You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by hv...@apache.org on 2016/11/28 10:56:33 UTC

spark git commit: [SPARK-18604][SQL] Make sure CollapseWindow returns the attributes in the same order.

Repository: spark
Updated Branches:
  refs/heads/master 87141622e -> 454b80499


[SPARK-18604][SQL] Make sure CollapseWindow returns the attributes in the same order.

## What changes were proposed in this pull request?
The `CollapseWindow` optimizer rule changes the order of output attributes. This modifies the output of the plan, which the optimizer cannot do. This also breaks things like `collect()` for which we use a `RowEncoder` that assumes that the output attributes of the executed plan are equal to those outputted by the logical plan.

## How was this patch tested?
I have updated an incorrect test in `CollapseWindowSuite`.

Author: Herman van Hovell <hv...@databricks.com>

Closes #16027 from hvanhovell/SPARK-18604.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/454b8049
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/454b8049
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/454b8049

Branch: refs/heads/master
Commit: 454b8049916a0353772a0ea5cfe14b62cbd81df4
Parents: 8714162
Author: Herman van Hovell <hv...@databricks.com>
Authored: Mon Nov 28 02:56:26 2016 -0800
Committer: Herman van Hovell <hv...@databricks.com>
Committed: Mon Nov 28 02:56:26 2016 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/optimizer/Optimizer.scala       |  2 +-
 .../sql/catalyst/optimizer/CollapseWindowSuite.scala   | 13 ++++++++-----
 2 files changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/454b8049/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 6ba8b33..2679e02 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -545,7 +545,7 @@ object CollapseRepartition extends Rule[LogicalPlan] {
 object CollapseWindow extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
     case w @ Window(we1, ps1, os1, Window(we2, ps2, os2, grandChild)) if ps1 == ps2 && os1 == os2 =>
-      w.copy(windowExpressions = we1 ++ we2, child = grandChild)
+      w.copy(windowExpressions = we2 ++ we1, child = grandChild)
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/454b8049/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
index 797076e..3f7d1d9 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/CollapseWindowSuite.scala
@@ -46,12 +46,15 @@ class CollapseWindowSuite extends PlanTest {
       .window(Seq(sum(b).as('sum_b)), partitionSpec1, orderSpec1)
       .window(Seq(avg(b).as('avg_b)), partitionSpec1, orderSpec1)
 
-    val optimized = Optimize.execute(query.analyze)
+    val analyzed = query.analyze
+    val optimized = Optimize.execute(analyzed)
+    assert(analyzed.output === optimized.output)
+
     val correctAnswer = testRelation.window(Seq(
-        avg(b).as('avg_b),
-        sum(b).as('sum_b),
-        max(a).as('max_a),
-        min(a).as('min_a)), partitionSpec1, orderSpec1)
+      min(a).as('min_a),
+      max(a).as('max_a),
+      sum(b).as('sum_b),
+      avg(b).as('avg_b)), partitionSpec1, orderSpec1)
 
     comparePlans(optimized, correctAnswer)
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org