You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/10/06 20:10:06 UTC

spark git commit: [SPARK-21947][SS] Check and report error when monotonically_increasing_id is used in streaming query

Repository: spark
Updated Branches:
  refs/heads/master 08b204fd2 -> debcbec74


[SPARK-21947][SS] Check and report error when monotonically_increasing_id is used in streaming query

## What changes were proposed in this pull request?

`monotonically_increasing_id` doesn't work in Structured Streaming. We should throw an exception if a streaming query uses it.

## How was this patch tested?

Added test.

Author: Liang-Chi Hsieh <vi...@gmail.com>

Closes #19336 from viirya/SPARK-21947.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/debcbec7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/debcbec7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/debcbec7

Branch: refs/heads/master
Commit: debcbec7491d3a23b19ef149e50d2887590b6de0
Parents: 08b204f
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Fri Oct 6 13:10:04 2017 -0700
Committer: Shixiong Zhu <zs...@gmail.com>
Committed: Fri Oct 6 13:10:04 2017 -0700

----------------------------------------------------------------------
 .../analysis/UnsupportedOperationChecker.scala       | 15 ++++++++++++++-
 .../analysis/UnsupportedOperationsSuite.scala        | 10 +++++++++-
 2 files changed, 23 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/debcbec7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index dee6fbe..04502d0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.catalyst.analysis
 
 import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, MonotonicallyIncreasingID}
 import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
 import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
 import org.apache.spark.sql.catalyst.plans._
@@ -129,6 +129,16 @@ object UnsupportedOperationChecker {
       !subplan.isStreaming || (aggs.nonEmpty && outputMode == InternalOutputModes.Complete)
     }
 
+    def checkUnsupportedExpressions(implicit operator: LogicalPlan): Unit = {
+      val unsupportedExprs = operator.expressions.flatMap(_.collect {
+        case m: MonotonicallyIncreasingID => m
+      }).distinct
+      if (unsupportedExprs.nonEmpty) {
+        throwError("Expression(s): " + unsupportedExprs.map(_.sql).mkString(", ") +
+          " is not supported with streaming DataFrames/Datasets")
+      }
+    }
+
     plan.foreachUp { implicit subPlan =>
 
       // Operations that cannot exists anywhere in a streaming plan
@@ -323,6 +333,9 @@ object UnsupportedOperationChecker {
 
         case _ =>
       }
+
+      // Check if there are unsupported expressions in streaming query plan.
+      checkUnsupportedExpressions(subPlan)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/debcbec7/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index e5057c4..60d1351 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
 import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, MonotonicallyIncreasingID, NamedExpression}
 import org.apache.spark.sql.catalyst.expressions.aggregate.Count
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.{FlatMapGroupsWithState, _}
@@ -614,6 +614,14 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
   testOutputMode(Update, shouldSupportAggregation = true, shouldSupportNonAggregation = true)
   testOutputMode(Complete, shouldSupportAggregation = true, shouldSupportNonAggregation = false)
 
+  // Unsupported expressions in streaming plan
+  assertNotSupportedInStreamingPlan(
+    "MonotonicallyIncreasingID",
+    streamRelation.select(MonotonicallyIncreasingID()),
+    outputMode = Append,
+    expectedMsgs = Seq("monotonically_increasing_id"))
+
+
   /*
     =======================================================================================
                                      TESTING FUNCTIONS


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org