You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/18 16:19:35 UTC
[arrow-datafusion] branch master updated: Skip useless pruning predicates in `ParquetExec` (#4280)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new a584ff553 Skip useless pruning predicates in `ParquetExec` (#4280)
a584ff553 is described below
commit a584ff553a13aa2ab85a53ad610dcbf9362db9fe
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Fri Nov 18 11:19:29 2022 -0500
Skip useless pruning predicates in `ParquetExec` (#4280)
* Skip useless pruning predicates
* Update datafusion/core/src/physical_plan/file_format/parquet.rs
Co-authored-by: Daniël Heres <da...@gmail.com>
Co-authored-by: Daniël Heres <da...@gmail.com>
---
datafusion/core/src/physical_optimizer/pruning.rs | 10 ++++
.../core/src/physical_plan/file_format/parquet.rs | 68 ++++++++++++++++++----
2 files changed, 66 insertions(+), 12 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs
index 170911096..ce525d2cf 100644
--- a/datafusion/core/src/physical_optimizer/pruning.rs
+++ b/datafusion/core/src/physical_optimizer/pruning.rs
@@ -50,6 +50,7 @@ use datafusion_expr::expr::{BinaryExpr, Cast};
use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter};
use datafusion_expr::{binary_expr, cast, try_cast, ExprSchemable};
use datafusion_physical_expr::create_physical_expr;
+use datafusion_physical_expr::expressions::Literal;
use log::trace;
/// Interface to pass statistics information to [`PruningPredicate`]
@@ -224,6 +225,15 @@ impl PruningPredicate {
&self.predicate_expr
}
+ /// Returns true if this pruning predicate is "always true" (aka will not prune anything)
+ pub fn allways_true(&self) -> bool {
+ self.predicate_expr
+ .as_any()
+ .downcast_ref::<Literal>()
+ .map(|l| matches!(l.value(), ScalarValue::Boolean(Some(true))))
+ .unwrap_or_default()
+ }
+
/// Returns all need column indexes to evaluate this pruning predicate
pub(crate) fn need_input_columns_ids(&self) -> HashSet<usize> {
let mut set = HashSet::new();
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index f5f3fcac6..55f9219c5 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -97,19 +97,28 @@ impl ParquetExec {
let predicate_creation_errors =
MetricBuilder::new(&metrics).global_counter("num_predicate_creation_errors");
- let pruning_predicate = predicate.and_then(|predicate_expr| {
- match PruningPredicate::try_new(
- predicate_expr,
- base_config.file_schema.clone(),
- ) {
- Ok(pruning_predicate) => Some(pruning_predicate),
- Err(e) => {
- debug!("Could not create pruning predicate for: {}", e);
- predicate_creation_errors.add(1);
+ let pruning_predicate = predicate
+ .and_then(|predicate_expr| {
+ match PruningPredicate::try_new(
+ predicate_expr,
+ base_config.file_schema.clone(),
+ ) {
+ Ok(pruning_predicate) => Some(pruning_predicate),
+ Err(e) => {
+ debug!("Could not create pruning predicate for: {}", e);
+ predicate_creation_errors.add(1);
+ None
+ }
+ }
+ })
+ .and_then(|pruning_predicate| {
+ // If the pruning predicate can't prune anything, don't try
+ if pruning_predicate.allways_true() {
None
+ } else {
+ Some(pruning_predicate)
}
- }
- });
+ });
let (projected_schema, projected_statistics) = base_config.project();
@@ -680,7 +689,7 @@ mod tests {
use chrono::{TimeZone, Utc};
use datafusion_common::assert_contains;
use datafusion_common::ScalarValue;
- use datafusion_expr::{col, lit};
+ use datafusion_expr::{col, lit, when};
use futures::StreamExt;
use object_store::local::LocalFileSystem;
use object_store::path::Path;
@@ -1544,6 +1553,10 @@ mod tests {
let rt = round_trip(vec![batch1], None, None, Some(filter), true, false).await;
+ // should have a pruning predicate
+ let pruning_predicate = &rt.parquet_exec.pruning_predicate;
+ assert!(pruning_predicate.is_some());
+
// convert to explain plan form
let display = displayable(rt.parquet_exec.as_ref()).indent().to_string();
@@ -1554,6 +1567,37 @@ mod tests {
assert_contains!(&display, "projection=[c1]");
}
+ #[tokio::test]
+ async fn parquet_exec_skip_empty_pruning() {
+ let c1: ArrayRef = Arc::new(StringArray::from(vec![
+ Some("Foo"),
+ None,
+ Some("bar"),
+ Some("bar"),
+ Some("bar"),
+ Some("bar"),
+ Some("zzz"),
+ ]));
+
+ // batch1: c1(string)
+ let batch1 = create_batch(vec![("c1", c1.clone())]);
+
+ // filter is too complicated for pruning
+ let filter = when(col("c1").not_eq(lit("bar")), lit(true))
+ .otherwise(lit(false))
+ .unwrap();
+
+ let rt = round_trip(vec![batch1], None, None, Some(filter), true, false).await;
+
+ // Should not contain a pruning predicate
+ let pruning_predicate = &rt.parquet_exec.pruning_predicate;
+ assert!(
+ pruning_predicate.is_none(),
+ "Still had pruning predicate: {:?}",
+ pruning_predicate
+ )
+ }
+
/// returns the sum of all the metrics with the specified name
/// the returned set.
///