You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by al...@apache.org on 2022/04/22 14:03:59 UTC
[arrow-datafusion] branch master updated: Enable filter pushdown when using In_list on parquet (#2282)
This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git
The following commit(s) were added to refs/heads/master by this push:
new 20d0c5b81 Enable filter pushdown when using In_list on parquet (#2282)
20d0c5b81 is described below
commit 20d0c5b818e098bacabb5bec0a29f039949103e5
Author: Yang Jiang <37...@users.noreply.github.com>
AuthorDate: Fri Apr 22 22:03:53 2022 +0800
Enable filter pushdown when using In_list on parquet (#2282)
* Enable filter pushdown when using In_list on parquet
* fix negated situation and add UT
* fix clippy
---
datafusion/core/src/physical_optimizer/pruning.rs | 74 +++++++++++++++++++++++
datafusion/core/tests/parquet_pruning.rs | 67 ++++++++++++++++----
2 files changed, 130 insertions(+), 11 deletions(-)
diff --git a/datafusion/core/src/physical_optimizer/pruning.rs b/datafusion/core/src/physical_optimizer/pruning.rs
index 67b7476e5..737b4401f 100644
--- a/datafusion/core/src/physical_optimizer/pruning.rs
+++ b/datafusion/core/src/physical_optimizer/pruning.rs
@@ -688,6 +688,20 @@ fn build_predicate_expression(
return Ok(unhandled);
}
}
+ Expr::InList {
+ expr,
+ list,
+ negated,
+ } if !list.is_empty() && list.len() < 20 => {
+ let eq_fun = if *negated { Expr::not_eq } else { Expr::eq };
+ let re_fun = if *negated { Expr::and } else { Expr::or };
+ let change_expr = list
+ .iter()
+ .map(|e| eq_fun(*expr.clone(), e.clone()))
+ .reduce(re_fun)
+ .unwrap();
+ return build_predicate_expression(&change_expr, schema, required_columns);
+ }
_ => {
return Ok(unhandled);
}
@@ -1340,6 +1354,66 @@ mod tests {
Ok(())
}
+ #[test]
+ fn row_group_predicate_in_list() -> Result<()> {
+ let schema = Schema::new(vec![
+ Field::new("c1", DataType::Int32, false),
+ Field::new("c2", DataType::Int32, false),
+ ]);
+ // test c1 in(1, 2, 3)
+ let expr = Expr::InList {
+ expr: Box::new(col("c1")),
+ list: vec![lit(1), lit(2), lit(3)],
+ negated: false,
+ };
+ let expected_expr = "#c1_min <= Int32(1) AND Int32(1) <= #c1_max OR #c1_min <= Int32(2) AND Int32(2) <= #c1_max OR #c1_min <= Int32(3) AND Int32(3) <= #c1_max";
+ let predicate_expr =
+ build_predicate_expression(&expr, &schema, &mut RequiredStatColumns::new())?;
+ assert_eq!(format!("{:?}", predicate_expr), expected_expr);
+
+ Ok(())
+ }
+
+ #[test]
+ fn row_group_predicate_in_list_empty() -> Result<()> {
+ let schema = Schema::new(vec![
+ Field::new("c1", DataType::Int32, false),
+ Field::new("c2", DataType::Int32, false),
+ ]);
+ // test c1 in()
+ let expr = Expr::InList {
+ expr: Box::new(col("c1")),
+ list: vec![],
+ negated: false,
+ };
+ let expected_expr = "Boolean(true)";
+ let predicate_expr =
+ build_predicate_expression(&expr, &schema, &mut RequiredStatColumns::new())?;
+ assert_eq!(format!("{:?}", predicate_expr), expected_expr);
+
+ Ok(())
+ }
+
+ #[test]
+ fn row_group_predicate_in_list_negated() -> Result<()> {
+ let schema = Schema::new(vec![
+ Field::new("c1", DataType::Int32, false),
+ Field::new("c2", DataType::Int32, false),
+ ]);
+ // test c1 not in(1, 2, 3)
+ let expr = Expr::InList {
+ expr: Box::new(col("c1")),
+ list: vec![lit(1), lit(2), lit(3)],
+ negated: true,
+ };
+ let expected_expr = "#c1_min != Int32(1) OR Int32(1) != #c1_max AND #c1_min != Int32(2) OR Int32(2) != #c1_max AND #c1_min != Int32(3) OR Int32(3) != #c1_max";
+ let predicate_expr =
+ build_predicate_expression(&expr, &schema, &mut RequiredStatColumns::new())?;
+ assert_eq!(format!("{:?}", predicate_expr), expected_expr);
+
+ Ok(())
+ }
+
#[test]
fn prune_api() {
let schema = Arc::new(Schema::new(vec![
diff --git a/datafusion/core/tests/parquet_pruning.rs b/datafusion/core/tests/parquet_pruning.rs
index d5392e9dc..5ee4fcca4 100644
--- a/datafusion/core/tests/parquet_pruning.rs
+++ b/datafusion/core/tests/parquet_pruning.rs
@@ -185,7 +185,7 @@ async fn prune_int32_lt() {
let (expected_errors, expected_row_group_pruned, expected_results) =
(Some(0), Some(1), 11);
- // resulrt of sql "SELECT * FROM t where i < 1" is same as
+ // result of sql "SELECT * FROM t where i < 1" is same as
// "SELECT * FROM t where -i > -1"
let output = ContextWithParquet::new(Scenario::Int32)
.await
@@ -222,7 +222,7 @@ async fn prune_int32_lt() {
#[tokio::test]
async fn prune_int32_eq() {
- // resulrt of sql "SELECT * FROM t where i = 1"
+ // result of sql "SELECT * FROM t where i = 1"
let output = ContextWithParquet::new(Scenario::Int32)
.await
.query("SELECT * FROM t where i = 1")
@@ -237,7 +237,7 @@ async fn prune_int32_eq() {
#[tokio::test]
async fn prune_int32_scalar_fun_and_eq() {
- // resulrt of sql "SELECT * FROM t where abs(i) = 1 and i = 1"
+ // result of sql "SELECT * FROM t where abs(i) = 1 and i = 1"
// only use "i = 1" to prune
let output = ContextWithParquet::new(Scenario::Int32)
.await
@@ -253,7 +253,7 @@ async fn prune_int32_scalar_fun_and_eq() {
#[tokio::test]
async fn prune_int32_scalar_fun() {
- // resulrt of sql "SELECT * FROM t where abs(i) = 1" is not supported
+ // result of sql "SELECT * FROM t where abs(i) = 1" is not supported
let output = ContextWithParquet::new(Scenario::Int32)
.await
.query("SELECT * FROM t where abs(i) = 1")
@@ -269,7 +269,7 @@ async fn prune_int32_scalar_fun() {
#[tokio::test]
async fn prune_int32_complex_expr() {
- // resulrt of sql "SELECT * FROM t where i+1 = 1" is not supported
+ // result of sql "SELECT * FROM t where i+1 = 1" is not supported
let output = ContextWithParquet::new(Scenario::Int32)
.await
.query("SELECT * FROM t where i+1 = 1")
@@ -285,7 +285,7 @@ async fn prune_int32_complex_expr() {
#[tokio::test]
async fn prune_int32_complex_expr_subtract() {
- // resulrt of sql "SELECT * FROM t where 1-i > 1" is not supported
+ // result of sql "SELECT * FROM t where 1-i > 1" is not supported
let output = ContextWithParquet::new(Scenario::Int32)
.await
.query("SELECT * FROM t where 1-i > 1")
@@ -304,7 +304,7 @@ async fn prune_f64_lt() {
let (expected_errors, expected_row_group_pruned, expected_results) =
(Some(0), Some(1), 11);
- // resulrt of sql "SELECT * FROM t where i < 1" is same as
+ // result of sql "SELECT * FROM t where i < 1" is same as
// "SELECT * FROM t where -i > -1"
let output = ContextWithParquet::new(Scenario::Float64)
.await
@@ -341,7 +341,7 @@ async fn prune_f64_lt() {
#[tokio::test]
async fn prune_f64_scalar_fun_and_gt() {
- // resulrt of sql "SELECT * FROM t where abs(f - 1) <= 0.000001 and f >= 0.1"
+ // result of sql "SELECT * FROM t where abs(f - 1) <= 0.000001 and f >= 0.1"
// only use "f >= 0" to prune
let output = ContextWithParquet::new(Scenario::Float64)
.await
@@ -357,7 +357,7 @@ async fn prune_f64_scalar_fun_and_gt() {
#[tokio::test]
async fn prune_f64_scalar_fun() {
- // resulrt of sql "SELECT * FROM t where abs(f-1) <= 0.000001" is not supported
+ // result of sql "SELECT * FROM t where abs(f-1) <= 0.000001" is not supported
let output = ContextWithParquet::new(Scenario::Float64)
.await
.query("SELECT * FROM t where abs(f-1) <= 0.000001")
@@ -373,7 +373,7 @@ async fn prune_f64_scalar_fun() {
#[tokio::test]
async fn prune_f64_complex_expr() {
- // resulrt of sql "SELECT * FROM t where f+1 > 1.1"" is not supported
+ // result of sql "SELECT * FROM t where f+1 > 1.1"" is not supported
let output = ContextWithParquet::new(Scenario::Float64)
.await
.query("SELECT * FROM t where f+1 > 1.1")
@@ -389,7 +389,7 @@ async fn prune_f64_complex_expr() {
#[tokio::test]
async fn prune_f64_complex_expr_subtract() {
- // resulrt of sql "SELECT * FROM t where 1-f > 1" is not supported
+ // result of sql "SELECT * FROM t where 1-f > 1" is not supported
let output = ContextWithParquet::new(Scenario::Float64)
.await
.query("SELECT * FROM t where 1-f > 1")
@@ -403,6 +403,51 @@ async fn prune_f64_complex_expr_subtract() {
assert_eq!(output.result_rows, 9, "{}", output.description());
}
+#[tokio::test]
+async fn prune_int32_eq_in_list() {
+ // result of sql "SELECT * FROM t where in (1)"
+ let output = ContextWithParquet::new(Scenario::Int32)
+ .await
+ .query("SELECT * FROM t where i in (1)")
+ .await;
+
+ println!("{}", output.description());
+ // This should prune out groups without error
+ assert_eq!(output.predicate_evaluation_errors(), Some(0));
+ assert_eq!(output.row_groups_pruned(), Some(3));
+ assert_eq!(output.result_rows, 1, "{}", output.description());
+}
+
+#[tokio::test]
+async fn prune_int32_eq_in_list_2() {
+ // result of sql "SELECT * FROM t where in (1000)", prune all
+ let output = ContextWithParquet::new(Scenario::Int32)
+ .await
+ .query("SELECT * FROM t where i in (1000)")
+ .await;
+
+ println!("{}", output.description());
+ // This should prune out groups without error
+ assert_eq!(output.predicate_evaluation_errors(), Some(0));
+ assert_eq!(output.row_groups_pruned(), Some(4));
+ assert_eq!(output.result_rows, 0, "{}", output.description());
+}
+
+#[tokio::test]
+async fn prune_int32_eq_in_list_negated() {
+ // result of sql "SELECT * FROM t where not in (1)" prune nothing
+ let output = ContextWithParquet::new(Scenario::Int32)
+ .await
+ .query("SELECT * FROM t where i not in (1)")
+ .await;
+
+ println!("{}", output.description());
+ // This should prune out groups without error
+ assert_eq!(output.predicate_evaluation_errors(), Some(0));
+ assert_eq!(output.row_groups_pruned(), Some(0));
+ assert_eq!(output.result_rows, 19, "{}", output.description());
+}
+
// ----------------------
// Begin test fixture
// ----------------------