You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/06/09 06:26:54 UTC
[spark] branch branch-3.3 updated: [SPARK-38997][SPARK-39037][SQL][FOLLOWUP] PushableColumnWithoutNestedColumn` need be translated to predicate too
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push:
new ea0571e001e [SPARK-38997][SPARK-39037][SQL][FOLLOWUP] PushableColumnWithoutNestedColumn` need be translated to predicate too
ea0571e001e is described below
commit ea0571e001e6ce4ac415f20142c39eedc18250e1
Author: Jiaan Geng <be...@163.com>
AuthorDate: Thu Jun 9 14:26:18 2022 +0800
[SPARK-38997][SPARK-39037][SQL][FOLLOWUP] PushableColumnWithoutNestedColumn` need be translated to predicate too
### What changes were proposed in this pull request?
https://github.com/apache/spark/pull/35768 assume the expression in `And`, `Or` and `Not` must be predicate.
https://github.com/apache/spark/pull/36370 and https://github.com/apache/spark/pull/36325 supported push down expressions in `GROUP BY` and `ORDER BY`. But the children of `And`, `Or` and `Not` can be `FieldReference.column(name)`.
`FieldReference.column(name)` is not a predicate, so the assert may fail.
### Why are the changes needed?
This PR fix the bug for `PushableColumnWithoutNestedColumn`.
### Does this PR introduce _any_ user-facing change?
'Yes'.
Let the push-down framework more correctly.
### How was this patch tested?
New tests
Closes #36776 from beliefer/SPARK-38997_SPARK-39037_followup.
Authored-by: Jiaan Geng <be...@163.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
(cherry picked from commit 125555cf2c1388b28fcc34beae09f971c5fadcb7)
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../sql/catalyst/util/V2ExpressionBuilder.scala | 13 ++++---
.../org/apache/spark/sql/jdbc/JDBCV2Suite.scala | 42 ++++++++++++++++++++++
2 files changed, 50 insertions(+), 5 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala
index 487b809d48a..c77a040bc64 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala
@@ -49,14 +49,17 @@ class V2ExpressionBuilder(
case Literal(true, BooleanType) => Some(new AlwaysTrue())
case Literal(false, BooleanType) => Some(new AlwaysFalse())
case Literal(value, dataType) => Some(LiteralValue(value, dataType))
- case col @ pushableColumn(name) if nestedPredicatePushdownEnabled =>
+ case col @ pushableColumn(name) =>
+ val ref = if (nestedPredicatePushdownEnabled) {
+ FieldReference(name)
+ } else {
+ FieldReference.column(name)
+ }
if (isPredicate && col.dataType.isInstanceOf[BooleanType]) {
- Some(new V2Predicate("=", Array(FieldReference(name), LiteralValue(true, BooleanType))))
+ Some(new V2Predicate("=", Array(ref, LiteralValue(true, BooleanType))))
} else {
- Some(FieldReference(name))
+ Some(ref)
}
- case pushableColumn(name) if !nestedPredicatePushdownEnabled =>
- Some(FieldReference.column(name))
case in @ InSet(child, hset) =>
generateExpression(child).map { v =>
val children =
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
index 91526cef507..858aeaa1365 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
@@ -851,6 +851,48 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
|[DEPT, CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END],
|""".stripMargin.replaceAll("\n", " "))
checkAnswer(df5, Seq(Row(1, 0, 10000), Row(1, 9000, 9000), Row(2, 0, 22000), Row(6, 0, 12000)))
+
+ val df6 = sql(
+ """
+ |SELECT CASE WHEN SALARY > 8000 AND is_manager <> false THEN SALARY ELSE 0 END as key,
+ | SUM(SALARY) FROM h2.test.employee GROUP BY key""".stripMargin)
+ checkAggregateRemoved(df6)
+ checkPushedInfo(df6,
+ """
+ |PushedAggregates: [SUM(SALARY)],
+ |PushedFilters: [],
+ |PushedGroupByExpressions:
+ |[CASE WHEN (SALARY > 8000.00) AND (IS_MANAGER = true) THEN SALARY ELSE 0.00 END],
+ |""".stripMargin.replaceAll("\n", " "))
+ checkAnswer(df6, Seq(Row(0, 21000), Row(10000, 20000), Row(12000, 12000)))
+
+ val df7 = sql(
+ """
+ |SELECT CASE WHEN SALARY > 8000 OR is_manager <> false THEN SALARY ELSE 0 END as key,
+ | SUM(SALARY) FROM h2.test.employee GROUP BY key""".stripMargin)
+ checkAggregateRemoved(df7)
+ checkPushedInfo(df7,
+ """
+ |PushedAggregates: [SUM(SALARY)],
+ |PushedFilters: [],
+ |PushedGroupByExpressions:
+ |[CASE WHEN (SALARY > 8000.00) OR (IS_MANAGER = true) THEN SALARY ELSE 0.00 END],
+ |""".stripMargin.replaceAll("\n", " "))
+ checkAnswer(df7, Seq(Row(10000, 20000), Row(12000, 24000), Row(9000, 9000)))
+
+ val df8 = sql(
+ """
+ |SELECT CASE WHEN NOT(is_manager <> false) THEN SALARY ELSE 0 END as key,
+ | SUM(SALARY) FROM h2.test.employee GROUP BY key""".stripMargin)
+ checkAggregateRemoved(df8)
+ checkPushedInfo(df8,
+ """
+ |PushedAggregates: [SUM(SALARY)],
+ |PushedFilters: [],
+ |PushedGroupByExpressions:
+ |[CASE WHEN NOT (IS_MANAGER = true) THEN SALARY ELSE 0.00 END],
+ |""".stripMargin.replaceAll("\n", " "))
+ checkAnswer(df8, Seq(Row(0, 32000), Row(12000, 12000), Row(9000, 9000)))
}
test("scan with aggregate push-down: DISTINCT SUM with group by") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org