You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/08 11:12:14 UTC

[arrow-datafusion] branch master updated: Add parquet predicate pushdown tests with smaller pages (#4131)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 175adbda6 Add parquet predicate pushdown tests with smaller pages (#4131)
175adbda6 is described below

commit 175adbda616ca2f339cfba375c1f86535f6e2ad1
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Tue Nov 8 06:12:09 2022 -0500

    Add parquet predicate pushdown tests with smaller pages (#4131)
---
 benchmarks/src/bin/parquet_filter_pushdown.rs      |  36 ++---
 datafusion/core/src/physical_optimizer/pruning.rs  |   7 +
 .../file_format/parquet/page_filter.rs             |  13 +-
 datafusion/core/tests/parquet_filter_pushdown.rs   | 158 ++++++++++++++++++---
 parquet-test-utils/src/lib.rs                      |  26 +---
 5 files changed, 182 insertions(+), 58 deletions(-)

diff --git a/benchmarks/src/bin/parquet_filter_pushdown.rs b/benchmarks/src/bin/parquet_filter_pushdown.rs
index 4ec2dc90c..a3ff7bee5 100644
--- a/benchmarks/src/bin/parquet_filter_pushdown.rs
+++ b/benchmarks/src/bin/parquet_filter_pushdown.rs
@@ -21,6 +21,7 @@ use datafusion::logical_expr::{lit, or, Expr};
 use datafusion::optimizer::utils::disjunction;
 use datafusion::physical_plan::collect;
 use datafusion::prelude::{col, SessionConfig, SessionContext};
+use parquet::file::properties::WriterProperties;
 use parquet_test_utils::{ParquetScanOptions, TestParquetFile};
 use std::path::PathBuf;
 use std::time::Instant;
@@ -73,7 +74,19 @@ async fn main() -> Result<()> {
 
     let path = opt.path.join("logs.parquet");
 
-    let test_file = gen_data(path, opt.scale_factor, opt.page_size, opt.row_group_size)?;
+    let mut props_builder = WriterProperties::builder();
+
+    if let Some(s) = opt.page_size {
+        props_builder = props_builder
+            .set_data_pagesize_limit(s)
+            .set_write_batch_size(s);
+    }
+
+    if let Some(s) = opt.row_group_size {
+        props_builder = props_builder.set_max_row_group_size(s);
+    }
+
+    let test_file = gen_data(path, opt.scale_factor, props_builder.build())?;
 
     run_benchmarks(&mut ctx, &test_file, opt.iterations, opt.debug).await?;
 
@@ -137,14 +150,9 @@ async fn run_benchmarks(
             println!("Using scan options {:?}", scan_options);
             for i in 0..iterations {
                 let start = Instant::now();
-                let rows = exec_scan(
-                    ctx,
-                    test_file,
-                    filter_expr.clone(),
-                    scan_options.clone(),
-                    debug,
-                )
-                .await?;
+                let rows =
+                    exec_scan(ctx, test_file, filter_expr.clone(), *scan_options, debug)
+                        .await?;
                 println!(
                     "Iteration {} returned {} rows in {} ms",
                     i,
@@ -179,17 +187,11 @@ async fn exec_scan(
 fn gen_data(
     path: PathBuf,
     scale_factor: f32,
-    page_size: Option<usize>,
-    row_group_size: Option<usize>,
+    props: WriterProperties,
 ) -> Result<TestParquetFile> {
     let generator = AccessLogGenerator::new();
 
     let num_batches = 100_f32 * scale_factor;
 
-    TestParquetFile::try_new(
-        path,
-        generator.take(num_batches as usize),
-        page_size,
-        row_group_size,
-    )
+    TestParquetFile::try_new(path, props, generator.take(num_batches as usize))
 }
diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs
index 3adf0ad7b..a47815377 100644
--- a/datafusion/core/src/physical_optimizer/pruning.rs
+++ b/datafusion/core/src/physical_optimizer/pruning.rs
@@ -51,6 +51,7 @@ use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter};
 use datafusion_expr::utils::expr_to_columns;
 use datafusion_expr::{binary_expr, cast, try_cast, ExprSchemable};
 use datafusion_physical_expr::create_physical_expr;
+use log::trace;
 
 /// Interface to pass statistics information to [`PruningPredicate`]
 ///
@@ -415,6 +416,12 @@ fn build_statistics_record_batch<S: PruningStatistics>(
     let mut options = RecordBatchOptions::default();
     options.row_count = Some(statistics.num_containers());
 
+    trace!(
+        "Creating statistics batch for {:#?} with {:#?}",
+        required_columns,
+        arrays
+    );
+
     RecordBatch::try_new_with_options(schema, arrays, &options).map_err(|err| {
         DataFusionError::Plan(format!("Can not create statistics record batch: {}", err))
     })
diff --git a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
index 828d21375..95c93151a 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
@@ -22,7 +22,7 @@ use arrow::{array::ArrayRef, datatypes::SchemaRef, error::ArrowError};
 use datafusion_common::{Column, DataFusionError, Result};
 use datafusion_expr::utils::expr_to_columns;
 use datafusion_optimizer::utils::split_conjunction;
-use log::{debug, error};
+use log::{debug, error, trace};
 use parquet::{
     arrow::arrow_reader::{RowSelection, RowSelector},
     errors::ParquetError,
@@ -143,6 +143,9 @@ pub(crate) fn build_page_filter(
                         }),
                     );
                 } else {
+                    trace!(
+                        "Did not have enough metadata to prune with page indexes, falling back, falling back to all rows",
+                    );
                     // fallback select all rows
                     let all_selected =
                         vec![RowSelector::select(groups[*r].num_rows() as usize)];
@@ -150,8 +153,9 @@ pub(crate) fn build_page_filter(
                 }
             }
             debug!(
-                "Use filter and page index create RowSelection {:?} from predicate:{:?}",
-                &selectors, predicate
+                "Use filter and page index create RowSelection {:?} from predicate: {:?}",
+                &selectors,
+                predicate.predicate_expr(),
             );
             row_selections.push_back(selectors.into_iter().flatten().collect::<Vec<_>>());
         }
@@ -321,7 +325,7 @@ fn prune_pages_in_one_row_group(
                 assert_eq!(row_vec.len(), values.len());
                 let mut sum_row = *row_vec.first().unwrap();
                 let mut selected = *values.first().unwrap();
-
+                trace!("Pruned to to {:?} using {:?}", values, pruning_stats);
                 for (i, &f) in values.iter().skip(1).enumerate() {
                     if f == selected {
                         sum_row += *row_vec.get(i).unwrap();
@@ -376,6 +380,7 @@ fn create_row_count_in_each_page(
 
 /// Wraps one col page_index in one rowGroup statistics in a way
 /// that implements [`PruningStatistics`]
+#[derive(Debug)]
 struct PagesPruningStatistics<'a> {
     col_page_indexes: &'a Index,
     col_offset_indexes: &'a Vec<PageLocation>,
diff --git a/datafusion/core/tests/parquet_filter_pushdown.rs b/datafusion/core/tests/parquet_filter_pushdown.rs
index 2c0d75cc2..54b7d8d16 100644
--- a/datafusion/core/tests/parquet_filter_pushdown.rs
+++ b/datafusion/core/tests/parquet_filter_pushdown.rs
@@ -30,12 +30,12 @@ use std::time::Instant;
 
 use arrow::compute::concat_batches;
 use arrow::record_batch::RecordBatch;
-use datafusion::logical_expr::{lit, Expr};
 use datafusion::physical_plan::collect;
 use datafusion::physical_plan::metrics::MetricsSet;
-use datafusion::prelude::{col, SessionContext};
+use datafusion::prelude::{col, lit, lit_timestamp_nano, Expr, SessionContext};
 use datafusion_optimizer::utils::{conjunction, disjunction, split_conjunction};
 use itertools::Itertools;
+use parquet::file::properties::WriterProperties;
 use parquet_test_utils::{ParquetScanOptions, TestParquetFile};
 use tempfile::TempDir;
 use test_utils::AccessLogGenerator;
@@ -43,24 +43,29 @@ use test_utils::AccessLogGenerator;
 /// how many rows of generated data to write to our parquet file (arbitrary)
 const NUM_ROWS: usize = 53819;
 
+#[cfg(test)]
+#[ctor::ctor]
+fn init() {
+    // enable logging so RUST_LOG works
+    let _ = env_logger::try_init();
+}
+
 #[cfg(not(target_family = "windows"))]
 #[tokio::test]
 async fn single_file() {
     // Only create the parquet file once as it is fairly large
+
     let tempdir = TempDir::new().unwrap();
 
     let generator = AccessLogGenerator::new().with_row_limit(Some(NUM_ROWS));
 
-    // TODO: set the max page rows with some various / arbitrary sizes 8311
-    // (using https://github.com/apache/arrow-rs/issues/2941) to ensure we get multiple pages
-    let page_size = None;
-    let row_group_size = None;
+    // default properties
+    let props = WriterProperties::builder().build();
     let file = tempdir.path().join("data.parquet");
 
     let start = Instant::now();
     println!("Writing test data to {:?}", file);
-    let test_parquet_file =
-        TestParquetFile::try_new(file, generator, page_size, row_group_size).unwrap();
+    let test_parquet_file = TestParquetFile::try_new(file, props, generator).unwrap();
     println!(
         "Completed generating test data in {:?}",
         Instant::now() - start
@@ -225,6 +230,77 @@ async fn single_file() {
         .run()
         .await;
 }
+
+#[cfg(not(target_family = "windows"))]
+#[tokio::test]
+async fn single_file_small_data_pages() {
+    let tempdir = TempDir::new().unwrap();
+
+    let generator = AccessLogGenerator::new().with_row_limit(Some(NUM_ROWS));
+
+    // set the max page rows with arbitrary sizes 8311 to increase
+    // effectiveness of page filtering
+    let props = WriterProperties::builder()
+        .set_data_page_row_count_limit(8311)
+        .build();
+    let file = tempdir.path().join("data_8311.parquet");
+
+    let start = Instant::now();
+    println!("Writing test data to {:?}", file);
+    let test_parquet_file = TestParquetFile::try_new(file, props, generator).unwrap();
+    println!(
+        "Completed generating test data in {:?}",
+        Instant::now() - start
+    );
+
+    // The statistics on the 'pod' column are as follows:
+    //
+    // parquet-tools dump -d ~/Downloads/data_8311.parquet
+    //
+    // ...
+    // pod TV=53819 RL=0 DL=0 DS:                 8 DE:PLAIN
+    // ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+    // page 0:                                     DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: aqcathnxqsphdhgjtgvxsfyiwbmhlmg, max: bvjjmytpfzdfsvlzfhbunasihjgxpesbmxv, num_nulls not defined] CRC:[none] SZ:7 VC:9216
+    // page 1:                                     DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: bvjjmytpfzdfsvlzfhbunasihjgxpesbmxv, max: bxyubzxbbmhroqhrdzttngxcpwwgkpaoizvgzd, num_nulls not defined] CRC:[none] SZ:7 VC:9216
+    // page 2:                                     DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: bxyubzxbbmhroqhrdzttngxcpwwgkpaoizvgzd, max: djzdyiecnumrsrcbizwlqzdhnpoiqdh, num_nulls not defined] CRC:[none] SZ:10 VC:9216
+    // page 3:                                     DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: djzdyiecnumrsrcbizwlqzdhnpoiqdh, max: fktdcgtmzvoedpwhfevcvvrtaurzgex, num_nulls not defined] CRC:[none] SZ:7 VC:9216
+    // page 4:                                     DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fktdcgtmzvoedpwhfevcvvrtaurzgex, max: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, num_nulls not defined] CRC:[none] SZ:7 VC:9216
+    // page 5:                                     DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: fwtdpgtxwqkkgtgvthhwycrvjiizdifyp, max: iadnalqpdzthpifrvewossmpqibgtsuin, num_nulls not defined] CRC:[none] SZ:7 VC:7739
+    //
+    // This test currently fails due to https://github.com/apache/arrow-datafusion/issues/3833
+    // (page index pruning not implemented for byte array)
+
+    // TestCase::new(&test_parquet_file)
+    //     .with_name("selective")
+    //     // predicagte is chosen carefully to prune pages 0, 1, 2, 3, 4
+    //     // pod = 'iadnalqpdzthpifrvewossmpqibgtsuin'
+    //     .with_filter(col("pod").eq(lit("iadnalqpdzthpifrvewossmpqibgtsuin")))
+    //     .with_pushdown_expected(PushdownExpected::Some)
+    //     .with_page_index_filtering_expected(PageIndexFilteringExpected::Some)
+    //     .with_expected_rows(2574)
+    //     .run()
+    //     .await;
+
+    // time TV=53819 RL=0 DL=0 DS:                7092 DE:PLAIN
+    // --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+    // page 0:                                     DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.004133888, num_nulls not defined] CRC:[none] SZ:13844 VC:9216
+    // page 1:                                     DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.006397952, num_nulls not defined] CRC:[none] SZ:14996 VC:9216
+    // page 2:                                     DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.005650432, num_nulls not defined] CRC:[none] SZ:14996 VC:9216
+    // page 3:                                     DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.004269056, num_nulls not defined] CRC:[none] SZ:14996 VC:9216
+    // page 4:                                     DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.007261184, num_nulls not defined] CRC:[none] SZ:14996 VC:9216
+    // page 5:                                     DLE:RLE RLE:RLE VLE:RLE_DICTIONARY ST:[min: 1970-01-01T00:00:00.000000000, max: 1970-01-01T00:00:00.005330944, num_nulls not defined] CRC:[none] SZ:12601 VC:7739
+    TestCase::new(&test_parquet_file)
+        .with_name("selective")
+        // predicagte is chosen carefully to prune pages
+        // time > 1970-01-01T00:00:00.004300000
+        .with_filter(col("time").gt(lit_timestamp_nano(4300000)))
+        .with_pushdown_expected(PushdownExpected::Some)
+        .with_page_index_filtering_expected(PageIndexFilteringExpected::Some)
+        .with_expected_rows(9745)
+        .run()
+        .await;
+}
+
 /// Expected pushdown behavior
 #[derive(Debug, Clone, Copy)]
 enum PushdownExpected {
@@ -234,6 +310,15 @@ enum PushdownExpected {
     Some,
 }
 
+/// Expected pushdown behavior
+#[derive(Debug, Clone, Copy)]
+enum PageIndexFilteringExpected {
+    /// How many pages were expected to be pruned
+    None,
+    /// Expected that more than 0 were pruned
+    Some,
+}
+
 /// parameters for running a test
 struct TestCase<'a> {
     test_parquet_file: &'a TestParquetFile,
@@ -243,6 +328,10 @@ struct TestCase<'a> {
     filter: Expr,
     /// Did we expect the pushdown filtering to have filtered any rows?
     pushdown_expected: PushdownExpected,
+
+    /// Did we expect page filtering to filter out pages
+    page_index_filtering_expected: PageIndexFilteringExpected,
+
     /// How many rows are expected to pass the predicate overall?
     expected_rows: usize,
 }
@@ -255,6 +344,7 @@ impl<'a> TestCase<'a> {
             // default to a filter that passes everything
             filter: lit(true),
             pushdown_expected: PushdownExpected::None,
+            page_index_filtering_expected: PageIndexFilteringExpected::None,
             expected_rows: 0,
         }
     }
@@ -271,8 +361,17 @@ impl<'a> TestCase<'a> {
     }
 
     /// Set the expected predicate pushdown
-    fn with_pushdown_expected(mut self, pushdown_expected: PushdownExpected) -> Self {
-        self.pushdown_expected = pushdown_expected;
+    fn with_pushdown_expected(mut self, v: PushdownExpected) -> Self {
+        self.pushdown_expected = v;
+        self
+    }
+
+    /// Set the expected page filtering
+    fn with_page_index_filtering_expected(
+        mut self,
+        v: PageIndexFilteringExpected,
+    ) -> Self {
+        self.page_index_filtering_expected = v;
         self
     }
 
@@ -307,8 +406,6 @@ impl<'a> TestCase<'a> {
                     reorder_filters: false,
                     enable_page_index: false,
                 },
-                // always expect no pushdown
-                PushdownExpected::None,
                 filter,
             )
             .await;
@@ -320,7 +417,6 @@ impl<'a> TestCase<'a> {
                     reorder_filters: false,
                     enable_page_index: false,
                 },
-                self.pushdown_expected,
                 filter,
             )
             .await;
@@ -334,7 +430,6 @@ impl<'a> TestCase<'a> {
                     reorder_filters: true,
                     enable_page_index: false,
                 },
-                self.pushdown_expected,
                 filter,
             )
             .await;
@@ -348,8 +443,6 @@ impl<'a> TestCase<'a> {
                     reorder_filters: false,
                     enable_page_index: true,
                 },
-                // pushdown is off so no pushdown is expected
-                PushdownExpected::None,
                 filter,
             )
             .await;
@@ -362,7 +455,6 @@ impl<'a> TestCase<'a> {
                     reorder_filters: true,
                     enable_page_index: true,
                 },
-                self.pushdown_expected,
                 filter,
             )
             .await;
@@ -374,7 +466,6 @@ impl<'a> TestCase<'a> {
     async fn read_with_options(
         &self,
         scan_options: ParquetScanOptions,
-        pushdown_expected: PushdownExpected,
         filter: &Expr,
     ) -> RecordBatch {
         println!("  scan options: {scan_options:?}");
@@ -404,6 +495,14 @@ impl<'a> TestCase<'a> {
         // verify expected pushdown
         let metrics =
             TestParquetFile::parquet_metrics(exec).expect("found parquet metrics");
+
+        let pushdown_expected = if scan_options.pushdown_filters {
+            self.pushdown_expected
+        } else {
+            // if filter pushdown is not enabled we don't expect it to filter rows
+            PushdownExpected::None
+        };
+
         let pushdown_rows_filtered = get_value(&metrics, "pushdown_rows_filtered");
         println!("  pushdown_rows_filtered: {}", pushdown_rows_filtered);
 
@@ -419,6 +518,29 @@ impl<'a> TestCase<'a> {
             }
         };
 
+        let page_index_rows_filtered = get_value(&metrics, "page_index_rows_filtered");
+        println!(" page_index_rows_filtered: {}", page_index_rows_filtered);
+
+        let page_index_filtering_expected = if scan_options.enable_page_index {
+            self.page_index_filtering_expected
+        } else {
+            // if page index filtering is not enabled, don't expect it
+            // to filter rows
+            PageIndexFilteringExpected::None
+        };
+
+        match page_index_filtering_expected {
+            PageIndexFilteringExpected::None => {
+                assert_eq!(page_index_rows_filtered, 0);
+            }
+            PageIndexFilteringExpected::Some => {
+                assert!(
+                    page_index_rows_filtered > 0,
+                    "Expected to filter rows via page index but none were",
+                );
+            }
+        };
+
         batch
     }
 }
diff --git a/parquet-test-utils/src/lib.rs b/parquet-test-utils/src/lib.rs
index 6a3e6e0ec..41066584e 100644
--- a/parquet-test-utils/src/lib.rs
+++ b/parquet-test-utils/src/lib.rs
@@ -51,7 +51,7 @@ pub struct TestParquetFile {
     object_meta: ObjectMeta,
 }
 
-#[derive(Debug, Clone)]
+#[derive(Debug, Clone, Copy)]
 pub struct ParquetScanOptions {
     pub pushdown_filters: bool,
     pub reorder_filters: bool,
@@ -59,34 +59,20 @@ pub struct ParquetScanOptions {
 }
 
 impl TestParquetFile {
-    /// Creates a new parquet file at the specified location
+    /// Creates a new parquet file at the specified location with the
+    /// given properties
     pub fn try_new(
         path: PathBuf,
+        props: WriterProperties,
         batches: impl IntoIterator<Item = RecordBatch>,
-        page_size: Option<usize>,
-        row_group_size: Option<usize>,
     ) -> Result<Self> {
         let file = File::create(&path).unwrap();
 
-        let mut props_builder = WriterProperties::builder();
-
-        if let Some(s) = page_size {
-            props_builder = props_builder
-                .set_data_pagesize_limit(s)
-                .set_write_batch_size(s);
-        }
-
-        if let Some(s) = row_group_size {
-            props_builder = props_builder.set_max_row_group_size(s);
-        }
-
         let mut batches = batches.into_iter();
         let first_batch = batches.next().expect("need at least one record batch");
         let schema = first_batch.schema();
 
-        let mut writer =
-            ArrowWriter::try_new(file, schema.clone(), Some(props_builder.build()))
-                .unwrap();
+        let mut writer = ArrowWriter::try_new(file, schema.clone(), Some(props)).unwrap();
 
         writer.write(&first_batch).unwrap();
         writer.flush()?;
@@ -122,7 +108,9 @@ impl TestParquetFile {
             object_meta,
         })
     }
+}
 
+impl TestParquetFile {
     /// return a `ParquetExec` and `FilterExec` with the specified options to scan this parquet file.
     ///
     /// This returns the same plan that DataFusion will make with a pushed down predicate followed by a filter: