You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jt...@apache.org on 2019/07/02 16:59:31 UTC
[spark] branch master updated: [SPARK-28223][SS] stream-stream
joins should fail unsupported checker in update mode
This is an automated email from the ASF dual-hosted git repository.
jtorres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4ebff5b [SPARK-28223][SS] stream-stream joins should fail unsupported checker in update mode
4ebff5b is described below
commit 4ebff5b6d68f26cc1ff9265a5489e0d7c2e05449
Author: Jose Torres <to...@gmail.com>
AuthorDate: Tue Jul 2 09:59:11 2019 -0700
[SPARK-28223][SS] stream-stream joins should fail unsupported checker in update mode
## What changes were proposed in this pull request?
Right now they fail only for inner joins, because we implemented the check when that was the only supported type.
## How was this patch tested?
new unit test
Closes #25023 from jose-torres/changevalidation.
Authored-by: Jose Torres <to...@gmail.com>
Signed-off-by: Jose Torres <to...@gmail.com>
---
.../catalyst/analysis/UnsupportedOperationChecker.scala | 11 +++++------
.../catalyst/analysis/UnsupportedOperationsSuite.scala | 16 +++++++++++++++-
2 files changed, 20 insertions(+), 7 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index 41ba6d3..288ff1a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -230,15 +230,14 @@ object UnsupportedOperationChecker {
"streaming DataFrame/Dataset")
case Join(left, right, joinType, condition, _) =>
+ if (left.isStreaming && right.isStreaming && outputMode != InternalOutputModes.Append) {
+ throwError("Join between two streaming DataFrames/Datasets is not supported" +
+ s" in ${outputMode} output mode, only in Append output mode")
+ }
joinType match {
-
case _: InnerLike =>
- if (left.isStreaming && right.isStreaming &&
- outputMode != InternalOutputModes.Append) {
- throwError("Inner join between two streaming DataFrames/Datasets is not supported" +
- s" in ${outputMode} output mode, only in Append output mode")
- }
+ // no further validations needed
case FullOuter =>
if (left.isStreaming || right.isStreaming) {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index 28a164b..0fe646e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -404,7 +404,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
_.join(_, joinType = Inner),
outputMode = Update,
streamStreamSupported = false,
- expectedMsg = "inner join")
+ expectedMsg = "is not supported in Update output mode")
// Full outer joins: only batch-batch is allowed
testBinaryOperationInStreamingPlan(
@@ -422,6 +422,20 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
streamStreamSupported = false,
expectedMsg = "outer join")
+ // Left outer joins: update and complete mode not allowed
+ assertNotSupportedInStreamingPlan(
+ s"left outer join with stream-stream relations and update mode",
+ streamRelation.join(streamRelation, joinType = LeftOuter,
+ condition = Some(attribute === attribute)),
+ OutputMode.Update(),
+ Seq("is not supported in Update output mode"))
+ assertNotSupportedInStreamingPlan(
+ s"left outer join with stream-stream relations and complete mode",
+ Aggregate(Nil, aggExprs("d"), streamRelation.join(streamRelation, joinType = LeftOuter,
+ condition = Some(attribute === attribute))),
+ OutputMode.Complete(),
+ Seq("is not supported in Complete output mode"))
+
// Left outer joins: stream-stream allowed with join on watermark attribute
// Note that the attribute need not be watermarked on both sides.
assertSupportedInStreamingPlan(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org