You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/11 11:14:37 UTC

[arrow-datafusion] branch main updated: [parquet] Avoid read parquet index when there is no filter pushdown. (#6317)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new a07d6ebc9f [parquet] Avoid read parquet index when there is no filter pushdown. (#6317)
a07d6ebc9f is described below

commit a07d6ebc9f31fb65bff4d5c66536ed3485d6db78
Author: Yang Jiang <ya...@ebay.com>
AuthorDate: Thu May 11 19:14:30 2023 +0800

    [parquet] Avoid read parquet index when there is no filter pushdown. (#6317)
    
    * [parquet] Avoid read parquet index when there is no filter pushdown.
    
    * fix clippy
    
    * fix fmt
    
    * fix fmt2
---
 .../core/src/physical_plan/file_format/parquet.rs  | 17 +++++++++-
 .../file_format/parquet/page_filter.rs             |  5 +++
 datafusion/core/tests/parquet/page_pruning.rs      | 39 ++++++++++++++++++++++
 3 files changed, 60 insertions(+), 1 deletion(-)

diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 641be002b6..9843ecc22e 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -482,7 +482,10 @@ impl FileOpener for ParquetOpener {
         let table_schema = self.table_schema.clone();
         let reorder_predicates = self.reorder_filters;
         let pushdown_filters = self.pushdown_filters;
-        let enable_page_index = self.enable_page_index;
+        let enable_page_index = should_enable_page_index(
+            self.enable_page_index,
+            &self.page_pruning_predicate,
+        );
         let limit = self.limit;
 
         Ok(Box::pin(async move {
@@ -572,6 +575,18 @@ impl FileOpener for ParquetOpener {
     }
 }
 
+fn should_enable_page_index(
+    enable_page_index: bool,
+    page_pruning_predicate: &Option<Arc<PagePruningPredicate>>,
+) -> bool {
+    enable_page_index
+        && page_pruning_predicate.is_some()
+        && page_pruning_predicate
+            .as_ref()
+            .map(|p| p.filter_number() > 0)
+            .unwrap_or(false)
+}
+
 /// Factory of parquet file readers.
 ///
 /// Provides means to implement custom data access interface.
diff --git a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
index 410c43c57d..00e55c41ad 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
@@ -222,6 +222,11 @@ impl PagePruningPredicate {
         file_metrics.page_index_rows_filtered.add(total_skip);
         Ok(Some(final_selection))
     }
+
+    /// Returns the number of filters in the [`PagePruningPredicate`]
+    pub fn filter_number(&self) -> usize {
+        self.predicates.len()
+    }
 }
 
 /// Returns the column index in the row group metadata for the single
diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs
index baf9d2d36a..1d444326bb 100644
--- a/datafusion/core/tests/parquet/page_pruning.rs
+++ b/datafusion/core/tests/parquet/page_pruning.rs
@@ -23,6 +23,7 @@ use datafusion::datasource::listing::PartitionedFile;
 use datafusion::datasource::object_store::ObjectStoreUrl;
 use datafusion::execution::context::SessionState;
 use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
+use datafusion::physical_plan::metrics::MetricValue;
 use datafusion::physical_plan::ExecutionPlan;
 use datafusion::prelude::SessionContext;
 use datafusion_common::{ScalarValue, Statistics, ToDFSchema};
@@ -714,3 +715,41 @@ async fn prune_decimal_in_list() {
     )
     .await;
 }
+
+#[tokio::test]
+async fn without_pushdown_filter() {
+    let mut context = ContextWithParquet::new(Scenario::Timestamps, Page).await;
+
+    let output1 = context.query("SELECT * FROM t").await;
+
+    let mut context = ContextWithParquet::new(Scenario::Timestamps, Page).await;
+
+    let output2 = context
+        .query("SELECT * FROM t where nanos < to_timestamp('2023-01-02 01:01:11Z')")
+        .await;
+
+    let bytes_scanned_without_filter = cast_count_metric(
+        output1
+            .parquet_metrics
+            .sum_by_name("bytes_scanned")
+            .unwrap(),
+    )
+    .unwrap();
+    let bytes_scanned_with_filter = cast_count_metric(
+        output2
+            .parquet_metrics
+            .sum_by_name("bytes_scanned")
+            .unwrap(),
+    )
+    .unwrap();
+
+    // Without filter will not read pageIndex.
+    assert!(bytes_scanned_with_filter > bytes_scanned_without_filter);
+}
+
+fn cast_count_metric(metric: MetricValue) -> Option<usize> {
+    match metric {
+        MetricValue::Count { count, .. } => Some(count.value()),
+        _ => None,
+    }
+}