You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/10/21 11:12:18 UTC

[GitHub] [spark] xuanyuanking commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

xuanyuanking commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r509130528



##########
File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
##########
@@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     _.join(_, joinType = LeftSemi),
     streamStreamSupported = false,
     batchStreamSupported = false,
-    expectedMsg = "left semi/anti joins")
+    expectedMsg = "LeftSemi join")
+
+  // Left semi joins: update and complete mode not allowed
+  assertNotSupportedInStreamingPlan(
+    "left semi join with stream-stream relations and update mode",
+    streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attribute)),
+    OutputMode.Update(),
+    Seq("is not supported in Update output mode"))
+  assertNotSupportedInStreamingPlan(
+    "left semi join with stream-stream relations and complete mode",
+    Aggregate(Nil, aggExprs("d"), streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attribute))),
+    OutputMode.Complete(),
+    Seq("is not supported in Complete output mode"))
+
+  // Left semi joins: stream-stream allowed with join on watermark attribute
+  // Note that the attribute need not be watermarked on both sides.
+  assertSupportedInStreamingPlan(
+    "left semi join with stream-stream relations and join on attribute with left watermark",
+    streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attributeWithWatermark === attribute)),
+    OutputMode.Append())
+  assertSupportedInStreamingPlan(
+    "left semi join with stream-stream relations and join on attribute with right watermark",
+    streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attributeWithWatermark)),
+    OutputMode.Append())
+  assertNotSupportedInStreamingPlan(
+    "left semi join with stream-stream relations and join on non-watermarked attribute",
+    streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attribute)),
+    OutputMode.Append(),
+    Seq("watermark in the join keys"))

Review comment:
       Personally, I prefer to change the match string to `without a watermark in the join keys`. I saw this is the same as the original tests, maybe we can change them together in this PR.

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
##########
@@ -291,17 +291,17 @@ object UnsupportedOperationChecker extends Logging {
                 throwError("Full outer joins with streaming DataFrames/Datasets are not supported")
               }
 
-            case LeftSemi | LeftAnti =>
+            case LeftAnti =>
               if (right.isStreaming) {
-                throwError("Left semi/anti joins with a streaming DataFrame/Dataset " +
+                throwError("Left anti joins with a streaming DataFrame/Dataset " +
                     "on the right are not supported")
               }
 
             // We support streaming left outer joins with static on the right always, and with

Review comment:
       nit: Also change this comment?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -99,13 +99,20 @@ class SymmetricHashJoinStateManager(
   /**
    * Get all the matched values for given join condition, with marking matched.
    * This method is designed to mark joined rows properly without exposing internal index of row.
+   *
+   * @param excludeRowsAlreadyMatched Do not join with rows already matched previously.
+   *                                  This is used for right side of left semi join in
+   *                                  [[StreamingSymmetricHashJoinExec]] only.
    */
   def getJoinedRows(
       key: UnsafeRow,
       generateJoinedRow: InternalRow => JoinedRow,
-      predicate: JoinedRow => Boolean): Iterator[JoinedRow] = {
+      predicate: JoinedRow => Boolean,
+      excludeRowsAlreadyMatched: Boolean = false): Iterator[JoinedRow] = {
     val numValues = keyToNumValues.get(key)
-    keyWithIndexToValue.getAll(key, numValues).map { keyIdxToValue =>
+    keyWithIndexToValue.getAll(key, numValues).filterNot { keyIdxToValue =>

Review comment:
       It makes more sense to add this filter logic in the `predicate` param(i.e `postJoinFilter` for OneSideHashJoiner) for rightSideJoiner only, corresponding to the comment https://github.com/apache/spark/pull/30076/files#diff-6cd66da710d8d54025c1edf658bbec5230e8b4e748f9f2f884a60b1ba1efed42R264




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org