You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2017/01/20 22:03:23 UTC
spark git commit: [SPARK-19314][SS][CATALYST] Do not allow sort
before aggregation in Structured Streaming plan
Repository: spark
Updated Branches:
refs/heads/master e20d9b156 -> 552e5f088
[SPARK-19314][SS][CATALYST] Do not allow sort before aggregation in Structured Streaming plan
## What changes were proposed in this pull request?
Sort in a streaming plan should be allowed only after a aggregation in complete mode. Currently it is incorrectly allowed when present anywhere in the plan. It gives unpredictable potentially incorrect results.
## How was this patch tested?
New test
Author: Tathagata Das <ta...@gmail.com>
Closes #16662 from tdas/SPARK-19314.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/552e5f08
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/552e5f08
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/552e5f08
Branch: refs/heads/master
Commit: 552e5f08841828e55f5924f1686825626da8bcd0
Parents: e20d9b1
Author: Tathagata Das <ta...@gmail.com>
Authored: Fri Jan 20 14:04:51 2017 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Fri Jan 20 14:04:51 2017 -0800
----------------------------------------------------------------------
.../sql/catalyst/analysis/UnsupportedOperationChecker.scala | 2 +-
.../sql/catalyst/analysis/UnsupportedOperationsSuite.scala | 9 +++++++--
2 files changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/552e5f08/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index c2666b2..f4d016c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -87,7 +87,7 @@ object UnsupportedOperationChecker {
* data.
*/
def containsCompleteData(subplan: LogicalPlan): Boolean = {
- val aggs = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a }
+ val aggs = subplan.collect { case a@Aggregate(_, _, _) if a.isStreaming => a }
// Either the subplan has no streaming source, or it has aggregation with Complete mode
!subplan.isStreaming || (aggs.nonEmpty && outputMode == InternalOutputModes.Complete)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/552e5f08/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index 58e69f9..dcdb1ae 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -199,13 +199,18 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
_.intersect(_),
streamStreamSupported = false)
- // Sort: supported only on batch subplans and on aggregation + complete output mode
+ // Sort: supported only on batch subplans and after aggregation on streaming plan + complete mode
testUnaryOperatorInStreamingPlan("sort", Sort(Nil, true, _))
assertSupportedInStreamingPlan(
- "sort - sort over aggregated data in Complete output mode",
+ "sort - sort after aggregation in Complete output mode",
streamRelation.groupBy()(Count("*")).sortBy(),
Complete)
assertNotSupportedInStreamingPlan(
+ "sort - sort before aggregation in Complete output mode",
+ streamRelation.sortBy().groupBy()(Count("*")),
+ Complete,
+ Seq("sort", "aggregat", "complete"))
+ assertNotSupportedInStreamingPlan(
"sort - sort over aggregated data in Update output mode",
streamRelation.groupBy()(Count("*")).sortBy(),
Update,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org