You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/11/18 20:05:26 UTC
[arrow-datafusion] branch master updated: Fix issue in filter pushdown with overloaded projection index (#4281)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 09e1c9148 Fix issue in filter pushdown with overloaded projection index (#4281)
09e1c9148 is described below
commit 09e1c9148514a87e3083d3644370d36f2e9fb87d
Author: Dan Harris <13...@users.noreply.github.com>
AuthorDate: Fri Nov 18 15:05:20 2022 -0500
Fix issue in filter pushdown with overloaded projection index (#4281)
* Fix issue in filter pushdown with overloaded projection index
* Use BTreeSet for collecting distinct column indexes
---
.../core/src/physical_plan/file_format/parquet.rs | 5 +++--
.../physical_plan/file_format/parquet/row_filter.rs | 19 +++++++++++++------
2 files changed, 16 insertions(+), 8 deletions(-)
diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs
index 55f9219c5..ea512e85c 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet.rs
@@ -989,7 +989,7 @@ mod tests {
// batch2: c3(int8), c2(int64)
let batch2 = create_batch(vec![("c3", c3), ("c2", c2)]);
- let filter = col("c2").eq(lit(2_i64));
+ let filter = col("c2").eq(lit(2_i64)).or(col("c2").eq(lit(1_i64)));
// read/write them files:
let rt =
@@ -998,13 +998,14 @@ mod tests {
"+----+----+----+",
"| c1 | c3 | c2 |",
"+----+----+----+",
+ "| | 10 | 1 |",
"| | 20 | 2 |",
"+----+----+----+",
];
assert_batches_sorted_eq!(expected, &rt.batches.unwrap());
let metrics = rt.parquet_exec.metrics().unwrap();
// Note there are were 6 rows in total (across three batches)
- assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 5);
+ assert_eq!(get_value(&metrics, "pushdown_rows_filtered"), 4);
}
#[tokio::test]
diff --git a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
index 353162c7f..eb8ca6ce5 100644
--- a/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
+++ b/datafusion/core/src/physical_plan/file_format/parquet/row_filter.rs
@@ -21,6 +21,7 @@ use arrow::error::{ArrowError, Result as ArrowResult};
use arrow::record_batch::RecordBatch;
use datafusion_common::{Column, DataFusionError, Result, ScalarValue, ToDFSchema};
use datafusion_expr::expr_rewriter::{ExprRewritable, ExprRewriter, RewriteRecursion};
+use std::collections::BTreeSet;
use datafusion_expr::Expr;
use datafusion_optimizer::utils::split_conjunction_owned;
@@ -174,7 +175,7 @@ struct FilterCandidateBuilder<'a> {
expr: Expr,
file_schema: &'a Schema,
table_schema: &'a Schema,
- required_column_indices: Vec<usize>,
+ required_column_indices: BTreeSet<usize>,
non_primitive_columns: bool,
projected_columns: bool,
}
@@ -185,7 +186,7 @@ impl<'a> FilterCandidateBuilder<'a> {
expr,
file_schema,
table_schema,
- required_column_indices: vec![],
+ required_column_indices: BTreeSet::default(),
non_primitive_columns: false,
projected_columns: false,
}
@@ -209,7 +210,7 @@ impl<'a> FilterCandidateBuilder<'a> {
expr,
required_bytes,
can_use_index,
- projection: self.required_column_indices,
+ projection: self.required_column_indices.into_iter().collect(),
}))
}
}
@@ -219,7 +220,7 @@ impl<'a> ExprRewriter for FilterCandidateBuilder<'a> {
fn pre_visit(&mut self, expr: &Expr) -> Result<RewriteRecursion> {
if let Expr::Column(column) = expr {
if let Ok(idx) = self.file_schema.index_of(&column.name) {
- self.required_column_indices.push(idx);
+ self.required_column_indices.insert(idx);
if DataType::is_nested(self.file_schema.field(idx).data_type()) {
self.non_primitive_columns = true;
@@ -284,7 +285,10 @@ fn remap_projection(src: &[usize]) -> Vec<usize> {
/// Calculate the total compressed size of all `Column's required for
/// predicate `Expr`. This should represent the total amount of file IO
/// required to evaluate the predicate.
-fn size_of_columns(columns: &[usize], metadata: &ParquetMetaData) -> Result<usize> {
+fn size_of_columns(
+ columns: &BTreeSet<usize>,
+ metadata: &ParquetMetaData,
+) -> Result<usize> {
let mut total_size = 0;
let row_groups = metadata.row_groups();
for idx in columns {
@@ -299,7 +303,10 @@ fn size_of_columns(columns: &[usize], metadata: &ParquetMetaData) -> Result<usiz
/// For a given set of `Column`s required for predicate `Expr` determine whether all
/// columns are sorted. Sorted columns may be queried more efficiently in the presence of
/// a PageIndex.
-fn columns_sorted(_columns: &[usize], _metadata: &ParquetMetaData) -> Result<bool> {
+fn columns_sorted(
+ _columns: &BTreeSet<usize>,
+ _metadata: &ParquetMetaData,
+) -> Result<bool> {
// TODO How do we know this?
Ok(false)
}