You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/07/19 15:44:05 UTC
[arrow-datafusion] branch master updated: step1 add option in
ExecutionConfig to enable/disable parquet pruning (#749)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 7019b0f step1 add option in ExecutionConfig to enable/disable parquet pruning (#749)
7019b0f is described below
commit 7019b0f3b6de025ed95f88492466663284ada8e7
Author: Cui Wenzheng <cu...@ymail.com>
AuthorDate: Mon Jul 19 23:43:57 2021 +0800
step1 add option in ExecutionConfig to enable/disable parquet pruning (#749)
---
datafusion/src/datasource/parquet.rs | 22 +++++++++++++++++++++-
datafusion/src/execution/context.rs | 18 ++++++++++++++----
datafusion/src/physical_optimizer/pruning.rs | 4 ++--
3 files changed, 37 insertions(+), 7 deletions(-)
diff --git a/datafusion/src/datasource/parquet.rs b/datafusion/src/datasource/parquet.rs
index e53fbbd..28f79a6 100644
--- a/datafusion/src/datasource/parquet.rs
+++ b/datafusion/src/datasource/parquet.rs
@@ -38,6 +38,7 @@ pub struct ParquetTable {
schema: SchemaRef,
statistics: Statistics,
max_concurrency: usize,
+ enable_pruning: bool,
}
impl ParquetTable {
@@ -51,6 +52,7 @@ impl ParquetTable {
schema,
statistics: parquet_exec.statistics().to_owned(),
max_concurrency,
+ enable_pruning: true,
})
}
@@ -58,6 +60,17 @@ impl ParquetTable {
pub fn path(&self) -> &str {
&self.path
}
+
+ /// Get parquet pruning option
+ pub fn get_enable_pruning(&self) -> bool {
+ self.enable_pruning
+ }
+
+ /// Set parquet pruning option
+ pub fn with_enable_pruning(mut self, enable_pruning: bool) -> Self {
+ self.enable_pruning = enable_pruning;
+ self
+ }
}
impl TableProvider for ParquetTable {
@@ -86,7 +99,14 @@ impl TableProvider for ParquetTable {
filters: &[Expr],
limit: Option<usize>,
) -> Result<Arc<dyn ExecutionPlan>> {
- let predicate = combine_filters(filters);
+ // If enable pruning then combine the filters to build the predicate.
+ // If disable pruning then set the predicate to None, thus readers
+ // will not prune data based on the statistics.
+ let predicate = if self.enable_pruning {
+ combine_filters(filters)
+ } else {
+ None
+ };
Ok(Arc::new(ParquetExec::try_from_path(
&self.path,
projection.clone(),
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index d4d3a8a..bd939ce 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -322,10 +322,11 @@ impl ExecutionContext {
/// Registers a Parquet data source so that it can be referenced from SQL statements
/// executed against this context.
pub fn register_parquet(&mut self, name: &str, filename: &str) -> Result<()> {
- let table = ParquetTable::try_new(
- filename,
- self.state.lock().unwrap().config.concurrency,
- )?;
+ let table = {
+ let m = self.state.lock().unwrap();
+ ParquetTable::try_new(filename, m.config.concurrency)?
+ .with_enable_pruning(m.config.parquet_pruning)
+ };
self.register_table(name, Arc::new(table))?;
Ok(())
}
@@ -633,6 +634,8 @@ pub struct ExecutionConfig {
/// Should DataFusion repartition data using the partition keys to execute window functions in
/// parallel using the provided `concurrency` level
pub repartition_windows: bool,
+ /// Should Datafusion parquet reader using the predicate to prune data
+ parquet_pruning: bool,
}
impl Default for ExecutionConfig {
@@ -663,6 +666,7 @@ impl Default for ExecutionConfig {
repartition_joins: true,
repartition_aggregations: true,
repartition_windows: true,
+ parquet_pruning: true,
}
}
}
@@ -765,6 +769,12 @@ impl ExecutionConfig {
self.repartition_windows = enabled;
self
}
+
+ /// Enables or disables the use of pruning predicate for parquet readers to skip row groups
+ pub fn with_parquet_pruning(mut self, enabled: bool) -> Self {
+ self.parquet_pruning = enabled;
+ self
+ }
}
/// Holds per-execution properties and data (such as starting timestamps, etc).
diff --git a/datafusion/src/physical_optimizer/pruning.rs b/datafusion/src/physical_optimizer/pruning.rs
index 5585c4d..3625381 100644
--- a/datafusion/src/physical_optimizer/pruning.rs
+++ b/datafusion/src/physical_optimizer/pruning.rs
@@ -547,7 +547,7 @@ fn build_predicate_expression(
// allow partial failure in predicate expression generation
// this can still produce a useful predicate when multiple conditions are joined using AND
Err(_) => {
- return Ok(logical_plan::lit(true));
+ return Ok(unhandled);
}
};
let corrected_op = expr_builder.correct_operator(op);
@@ -596,7 +596,7 @@ fn build_predicate_expression(
.lt_eq(expr_builder.scalar_expr().clone())
}
// other expressions are not supported
- _ => logical_plan::lit(true),
+ _ => unhandled,
};
Ok(statistics_expr)
}