You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jt...@apache.org on 2019/07/02 16:59:31 UTC

[spark] branch master updated: [SPARK-28223][SS] stream-stream joins should fail unsupported checker in update mode

This is an automated email from the ASF dual-hosted git repository.

jtorres pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ebff5b  [SPARK-28223][SS] stream-stream joins should fail unsupported checker in update mode
4ebff5b is described below

commit 4ebff5b6d68f26cc1ff9265a5489e0d7c2e05449
Author: Jose Torres <to...@gmail.com>
AuthorDate: Tue Jul 2 09:59:11 2019 -0700

    [SPARK-28223][SS] stream-stream joins should fail unsupported checker in update mode
    
    ## What changes were proposed in this pull request?
    
    Right now they fail only for inner joins, because we implemented the check when that was the only supported type.
    
    ## How was this patch tested?
    
    new unit test
    
    Closes #25023 from jose-torres/changevalidation.
    
    Authored-by: Jose Torres <to...@gmail.com>
    Signed-off-by: Jose Torres <to...@gmail.com>
---
 .../catalyst/analysis/UnsupportedOperationChecker.scala  | 11 +++++------
 .../catalyst/analysis/UnsupportedOperationsSuite.scala   | 16 +++++++++++++++-
 2 files changed, 20 insertions(+), 7 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index 41ba6d3..288ff1a 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -230,15 +230,14 @@ object UnsupportedOperationChecker {
             "streaming DataFrame/Dataset")
 
         case Join(left, right, joinType, condition, _) =>
+          if (left.isStreaming && right.isStreaming && outputMode != InternalOutputModes.Append) {
+            throwError("Join between two streaming DataFrames/Datasets is not supported" +
+              s" in ${outputMode} output mode, only in Append output mode")
+          }
 
           joinType match {
-
             case _: InnerLike =>
-              if (left.isStreaming && right.isStreaming &&
-                outputMode != InternalOutputModes.Append) {
-                throwError("Inner join between two streaming DataFrames/Datasets is not supported" +
-                  s" in ${outputMode} output mode, only in Append output mode")
-              }
+              // no further validations needed
 
             case FullOuter =>
               if (left.isStreaming || right.isStreaming) {
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index 28a164b..0fe646e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -404,7 +404,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     _.join(_, joinType = Inner),
     outputMode = Update,
     streamStreamSupported = false,
-    expectedMsg = "inner join")
+    expectedMsg = "is not supported in Update output mode")
 
   // Full outer joins: only batch-batch is allowed
   testBinaryOperationInStreamingPlan(
@@ -422,6 +422,20 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     streamStreamSupported = false,
     expectedMsg = "outer join")
 
+  // Left outer joins: update and complete mode not allowed
+  assertNotSupportedInStreamingPlan(
+    s"left outer join with stream-stream relations and update mode",
+    streamRelation.join(streamRelation, joinType = LeftOuter,
+      condition = Some(attribute === attribute)),
+    OutputMode.Update(),
+    Seq("is not supported in Update output mode"))
+  assertNotSupportedInStreamingPlan(
+    s"left outer join with stream-stream relations and complete mode",
+    Aggregate(Nil, aggExprs("d"), streamRelation.join(streamRelation, joinType = LeftOuter,
+      condition = Some(attribute === attribute))),
+    OutputMode.Complete(),
+    Seq("is not supported in Complete output mode"))
+
   // Left outer joins: stream-stream allowed with join on watermark attribute
   // Note that the attribute need not be watermarked on both sides.
   assertSupportedInStreamingPlan(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org