You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/06/09 06:26:54 UTC

[spark] branch branch-3.3 updated: [SPARK-38997][SPARK-39037][SQL][FOLLOWUP] PushableColumnWithoutNestedColumn` need be translated to predicate too

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new ea0571e001e [SPARK-38997][SPARK-39037][SQL][FOLLOWUP] PushableColumnWithoutNestedColumn` need be translated to predicate too
ea0571e001e is described below

commit ea0571e001e6ce4ac415f20142c39eedc18250e1
Author: Jiaan Geng <be...@163.com>
AuthorDate: Thu Jun 9 14:26:18 2022 +0800

    [SPARK-38997][SPARK-39037][SQL][FOLLOWUP] PushableColumnWithoutNestedColumn` need be translated to predicate too
    
    ### What changes were proposed in this pull request?
    https://github.com/apache/spark/pull/35768 assume the expression in `And`, `Or` and `Not` must be predicate.
    https://github.com/apache/spark/pull/36370 and https://github.com/apache/spark/pull/36325 supported push down expressions in `GROUP BY` and `ORDER BY`. But the children of `And`, `Or` and `Not` can be `FieldReference.column(name)`.
    `FieldReference.column(name)` is not a predicate, so the assert may fail.
    
    ### Why are the changes needed?
    This PR fix the bug for `PushableColumnWithoutNestedColumn`.
    
    ### Does this PR introduce _any_ user-facing change?
    'Yes'.
    Let the push-down framework more correctly.
    
    ### How was this patch tested?
    New tests
    
    Closes #36776 from beliefer/SPARK-38997_SPARK-39037_followup.
    
    Authored-by: Jiaan Geng <be...@163.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 125555cf2c1388b28fcc34beae09f971c5fadcb7)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/catalyst/util/V2ExpressionBuilder.scala    | 13 ++++---
 .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala    | 42 ++++++++++++++++++++++
 2 files changed, 50 insertions(+), 5 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala
index 487b809d48a..c77a040bc64 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalyst/util/V2ExpressionBuilder.scala
@@ -49,14 +49,17 @@ class V2ExpressionBuilder(
     case Literal(true, BooleanType) => Some(new AlwaysTrue())
     case Literal(false, BooleanType) => Some(new AlwaysFalse())
     case Literal(value, dataType) => Some(LiteralValue(value, dataType))
-    case col @ pushableColumn(name) if nestedPredicatePushdownEnabled =>
+    case col @ pushableColumn(name) =>
+      val ref = if (nestedPredicatePushdownEnabled) {
+        FieldReference(name)
+      } else {
+        FieldReference.column(name)
+      }
       if (isPredicate && col.dataType.isInstanceOf[BooleanType]) {
-        Some(new V2Predicate("=", Array(FieldReference(name), LiteralValue(true, BooleanType))))
+        Some(new V2Predicate("=", Array(ref, LiteralValue(true, BooleanType))))
       } else {
-        Some(FieldReference(name))
+        Some(ref)
       }
-    case pushableColumn(name) if !nestedPredicatePushdownEnabled =>
-      Some(FieldReference.column(name))
     case in @ InSet(child, hset) =>
       generateExpression(child).map { v =>
         val children =
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
index 91526cef507..858aeaa1365 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
@@ -851,6 +851,48 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
         |[DEPT, CASE WHEN (SALARY > 8000.00) AND (SALARY < 10000.00) THEN SALARY ELSE 0.00 END],
         |""".stripMargin.replaceAll("\n", " "))
     checkAnswer(df5, Seq(Row(1, 0, 10000), Row(1, 9000, 9000), Row(2, 0, 22000), Row(6, 0, 12000)))
+
+    val df6 = sql(
+      """
+        |SELECT CASE WHEN SALARY > 8000 AND is_manager <> false THEN SALARY ELSE 0 END as key,
+        |  SUM(SALARY) FROM h2.test.employee GROUP BY key""".stripMargin)
+    checkAggregateRemoved(df6)
+    checkPushedInfo(df6,
+      """
+        |PushedAggregates: [SUM(SALARY)],
+        |PushedFilters: [],
+        |PushedGroupByExpressions:
+        |[CASE WHEN (SALARY > 8000.00) AND (IS_MANAGER = true) THEN SALARY ELSE 0.00 END],
+        |""".stripMargin.replaceAll("\n", " "))
+    checkAnswer(df6, Seq(Row(0, 21000), Row(10000, 20000), Row(12000, 12000)))
+
+    val df7 = sql(
+      """
+        |SELECT CASE WHEN SALARY > 8000 OR is_manager <> false THEN SALARY ELSE 0 END as key,
+        |  SUM(SALARY) FROM h2.test.employee GROUP BY key""".stripMargin)
+    checkAggregateRemoved(df7)
+    checkPushedInfo(df7,
+      """
+        |PushedAggregates: [SUM(SALARY)],
+        |PushedFilters: [],
+        |PushedGroupByExpressions:
+        |[CASE WHEN (SALARY > 8000.00) OR (IS_MANAGER = true) THEN SALARY ELSE 0.00 END],
+        |""".stripMargin.replaceAll("\n", " "))
+    checkAnswer(df7, Seq(Row(10000, 20000), Row(12000, 24000), Row(9000, 9000)))
+
+    val df8 = sql(
+      """
+        |SELECT CASE WHEN NOT(is_manager <> false) THEN SALARY ELSE 0 END as key,
+        |  SUM(SALARY) FROM h2.test.employee GROUP BY key""".stripMargin)
+    checkAggregateRemoved(df8)
+    checkPushedInfo(df8,
+      """
+        |PushedAggregates: [SUM(SALARY)],
+        |PushedFilters: [],
+        |PushedGroupByExpressions:
+        |[CASE WHEN NOT (IS_MANAGER = true) THEN SALARY ELSE 0.00 END],
+        |""".stripMargin.replaceAll("\n", " "))
+    checkAnswer(df8, Seq(Row(0, 32000), Row(12000, 12000), Row(9000, 9000)))
   }
 
   test("scan with aggregate push-down: DISTINCT SUM with group by") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org