You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2017/01/20 22:03:23 UTC

spark git commit: [SPARK-19314][SS][CATALYST] Do not allow sort before aggregation in Structured Streaming plan

Repository: spark
Updated Branches:
  refs/heads/master e20d9b156 -> 552e5f088


[SPARK-19314][SS][CATALYST] Do not allow sort before aggregation in Structured Streaming plan

## What changes were proposed in this pull request?

Sort in a streaming plan should be allowed only after a aggregation in complete mode. Currently it is incorrectly allowed when present anywhere in the plan. It gives unpredictable potentially incorrect results.

## How was this patch tested?
New test

Author: Tathagata Das <ta...@gmail.com>

Closes #16662 from tdas/SPARK-19314.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/552e5f08
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/552e5f08
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/552e5f08

Branch: refs/heads/master
Commit: 552e5f08841828e55f5924f1686825626da8bcd0
Parents: e20d9b1
Author: Tathagata Das <ta...@gmail.com>
Authored: Fri Jan 20 14:04:51 2017 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri Jan 20 14:04:51 2017 -0800

----------------------------------------------------------------------
 .../sql/catalyst/analysis/UnsupportedOperationChecker.scala | 2 +-
 .../sql/catalyst/analysis/UnsupportedOperationsSuite.scala  | 9 +++++++--
 2 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/552e5f08/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index c2666b2..f4d016c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -87,7 +87,7 @@ object UnsupportedOperationChecker {
      * data.
      */
     def containsCompleteData(subplan: LogicalPlan): Boolean = {
-      val aggs = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a }
+      val aggs = subplan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a }
       // Either the subplan has no streaming source, or it has aggregation with Complete mode
       !subplan.isStreaming || (aggs.nonEmpty && outputMode == InternalOutputModes.Complete)
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/552e5f08/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index 58e69f9..dcdb1ae 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -199,13 +199,18 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     _.intersect(_),
     streamStreamSupported = false)
 
-  // Sort: supported only on batch subplans and on aggregation + complete output mode
+  // Sort: supported only on batch subplans and after aggregation on streaming plan + complete mode
   testUnaryOperatorInStreamingPlan("sort", Sort(Nil, true, _))
   assertSupportedInStreamingPlan(
-    "sort - sort over aggregated data in Complete output mode",
+    "sort - sort after aggregation in Complete output mode",
     streamRelation.groupBy()(Count("*")).sortBy(),
     Complete)
   assertNotSupportedInStreamingPlan(
+    "sort - sort before aggregation in Complete output mode",
+    streamRelation.sortBy().groupBy()(Count("*")),
+    Complete,
+    Seq("sort", "aggregat", "complete"))
+  assertNotSupportedInStreamingPlan(
     "sort - sort over aggregated data in Update output mode",
     streamRelation.groupBy()(Count("*")).sortBy(),
     Update,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org