You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/18 20:05:26 UTC

[arrow-datafusion] branch master updated: Fix issue in filter pushdown with overloaded projection index (#4281)

This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/master by this push:
     new 09e1c9148 Fix issue in filter pushdown with overloaded projection index (#4281)
09e1c9148 is described below

commit 09e1c9148514a87e3083d3644370d36f2e9fb87d
Author: Dan Harris <13...@users.noreply.github.com>
AuthorDate: Fri Nov 18 15:05:20 2022 -0500

    Fix issue in filter pushdown with overloaded projection index (#4281)
    
    * Fix issue in filter pushdown with overloaded projection index
    
    * Use BTreeSet for collecting distinct column indexes
---
 .../core/src/physical_plan/file_format/parquet.rs     |  5 +++--
 .../physical_plan/file_format/parquet/row_filter.rs   | 19 +++++++++++++------
 2 files changed, 16 insertions(+), 8 deletions(-)

diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 55f9219c5..ea512e85c 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -989,7 +989,7 @@ mod tests {
         // batch2: c3(int8), c2(int64)
         let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);
 
-        let filter = col("c2").eq(lit(2_i64));
+        let filter = col("c2").eq(lit(2_i64)).or(col("c2").eq(lit(1_i64)));
 
         // read/write them files:
         let rt =
@@ -998,13 +998,14 @@ mod tests {
             "+----+----+----+",
             "| c1 | c3 | c2 |",
             "+----+----+----+",
+            "|    | 10 | 1  |",
             "|    | 20 | 2  |",
             "+----+----+----+",
         ];
         assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
         let metrics = rt.parquet_exec.metrics().unwrap();
         // Note there are were 6 rows in total (across three batches)
-        assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
+        assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 4);
     }
 
     #[tokio::test]
diff --git a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
index 353162c7f..eb8ca6ce5 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
@@ -21,6 +21,7 @@ use arrow::error::{ArrowError, Result as ArrowResult};
 use arrow::record_batch::RecordBatch;
 use datafusion_common::{Column, DataFusionError, Result, ScalarValue, ToDFSchema};
 use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter, RewriteRecursion};
+use std::collections::BTreeSet;
 
 use datafusion_expr::Expr;
 use datafusion_optimizer::utils::split_conjunction_owned;
@@ -174,7 +175,7 @@ struct FilterCandidateBuilder<'a> {
     expr: Expr,
     file_schema: &'a Schema,
     table_schema: &'a Schema,
-    required_column_indices: Vec<usize>,
+    required_column_indices: BTreeSet<usize>,
     non_primitive_columns: bool,
     projected_columns: bool,
 }
@@ -185,7 +186,7 @@ impl<'a> FilterCandidateBuilder<'a> {
             expr,
             file_schema,
             table_schema,
-            required_column_indices: vec![],
+            required_column_indices: BTreeSet::default(),
             non_primitive_columns: false,
             projected_columns: false,
         }
@@ -209,7 +210,7 @@ impl<'a> FilterCandidateBuilder<'a> {
                 expr,
                 required_bytes,
                 can_use_index,
-                projection: self.required_column_indices,
+                projection: self.required_column_indices.into_iter().collect(),
             }))
         }
     }
@@ -219,7 +220,7 @@ impl<'a> ExprRewriter for FilterCandidateBuilder<'a> {
     fn pre_visit(&mut self, expr: &Expr) -> Result<RewriteRecursion> {
         if let Expr::Column(column) = expr {
             if let Ok(idx) = self.file_schema.index_of(&column.name) {
-                self.required_column_indices.push(idx);
+                self.required_column_indices.insert(idx);
 
                 if DataType::is_nested(self.file_schema.field(idx).data_type()) {
                     self.non_primitive_columns = true;
@@ -284,7 +285,10 @@ fn remap_projection(src: &[usize]) -> Vec<usize> {
 /// Calculate the total compressed size of all `Column's required for
 /// predicate `Expr`. This should represent the total amount of file IO
 /// required to evaluate the predicate.
-fn size_of_columns(columns: &[usize], metadata: &ParquetMetaData) -> Result<usize> {
+fn size_of_columns(
+    columns: &BTreeSet<usize>,
+    metadata: &ParquetMetaData,
+) -> Result<usize> {
     let mut total_size = 0;
     let row_groups = metadata.row_groups();
     for idx in columns {
@@ -299,7 +303,10 @@ fn size_of_columns(columns: &[usize], metadata: &ParquetMetaData) -> Result<usiz
 /// For a given set of `Column`s required for predicate `Expr` determine whether all
 /// columns are sorted. Sorted columns may be queried more efficiently in the presence of
 /// a PageIndex.
-fn columns_sorted(_columns: &[usize], _metadata: &ParquetMetaData) -> Result<bool> {
+fn columns_sorted(
+    _columns: &BTreeSet<usize>,
+    _metadata: &ParquetMetaData,
+) -> Result<bool> {
     // TODO How do we know this?
     Ok(false)
 }