You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/01 13:04:22 UTC
[arrow-datafusion] branch master updated: Extract common parquet testing code to `parquet-test-util` crate (#4042)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 41467ab4f Extract common parquet testing code to `parquet-test-util` crate (#4042)
41467ab4f is described below
commit 41467ab4fea4172ecd4df34eb0c081ac7670c233
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Tue Nov 1 09:04:03 2022 -0400
Extract common parquet testing code to `parquet-test-util` crate (#4042)
* Extract common parquet testing code to `parquet-test-util` crate
* fix doc tests
---
Cargo.toml | 15 +-
benchmarks/Cargo.toml | 1 +
benchmarks/src/bin/parquet_filter_pushdown.rs | 147 ++----------------
Cargo.toml => parquet-test-utils/Cargo.toml | 29 ++--
parquet-test-utils/src/lib.rs | 214 ++++++++++++++++++++++++++
test-utils/src/data_gen.rs | 34 +++-
6 files changed, 287 insertions(+), 153 deletions(-)
diff --git a/Cargo.toml b/Cargo.toml
index 36a9405b0..1ab431ea3 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -17,7 +17,20 @@
[workspace]
exclude = ["datafusion-cli"]
-members = ["datafusion/common", "datafusion/core", "datafusion/expr", "datafusion/jit", "datafusion/optimizer", "datafusion/physical-expr", "datafusion/proto", "datafusion/row", "datafusion/sql", "datafusion-examples", "benchmarks",
+members = [
+ "datafusion/common",
+ "datafusion/core",
+ "datafusion/expr",
+ "datafusion/jit",
+ "datafusion/optimizer",
+ "datafusion/physical-expr",
+ "datafusion/proto",
+ "datafusion/row",
+ "datafusion/sql",
+ "datafusion-examples",
+ "test-utils",
+ "parquet-test-utils",
+ "benchmarks",
]
[profile.release]
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 8795a8611..dd9253b8d 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -41,6 +41,7 @@ mimalloc = { version = "0.1", optional = true, default-features = false }
num_cpus = "1.13.0"
object_store = "0.5.0"
parquet = "25.0.0"
+parquet-test-utils = { path = "../parquet-test-utils/" }
rand = "0.8.4"
serde = { version = "1.0.136", features = ["derive"] }
serde_json = "1.0.78"
diff --git a/benchmarks/src/bin/parquet_filter_pushdown.rs b/benchmarks/src/bin/parquet_filter_pushdown.rs
index 3efa86f27..4ec2dc90c 100644
--- a/benchmarks/src/bin/parquet_filter_pushdown.rs
+++ b/benchmarks/src/bin/parquet_filter_pushdown.rs
@@ -15,30 +15,14 @@
// specific language governing permissions and limitations
// under the License.
-use arrow::datatypes::SchemaRef;
use arrow::util::pretty;
-use datafusion::common::{Result, ToDFSchema};
-use datafusion::config::{
- ConfigOptions, OPT_PARQUET_ENABLE_PAGE_INDEX, OPT_PARQUET_PUSHDOWN_FILTERS,
- OPT_PARQUET_REORDER_FILTERS,
-};
-use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile};
-use datafusion::datasource::object_store::ObjectStoreUrl;
-use datafusion::execution::context::ExecutionProps;
+use datafusion::common::Result;
use datafusion::logical_expr::{lit, or, Expr};
use datafusion::optimizer::utils::disjunction;
-use datafusion::physical_expr::create_physical_expr;
use datafusion::physical_plan::collect;
-use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
-use datafusion::physical_plan::filter::FilterExec;
use datafusion::prelude::{col, SessionConfig, SessionContext};
-use object_store::path::Path;
-use object_store::ObjectMeta;
-use parquet::arrow::ArrowWriter;
-use parquet::file::properties::WriterProperties;
-use std::fs::File;
+use parquet_test_utils::{ParquetScanOptions, TestParquetFile};
use std::path::PathBuf;
-use std::sync::Arc;
use std::time::Instant;
use structopt::StructOpt;
use test_utils::AccessLogGenerator;
@@ -89,34 +73,16 @@ async fn main() -> Result<()> {
let path = opt.path.join("logs.parquet");
- let (schema, object_store_url, object_meta) =
- gen_data(path, opt.scale_factor, opt.page_size, opt.row_group_size)?;
+ let test_file = gen_data(path, opt.scale_factor, opt.page_size, opt.row_group_size)?;
- run_benchmarks(
- &mut ctx,
- schema,
- object_store_url,
- object_meta,
- opt.iterations,
- opt.debug,
- )
- .await?;
+ run_benchmarks(&mut ctx, &test_file, opt.iterations, opt.debug).await?;
Ok(())
}
-#[derive(Debug, Clone)]
-struct ParquetScanOptions {
- pushdown_filters: bool,
- reorder_filters: bool,
- enable_page_index: bool,
-}
-
async fn run_benchmarks(
ctx: &mut SessionContext,
- schema: SchemaRef,
- object_store_url: ObjectStoreUrl,
- object_meta: ObjectMeta,
+ test_file: &TestParquetFile,
iterations: usize,
debug: bool,
) -> Result<()> {
@@ -156,8 +122,7 @@ async fn run_benchmarks(
disjunction([
col("request_method").not_eq(lit("GET")),
col("response_status").eq(lit(400_u16)),
- // TODO this fails in the FilterExec with Error: Internal("The type of Dictionary(Int32, Utf8) = Utf8 of binary physical should be same")
- // col("service").eq(lit("backend")),
+ col("service").eq(lit("backend")),
])
.unwrap(),
// Filter everything
@@ -174,9 +139,7 @@ async fn run_benchmarks(
let start = Instant::now();
let rows = exec_scan(
ctx,
- schema.clone(),
- object_store_url.clone(),
- object_meta.clone(),
+ test_file,
filter_expr.clone(),
scan_options.clone(),
debug,
@@ -197,52 +160,12 @@ async fn run_benchmarks(
async fn exec_scan(
ctx: &SessionContext,
- schema: SchemaRef,
- object_store_url: ObjectStoreUrl,
- object_meta: ObjectMeta,
+ test_file: &TestParquetFile,
filter: Expr,
scan_options: ParquetScanOptions,
debug: bool,
) -> Result<usize> {
- let ParquetScanOptions {
- pushdown_filters,
- reorder_filters,
- enable_page_index,
- } = scan_options;
-
- let mut config_options = ConfigOptions::new();
- config_options.set_bool(OPT_PARQUET_PUSHDOWN_FILTERS, pushdown_filters);
- config_options.set_bool(OPT_PARQUET_REORDER_FILTERS, reorder_filters);
- config_options.set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, enable_page_index);
-
- let scan_config = FileScanConfig {
- object_store_url,
- file_schema: schema.clone(),
- file_groups: vec![vec![PartitionedFile {
- object_meta,
- partition_values: vec![],
- range: None,
- extensions: None,
- }]],
- statistics: Default::default(),
- projection: None,
- limit: None,
- table_partition_cols: vec![],
- config_options: config_options.into_shareable(),
- };
-
- let df_schema = schema.clone().to_dfschema()?;
-
- let physical_filter_expr = create_physical_expr(
- &filter,
- &df_schema,
- schema.as_ref(),
- &ExecutionProps::default(),
- )?;
-
- let parquet_exec = Arc::new(ParquetExec::new(scan_config, Some(filter), None));
-
- let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?);
+ let exec = test_file.create_scan(filter, scan_options).await?;
let task_ctx = ctx.task_ctx();
let result = collect(exec, task_ctx).await?;
@@ -258,53 +181,15 @@ fn gen_data(
scale_factor: f32,
page_size: Option<usize>,
row_group_size: Option<usize>,
-) -> Result<(SchemaRef, ObjectStoreUrl, ObjectMeta)> {
+) -> Result<TestParquetFile> {
let generator = AccessLogGenerator::new();
- let file = File::create(&path).unwrap();
-
- let mut props_builder = WriterProperties::builder();
-
- if let Some(s) = page_size {
- props_builder = props_builder
- .set_data_pagesize_limit(s)
- .set_write_batch_size(s);
- }
-
- if let Some(s) = row_group_size {
- props_builder = props_builder.set_max_row_group_size(s);
- }
-
- let schema = generator.schema();
- let mut writer =
- ArrowWriter::try_new(file, schema.clone(), Some(props_builder.build())).unwrap();
-
- let mut num_rows = 0;
-
let num_batches = 100_f32 * scale_factor;
- for batch in generator.take(num_batches as usize) {
- writer.write(&batch).unwrap();
- writer.flush()?;
- num_rows += batch.num_rows();
- }
- writer.close().unwrap();
-
- println!("Generated test dataset with {} rows", num_rows);
-
- let size = std::fs::metadata(&path)?.len() as usize;
-
- let canonical_path = path.canonicalize()?;
-
- let object_store_url =
- ListingTableUrl::parse(canonical_path.to_str().unwrap_or_default())?
- .object_store();
-
- let object_meta = ObjectMeta {
- location: Path::parse(canonical_path.to_str().unwrap_or_default())?,
- last_modified: Default::default(),
- size,
- };
-
- Ok((schema, object_store_url, object_meta))
+ TestParquetFile::try_new(
+ path,
+ generator.take(num_batches as usize),
+ page_size,
+ row_group_size,
+ )
}
diff --git a/Cargo.toml b/parquet-test-utils/Cargo.toml
similarity index 53%
copy from Cargo.toml
copy to parquet-test-utils/Cargo.toml
index 36a9405b0..599cdc35b 100644
--- a/Cargo.toml
+++ b/parquet-test-utils/Cargo.toml
@@ -15,25 +15,14 @@
# specific language governing permissions and limitations
# under the License.
-[workspace]
-exclude = ["datafusion-cli"]
-members = ["datafusion/common", "datafusion/core", "datafusion/expr", "datafusion/jit", "datafusion/optimizer", "datafusion/physical-expr", "datafusion/proto", "datafusion/row", "datafusion/sql", "datafusion-examples", "benchmarks",
-]
+[package]
+name = "parquet-test-utils"
+version = "0.1.0"
+edition = "2021"
-[profile.release]
-codegen-units = 1
-lto = true
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
-# the release profile takes a long time to build so we can use this profile during development to save time
-# cargo build --profile release-nonlto
-[profile.release-nonlto]
-codegen-units = 16
-debug = false
-debug-assertions = false
-incremental = false
-inherits = "release"
-lto = false
-opt-level = 3
-overflow-checks = false
-panic = 'unwind'
-rpath = false
+[dependencies]
+datafusion = { path = "../datafusion/core" }
+object_store = "0.5.0"
+parquet = "25.0.0"
diff --git a/parquet-test-utils/src/lib.rs b/parquet-test-utils/src/lib.rs
new file mode 100644
index 000000000..6c1454016
--- /dev/null
+++ b/parquet-test-utils/src/lib.rs
@@ -0,0 +1,214 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Helpers for writing parquet files and reading them back
+
+use std::fs::File;
+use std::path::PathBuf;
+use std::sync::Arc;
+
+use datafusion::arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
+use datafusion::common::ToDFSchema;
+use datafusion::config::{
+ ConfigOptions, OPT_PARQUET_ENABLE_PAGE_INDEX, OPT_PARQUET_PUSHDOWN_FILTERS,
+ OPT_PARQUET_REORDER_FILTERS,
+};
+use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile};
+use datafusion::datasource::object_store::ObjectStoreUrl;
+use datafusion::error::Result;
+use datafusion::optimizer::simplify_expressions::{ExprSimplifier, SimplifyContext};
+use datafusion::physical_expr::create_physical_expr;
+use datafusion::physical_expr::execution_props::ExecutionProps;
+use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
+use datafusion::physical_plan::filter::FilterExec;
+use datafusion::physical_plan::metrics::MetricsSet;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::Expr;
+use object_store::path::Path;
+use object_store::ObjectMeta;
+use parquet::arrow::ArrowWriter;
+use parquet::file::properties::WriterProperties;
+
+/// a ParquetFile that has been created for testing.
+pub struct TestParquetFile {
+ path: PathBuf,
+ schema: SchemaRef,
+ object_store_url: ObjectStoreUrl,
+ object_meta: ObjectMeta,
+}
+
+#[derive(Debug, Clone)]
+pub struct ParquetScanOptions {
+ pub pushdown_filters: bool,
+ pub reorder_filters: bool,
+ pub enable_page_index: bool,
+}
+
+impl TestParquetFile {
+ /// Creates a new parquet file at the specified location
+ pub fn try_new(
+ path: PathBuf,
+ batches: impl IntoIterator<Item = RecordBatch>,
+ page_size: Option<usize>,
+ row_group_size: Option<usize>,
+ ) -> Result<Self> {
+ let file = File::create(&path).unwrap();
+
+ let mut props_builder = WriterProperties::builder();
+
+ if let Some(s) = page_size {
+ props_builder = props_builder
+ .set_data_pagesize_limit(s)
+ .set_write_batch_size(s);
+ }
+
+ if let Some(s) = row_group_size {
+ props_builder = props_builder.set_max_row_group_size(s);
+ }
+
+ let mut batches = batches.into_iter();
+ let first_batch = batches.next().expect("need at least one record batch");
+ let schema = first_batch.schema();
+
+ let mut writer =
+ ArrowWriter::try_new(file, schema.clone(), Some(props_builder.build()))
+ .unwrap();
+
+ writer.write(&first_batch).unwrap();
+ writer.flush()?;
+ let mut num_rows = first_batch.num_rows();
+
+ for batch in batches {
+ writer.write(&batch).unwrap();
+ writer.flush()?;
+ num_rows += batch.num_rows();
+ }
+ writer.close().unwrap();
+
+ println!("Generated test dataset with {} rows", num_rows);
+
+ let size = std::fs::metadata(&path)?.len() as usize;
+
+ let canonical_path = path.canonicalize()?;
+
+ let object_store_url =
+ ListingTableUrl::parse(canonical_path.to_str().unwrap_or_default())?
+ .object_store();
+
+ let object_meta = ObjectMeta {
+ location: Path::parse(canonical_path.to_str().unwrap_or_default())?,
+ last_modified: Default::default(),
+ size,
+ };
+
+ Ok(Self {
+ path,
+ schema,
+ object_store_url,
+ object_meta,
+ })
+ }
+
+ /// return a `ParquetExec` and `FilterExec` with the specified options to scan this parquet file.
+ ///
+ /// This returns the same plan that DataFusion will make with a pushed down predicate followed by a filter:
+ ///
+ /// ```text
+ /// (FilterExec)
+ /// (ParquetExec)
+ /// ```
+ pub async fn create_scan(
+ &self,
+ filter: Expr,
+ scan_options: ParquetScanOptions,
+ ) -> Result<Arc<dyn ExecutionPlan>> {
+ let ParquetScanOptions {
+ pushdown_filters,
+ reorder_filters,
+ enable_page_index,
+ } = scan_options;
+
+ let mut config_options = ConfigOptions::new();
+ config_options.set_bool(OPT_PARQUET_PUSHDOWN_FILTERS, pushdown_filters);
+ config_options.set_bool(OPT_PARQUET_REORDER_FILTERS, reorder_filters);
+ config_options.set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, enable_page_index);
+
+ let scan_config = FileScanConfig {
+ object_store_url: self.object_store_url.clone(),
+ file_schema: self.schema.clone(),
+ file_groups: vec![vec![PartitionedFile {
+ object_meta: self.object_meta.clone(),
+ partition_values: vec![],
+ range: None,
+ extensions: None,
+ }]],
+ statistics: Default::default(),
+ projection: None,
+ limit: None,
+ table_partition_cols: vec![],
+ config_options: config_options.into_shareable(),
+ };
+
+ let df_schema = self.schema.clone().to_dfschema_ref()?;
+
+ // run coercion on the filters to coerce types etc.
+ let props = ExecutionProps::new();
+ let context = SimplifyContext::new(&props).with_schema(df_schema.clone());
+ let simplifier = ExprSimplifier::new(context);
+ let filter = simplifier.coerce(filter, df_schema.clone()).unwrap();
+
+ let physical_filter_expr = create_physical_expr(
+ &filter,
+ &df_schema,
+ self.schema.as_ref(),
+ &ExecutionProps::default(),
+ )?;
+
+ let parquet_exec = Arc::new(ParquetExec::new(scan_config, Some(filter), None));
+
+ let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?);
+
+ Ok(exec)
+ }
+
+ /// Retrieve metrics from the parquet exec returned from `create_scan`
+ ///
+ /// Recursively searches for ParquetExec and returns the metrics
+ /// on the first one it finds
+ pub fn parquet_metrics(&self, plan: Arc<dyn ExecutionPlan>) -> Option<MetricsSet> {
+ if let Some(parquet) = plan.as_any().downcast_ref::<ParquetExec>() {
+ return parquet.metrics();
+ }
+
+ for child in plan.children() {
+ if let Some(metrics) = self.parquet_metrics(child) {
+ return Some(metrics);
+ }
+ }
+ None
+ }
+
+ /// The schema of this parquet file
+ pub fn schema(&self) -> SchemaRef {
+ self.schema.clone()
+ }
+
+ /// The path to the parquet file
+ pub fn path(&self) -> &std::path::Path {
+ self.path.as_path()
+ }
+}
diff --git a/test-utils/src/data_gen.rs b/test-utils/src/data_gen.rs
index a77728eea..dd516d5fe 100644
--- a/test-utils/src/data_gen.rs
+++ b/test-utils/src/data_gen.rs
@@ -190,6 +190,11 @@ pub struct AccessLogGenerator {
schema: SchemaRef,
rng: StdRng,
host_idx: usize,
+
+ /// optional number of rows produced
+ row_limit: Option<usize>,
+ /// How many rows have been returned so far
+ row_count: usize,
}
impl Default for AccessLogGenerator {
@@ -209,6 +214,8 @@ impl AccessLogGenerator {
schema: BatchBuilder::schema(),
host_idx: 0,
rng: StdRng::from_seed(seed),
+ row_limit: None,
+ row_count: 0,
}
}
@@ -216,12 +223,25 @@ impl AccessLogGenerator {
pub fn schema(&self) -> SchemaRef {
self.schema.clone()
}
+
+ /// Return up to row_limit rows;
+ pub fn with_row_limit(mut self, row_limit: Option<usize>) -> Self {
+ self.row_limit = row_limit;
+ self
+ }
}
impl Iterator for AccessLogGenerator {
type Item = RecordBatch;
fn next(&mut self) -> Option<Self::Item> {
+ // if we have a limit and have passed it, stop generating
+ if let Some(limit) = self.row_limit {
+ if self.row_count >= limit {
+ return None;
+ }
+ }
+
let mut builder = BatchBuilder::default();
let host = format!(
@@ -236,6 +256,18 @@ impl Iterator for AccessLogGenerator {
}
builder.append(&mut self.rng, &host, service);
}
- Some(builder.finish(Arc::clone(&self.schema)))
+
+ let batch = builder.finish(Arc::clone(&self.schema));
+
+ // limit batch if needed to stay under row limit
+ let batch = if let Some(limit) = self.row_limit {
+ let num_rows = limit - self.row_count;
+ batch.slice(0, num_rows)
+ } else {
+ batch
+ };
+
+ self.row_count += batch.num_rows();
+ Some(batch)
}
}