You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2017/10/06 20:10:06 UTC
spark git commit: [SPARK-21947][SS] Check and report error when
monotonically_increasing_id is used in streaming query
Repository: spark
Updated Branches:
refs/heads/master 08b204fd2 -> debcbec74
[SPARK-21947][SS] Check and report error when monotonically_increasing_id is used in streaming query
## What changes were proposed in this pull request?
`monotonically_increasing_id` doesn't work in Structured Streaming. We should throw an exception if a streaming query uses it.
## How was this patch tested?
Added test.
Author: Liang-Chi Hsieh <vi...@gmail.com>
Closes #19336 from viirya/SPARK-21947.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/debcbec7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/debcbec7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/debcbec7
Branch: refs/heads/master
Commit: debcbec7491d3a23b19ef149e50d2887590b6de0
Parents: 08b204f
Author: Liang-Chi Hsieh <vi...@gmail.com>
Authored: Fri Oct 6 13:10:04 2017 -0700
Committer: Shixiong Zhu <zs...@gmail.com>
Committed: Fri Oct 6 13:10:04 2017 -0700
----------------------------------------------------------------------
.../analysis/UnsupportedOperationChecker.scala | 15 ++++++++++++++-
.../analysis/UnsupportedOperationsSuite.scala | 10 +++++++++-
2 files changed, 23 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/debcbec7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index dee6fbe..04502d0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -18,7 +18,7 @@
package org.apache.spark.sql.catalyst.analysis
import org.apache.spark.sql.AnalysisException
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, AttributeSet, MonotonicallyIncreasingID}
import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys
import org.apache.spark.sql.catalyst.plans._
@@ -129,6 +129,16 @@ object UnsupportedOperationChecker {
!subplan.isStreaming || (aggs.nonEmpty && outputMode == InternalOutputModes.Complete)
}
+ def checkUnsupportedExpressions(implicit operator: LogicalPlan): Unit = {
+ val unsupportedExprs = operator.expressions.flatMap(_.collect {
+ case m: MonotonicallyIncreasingID => m
+ }).distinct
+ if (unsupportedExprs.nonEmpty) {
+ throwError("Expression(s): " + unsupportedExprs.map(_.sql).mkString(", ") +
+ " is not supported with streaming DataFrames/Datasets")
+ }
+ }
+
plan.foreachUp { implicit subPlan =>
// Operations that cannot exists anywhere in a streaming plan
@@ -323,6 +333,9 @@ object UnsupportedOperationChecker {
case _ =>
}
+
+ // Check if there are unsupported expressions in streaming query plan.
+ checkUnsupportedExpressions(subPlan)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/debcbec7/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index e5057c4..60d1351 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
-import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, NamedExpression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference, MonotonicallyIncreasingID, NamedExpression}
import org.apache.spark.sql.catalyst.expressions.aggregate.Count
import org.apache.spark.sql.catalyst.plans._
import org.apache.spark.sql.catalyst.plans.logical.{FlatMapGroupsWithState, _}
@@ -614,6 +614,14 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
testOutputMode(Update, shouldSupportAggregation = true, shouldSupportNonAggregation = true)
testOutputMode(Complete, shouldSupportAggregation = true, shouldSupportNonAggregation = false)
+ // Unsupported expressions in streaming plan
+ assertNotSupportedInStreamingPlan(
+ "MonotonicallyIncreasingID",
+ streamRelation.select(MonotonicallyIncreasingID()),
+ outputMode = Append,
+ expectedMsgs = Seq("monotonically_increasing_id"))
+
+
/*
=======================================================================================
TESTING FUNCTIONS
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org