You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2023/05/11 11:14:37 UTC
[arrow-datafusion] branch main updated: [parquet] Avoid read parquet index when there is no filter pushdown. (#6317)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/main by this push:
new a07d6ebc9f [parquet] Avoid read parquet index when there is no filter pushdown. (#6317)
a07d6ebc9f is described below
commit a07d6ebc9f31fb65bff4d5c66536ed3485d6db78
Author: Yang Jiang <ya...@ebay.com>
AuthorDate: Thu May 11 19:14:30 2023 +0800
[parquet] Avoid read parquet index when there is no filter pushdown. (#6317)
* [parquet] Avoid read parquet index when there is no filter pushdown.
* fix clippy
* fix fmt
* fix fmt2
---
.../core/src/physical_plan/file_format/parquet.rs | 17 +++++++++-
.../file_format/parquet/page_filter.rs | 5 +++
datafusion/core/tests/parquet/page_pruning.rs | 39 ++++++++++++++++++++++
3 files changed, 60 insertions(+), 1 deletion(-)
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 641be002b6..9843ecc22e 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -482,7 +482,10 @@ impl FileOpener for ParquetOpener {
let table_schema = self.table_schema.clone();
let reorder_predicates = self.reorder_filters;
let pushdown_filters = self.pushdown_filters;
- let enable_page_index = self.enable_page_index;
+ let enable_page_index = should_enable_page_index(
+ self.enable_page_index,
+ &self.page_pruning_predicate,
+ );
let limit = self.limit;
Ok(Box::pin(async move {
@@ -572,6 +575,18 @@ impl FileOpener for ParquetOpener {
}
}
+fn should_enable_page_index(
+ enable_page_index: bool,
+ page_pruning_predicate: &Option<Arc<PagePruningPredicate>>,
+) -> bool {
+ enable_page_index
+ && page_pruning_predicate.is_some()
+ && page_pruning_predicate
+ .as_ref()
+ .map(|p| p.filter_number() > 0)
+ .unwrap_or(false)
+}
+
/// Factory of parquet file readers.
///
/// Provides means to implement custom data access interface.
diff --git a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
index 410c43c57d..00e55c41ad 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/page_filter.rs
@@ -222,6 +222,11 @@ impl PagePruningPredicate {
file_metrics.page_index_rows_filtered.add(total_skip);
Ok(Some(final_selection))
}
+
+ /// Returns the number of filters in the [`PagePruningPredicate`]
+ pub fn filter_number(&self) -> usize {
+ self.predicates.len()
+ }
}
/// Returns the column index in the row group metadata for the single
diff --git a/datafusion/core/tests/parquet/page_pruning.rs b/datafusion/core/tests/parquet/page_pruning.rs
index baf9d2d36a..1d444326bb 100644
--- a/datafusion/core/tests/parquet/page_pruning.rs
+++ b/datafusion/core/tests/parquet/page_pruning.rs
@@ -23,6 +23,7 @@ use datafusion::datasource::listing::PartitionedFile;
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::execution::context::SessionState;
use datafusion::physical_plan::file_format::{FileScanConfig, ParquetExec};
+use datafusion::physical_plan::metrics::MetricValue;
use datafusion::physical_plan::ExecutionPlan;
use datafusion::prelude::SessionContext;
use datafusion_common::{ScalarValue, Statistics, ToDFSchema};
@@ -714,3 +715,41 @@ async fn prune_decimal_in_list() {
)
.await;
}
+
+#[tokio::test]
+async fn without_pushdown_filter() {
+ let mut context = ContextWithParquet::new(Scenario::Timestamps, Page).await;
+
+ let output1 = context.query("SELECT * FROM t").await;
+
+ let mut context = ContextWithParquet::new(Scenario::Timestamps, Page).await;
+
+ let output2 = context
+ .query("SELECT * FROM t where nanos < to_timestamp('2023-01-02 01:01:11Z')")
+ .await;
+
+ let bytes_scanned_without_filter = cast_count_metric(
+ output1
+ .parquet_metrics
+ .sum_by_name("bytes_scanned")
+ .unwrap(),
+ )
+ .unwrap();
+ let bytes_scanned_with_filter = cast_count_metric(
+ output2
+ .parquet_metrics
+ .sum_by_name("bytes_scanned")
+ .unwrap(),
+ )
+ .unwrap();
+
+ // Without filter will not read pageIndex.
+ assert!(bytes_scanned_with_filter > bytes_scanned_without_filter);
+}
+
+fn cast_count_metric(metric: MetricValue) -> Option<usize> {
+ match metric {
+ MetricValue::Count { count, .. } => Some(count.value()),
+ _ => None,
+ }
+}