You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2022/01/10 13:49:07 UTC

[spark] branch master updated: [SPARK-37527][SQL] Compile `COVAR_POP`, `COVAR_SAMP` and `CORR` in `H2Dialet`

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ac69e8  [SPARK-37527][SQL] Compile `COVAR_POP`, `COVAR_SAMP` and `CORR` in `H2Dialet`
2ac69e8 is described below

commit 2ac69e8cb0231edd753b9a91ac6d2b07e1d10525
Author: Jiaan Geng <be...@163.com>
AuthorDate: Mon Jan 10 21:47:55 2022 +0800

    [SPARK-37527][SQL] Compile `COVAR_POP`, `COVAR_SAMP` and `CORR` in `H2Dialet`
    
    ### What changes were proposed in this pull request?
    https://github.com/apache/spark/pull/35101 translate `COVAR_POP`, `COVAR_SAMP` and `CORR`, but the H2 lower version cannot support them.
    
    After https://github.com/apache/spark/pull/35013, we can compile the three aggregate functions in `H2Dialet` now.
    
    ### Why are the changes needed?
    Supplement the implement of `H2Dialet`.
    
    ### Does this PR introduce _any_ user-facing change?
    'Yes'. Spark could complete push-down `COVAR_POP`, `COVAR_SAMP` and `CORR` into H2.
    
    ### How was this patch tested?
    Test updated.
    
    Closes #35145 from beliefer/SPARK-37527_followup.
    
    Authored-by: Jiaan Geng <be...@163.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala | 12 ++++++++++++
 .../test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala   | 12 ++++++++----
 2 files changed, 20 insertions(+), 4 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
index 087c357..1f422e5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/H2Dialect.scala
@@ -47,6 +47,18 @@ private object H2Dialect extends JdbcDialect {
           assert(f.inputs().length == 1)
           val distinct = if (f.isDistinct) "DISTINCT " else ""
           Some(s"STDDEV_SAMP($distinct${f.inputs().head})")
+        case f: GeneralAggregateFunc if f.name() == "COVAR_POP" =>
+          assert(f.inputs().length == 2)
+          val distinct = if (f.isDistinct) "DISTINCT " else ""
+          Some(s"COVAR_POP($distinct${f.inputs().head}, ${f.inputs().last})")
+        case f: GeneralAggregateFunc if f.name() == "COVAR_SAMP" =>
+          assert(f.inputs().length == 2)
+          val distinct = if (f.isDistinct) "DISTINCT " else ""
+          Some(s"COVAR_SAMP($distinct${f.inputs().head}, ${f.inputs().last})")
+        case f: GeneralAggregateFunc if f.name() == "CORR" =>
+          assert(f.inputs().length == 2)
+          val distinct = if (f.isDistinct) "DISTINCT " else ""
+          Some(s"CORR($distinct${f.inputs().head}, ${f.inputs().last})")
         case _ => None
       }
     )
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
index f99e40a..c5e1a6a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCV2Suite.scala
@@ -745,11 +745,13 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
     val df = sql("select COVAR_POP(bonus, bonus), COVAR_SAMP(bonus, bonus)" +
       " FROM h2.test.employee where dept > 0 group by DePt")
     checkFiltersRemoved(df)
-    checkAggregateRemoved(df, false)
+    checkAggregateRemoved(df)
     df.queryExecution.optimizedPlan.collect {
       case _: DataSourceV2ScanRelation =>
         val expected_plan_fragment =
-          "PushedFilters: [IsNotNull(DEPT), GreaterThan(DEPT,0)]"
+          "PushedAggregates: [COVAR_POP(BONUS, BONUS), COVAR_SAMP(BONUS, BONUS)], " +
+            "PushedFilters: [IsNotNull(DEPT), GreaterThan(DEPT,0)], " +
+            "PushedGroupByColumns: [DEPT]"
         checkKeywordsExistsInExplain(df, expected_plan_fragment)
     }
     checkAnswer(df, Seq(Row(10000d, 20000d), Row(2500d, 5000d), Row(0d, null)))
@@ -759,11 +761,13 @@ class JDBCV2Suite extends QueryTest with SharedSparkSession with ExplainSuiteHel
     val df = sql("select CORR(bonus, bonus) FROM h2.test.employee where dept > 0" +
       " group by DePt")
     checkFiltersRemoved(df)
-    checkAggregateRemoved(df, false)
+    checkAggregateRemoved(df)
     df.queryExecution.optimizedPlan.collect {
       case _: DataSourceV2ScanRelation =>
         val expected_plan_fragment =
-          "PushedFilters: [IsNotNull(DEPT), GreaterThan(DEPT,0)]"
+          "PushedAggregates: [CORR(BONUS, BONUS)], " +
+            "PushedFilters: [IsNotNull(DEPT), GreaterThan(DEPT,0)], " +
+            "PushedGroupByColumns: [DEPT]"
         checkKeywordsExistsInExplain(df, expected_plan_fragment)
     }
     checkAnswer(df, Seq(Row(1d), Row(1d), Row(null)))

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org