You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2016/12/15 19:54:39 UTC

spark git commit: [SPARK-18870] Disallowed Distinct Aggregations on Streaming Datasets

Repository: spark
Updated Branches:
  refs/heads/master 01e14bf30 -> 4f7292c87


[SPARK-18870] Disallowed Distinct Aggregations on Streaming Datasets

## What changes were proposed in this pull request?

Check whether Aggregation operators on a streaming subplan have aggregate expressions with isDistinct = true.

## How was this patch tested?

Added unit test

Author: Tathagata Das <ta...@gmail.com>

Closes #16289 from tdas/SPARK-18870.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f7292c8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f7292c8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f7292c8

Branch: refs/heads/master
Commit: 4f7292c87512a7da3542998d0e5aa21c27a511e9
Parents: 01e14bf
Author: Tathagata Das <ta...@gmail.com>
Authored: Thu Dec 15 11:54:35 2016 -0800
Committer: Tathagata Das <ta...@gmail.com>
Committed: Thu Dec 15 11:54:35 2016 -0800

----------------------------------------------------------------------
 .../analysis/UnsupportedOperationChecker.scala       | 15 +++++++++++++--
 .../analysis/UnsupportedOperationsSuite.scala        | 13 +++++++++++++
 2 files changed, 26 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4f7292c8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index c054fcb..c4a78f9 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.{AnalysisException, InternalOutputModes}
 import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.streaming.OutputMode
@@ -95,6 +96,16 @@ object UnsupportedOperationChecker {
       // Operations that cannot exists anywhere in a streaming plan
       subPlan match {
 
+        case Aggregate(_, aggregateExpressions, child) =>
+          val distinctAggExprs = aggregateExpressions.flatMap { expr =>
+            expr.collect { case ae: AggregateExpression if ae.isDistinct => ae }
+          }
+          throwErrorIf(
+            child.isStreaming && distinctAggExprs.nonEmpty,
+            "Distinct aggregations are not supported on streaming DataFrames/Datasets, unless " +
+              "it is on aggregated DataFrame/Dataset in Complete output mode. Consider using " +
+              "approximate distinct aggregation (e.g. approx_count_distinct() instead of count()).")
+
         case _: Command =>
           throwError("Commands like CreateTable*, AlterTable*, Show* are not supported with " +
             "streaming DataFrames/Datasets")
@@ -143,7 +154,7 @@ object UnsupportedOperationChecker {
           throwError("Union between streaming and batch DataFrames/Datasets is not supported")
 
         case Except(left, right) if right.isStreaming =>
-          throwError("Except with a streaming DataFrame/Dataset on the right is not supported")
+          throwError("Except on a streaming DataFrame/Dataset on the right is not supported")
 
         case Intersect(left, right) if left.isStreaming && right.isStreaming =>
           throwError("Intersect between two streaming DataFrames/Datasets is not supported")
@@ -156,7 +167,7 @@ object UnsupportedOperationChecker {
 
         case Sort(_, _, _) | SortPartitions(_, _) if !containsCompleteData(subPlan) =>
           throwError("Sorting is not supported on streaming DataFrames/Datasets, unless it is on" +
-            "aggregated DataFrame/Dataset in Complete mode")
+            "aggregated DataFrame/Dataset in Complete output mode")
 
         case Sample(_, _, _, _, child) if child.isStreaming =>
           throwError("Sampling is not supported on streaming DataFrames/Datasets")

http://git-wip-us.apache.org/repos/asf/spark/blob/4f7292c8/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index ff1bb12..34e94c7 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -98,6 +98,19 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     outputMode = Update,
     expectedMsgs = Seq("multiple streaming aggregations"))
 
+  // Aggregation: Distinct aggregates not supported on streaming relation
+  val distinctAggExprs = Seq(Count("*").toAggregateExpression(isDistinct = true).as("c"))
+  assertSupportedInStreamingPlan(
+    "distinct aggregate - aggregate on batch relation",
+    Aggregate(Nil, distinctAggExprs, batchRelation),
+    outputMode = Append)
+
+  assertNotSupportedInStreamingPlan(
+    "distinct aggregate - aggregate on streaming relation",
+    Aggregate(Nil, distinctAggExprs, streamRelation),
+    outputMode = Complete,
+    expectedMsgs = Seq("distinct aggregation"))
+
   // Inner joins: Stream-stream not supported
   testBinaryOperationInStreamingPlan(
     "inner join",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org