You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/10/19 19:39:17 UTC

[GitHub] [spark] viirya commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

viirya commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r507995582



##########
File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
##########
@@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     _.join(_, joinType = LeftSemi),
     streamStreamSupported = false,
     batchStreamSupported = false,
-    expectedMsg = "left semi/anti joins")
+    expectedMsg = "LeftSemi join")
+
+  // Left semi joins: update and complete mode not allowed
+  assertNotSupportedInStreamingPlan(
+    s"left semi join with stream-stream relations and update mode",

Review comment:
       nit: don't need s"".

##########
File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
##########
@@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     _.join(_, joinType = LeftSemi),
     streamStreamSupported = false,
     batchStreamSupported = false,
-    expectedMsg = "left semi/anti joins")
+    expectedMsg = "LeftSemi join")
+
+  // Left semi joins: update and complete mode not allowed
+  assertNotSupportedInStreamingPlan(
+    s"left semi join with stream-stream relations and update mode",
+    streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attribute)),
+    OutputMode.Update(),
+    Seq("is not supported in Update output mode"))
+  assertNotSupportedInStreamingPlan(
+    s"left semi join with stream-stream relations and complete mode",

Review comment:
       ditto. There are also couples similar ditto below. I don't comment one by one.
   

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -99,13 +99,20 @@ class SymmetricHashJoinStateManager(
   /**
    * Get all the matched values for given join condition, with marking matched.
    * This method is designed to mark joined rows properly without exposing internal index of row.
+   *
+   * @param joinOnlyFirstTimeMatchedRow Only join with first-time matched row.
+   *                                    This is used for right side of left semi join in
+   *                                    [[StreamingSymmetricHashJoinExec]] only.
    */
   def getJoinedRows(
       key: UnsafeRow,
       generateJoinedRow: InternalRow => JoinedRow,
-      predicate: JoinedRow => Boolean): Iterator[JoinedRow] = {
+      predicate: JoinedRow => Boolean,
+      joinOnlyFirstTimeMatchedRow: Boolean = false): Iterator[JoinedRow] = {
     val numValues = keyToNumValues.get(key)
-    keyWithIndexToValue.getAll(key, numValues).map { keyIdxToValue =>
+    keyWithIndexToValue.getAll(key, numValues).filterNot { keyIdxToValue =>
+      joinOnlyFirstTimeMatchedRow && keyIdxToValue.matched
+    }.map { keyIdxToValue =>

Review comment:
       I feel it is easier to read if:
   
   ```scala
   val keyIdxToValues = if (joinOnlyFirstTimeMatchedRow) {
     keyWithIndexToValue.getAll(key, numValues).filter { keyIdxToValue =>
       !keyIdxToValue.matched
     }
   } else {
     keyWithIndexToValue.getAll(key, numValues) 
   }
   
   keyIdxToValues.map { keyIdxToValue =>
     ...
   }
   ```

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -347,22 +360,28 @@ case class StreamingSymmetricHashJoinExec(
 
       // Processing time between inner output completion and here comes from the outer portion of a
       // join, and thus counts as removal time as we remove old state from one side while iterating.
-      if (innerOutputCompletionTimeNs != 0) {
+      if (hashJoinOutputCompletionTimeNs != 0) {
         allRemovalsTimeMs +=
-          math.max(NANOSECONDS.toMillis(System.nanoTime - innerOutputCompletionTimeNs), 0)
+          math.max(NANOSECONDS.toMillis(System.nanoTime - hashJoinOutputCompletionTimeNs), 0)
       }
 
       allRemovalsTimeMs += timeTakenMs {
         // Remove any remaining state rows which aren't needed because they're below the watermark.
         //
-        // For inner joins, we have to remove unnecessary state rows from both sides if possible.
+        // For inner and left semi joins, we have to remove unnecessary state rows from both sides
+        // if possible.
+        //
         // For outer joins, we have already removed unnecessary state rows from the outer side
         // (e.g., left side for left outer join) while generating the outer "null" outputs. Now, we
         // have to remove unnecessary state rows from the other side (e.g., right side for the left
         // outer join) if possible. In all cases, nothing needs to be outputted, hence the removal
         // needs to be done greedily by immediately consuming the returned iterator.
+        //
+        // For left semi joins, we have to remove unnecessary state rows from both sides if
+        // possible.

Review comment:
       Is this duplicated with L371?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -246,37 +249,43 @@ case class StreamingSymmetricHashJoinExec(
 
     //  Join one side input using the other side's buffered/state rows. Here is how it is done.
     //
-    //  - `leftSideJoiner.storeAndJoinWithOtherSide(rightSideJoiner)` generates all rows from
-    //    matching new left input with stored right input, and also stores all the left input
+    //  - `leftSideJoiner.storeAndJoinWithOtherSide(rightSideJoiner)`
+    //    - inner, left outer, right outer join: generates all rows from matching new left input

Review comment:
       `Inner, Left Outer, Right Outer Join: ...`
   
   It is better for readability.
   
   ditto for below.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -347,22 +360,28 @@ case class StreamingSymmetricHashJoinExec(
 
       // Processing time between inner output completion and here comes from the outer portion of a

Review comment:
       update `inner output completion`?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -99,13 +99,20 @@ class SymmetricHashJoinStateManager(
   /**
    * Get all the matched values for given join condition, with marking matched.
    * This method is designed to mark joined rows properly without exposing internal index of row.
+   *
+   * @param joinOnlyFirstTimeMatchedRow Only join with first-time matched row.

Review comment:
       `skipMatchedRowsInPreviousBatches`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org