You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/12/17 08:14:02 UTC

[spark] branch master updated: [SPARK-20636] Add the rule TransposeWindow to the optimization batch

This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new f6888f7  [SPARK-20636] Add the rule TransposeWindow to the optimization batch
f6888f7 is described below

commit f6888f7c944daff3d7c88b37e883673866eb148e
Author: gatorsmile <ga...@gmail.com>
AuthorDate: Mon Dec 17 00:13:51 2018 -0800

    [SPARK-20636] Add the rule TransposeWindow to the optimization batch
    
    ## What changes were proposed in this pull request?
    
    This PR is a follow-up of the PR https://github.com/apache/spark/pull/17899. It is to add the rule TransposeWindow the optimizer batch.
    
    ## How was this patch tested?
    The existing tests.
    
    Closes #23222 from gatorsmile/followupSPARK-20636.
    
    Authored-by: gatorsmile <ga...@gmail.com>
    Signed-off-by: gatorsmile <ga...@gmail.com>
---
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  1 +
 .../spark/sql/DataFrameWindowFunctionsSuite.scala  | 38 +++++++++++++++-------
 2 files changed, 27 insertions(+), 12 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index f615757..3eb6bca 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -73,6 +73,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
         CombineLimits,
         CombineUnions,
         // Constant folding and strength reduction
+        TransposeWindow,
         NullPropagation,
         ConstantPropagation,
         FoldablePropagation,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index 9a5d5a9..9277dc6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql
 import org.scalatest.Matchers.the
 
 import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled}
+import org.apache.spark.sql.catalyst.optimizer.TransposeWindow
+import org.apache.spark.sql.execution.exchange.Exchange
 import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction, Window}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
@@ -668,18 +670,30 @@ class DataFrameWindowFunctionsSuite extends QueryTest with SharedSQLContext {
       ("S2", "P2", 300)
     ).toDF("sno", "pno", "qty")
 
-    val w1 = Window.partitionBy("sno")
-    val w2 = Window.partitionBy("sno", "pno")
-
-    checkAnswer(
-      df.select($"sno", $"pno", $"qty", sum($"qty").over(w2).alias("sum_qty_2"))
-        .select($"sno", $"pno", $"qty", col("sum_qty_2"), sum("qty").over(w1).alias("sum_qty_1")),
-      Seq(
-        Row("S1", "P1", 100, 800, 800),
-        Row("S1", "P1", 700, 800, 800),
-        Row("S2", "P1", 200, 200, 500),
-        Row("S2", "P2", 300, 300, 500)))
-
+    Seq(true, false).foreach { transposeWindowEnabled =>
+      val excludedRules = if (transposeWindowEnabled) "" else TransposeWindow.ruleName
+      withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> excludedRules) {
+        val w1 = Window.partitionBy("sno")
+        val w2 = Window.partitionBy("sno", "pno")
+
+        val select = df.select($"sno", $"pno", $"qty", sum($"qty").over(w2).alias("sum_qty_2"))
+          .select($"sno", $"pno", $"qty", col("sum_qty_2"), sum("qty").over(w1).alias("sum_qty_1"))
+
+        val expectedNumExchanges = if (transposeWindowEnabled) 1 else 2
+        val actualNumExchanges = select.queryExecution.executedPlan.collect {
+          case e: Exchange => e
+        }.length
+        assert(actualNumExchanges == expectedNumExchanges)
+
+        checkAnswer(
+          select,
+          Seq(
+            Row("S1", "P1", 100, 800, 800),
+            Row("S1", "P1", 700, 800, 800),
+            Row("S2", "P1", 200, 200, 500),
+            Row("S2", "P2", 300, 300, 500)))
+      }
+    }
   }
 
   test("NaN and -0.0 in window partition keys") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org