You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by xu...@apache.org on 2022/11/22 10:38:56 UTC

[arrow-datafusion] branch master updated: Reduce Expr copies in parquet exec (#4283)

This is an automated email from the ASF dual-hosted git repository.

xudong963 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new eac254c0e Reduce Expr copies in parquet exec (#4283)
eac254c0e is described below

commit eac254c0ef347c5bd12e8f6401a6fb0113090294
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Tue Nov 22 05:38:50 2022 -0500

    Reduce Expr copies in parquet exec (#4283)
---
 .../core/src/physical_plan/file_format/parquet.rs  | 23 ++++++++++++---------
 .../file_format/parquet/row_filter.rs              |  8 ++++----
 .../file_format/parquet/row_groups.rs              | 24 +++++++++++-----------
 3 files changed, 29 insertions(+), 26 deletions(-)

diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index e62cdf323..2ba8203a5 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -76,9 +76,9 @@ pub struct ParquetExec {
     /// Execution metrics
     metrics: ExecutionPlanMetricsSet,
     /// Optional predicate for row filtering during parquet scan
-    predicate: Option<Expr>,
+    predicate: Option<Arc<Expr>>,
     /// Optional predicate for pruning row groups
-    pruning_predicate: Option<PruningPredicate>,
+    pruning_predicate: Option<Arc<PruningPredicate>>,
     /// Optional hint for the size of the parquet metadata
     metadata_size_hint: Option<usize>,
     /// Optional user defined parquet file reader factory
@@ -106,7 +106,7 @@ impl ParquetExec {
                     predicate_expr,
                     base_config.file_schema.clone(),
                 ) {
-                    Ok(pruning_predicate) => Some(pruning_predicate),
+                    Ok(pruning_predicate) => Some(Arc::new(pruning_predicate)),
                     Err(e) => {
                         debug!("Could not create pruning predicate for: {}", e);
                         predicate_creation_errors.add(1);
@@ -123,6 +123,9 @@ impl ParquetExec {
                 }
             });
 
+        // Save original predicate
+        let predicate = predicate.map(Arc::new);
+
         let (projected_schema, projected_statistics) = base_config.project();
 
         Self {
@@ -143,7 +146,7 @@ impl ParquetExec {
     }
 
     /// Optional reference to this parquet scan's pruning predicate
-    pub fn pruning_predicate(&self) -> Option<&PruningPredicate> {
+    pub fn pruning_predicate(&self) -> Option<&Arc<PruningPredicate>> {
         self.pruning_predicate.as_ref()
     }
 
@@ -376,8 +379,8 @@ struct ParquetOpener {
     partition_index: usize,
     projection: Arc<[usize]>,
     batch_size: usize,
-    predicate: Option<Expr>,
-    pruning_predicate: Option<PruningPredicate>,
+    predicate: Option<Arc<Expr>>,
+    pruning_predicate: Option<Arc<PruningPredicate>>,
     table_schema: SchemaRef,
     metadata_size_hint: Option<usize>,
     metrics: ExecutionPlanMetricsSet,
@@ -435,7 +438,7 @@ impl FileOpener for ParquetOpener {
             // Filter pushdown: evaluate predicates during scan
             if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() {
                 let row_filter = row_filter::build_row_filter(
-                    predicate.clone(),
+                    predicate.as_ref(),
                     builder.schema().as_ref(),
                     table_schema.as_ref(),
                     builder.metadata(),
@@ -463,7 +466,7 @@ impl FileOpener for ParquetOpener {
             let row_groups = row_groups::prune_row_groups(
                 file_metadata.row_groups(),
                 file_range,
-                pruning_predicate.clone(),
+                pruning_predicate.as_ref().map(|p| p.as_ref()),
                 &file_metrics,
             );
 
@@ -473,7 +476,7 @@ impl FileOpener for ParquetOpener {
             if let Some(row_selection) = (enable_page_index && !row_groups.is_empty())
                 .then(|| {
                     page_filter::build_page_filter(
-                        pruning_predicate.as_ref(),
+                        pruning_predicate.as_ref().map(|p| p.as_ref()),
                         builder.schema().clone(),
                         &row_groups,
                         file_metadata.as_ref(),
@@ -1615,7 +1618,7 @@ mod tests {
 
         // but does still has a pushdown down predicate
         let predicate = rt.parquet_exec.predicate.as_ref();
-        assert_eq!(predicate, Some(&filter));
+        assert_eq!(predicate.unwrap().as_ref(), &filter);
     }
 
     /// returns the sum of all the metrics with the specified name
diff --git a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
index 5b193c4f8..a41ea8a05 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
@@ -25,7 +25,7 @@ use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter, RewriteRecurs
 use std::collections::BTreeSet;
 
 use datafusion_expr::Expr;
-use datafusion_optimizer::utils::split_conjunction_owned;
+use datafusion_optimizer::utils::split_conjunction;
 use datafusion_physical_expr::execution_props::ExecutionProps;
 use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
 use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
@@ -309,7 +309,7 @@ fn columns_sorted(
 
 /// Build a [`RowFilter`] from the given predicate `Expr`
 pub fn build_row_filter(
-    expr: Expr,
+    expr: &Expr,
     file_schema: &Schema,
     table_schema: &Schema,
     metadata: &ParquetMetaData,
@@ -319,13 +319,13 @@ pub fn build_row_filter(
     let rows_filtered = &file_metrics.pushdown_rows_filtered;
     let time = &file_metrics.pushdown_eval_time;
 
-    let predicates = split_conjunction_owned(expr);
+    let predicates = split_conjunction(expr);
 
     let mut candidates: Vec<FilterCandidate> = predicates
         .into_iter()
         .flat_map(|expr| {
             if let Ok(candidate) =
-                FilterCandidateBuilder::new(expr, file_schema, table_schema)
+                FilterCandidateBuilder::new(expr.clone(), file_schema, table_schema)
                     .build(metadata)
             {
                 candidate
diff --git a/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs b/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs
index c7a281198..dab2a4226 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs
@@ -39,7 +39,7 @@ use super::ParquetFileMetrics;
 pub(crate) fn prune_row_groups(
     groups: &[RowGroupMetaData],
     range: Option<FileRange>,
-    predicate: Option<PruningPredicate>,
+    predicate: Option<&PruningPredicate>,
     metrics: &ParquetFileMetrics,
 ) -> Vec<usize> {
     let mut filtered = Vec::with_capacity(groups.len());
@@ -51,7 +51,7 @@ pub(crate) fn prune_row_groups(
             }
         }
 
-        if let Some(predicate) = &predicate {
+        if let Some(predicate) = predicate {
             let pruning_stats = RowGroupPruningStatistics {
                 row_group_metadata: metadata,
                 parquet_schema: predicate.schema().as_ref(),
@@ -297,7 +297,7 @@ mod tests {
 
         let metrics = parquet_file_metrics();
         assert_eq!(
-            prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
+            prune_row_groups(&[rgm1, rgm2], None, Some(&pruning_predicate), &metrics),
             vec![1]
         );
     }
@@ -331,7 +331,7 @@ mod tests {
         // missing statistics for first row group mean that the result from the predicate expression
         // is null / undefined so the first row group can't be filtered out
         assert_eq!(
-            prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
+            prune_row_groups(&[rgm1, rgm2], None, Some(&pruning_predicate), &metrics),
             vec![0, 1]
         );
     }
@@ -372,7 +372,7 @@ mod tests {
         // the first row group is still filtered out because the predicate expression can be partially evaluated
         // when conditions are joined using AND
         assert_eq!(
-            prune_row_groups(groups, None, Some(pruning_predicate), &metrics),
+            prune_row_groups(groups, None, Some(&pruning_predicate), &metrics),
             vec![1]
         );
 
@@ -384,7 +384,7 @@ mod tests {
         // if conditions in predicate are joined with OR and an unsupported expression is used
         // this bypasses the entire predicate expression and no row groups are filtered out
         assert_eq!(
-            prune_row_groups(groups, None, Some(pruning_predicate), &metrics),
+            prune_row_groups(groups, None, Some(&pruning_predicate), &metrics),
             vec![0, 1]
         );
     }
@@ -426,7 +426,7 @@ mod tests {
         let metrics = parquet_file_metrics();
         // First row group was filtered out because it contains no null value on "c2".
         assert_eq!(
-            prune_row_groups(&groups, None, Some(pruning_predicate), &metrics),
+            prune_row_groups(&groups, None, Some(&pruning_predicate), &metrics),
             vec![1]
         );
     }
@@ -451,7 +451,7 @@ mod tests {
         // bool = NULL always evaluates to NULL (and thus will not
         // pass predicates. Ideally these should both be false
         assert_eq!(
-            prune_row_groups(&groups, None, Some(pruning_predicate), &metrics),
+            prune_row_groups(&groups, None, Some(&pruning_predicate), &metrics),
             vec![1]
         );
     }
@@ -500,7 +500,7 @@ mod tests {
         );
         let metrics = parquet_file_metrics();
         assert_eq!(
-            prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
+            prune_row_groups(&[rgm1, rgm2], None, Some(&pruning_predicate), &metrics),
             vec![0]
         );
 
@@ -556,7 +556,7 @@ mod tests {
             prune_row_groups(
                 &[rgm1, rgm2, rgm3],
                 None,
-                Some(pruning_predicate),
+                Some(&pruning_predicate),
                 &metrics
             ),
             vec![0, 1]
@@ -597,7 +597,7 @@ mod tests {
         );
         let metrics = parquet_file_metrics();
         assert_eq!(
-            prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
+            prune_row_groups(&[rgm1, rgm2], None, Some(&pruning_predicate), &metrics),
             vec![1]
         );
 
@@ -656,7 +656,7 @@ mod tests {
         );
         let metrics = parquet_file_metrics();
         assert_eq!(
-            prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
+            prune_row_groups(&[rgm1, rgm2], None, Some(&pruning_predicate), &metrics),
             vec![1]
         );