You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2021/07/19 15:44:05 UTC

[arrow-datafusion] branch master updated: step1 add option in ExecutionConfig to enable/disable parquet pruning (#749)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 7019b0f  step1 add option in ExecutionConfig to enable/disable parquet pruning (#749)
7019b0f is described below

commit 7019b0f3b6de025ed95f88492466663284ada8e7
Author: Cui Wenzheng <cu...@ymail.com>
AuthorDate: Mon Jul 19 23:43:57 2021 +0800

    step1 add option in ExecutionConfig to enable/disable parquet pruning (#749)
---
 datafusion/src/datasource/parquet.rs         | 22 +++++++++++++++++++++-
 datafusion/src/execution/context.rs          | 18 ++++++++++++++----
 datafusion/src/physical_optimizer/pruning.rs |  4 ++--
 3 files changed, 37 insertions(+), 7 deletions(-)

diff --git a/datafusion/src/datasource/parquet.rs b/datafusion/src/datasource/parquet.rs
index e53fbbd..28f79a6 100644
--- a/datafusion/src/datasource/parquet.rs
+++ b/datafusion/src/datasource/parquet.rs
@@ -38,6 +38,7 @@ pub struct ParquetTable {
     schema: SchemaRef,
     statistics: Statistics,
     max_concurrency: usize,
+    enable_pruning: bool,
 }
 
 impl ParquetTable {
@@ -51,6 +52,7 @@ impl ParquetTable {
             schema,
             statistics: parquet_exec.statistics().to_owned(),
             max_concurrency,
+            enable_pruning: true,
         })
     }
 
@@ -58,6 +60,17 @@ impl ParquetTable {
     pub fn path(&self) -> &str {
         &self.path
     }
+
+    /// Get parquet pruning option
+    pub fn get_enable_pruning(&self) -> bool {
+        self.enable_pruning
+    }
+
+    /// Set parquet pruning option
+    pub fn with_enable_pruning(mut self, enable_pruning: bool) -> Self {
+        self.enable_pruning = enable_pruning;
+        self
+    }
 }
 
 impl TableProvider for ParquetTable {
@@ -86,7 +99,14 @@ impl TableProvider for ParquetTable {
         filters: &[Expr],
         limit: Option<usize>,
     ) -> Result<Arc<dyn ExecutionPlan>> {
-        let predicate = combine_filters(filters);
+        // If enable pruning then combine the filters to build the predicate.
+        // If disable pruning then set the predicate to None, thus readers
+        // will not prune data based on the statistics.
+        let predicate = if self.enable_pruning {
+            combine_filters(filters)
+        } else {
+            None
+        };
         Ok(Arc::new(ParquetExec::try_from_path(
             &self.path,
             projection.clone(),
diff --git a/datafusion/src/execution/context.rs b/datafusion/src/execution/context.rs
index d4d3a8a..bd939ce 100644
--- a/datafusion/src/execution/context.rs
+++ b/datafusion/src/execution/context.rs
@@ -322,10 +322,11 @@ impl ExecutionContext {
     /// Registers a Parquet data source so that it can be referenced from SQL statements
     /// executed against this context.
     pub fn register_parquet(&mut self, name: &str, filename: &str) -> Result<()> {
-        let table = ParquetTable::try_new(
-            filename,
-            self.state.lock().unwrap().config.concurrency,
-        )?;
+        let table = {
+            let m = self.state.lock().unwrap();
+            ParquetTable::try_new(filename, m.config.concurrency)?
+                .with_enable_pruning(m.config.parquet_pruning)
+        };
         self.register_table(name, Arc::new(table))?;
         Ok(())
     }
@@ -633,6 +634,8 @@ pub struct ExecutionConfig {
     /// Should DataFusion repartition data using the partition keys to execute window functions in
     /// parallel using the provided `concurrency` level
     pub repartition_windows: bool,
+    /// Should Datafusion parquet reader using the predicate to prune data
+    parquet_pruning: bool,
 }
 
 impl Default for ExecutionConfig {
@@ -663,6 +666,7 @@ impl Default for ExecutionConfig {
             repartition_joins: true,
             repartition_aggregations: true,
             repartition_windows: true,
+            parquet_pruning: true,
         }
     }
 }
@@ -765,6 +769,12 @@ impl ExecutionConfig {
         self.repartition_windows = enabled;
         self
     }
+
+    /// Enables or disables the use of pruning predicate for parquet readers to skip row groups
+    pub fn with_parquet_pruning(mut self, enabled: bool) -> Self {
+        self.parquet_pruning = enabled;
+        self
+    }
 }
 
 /// Holds per-execution properties and data (such as starting timestamps, etc).
diff --git a/datafusion/src/physical_optimizer/pruning.rs b/datafusion/src/physical_optimizer/pruning.rs
index 5585c4d..3625381 100644
--- a/datafusion/src/physical_optimizer/pruning.rs
+++ b/datafusion/src/physical_optimizer/pruning.rs
@@ -547,7 +547,7 @@ fn build_predicate_expression(
         // allow partial failure in predicate expression generation
         // this can still produce a useful predicate when multiple conditions are joined using AND
         Err(_) => {
-            return Ok(logical_plan::lit(true));
+            return Ok(unhandled);
         }
     };
     let corrected_op = expr_builder.correct_operator(op);
@@ -596,7 +596,7 @@ fn build_predicate_expression(
                 .lt_eq(expr_builder.scalar_expr().clone())
         }
         // other expressions are not supported
-        _ => logical_plan::lit(true),
+        _ => unhandled,
     };
     Ok(statistics_expr)
 }