You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/01 13:04:22 UTC

[arrow-datafusion] branch master updated: Extract common parquet testing code to `parquet-test-util` crate (#4042)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 41467ab4f Extract common parquet testing code to `parquet-test-util` crate (#4042)
41467ab4f is described below

commit 41467ab4fea4172ecd4df34eb0c081ac7670c233
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Tue Nov 1 09:04:03 2022 -0400

    Extract common parquet testing code to `parquet-test-util` crate (#4042)
    
    * Extract common parquet testing code to `parquet-test-util` crate
    
    * fix doc tests
---
 Cargo.toml                                    |  15 +-
 benchmarks/Cargo.toml                         |   1 +
 benchmarks/src/bin/parquet_filter_pushdown.rs | 147 ++----------------
 Cargo.toml => parquet-test-utils/Cargo.toml   |  29 ++--
 parquet-test-utils/src/lib.rs                 | 214 ++++++++++++++++++++++++++
 test-utils/src/data_gen.rs                    |  34 +++-
 6 files changed, 287 insertions(+), 153 deletions(-)

diff --git a/Cargo.toml b/Cargo.toml
index 36a9405b0..1ab431ea3 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -17,7 +17,20 @@
 
 [workspace]
 exclude = ["datafusion-cli"]
-members = ["datafusion/common", "datafusion/core", "datafusion/expr", "datafusion/jit", "datafusion/optimizer", "datafusion/physical-expr", "datafusion/proto", "datafusion/row", "datafusion/sql", "datafusion-examples", "benchmarks",
+members = [
+    "datafusion/common",
+    "datafusion/core",
+    "datafusion/expr",
+    "datafusion/jit",
+    "datafusion/optimizer",
+    "datafusion/physical-expr",
+    "datafusion/proto",
+    "datafusion/row",
+    "datafusion/sql",
+    "datafusion-examples",
+    "test-utils",
+    "parquet-test-utils",
+    "benchmarks",
 ]
 
 [profile.release]
diff --git a/benchmarks/Cargo.toml b/benchmarks/Cargo.toml
index 8795a8611..dd9253b8d 100644
--- a/benchmarks/Cargo.toml
+++ b/benchmarks/Cargo.toml
@@ -41,6 +41,7 @@ mimalloc = { version = "0.1", optional = true, default-features = false }
 num_cpus = "1.13.0"
 object_store = "0.5.0"
 parquet = "25.0.0"
+parquet-test-utils = { path = "../parquet-test-utils/" }
 rand = "0.8.4"
 serde = { version = "1.0.136", features = ["derive"] }
 serde_json = "1.0.78"
diff --git a/benchmarks/src/bin/parquet_filter_pushdown.rs b/benchmarks/src/bin/parquet_filter_pushdown.rs
index 3efa86f27..4ec2dc90c 100644
--- a/benchmarks/src/bin/parquet_filter_pushdown.rs
+++ b/benchmarks/src/bin/parquet_filter_pushdown.rs
@@ -15,30 +15,14 @@
 // specific language governing permissions and limitations
 // under the License.
 
-use arrow::datatypes::SchemaRef;
 use arrow::util::pretty;
-use datafusion::common::{Result, ToDFSchema};
-use datafusion::config::{
-    ConfigOptions, OPT_PARQUET_ENABLE_PAGE_INDEX, OPT_PARQUET_PUSHDOWN_FILTERS,
-    OPT_PARQUET_REORDER_FILTERS,
-};
-use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile};
-use datafusion::datasource::object_store::ObjectStoreUrl;
-use datafusion::execution::context::ExecutionProps;
+use datafusion::common::Result;
 use datafusion::logical_expr::{lit, or, Expr};
 use datafusion::optimizer::utils::disjunction;
-use datafusion::physical_expr::create_physical_expr;
 use datafusion::physical_plan::collect;
-use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
-use datafusion::physical_plan::filter::FilterExec;
 use datafusion::prelude::{col, SessionConfig, SessionContext};
-use object_store::path::Path;
-use object_store::ObjectMeta;
-use parquet::arrow::ArrowWriter;
-use parquet::file::properties::WriterProperties;
-use std::fs::File;
+use parquet_test_utils::{ParquetScanOptions, TestParquetFile};
 use std::path::PathBuf;
-use std::sync::Arc;
 use std::time::Instant;
 use structopt::StructOpt;
 use test_utils::AccessLogGenerator;
@@ -89,34 +73,16 @@ async fn main() -> Result<()> {
 
     let path = opt.path.join("logs.parquet");
 
-    let (schema, object_store_url, object_meta) =
-        gen_data(path, opt.scale_factor, opt.page_size, opt.row_group_size)?;
+    let test_file = gen_data(path, opt.scale_factor, opt.page_size, opt.row_group_size)?;
 
-    run_benchmarks(
-        &mut ctx,
-        schema,
-        object_store_url,
-        object_meta,
-        opt.iterations,
-        opt.debug,
-    )
-    .await?;
+    run_benchmarks(&mut ctx, &test_file, opt.iterations, opt.debug).await?;
 
     Ok(())
 }
 
-#[derive(Debug, Clone)]
-struct ParquetScanOptions {
-    pushdown_filters: bool,
-    reorder_filters: bool,
-    enable_page_index: bool,
-}
-
 async fn run_benchmarks(
     ctx: &mut SessionContext,
-    schema: SchemaRef,
-    object_store_url: ObjectStoreUrl,
-    object_meta: ObjectMeta,
+    test_file: &TestParquetFile,
     iterations: usize,
     debug: bool,
 ) -> Result<()> {
@@ -156,8 +122,7 @@ async fn run_benchmarks(
         disjunction([
             col("request_method").not_eq(lit("GET")),
             col("response_status").eq(lit(400_u16)),
-            // TODO this fails in the FilterExec with Error: Internal("The type of Dictionary(Int32, Utf8) = Utf8 of binary physical should be same")
-            // col("service").eq(lit("backend")),
+            col("service").eq(lit("backend")),
         ])
         .unwrap(),
         // Filter everything
@@ -174,9 +139,7 @@ async fn run_benchmarks(
                 let start = Instant::now();
                 let rows = exec_scan(
                     ctx,
-                    schema.clone(),
-                    object_store_url.clone(),
-                    object_meta.clone(),
+                    test_file,
                     filter_expr.clone(),
                     scan_options.clone(),
                     debug,
@@ -197,52 +160,12 @@ async fn run_benchmarks(
 
 async fn exec_scan(
     ctx: &SessionContext,
-    schema: SchemaRef,
-    object_store_url: ObjectStoreUrl,
-    object_meta: ObjectMeta,
+    test_file: &TestParquetFile,
     filter: Expr,
     scan_options: ParquetScanOptions,
     debug: bool,
 ) -> Result<usize> {
-    let ParquetScanOptions {
-        pushdown_filters,
-        reorder_filters,
-        enable_page_index,
-    } = scan_options;
-
-    let mut config_options = ConfigOptions::new();
-    config_options.set_bool(OPT_PARQUET_PUSHDOWN_FILTERS, pushdown_filters);
-    config_options.set_bool(OPT_PARQUET_REORDER_FILTERS, reorder_filters);
-    config_options.set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, enable_page_index);
-
-    let scan_config = FileScanConfig {
-        object_store_url,
-        file_schema: schema.clone(),
-        file_groups: vec![vec![PartitionedFile {
-            object_meta,
-            partition_values: vec![],
-            range: None,
-            extensions: None,
-        }]],
-        statistics: Default::default(),
-        projection: None,
-        limit: None,
-        table_partition_cols: vec![],
-        config_options: config_options.into_shareable(),
-    };
-
-    let df_schema = schema.clone().to_dfschema()?;
-
-    let physical_filter_expr = create_physical_expr(
-        &filter,
-        &df_schema,
-        schema.as_ref(),
-        &ExecutionProps::default(),
-    )?;
-
-    let parquet_exec = Arc::new(ParquetExec::new(scan_config, Some(filter), None));
-
-    let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?);
+    let exec = test_file.create_scan(filter, scan_options).await?;
 
     let task_ctx = ctx.task_ctx();
     let result = collect(exec, task_ctx).await?;
@@ -258,53 +181,15 @@ fn gen_data(
     scale_factor: f32,
     page_size: Option<usize>,
     row_group_size: Option<usize>,
-) -> Result<(SchemaRef, ObjectStoreUrl, ObjectMeta)> {
+) -> Result<TestParquetFile> {
     let generator = AccessLogGenerator::new();
 
-    let file = File::create(&path).unwrap();
-
-    let mut props_builder = WriterProperties::builder();
-
-    if let Some(s) = page_size {
-        props_builder = props_builder
-            .set_data_pagesize_limit(s)
-            .set_write_batch_size(s);
-    }
-
-    if let Some(s) = row_group_size {
-        props_builder = props_builder.set_max_row_group_size(s);
-    }
-
-    let schema = generator.schema();
-    let mut writer =
-        ArrowWriter::try_new(file, schema.clone(), Some(props_builder.build())).unwrap();
-
-    let mut num_rows = 0;
-
     let num_batches = 100_f32 * scale_factor;
 
-    for batch in generator.take(num_batches as usize) {
-        writer.write(&batch).unwrap();
-        writer.flush()?;
-        num_rows += batch.num_rows();
-    }
-    writer.close().unwrap();
-
-    println!("Generated test dataset with {} rows", num_rows);
-
-    let size = std::fs::metadata(&path)?.len() as usize;
-
-    let canonical_path = path.canonicalize()?;
-
-    let object_store_url =
-        ListingTableUrl::parse(canonical_path.to_str().unwrap_or_default())?
-            .object_store();
-
-    let object_meta = ObjectMeta {
-        location: Path::parse(canonical_path.to_str().unwrap_or_default())?,
-        last_modified: Default::default(),
-        size,
-    };
-
-    Ok((schema, object_store_url, object_meta))
+    TestParquetFile::try_new(
+        path,
+        generator.take(num_batches as usize),
+        page_size,
+        row_group_size,
+    )
 }
diff --git a/Cargo.toml b/parquet-test-utils/Cargo.toml
similarity index 53%
copy from Cargo.toml
copy to parquet-test-utils/Cargo.toml
index 36a9405b0..599cdc35b 100644
--- a/Cargo.toml
+++ b/parquet-test-utils/Cargo.toml
@@ -15,25 +15,14 @@
 # specific language governing permissions and limitations
 # under the License.
 
-[workspace]
-exclude = ["datafusion-cli"]
-members = ["datafusion/common", "datafusion/core", "datafusion/expr", "datafusion/jit", "datafusion/optimizer", "datafusion/physical-expr", "datafusion/proto", "datafusion/row", "datafusion/sql", "datafusion-examples", "benchmarks",
-]
+[package]
+name = "parquet-test-utils"
+version = "0.1.0"
+edition = "2021"
 
-[profile.release]
-codegen-units = 1
-lto = true
+# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
 
-# the release profile takes a long time to build so we can use this profile during development to save time
-# cargo build --profile release-nonlto
-[profile.release-nonlto]
-codegen-units = 16
-debug = false
-debug-assertions = false
-incremental = false
-inherits = "release"
-lto = false
-opt-level = 3
-overflow-checks = false
-panic = 'unwind'
-rpath = false
+[dependencies]
+datafusion = { path = "../datafusion/core" }
+object_store = "0.5.0"
+parquet = "25.0.0"
diff --git a/parquet-test-utils/src/lib.rs b/parquet-test-utils/src/lib.rs
new file mode 100644
index 000000000..6c1454016
--- /dev/null
+++ b/parquet-test-utils/src/lib.rs
@@ -0,0 +1,214 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! Helpers for writing parquet files and reading them back
+
+use std::fs::File;
+use std::path::PathBuf;
+use std::sync::Arc;
+
+use datafusion::arrow::{datatypes::SchemaRef, record_batch::RecordBatch};
+use datafusion::common::ToDFSchema;
+use datafusion::config::{
+    ConfigOptions, OPT_PARQUET_ENABLE_PAGE_INDEX, OPT_PARQUET_PUSHDOWN_FILTERS,
+    OPT_PARQUET_REORDER_FILTERS,
+};
+use datafusion::datasource::listing::{ListingTableUrl, PartitionedFile};
+use datafusion::datasource::object_store::ObjectStoreUrl;
+use datafusion::error::Result;
+use datafusion::optimizer::simplify_expressions::{ExprSimplifier, SimplifyContext};
+use datafusion::physical_expr::create_physical_expr;
+use datafusion::physical_expr::execution_props::ExecutionProps;
+use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
+use datafusion::physical_plan::filter::FilterExec;
+use datafusion::physical_plan::metrics::MetricsSet;
+use datafusion::physical_plan::ExecutionPlan;
+use datafusion::prelude::Expr;
+use object_store::path::Path;
+use object_store::ObjectMeta;
+use parquet::arrow::ArrowWriter;
+use parquet::file::properties::WriterProperties;
+
+///  a ParquetFile that has been created for testing.
+pub struct TestParquetFile {
+    path: PathBuf,
+    schema: SchemaRef,
+    object_store_url: ObjectStoreUrl,
+    object_meta: ObjectMeta,
+}
+
+#[derive(Debug, Clone)]
+pub struct ParquetScanOptions {
+    pub pushdown_filters: bool,
+    pub reorder_filters: bool,
+    pub enable_page_index: bool,
+}
+
+impl TestParquetFile {
+    /// Creates a new parquet file at the specified location
+    pub fn try_new(
+        path: PathBuf,
+        batches: impl IntoIterator<Item = RecordBatch>,
+        page_size: Option<usize>,
+        row_group_size: Option<usize>,
+    ) -> Result<Self> {
+        let file = File::create(&path).unwrap();
+
+        let mut props_builder = WriterProperties::builder();
+
+        if let Some(s) = page_size {
+            props_builder = props_builder
+                .set_data_pagesize_limit(s)
+                .set_write_batch_size(s);
+        }
+
+        if let Some(s) = row_group_size {
+            props_builder = props_builder.set_max_row_group_size(s);
+        }
+
+        let mut batches = batches.into_iter();
+        let first_batch = batches.next().expect("need at least one record batch");
+        let schema = first_batch.schema();
+
+        let mut writer =
+            ArrowWriter::try_new(file, schema.clone(), Some(props_builder.build()))
+                .unwrap();
+
+        writer.write(&first_batch).unwrap();
+        writer.flush()?;
+        let mut num_rows = first_batch.num_rows();
+
+        for batch in batches {
+            writer.write(&batch).unwrap();
+            writer.flush()?;
+            num_rows += batch.num_rows();
+        }
+        writer.close().unwrap();
+
+        println!("Generated test dataset with {} rows", num_rows);
+
+        let size = std::fs::metadata(&path)?.len() as usize;
+
+        let canonical_path = path.canonicalize()?;
+
+        let object_store_url =
+            ListingTableUrl::parse(canonical_path.to_str().unwrap_or_default())?
+                .object_store();
+
+        let object_meta = ObjectMeta {
+            location: Path::parse(canonical_path.to_str().unwrap_or_default())?,
+            last_modified: Default::default(),
+            size,
+        };
+
+        Ok(Self {
+            path,
+            schema,
+            object_store_url,
+            object_meta,
+        })
+    }
+
+    /// return a `ParquetExec` and `FilterExec` with the specified options to scan this parquet file.
+    ///
+    /// This returns the same plan that DataFusion will make with a pushed down predicate followed by a filter:
+    ///
+    /// ```text
+    /// (FilterExec)
+    ///   (ParquetExec)
+    /// ```
+    pub async fn create_scan(
+        &self,
+        filter: Expr,
+        scan_options: ParquetScanOptions,
+    ) -> Result<Arc<dyn ExecutionPlan>> {
+        let ParquetScanOptions {
+            pushdown_filters,
+            reorder_filters,
+            enable_page_index,
+        } = scan_options;
+
+        let mut config_options = ConfigOptions::new();
+        config_options.set_bool(OPT_PARQUET_PUSHDOWN_FILTERS, pushdown_filters);
+        config_options.set_bool(OPT_PARQUET_REORDER_FILTERS, reorder_filters);
+        config_options.set_bool(OPT_PARQUET_ENABLE_PAGE_INDEX, enable_page_index);
+
+        let scan_config = FileScanConfig {
+            object_store_url: self.object_store_url.clone(),
+            file_schema: self.schema.clone(),
+            file_groups: vec![vec![PartitionedFile {
+                object_meta: self.object_meta.clone(),
+                partition_values: vec![],
+                range: None,
+                extensions: None,
+            }]],
+            statistics: Default::default(),
+            projection: None,
+            limit: None,
+            table_partition_cols: vec![],
+            config_options: config_options.into_shareable(),
+        };
+
+        let df_schema = self.schema.clone().to_dfschema_ref()?;
+
+        // run coercion on the filters to coerce types etc.
+        let props = ExecutionProps::new();
+        let context = SimplifyContext::new(&props).with_schema(df_schema.clone());
+        let simplifier = ExprSimplifier::new(context);
+        let filter = simplifier.coerce(filter, df_schema.clone()).unwrap();
+
+        let physical_filter_expr = create_physical_expr(
+            &filter,
+            &df_schema,
+            self.schema.as_ref(),
+            &ExecutionProps::default(),
+        )?;
+
+        let parquet_exec = Arc::new(ParquetExec::new(scan_config, Some(filter), None));
+
+        let exec = Arc::new(FilterExec::try_new(physical_filter_expr, parquet_exec)?);
+
+        Ok(exec)
+    }
+
+    /// Retrieve metrics from the parquet exec returned from `create_scan`
+    ///
+    /// Recursively searches for ParquetExec and returns the metrics
+    /// on the first one it finds
+    pub fn parquet_metrics(&self, plan: Arc<dyn ExecutionPlan>) -> Option<MetricsSet> {
+        if let Some(parquet) = plan.as_any().downcast_ref::<ParquetExec>() {
+            return parquet.metrics();
+        }
+
+        for child in plan.children() {
+            if let Some(metrics) = self.parquet_metrics(child) {
+                return Some(metrics);
+            }
+        }
+        None
+    }
+
+    /// The schema of this parquet file
+    pub fn schema(&self) -> SchemaRef {
+        self.schema.clone()
+    }
+
+    /// The path to the parquet file
+    pub fn path(&self) -> &std::path::Path {
+        self.path.as_path()
+    }
+}
diff --git a/test-utils/src/data_gen.rs b/test-utils/src/data_gen.rs
index a77728eea..dd516d5fe 100644
--- a/test-utils/src/data_gen.rs
+++ b/test-utils/src/data_gen.rs
@@ -190,6 +190,11 @@ pub struct AccessLogGenerator {
     schema: SchemaRef,
     rng: StdRng,
     host_idx: usize,
+
+    /// optional  number of rows produced
+    row_limit: Option<usize>,
+    /// How many rows have been returned so far
+    row_count: usize,
 }
 
 impl Default for AccessLogGenerator {
@@ -209,6 +214,8 @@ impl AccessLogGenerator {
             schema: BatchBuilder::schema(),
             host_idx: 0,
             rng: StdRng::from_seed(seed),
+            row_limit: None,
+            row_count: 0,
         }
     }
 
@@ -216,12 +223,25 @@ impl AccessLogGenerator {
     pub fn schema(&self) -> SchemaRef {
         self.schema.clone()
     }
+
+    /// Return up to row_limit rows;
+    pub fn with_row_limit(mut self, row_limit: Option<usize>) -> Self {
+        self.row_limit = row_limit;
+        self
+    }
 }
 
 impl Iterator for AccessLogGenerator {
     type Item = RecordBatch;
 
     fn next(&mut self) -> Option<Self::Item> {
+        // if we have a limit and have passed it, stop generating
+        if let Some(limit) = self.row_limit {
+            if self.row_count >= limit {
+                return None;
+            }
+        }
+
         let mut builder = BatchBuilder::default();
 
         let host = format!(
@@ -236,6 +256,18 @@ impl Iterator for AccessLogGenerator {
             }
             builder.append(&mut self.rng, &host, service);
         }
-        Some(builder.finish(Arc::clone(&self.schema)))
+
+        let batch = builder.finish(Arc::clone(&self.schema));
+
+        // limit batch if needed to stay under row limit
+        let batch = if let Some(limit) = self.row_limit {
+            let num_rows = limit - self.row_count;
+            batch.slice(0, num_rows)
+        } else {
+            batch
+        };
+
+        self.row_count += batch.num_rows();
+        Some(batch)
     }
 }