You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/12/17 08:14:02 UTC
[spark] branch master updated: [SPARK-20636] Add the rule
TransposeWindow to the optimization batch
This is an automated email from the ASF dual-hosted git repository.
lixiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f6888f7 [SPARK-20636] Add the rule TransposeWindow to the optimization batch
f6888f7 is described below
commit f6888f7c944daff3d7c88b37e883673866eb148e
Author: gatorsmile <ga...@gmail.com>
AuthorDate: Mon Dec 17 00:13:51 2018 -0800
[SPARK-20636] Add the rule TransposeWindow to the optimization batch
## What changes were proposed in this pull request?
This PR is a follow-up of the PR https://github.com/apache/spark/pull/17899. It is to add the rule TransposeWindow the optimizer batch.
## How was this patch tested?
The existing tests.
Closes #23222 from gatorsmile/followupSPARK-20636.
Authored-by: gatorsmile <ga...@gmail.com>
Signed-off-by: gatorsmile <ga...@gmail.com>
---
.../spark/sql/catalyst/optimizer/Optimizer.scala | 1 +
.../spark/sql/DataFrameWindowFunctionsSuite.scala | 38 +++++++++++++++-------
2 files changed, 27 insertions(+), 12 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index f615757..3eb6bca 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -73,6 +73,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
CombineLimits,
CombineUnions,
// Constant folding and strength reduction
+ TransposeWindow,
NullPropagation,
ConstantPropagation,
FoldablePropagation,
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
index 9a5d5a9..9277dc6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameWindowFunctionsSuite.scala
@@ -20,6 +20,8 @@ package org.apache.spark.sql
import org.scalatest.Matchers.the
import org.apache.spark.TestUtils.{assertNotSpilled, assertSpilled}
+import org.apache.spark.sql.catalyst.optimizer.TransposeWindow
+import org.apache.spark.sql.execution.exchange.Exchange
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction, Window}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
@@ -668,18 +670,30 @@ class DataFrameWindowFunctionsSuite extends QueryTest with SharedSQLContext {
("S2", "P2", 300)
).toDF("sno", "pno", "qty")
- val w1 = Window.partitionBy("sno")
- val w2 = Window.partitionBy("sno", "pno")
-
- checkAnswer(
- df.select($"sno", $"pno", $"qty", sum($"qty").over(w2).alias("sum_qty_2"))
- .select($"sno", $"pno", $"qty", col("sum_qty_2"), sum("qty").over(w1).alias("sum_qty_1")),
- Seq(
- Row("S1", "P1", 100, 800, 800),
- Row("S1", "P1", 700, 800, 800),
- Row("S2", "P1", 200, 200, 500),
- Row("S2", "P2", 300, 300, 500)))
-
+ Seq(true, false).foreach { transposeWindowEnabled =>
+ val excludedRules = if (transposeWindowEnabled) "" else TransposeWindow.ruleName
+ withSQLConf(SQLConf.OPTIMIZER_EXCLUDED_RULES.key -> excludedRules) {
+ val w1 = Window.partitionBy("sno")
+ val w2 = Window.partitionBy("sno", "pno")
+
+ val select = df.select($"sno", $"pno", $"qty", sum($"qty").over(w2).alias("sum_qty_2"))
+ .select($"sno", $"pno", $"qty", col("sum_qty_2"), sum("qty").over(w1).alias("sum_qty_1"))
+
+ val expectedNumExchanges = if (transposeWindowEnabled) 1 else 2
+ val actualNumExchanges = select.queryExecution.executedPlan.collect {
+ case e: Exchange => e
+ }.length
+ assert(actualNumExchanges == expectedNumExchanges)
+
+ checkAnswer(
+ select,
+ Seq(
+ Row("S1", "P1", 100, 800, 800),
+ Row("S1", "P1", 700, 800, 800),
+ Row("S2", "P1", 200, 200, 500),
+ Row("S2", "P2", 300, 300, 500)))
+ }
+ }
}
test("NaN and -0.0 in window partition keys") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org