You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/07/13 07:11:50 UTC

[spark] branch master updated: [SPARK-39385][SQL] Supports push down `REGR_AVGX` and `REGR_AVGY`

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new a79c91ec341 [SPARK-39385][SQL] Supports push down `REGR_AVGX` and `REGR_AVGY`
a79c91ec341 is described below

commit a79c91ec3417ad9edffacad56aa32e2b1df43b5a
Author: Jiaan Geng <be...@163.com>
AuthorDate: Wed Jul 13 15:11:35 2022 +0800

    [SPARK-39385][SQL] Supports push down `REGR_AVGX` and `REGR_AVGY`
    
    ### What changes were proposed in this pull request?
    https://github.com/apache/spark/pull/36773 translate linear regression aggregate functions for pushdown.
    Although `REGR_AVGX` and `REGR_AVGY` are replaced to `AVG` in runtime, we can pushdown `AVG` to achieve the same result that push down `REGR_AVGX` and `REGR_AVGY`.
    
    Take `RegrAvgX` as an example, `RegrAvgX` replaced with `Average(If(And(IsNotNull(left), IsNotNull(right)), right, Literal.create(null, right.dataType)))` in runtime and then the latter will be optimized as `Average(CaseWhen(Seq[(And(IsNotNull(left), IsNotNull(right)), right)], Some(Literal.create(null, right.dataType))))`
    
    We can see `Literal.create(null, right.dataType)` here, `visitLiteral` of `JDBCSQLBuilder` cannot processing the null literal in the correct way. So we need to fix the issue too.
    
    ### Why are the changes needed?
    Let Aggregate pushdown supports `REGR_AVGX` and `REGR_AVGY`.
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    New feature.
    
    ### How was this patch tested?
    New test cases.
    
    Closes #37126 from beliefer/SPARK-39385_followup.
    
    Authored-by: Jiaan Geng <be...@163.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../org/apache/spark/sql/jdbc/JdbcDialects.scala   |  5 ++--
 .../org/apache/spark/sql/jdbc/JDBCV2Suite.scala    | 32 ++++++++++++++++++++++
 2 files changed, 35 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
index d77299bdc0c..491e0231a23 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/JdbcDialects.scala
@@ -224,8 +224,9 @@ abstract class JdbcDialect extends Serializable with Logging {
 
   private[jdbc] class JDBCSQLBuilder extends V2ExpressionSQLBuilder {
     override def visitLiteral(literal: Literal[_]): String = {
-      compileValue(
-        CatalystTypeConverters.convertToScala(literal.value(), literal.dataType())).toString
+      Option(literal.value()).map(v =>
+        compileValue(CatalystTypeConverters.convertToScala(v, literal.dataType())).toString)
+        .getOrElse(super.visitLiteral(literal))
     }
 
     override def visitNamedReference(namedRef: NamedReference): String = {
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
index 7608c0b148d..ddcf28652e9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
@@ -1824,6 +1824,38 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
     checkPushedInfo(df2, "PushedFilters: [DEPT IS NOT NULL, DEPT > 0], ReadSchema:")
     checkAnswer(df2,
       Seq(Row(0.0, 1.0, 1.0, 20000.0), Row(0.0, 1.0, 1.0, 5000.0), Row(null, null, null, 0.0)))
+
+    val df3 = sql(
+      """
+        |SELECT
+        |  REGR_AVGX(bonus, bonus),
+        |  REGR_AVGY(bonus, bonus)
+        |FROM h2.test.employee WHERE dept > 0 GROUP BY DePt""".stripMargin)
+    checkFiltersRemoved(df3)
+    checkAggregateRemoved(df3)
+    checkPushedInfo(df3,
+      """
+        |PushedAggregates: [AVG(CASE WHEN BONUS IS NOT NULL THEN BONUS ELSE null END)],
+        |PushedFilters: [DEPT IS NOT NULL, DEPT > 0],
+        |PushedGroupByExpressions: [DEPT],
+        |""".stripMargin.replaceAll("\n", " "))
+    checkAnswer(df3, Seq(Row(1100.0, 1100.0), Row(1200.0, 1200.0), Row(1250.0, 1250.0)))
+
+    val df4 = sql(
+      """
+        |SELECT
+        |  REGR_AVGX(DISTINCT bonus, bonus),
+        |  REGR_AVGY(DISTINCT bonus, bonus)
+        |FROM h2.test.employee WHERE dept > 0 GROUP BY DePt""".stripMargin)
+    checkFiltersRemoved(df4)
+    checkAggregateRemoved(df4)
+    checkPushedInfo(df4,
+      """
+        |PushedAggregates: [AVG(DISTINCT CASE WHEN BONUS IS NOT NULL THEN BONUS ELSE null END)],
+        |PushedFilters: [DEPT IS NOT NULL, DEPT > 0],
+        |PushedGroupByExpressions: [DEPT],
+        |""".stripMargin.replaceAll("\n", " "))
+    checkAnswer(df4, Seq(Row(1100.0, 1100.0), Row(1200.0, 1200.0), Row(1250.0, 1250.0)))
   }
 
   test("scan with aggregate push-down: aggregate over alias push down") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org