You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by xu...@apache.org on 2022/11/22 10:38:56 UTC
[arrow-datafusion] branch master updated: Reduce Expr copies in parquet exec (#4283)
This is an automated email from the ASF dual-hosted git repository.
xudong963 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new eac254c0e Reduce Expr copies in parquet exec (#4283)
eac254c0e is described below
commit eac254c0ef347c5bd12e8f6401a6fb0113090294
Author: Andrew Lamb <an...@nerdnetworks.org>
AuthorDate: Tue Nov 22 05:38:50 2022 -0500
Reduce Expr copies in parquet exec (#4283)
---
.../core/src/physical_plan/file_format/parquet.rs | 23 ++++++++++++---------
.../file_format/parquet/row_filter.rs | 8 ++++----
.../file_format/parquet/row_groups.rs | 24 +++++++++++-----------
3 files changed, 29 insertions(+), 26 deletions(-)
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index e62cdf323..2ba8203a5 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -76,9 +76,9 @@ pub struct ParquetExec {
/// Execution metrics
metrics: ExecutionPlanMetricsSet,
/// Optional predicate for row filtering during parquet scan
- predicate: Option<Expr>,
+ predicate: Option<Arc<Expr>>,
/// Optional predicate for pruning row groups
- pruning_predicate: Option<PruningPredicate>,
+ pruning_predicate: Option<Arc<PruningPredicate>>,
/// Optional hint for the size of the parquet metadata
metadata_size_hint: Option<usize>,
/// Optional user defined parquet file reader factory
@@ -106,7 +106,7 @@ impl ParquetExec {
predicate_expr,
base_config.file_schema.clone(),
) {
- Ok(pruning_predicate) => Some(pruning_predicate),
+ Ok(pruning_predicate) => Some(Arc::new(pruning_predicate)),
Err(e) => {
debug!("Could not create pruning predicate for: {}", e);
predicate_creation_errors.add(1);
@@ -123,6 +123,9 @@ impl ParquetExec {
}
});
+ // Save original predicate
+ let predicate = predicate.map(Arc::new);
+
let (projected_schema, projected_statistics) = base_config.project();
Self {
@@ -143,7 +146,7 @@ impl ParquetExec {
}
/// Optional reference to this parquet scan's pruning predicate
- pub fn pruning_predicate(&self) -> Option<&PruningPredicate> {
+ pub fn pruning_predicate(&self) -> Option<&Arc<PruningPredicate>> {
self.pruning_predicate.as_ref()
}
@@ -376,8 +379,8 @@ struct ParquetOpener {
partition_index: usize,
projection: Arc<[usize]>,
batch_size: usize,
- predicate: Option<Expr>,
- pruning_predicate: Option<PruningPredicate>,
+ predicate: Option<Arc<Expr>>,
+ pruning_predicate: Option<Arc<PruningPredicate>>,
table_schema: SchemaRef,
metadata_size_hint: Option<usize>,
metrics: ExecutionPlanMetricsSet,
@@ -435,7 +438,7 @@ impl FileOpener for ParquetOpener {
// Filter pushdown: evaluate predicates during scan
if let Some(predicate) = pushdown_filters.then_some(predicate).flatten() {
let row_filter = row_filter::build_row_filter(
- predicate.clone(),
+ predicate.as_ref(),
builder.schema().as_ref(),
table_schema.as_ref(),
builder.metadata(),
@@ -463,7 +466,7 @@ impl FileOpener for ParquetOpener {
let row_groups = row_groups::prune_row_groups(
file_metadata.row_groups(),
file_range,
- pruning_predicate.clone(),
+ pruning_predicate.as_ref().map(|p| p.as_ref()),
&file_metrics,
);
@@ -473,7 +476,7 @@ impl FileOpener for ParquetOpener {
if let Some(row_selection) = (enable_page_index && !row_groups.is_empty())
.then(|| {
page_filter::build_page_filter(
- pruning_predicate.as_ref(),
+ pruning_predicate.as_ref().map(|p| p.as_ref()),
builder.schema().clone(),
&row_groups,
file_metadata.as_ref(),
@@ -1615,7 +1618,7 @@ mod tests {
// but does still has a pushdown down predicate
let predicate = rt.parquet_exec.predicate.as_ref();
- assert_eq!(predicate, Some(&filter));
+ assert_eq!(predicate.unwrap().as_ref(), &filter);
}
/// returns the sum of all the metrics with the specified name
diff --git a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
index 5b193c4f8..a41ea8a05 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
@@ -25,7 +25,7 @@ use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter, RewriteRecurs
use std::collections::BTreeSet;
use datafusion_expr::Expr;
-use datafusion_optimizer::utils::split_conjunction_owned;
+use datafusion_optimizer::utils::split_conjunction;
use datafusion_physical_expr::execution_props::ExecutionProps;
use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
use parquet::arrow::arrow_reader::{ArrowPredicate, RowFilter};
@@ -309,7 +309,7 @@ fn columns_sorted(
/// Build a [`RowFilter`] from the given predicate `Expr`
pub fn build_row_filter(
- expr: Expr,
+ expr: &Expr,
file_schema: &Schema,
table_schema: &Schema,
metadata: &ParquetMetaData,
@@ -319,13 +319,13 @@ pub fn build_row_filter(
let rows_filtered = &file_metrics.pushdown_rows_filtered;
let time = &file_metrics.pushdown_eval_time;
- let predicates = split_conjunction_owned(expr);
+ let predicates = split_conjunction(expr);
let mut candidates: Vec<FilterCandidate> = predicates
.into_iter()
.flat_map(|expr| {
if let Ok(candidate) =
- FilterCandidateBuilder::new(expr, file_schema, table_schema)
+ FilterCandidateBuilder::new(expr.clone(), file_schema, table_schema)
.build(metadata)
{
candidate
diff --git a/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs b/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs
index c7a281198..dab2a4226 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/row_groups.rs
@@ -39,7 +39,7 @@ use super::ParquetFileMetrics;
pub(crate) fn prune_row_groups(
groups: &[RowGroupMetaData],
range: Option<FileRange>,
- predicate: Option<PruningPredicate>,
+ predicate: Option<&PruningPredicate>,
metrics: &ParquetFileMetrics,
) -> Vec<usize> {
let mut filtered = Vec::with_capacity(groups.len());
@@ -51,7 +51,7 @@ pub(crate) fn prune_row_groups(
}
}
- if let Some(predicate) = &predicate {
+ if let Some(predicate) = predicate {
let pruning_stats = RowGroupPruningStatistics {
row_group_metadata: metadata,
parquet_schema: predicate.schema().as_ref(),
@@ -297,7 +297,7 @@ mod tests {
let metrics = parquet_file_metrics();
assert_eq!(
- prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
+ prune_row_groups(&[rgm1, rgm2], None, Some(&pruning_predicate), &metrics),
vec![1]
);
}
@@ -331,7 +331,7 @@ mod tests {
// missing statistics for first row group mean that the result from the predicate expression
// is null / undefined so the first row group can't be filtered out
assert_eq!(
- prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
+ prune_row_groups(&[rgm1, rgm2], None, Some(&pruning_predicate), &metrics),
vec![0, 1]
);
}
@@ -372,7 +372,7 @@ mod tests {
// the first row group is still filtered out because the predicate expression can be partially evaluated
// when conditions are joined using AND
assert_eq!(
- prune_row_groups(groups, None, Some(pruning_predicate), &metrics),
+ prune_row_groups(groups, None, Some(&pruning_predicate), &metrics),
vec![1]
);
@@ -384,7 +384,7 @@ mod tests {
// if conditions in predicate are joined with OR and an unsupported expression is used
// this bypasses the entire predicate expression and no row groups are filtered out
assert_eq!(
- prune_row_groups(groups, None, Some(pruning_predicate), &metrics),
+ prune_row_groups(groups, None, Some(&pruning_predicate), &metrics),
vec![0, 1]
);
}
@@ -426,7 +426,7 @@ mod tests {
let metrics = parquet_file_metrics();
// First row group was filtered out because it contains no null value on "c2".
assert_eq!(
- prune_row_groups(&groups, None, Some(pruning_predicate), &metrics),
+ prune_row_groups(&groups, None, Some(&pruning_predicate), &metrics),
vec![1]
);
}
@@ -451,7 +451,7 @@ mod tests {
// bool = NULL always evaluates to NULL (and thus will not
// pass predicates. Ideally these should both be false
assert_eq!(
- prune_row_groups(&groups, None, Some(pruning_predicate), &metrics),
+ prune_row_groups(&groups, None, Some(&pruning_predicate), &metrics),
vec![1]
);
}
@@ -500,7 +500,7 @@ mod tests {
);
let metrics = parquet_file_metrics();
assert_eq!(
- prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
+ prune_row_groups(&[rgm1, rgm2], None, Some(&pruning_predicate), &metrics),
vec![0]
);
@@ -556,7 +556,7 @@ mod tests {
prune_row_groups(
&[rgm1, rgm2, rgm3],
None,
- Some(pruning_predicate),
+ Some(&pruning_predicate),
&metrics
),
vec![0, 1]
@@ -597,7 +597,7 @@ mod tests {
);
let metrics = parquet_file_metrics();
assert_eq!(
- prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
+ prune_row_groups(&[rgm1, rgm2], None, Some(&pruning_predicate), &metrics),
vec![1]
);
@@ -656,7 +656,7 @@ mod tests {
);
let metrics = parquet_file_metrics();
assert_eq!(
- prune_row_groups(&[rgm1, rgm2], None, Some(pruning_predicate), &metrics),
+ prune_row_groups(&[rgm1, rgm2], None, Some(&pruning_predicate), &metrics),
vec![1]
);