You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/10/17 02:17:39 UTC

[GitHub] [spark] c21 opened a new pull request #30076: [SPARK-32862][SQL] Left semi stream-stream join

c21 opened a new pull request #30076:
URL: https://github.com/apache/spark/pull/30076


   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   This is to support left semi join in stream-stream join. The implementation of left semi join is (mostly in `StreamingSymmetricHashJoinExec` and `SymmetricHashJoinStateManager`):
   * For left side input row, check if there's a match on right side state store.
     * if there's a match, output the left side row, but do not put the row in left side state store (no need to put in state store).
     * if there's no match, output nothing, but put the row in left side state store (with "matched" field to set to false in state store).
   * For right side input row, check if there's a match on left side state store.
     * For all matched left rows in state store, output the rows with "matched" field as false. Set all left rows with "matched" field to be true. Only output the left side rows matched for the first time to guarantee left semi join semantics.
   * State store eviction: evict rows from left/right side state store below watermark, same as inner join.
   
   Note a followup optimization can be to evict matched left side rows from state store earlier, even when the rows are still above watermark. However this needs more change in `SymmetricHashJoinStateManager`, so will leave this as a followup.
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   Current stream-stream join supports inner, left outer and right outer join (https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L166 ). We do see internally a lot of users are using left semi stream-stream join (not spark structured streaming), e.g. I want to get the ad impression (join left side) which has click (joint right side), but I don't care how many clicks per ad (left semi semantics).
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   No.
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   -->
   
   Added unit tests in `UnsupportedOperationChecker.scala` and `StreamingJoinSuite.scala`.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-716287855






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-713855893


   **[Test build #130106 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/130106/testReport)** for PR 30076 at commit [`9cd222f`](https://github.com/apache/spark/commit/9cd222f833cc47cd7880028f42fd13d8396e2414).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-713359180


   Kubernetes integration test status success
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34681/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r508012673



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -99,13 +99,20 @@ class SymmetricHashJoinStateManager(
   /**
    * Get all the matched values for given join condition, with marking matched.
    * This method is designed to mark joined rows properly without exposing internal index of row.
+   *
+   * @param joinOnlyFirstTimeMatchedRow Only join with first-time matched row.

Review comment:
       `skipMatchedRowsInPreviousRuns`? `first-time matched row` reads a bit confusing to me. Can you rephrase it like we only join with rows that are not matched previously?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-716297080


   Thanks, merging to master.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-710751015






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-712391500


   Kubernetes integration test status success
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34617/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-710751015






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-715465899


   @viirya - wondering any more comments? thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-713855893


   **[Test build #130106 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/130106/testReport)** for PR 30076 at commit [`9cd222f`](https://github.com/apache/spark/commit/9cd222f833cc47cd7880028f42fd13d8396e2414).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-716257756






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-716238353






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-710764179


   **[Test build #129934 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129934/testReport)** for PR 30076 at commit [`ee16690`](https://github.com/apache/spark/commit/ee166901bbaa3500a9fa8704f2a5598a8ce4b6d7).
    * This patch **fails due to an unknown error code, -9**.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-710748251


   Merged build finished. Test FAILed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-716297489


   Thanks for your contribution! Merged into master.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-712391522






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-710738844


   Kubernetes integration test status success
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34536/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r508269154



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -41,18 +41,174 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+abstract class StreamingJoinSuite
+  extends StreamTest with StateStoreMetricsTest with BeforeAndAfter {
 
-class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with BeforeAndAfter {
+  import testImplicits._
 
   before {
-    SparkSession.setActiveSessionInternal(spark)  // set this before force initializing 'joinExec'
-    spark.streams.stateStoreCoordinator   // initialize the lazy coordinator
+    SparkSession.setActiveSessionInternal(spark) // set this before force initializing 'joinExec'
+    spark.streams.stateStoreCoordinator // initialize the lazy coordinator
   }
 
   after {
     StateStore.stop()
   }
 
+  protected def setupStream(prefix: String, multiplier: Int): (MemoryStream[Int], DataFrame) = {
+    val input = MemoryStream[Int]
+    val df = input.toDF
+      .select(
+        'value as "key",
+        timestamp_seconds($"value")  as s"${prefix}Time",
+        ('value * multiplier) as s"${prefix}Value")
+      .withWatermark(s"${prefix}Time", "10 seconds")
+
+    (input, df)
+  }
+
+  protected def setupWindowedJoin(joinType: String)
+    : (MemoryStream[Int], MemoryStream[Int], DataFrame) = {
+
+    val (input1, df1) = setupStream("left", 2)
+    val (input2, df2) = setupStream("right", 3)
+    val windowed1 = df1.select('key, window('leftTime, "10 second"), 'leftValue)
+    val windowed2 = df2.select('key, window('rightTime, "10 second"), 'rightValue)
+    val joined = windowed1.join(windowed2, Seq("key", "window"), joinType)
+    val select = if (joinType == "left_semi") {

Review comment:
       For reviewers: this is equivalent to `StreamingOuterJoinSuite.setupWindowedJoin` with changing 
   1) signature `private` to `protected`
   2) conditional select on left_semi vs others, as in left_semi only left side of columns are available

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -41,18 +41,174 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+abstract class StreamingJoinSuite
+  extends StreamTest with StateStoreMetricsTest with BeforeAndAfter {
 
-class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with BeforeAndAfter {
+  import testImplicits._
 
   before {
-    SparkSession.setActiveSessionInternal(spark)  // set this before force initializing 'joinExec'
-    spark.streams.stateStoreCoordinator   // initialize the lazy coordinator
+    SparkSession.setActiveSessionInternal(spark) // set this before force initializing 'joinExec'
+    spark.streams.stateStoreCoordinator // initialize the lazy coordinator
   }
 
   after {
     StateStore.stop()
   }
 
+  protected def setupStream(prefix: String, multiplier: Int): (MemoryStream[Int], DataFrame) = {

Review comment:
       It'd be pretty much helpful to provide guide comments on tracking refactors.
   e.g. this is equivalent to `StreamingOuterJoinSuite.setupStream` with changing signature `private` to `protected` to co-use.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -1041,3 +1077,104 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
     )
   }
 }
+
+class StreamingLeftSemiJoinSuite extends StreamingJoinSuite {
+
+  import testImplicits._
+
+  test("windowed left semi join") {
+    val (leftInput, rightInput, joined) = setupWindowedJoin("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
+      CheckNewAnswer(Row(3, 10, 6), Row(4, 10, 8), Row(5, 10, 10)),
+      MultiAddData(leftInput, 21)(rightInput, 22),
+      // Watermark = 11, should remove rows having window=[0,10]
+      CheckNewAnswer(),
+      assertNumStateRows(total = 2, updated = 12),
+      AddData(leftInput, 22),
+      CheckNewAnswer(Row(22, 30, 44)),
+      assertNumStateRows(total = 2, updated = 0),
+      StopStream,
+      StartStream(),
+
+      AddData(leftInput, 1),
+      // Row not add as 1 < state key watermark = 12
+      CheckNewAnswer(),
+      AddData(rightInput, 11),
+      // Row not add as 11 < state key watermark = 12
+      CheckNewAnswer()
+    )
+  }
+
+  test("left semi early state exclusion on left") {
+    val (leftInput, rightInput, joined) = setupWindowedJoinWithLeftCondition("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 1, 2, 3)(rightInput, 1, 2, 3, 4, 5),
+      // The left rows with leftValue <= 4 should not generate their semi join row and
+      // not get added to the state.
+      CheckNewAnswer(Row(3, 10, 6)),
+      assertNumStateRows(total = 4, updated = 4),

Review comment:
       Can you describe the details on the state rows, and add the same num state rows verification on below after each CheckNewAnswer?

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -1041,3 +1077,104 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
     )
   }
 }
+
+class StreamingLeftSemiJoinSuite extends StreamingJoinSuite {
+
+  import testImplicits._
+
+  test("windowed left semi join") {
+    val (leftInput, rightInput, joined) = setupWindowedJoin("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
+      CheckNewAnswer(Row(3, 10, 6), Row(4, 10, 8), Row(5, 10, 10)),
+      MultiAddData(leftInput, 21)(rightInput, 22),
+      // Watermark = 11, should remove rows having window=[0,10]
+      CheckNewAnswer(),
+      assertNumStateRows(total = 2, updated = 12),
+      AddData(leftInput, 22),
+      CheckNewAnswer(Row(22, 30, 44)),
+      assertNumStateRows(total = 2, updated = 0),

Review comment:
       It might be better to add a code comment why there's no change on state.
   (e.g. Unlike inner/outer joins, given left input row matches with right input row, we don't buffer the left input row to the state.)

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -41,18 +41,174 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+abstract class StreamingJoinSuite
+  extends StreamTest with StateStoreMetricsTest with BeforeAndAfter {
 
-class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with BeforeAndAfter {
+  import testImplicits._
 
   before {
-    SparkSession.setActiveSessionInternal(spark)  // set this before force initializing 'joinExec'
-    spark.streams.stateStoreCoordinator   // initialize the lazy coordinator
+    SparkSession.setActiveSessionInternal(spark) // set this before force initializing 'joinExec'
+    spark.streams.stateStoreCoordinator // initialize the lazy coordinator
   }
 
   after {
     StateStore.stop()
   }
 
+  protected def setupStream(prefix: String, multiplier: Int): (MemoryStream[Int], DataFrame) = {
+    val input = MemoryStream[Int]
+    val df = input.toDF
+      .select(
+        'value as "key",
+        timestamp_seconds($"value")  as s"${prefix}Time",
+        ('value * multiplier) as s"${prefix}Value")
+      .withWatermark(s"${prefix}Time", "10 seconds")
+
+    (input, df)
+  }
+
+  protected def setupWindowedJoin(joinType: String)
+    : (MemoryStream[Int], MemoryStream[Int], DataFrame) = {
+
+    val (input1, df1) = setupStream("left", 2)
+    val (input2, df2) = setupStream("right", 3)
+    val windowed1 = df1.select('key, window('leftTime, "10 second"), 'leftValue)
+    val windowed2 = df2.select('key, window('rightTime, "10 second"), 'rightValue)
+    val joined = windowed1.join(windowed2, Seq("key", "window"), joinType)
+    val select = if (joinType == "left_semi") {
+      joined.select('key, $"window.end".cast("long"), 'leftValue)
+    } else {
+      joined.select('key, $"window.end".cast("long"), 'leftValue, 'rightValue)
+    }
+
+    (input1, input2, select)
+  }
+
+  protected def setupWindowedJoinWithLeftCondition(joinType: String)

Review comment:
       For reviewers: this is extracted from `test("left outer early state exclusion on left")` / `test("right outer early state exclusion on left")`, with adding select per join type.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -41,18 +41,174 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+abstract class StreamingJoinSuite
+  extends StreamTest with StateStoreMetricsTest with BeforeAndAfter {
 
-class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with BeforeAndAfter {
+  import testImplicits._
 
   before {
-    SparkSession.setActiveSessionInternal(spark)  // set this before force initializing 'joinExec'
-    spark.streams.stateStoreCoordinator   // initialize the lazy coordinator
+    SparkSession.setActiveSessionInternal(spark) // set this before force initializing 'joinExec'
+    spark.streams.stateStoreCoordinator // initialize the lazy coordinator
   }
 
   after {
     StateStore.stop()
   }
 
+  protected def setupStream(prefix: String, multiplier: Int): (MemoryStream[Int], DataFrame) = {
+    val input = MemoryStream[Int]
+    val df = input.toDF
+      .select(
+        'value as "key",
+        timestamp_seconds($"value")  as s"${prefix}Time",
+        ('value * multiplier) as s"${prefix}Value")
+      .withWatermark(s"${prefix}Time", "10 seconds")
+
+    (input, df)
+  }
+
+  protected def setupWindowedJoin(joinType: String)
+    : (MemoryStream[Int], MemoryStream[Int], DataFrame) = {
+
+    val (input1, df1) = setupStream("left", 2)
+    val (input2, df2) = setupStream("right", 3)
+    val windowed1 = df1.select('key, window('leftTime, "10 second"), 'leftValue)
+    val windowed2 = df2.select('key, window('rightTime, "10 second"), 'rightValue)
+    val joined = windowed1.join(windowed2, Seq("key", "window"), joinType)
+    val select = if (joinType == "left_semi") {
+      joined.select('key, $"window.end".cast("long"), 'leftValue)
+    } else {
+      joined.select('key, $"window.end".cast("long"), 'leftValue, 'rightValue)
+    }
+
+    (input1, input2, select)
+  }
+
+  protected def setupWindowedJoinWithLeftCondition(joinType: String)
+    : (MemoryStream[Int], MemoryStream[Int], DataFrame) = {
+
+    val (leftInput, df1) = setupStream("left", 2)
+    val (rightInput, df2) = setupStream("right", 3)
+    // Use different schemas to ensure the null row is being generated from the correct side.
+    val left = df1.select('key, window('leftTime, "10 second"), 'leftValue)
+    val right = df2.select('key, window('rightTime, "10 second"), 'rightValue.cast("string"))
+
+    val joined = left.join(
+      right,
+      left("key") === right("key")
+        && left("window") === right("window")
+        && 'leftValue > 4,
+      joinType)
+
+    val select = if (joinType == "left_semi") {
+      joined.select(left("key"), left("window.end").cast("long"), 'leftValue)
+    } else if (joinType == "left_outer") {
+      joined.select(left("key"), left("window.end").cast("long"), 'leftValue, 'rightValue)
+    } else if (joinType == "right_outer") {
+      joined.select(right("key"), right("window.end").cast("long"), 'leftValue, 'rightValue)
+    } else {
+      joined
+    }
+
+    (leftInput, rightInput, select)
+  }
+
+  protected def setupWindowedJoinWithRightCondition(joinType: String)
+    : (MemoryStream[Int], MemoryStream[Int], DataFrame) = {
+
+    val (leftInput, df1) = setupStream("left", 2)
+    val (rightInput, df2) = setupStream("right", 3)
+    // Use different schemas to ensure the null row is being generated from the correct side.
+    val left = df1.select('key, window('leftTime, "10 second"), 'leftValue)
+    val right = df2.select('key, window('rightTime, "10 second"), 'rightValue.cast("string"))
+
+    val joined = left.join(
+      right,
+      left("key") === right("key")
+        && left("window") === right("window")
+        && 'rightValue.cast("int") > 7,
+      joinType)
+
+    val select = if (joinType == "left_semi") {
+      joined.select(left("key"), left("window.end").cast("long"), 'leftValue)
+    } else if (joinType == "left_outer") {
+      joined.select(left("key"), left("window.end").cast("long"), 'leftValue, 'rightValue)
+    } else if (joinType == "right_outer") {
+      joined.select(right("key"), right("window.end").cast("long"), 'leftValue, 'rightValue)
+    } else {
+      joined
+    }
+
+    (leftInput, rightInput, select)
+  }
+
+  protected def setupWindowedJoinWithRangeCondition(joinType: String)
+    : (MemoryStream[(Int, Int)], MemoryStream[(Int, Int)], DataFrame) = {
+
+    val leftInput = MemoryStream[(Int, Int)]
+    val rightInput = MemoryStream[(Int, Int)]
+
+    val df1 = leftInput.toDF.toDF("leftKey", "time")
+      .select('leftKey, timestamp_seconds($"time") as "leftTime", ('leftKey * 2) as "leftValue")
+      .withWatermark("leftTime", "10 seconds")
+
+    val df2 = rightInput.toDF.toDF("rightKey", "time")
+      .select('rightKey, timestamp_seconds($"time") as "rightTime",
+        ('rightKey * 3) as "rightValue")
+      .withWatermark("rightTime", "10 seconds")
+
+    val joined =
+      df1.join(
+        df2,
+        expr("leftKey = rightKey AND " +
+          "leftTime BETWEEN rightTime - interval 5 seconds AND rightTime + interval 5 seconds"),
+        joinType)
+
+    val select = if (joinType == "left_semi") {
+      joined.select('leftKey, 'leftTime.cast("int"))
+    } else {
+      joined.select('leftKey, 'rightKey, 'leftTime.cast("int"), 'rightTime.cast("int"))
+    }
+
+    (leftInput, rightInput, select)
+  }
+
+  protected def setupWindowedSelfJoin(joinType: String)

Review comment:
       For reviewers: this is extracted from `test("SPARK-26187 self left outer join should not return outer nulls for already matched rows")`, with conditional select on left_semi vs others, as in left_semi only left side of columns are available.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -41,18 +41,174 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+abstract class StreamingJoinSuite
+  extends StreamTest with StateStoreMetricsTest with BeforeAndAfter {
 
-class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with BeforeAndAfter {
+  import testImplicits._
 
   before {
-    SparkSession.setActiveSessionInternal(spark)  // set this before force initializing 'joinExec'
-    spark.streams.stateStoreCoordinator   // initialize the lazy coordinator
+    SparkSession.setActiveSessionInternal(spark) // set this before force initializing 'joinExec'
+    spark.streams.stateStoreCoordinator // initialize the lazy coordinator
   }
 
   after {
     StateStore.stop()
   }
 
+  protected def setupStream(prefix: String, multiplier: Int): (MemoryStream[Int], DataFrame) = {
+    val input = MemoryStream[Int]
+    val df = input.toDF
+      .select(
+        'value as "key",
+        timestamp_seconds($"value")  as s"${prefix}Time",
+        ('value * multiplier) as s"${prefix}Value")
+      .withWatermark(s"${prefix}Time", "10 seconds")
+
+    (input, df)
+  }
+
+  protected def setupWindowedJoin(joinType: String)
+    : (MemoryStream[Int], MemoryStream[Int], DataFrame) = {
+
+    val (input1, df1) = setupStream("left", 2)
+    val (input2, df2) = setupStream("right", 3)
+    val windowed1 = df1.select('key, window('leftTime, "10 second"), 'leftValue)
+    val windowed2 = df2.select('key, window('rightTime, "10 second"), 'rightValue)
+    val joined = windowed1.join(windowed2, Seq("key", "window"), joinType)
+    val select = if (joinType == "left_semi") {
+      joined.select('key, $"window.end".cast("long"), 'leftValue)
+    } else {
+      joined.select('key, $"window.end".cast("long"), 'leftValue, 'rightValue)
+    }
+
+    (input1, input2, select)
+  }
+
+  protected def setupWindowedJoinWithLeftCondition(joinType: String)
+    : (MemoryStream[Int], MemoryStream[Int], DataFrame) = {
+
+    val (leftInput, df1) = setupStream("left", 2)
+    val (rightInput, df2) = setupStream("right", 3)
+    // Use different schemas to ensure the null row is being generated from the correct side.
+    val left = df1.select('key, window('leftTime, "10 second"), 'leftValue)
+    val right = df2.select('key, window('rightTime, "10 second"), 'rightValue.cast("string"))
+
+    val joined = left.join(
+      right,
+      left("key") === right("key")
+        && left("window") === right("window")
+        && 'leftValue > 4,
+      joinType)
+
+    val select = if (joinType == "left_semi") {
+      joined.select(left("key"), left("window.end").cast("long"), 'leftValue)
+    } else if (joinType == "left_outer") {
+      joined.select(left("key"), left("window.end").cast("long"), 'leftValue, 'rightValue)
+    } else if (joinType == "right_outer") {
+      joined.select(right("key"), right("window.end").cast("long"), 'leftValue, 'rightValue)
+    } else {
+      joined
+    }
+
+    (leftInput, rightInput, select)
+  }
+
+  protected def setupWindowedJoinWithRightCondition(joinType: String)
+    : (MemoryStream[Int], MemoryStream[Int], DataFrame) = {
+
+    val (leftInput, df1) = setupStream("left", 2)
+    val (rightInput, df2) = setupStream("right", 3)
+    // Use different schemas to ensure the null row is being generated from the correct side.
+    val left = df1.select('key, window('leftTime, "10 second"), 'leftValue)
+    val right = df2.select('key, window('rightTime, "10 second"), 'rightValue.cast("string"))
+
+    val joined = left.join(
+      right,
+      left("key") === right("key")
+        && left("window") === right("window")
+        && 'rightValue.cast("int") > 7,
+      joinType)
+
+    val select = if (joinType == "left_semi") {
+      joined.select(left("key"), left("window.end").cast("long"), 'leftValue)
+    } else if (joinType == "left_outer") {
+      joined.select(left("key"), left("window.end").cast("long"), 'leftValue, 'rightValue)
+    } else if (joinType == "right_outer") {
+      joined.select(right("key"), right("window.end").cast("long"), 'leftValue, 'rightValue)
+    } else {
+      joined
+    }
+
+    (leftInput, rightInput, select)
+  }
+
+  protected def setupWindowedJoinWithRangeCondition(joinType: String)

Review comment:
       For reviewers: this is extracted from `test(s"${joinType.replaceAllLiterally("_", " ")} with watermark range condition")`, with conditional select on left_semi vs others, as in left_semi only left side of columns are available.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -41,18 +41,174 @@ import org.apache.spark.sql.functions._
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+abstract class StreamingJoinSuite
+  extends StreamTest with StateStoreMetricsTest with BeforeAndAfter {
 
-class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with BeforeAndAfter {
+  import testImplicits._
 
   before {
-    SparkSession.setActiveSessionInternal(spark)  // set this before force initializing 'joinExec'
-    spark.streams.stateStoreCoordinator   // initialize the lazy coordinator
+    SparkSession.setActiveSessionInternal(spark) // set this before force initializing 'joinExec'
+    spark.streams.stateStoreCoordinator // initialize the lazy coordinator
   }
 
   after {
     StateStore.stop()
   }
 
+  protected def setupStream(prefix: String, multiplier: Int): (MemoryStream[Int], DataFrame) = {
+    val input = MemoryStream[Int]
+    val df = input.toDF
+      .select(
+        'value as "key",
+        timestamp_seconds($"value")  as s"${prefix}Time",
+        ('value * multiplier) as s"${prefix}Value")
+      .withWatermark(s"${prefix}Time", "10 seconds")
+
+    (input, df)
+  }
+
+  protected def setupWindowedJoin(joinType: String)
+    : (MemoryStream[Int], MemoryStream[Int], DataFrame) = {
+
+    val (input1, df1) = setupStream("left", 2)
+    val (input2, df2) = setupStream("right", 3)
+    val windowed1 = df1.select('key, window('leftTime, "10 second"), 'leftValue)
+    val windowed2 = df2.select('key, window('rightTime, "10 second"), 'rightValue)
+    val joined = windowed1.join(windowed2, Seq("key", "window"), joinType)
+    val select = if (joinType == "left_semi") {
+      joined.select('key, $"window.end".cast("long"), 'leftValue)
+    } else {
+      joined.select('key, $"window.end".cast("long"), 'leftValue, 'rightValue)
+    }
+
+    (input1, input2, select)
+  }
+
+  protected def setupWindowedJoinWithLeftCondition(joinType: String)
+    : (MemoryStream[Int], MemoryStream[Int], DataFrame) = {
+
+    val (leftInput, df1) = setupStream("left", 2)
+    val (rightInput, df2) = setupStream("right", 3)
+    // Use different schemas to ensure the null row is being generated from the correct side.
+    val left = df1.select('key, window('leftTime, "10 second"), 'leftValue)
+    val right = df2.select('key, window('rightTime, "10 second"), 'rightValue.cast("string"))
+
+    val joined = left.join(
+      right,
+      left("key") === right("key")
+        && left("window") === right("window")
+        && 'leftValue > 4,
+      joinType)
+
+    val select = if (joinType == "left_semi") {
+      joined.select(left("key"), left("window.end").cast("long"), 'leftValue)
+    } else if (joinType == "left_outer") {
+      joined.select(left("key"), left("window.end").cast("long"), 'leftValue, 'rightValue)
+    } else if (joinType == "right_outer") {
+      joined.select(right("key"), right("window.end").cast("long"), 'leftValue, 'rightValue)
+    } else {
+      joined
+    }
+
+    (leftInput, rightInput, select)
+  }
+
+  protected def setupWindowedJoinWithRightCondition(joinType: String)

Review comment:
       For reviewers: this is extracted from `test("left outer early state exclusion on right")` / `test("right outer early state exclusion on right")`, with adding select per join type.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -1041,3 +1077,104 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
     )
   }
 }
+
+class StreamingLeftSemiJoinSuite extends StreamingJoinSuite {
+
+  import testImplicits._
+
+  test("windowed left semi join") {
+    val (leftInput, rightInput, joined) = setupWindowedJoin("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
+      CheckNewAnswer(Row(3, 10, 6), Row(4, 10, 8), Row(5, 10, 10)),
+      MultiAddData(leftInput, 21)(rightInput, 22),
+      // Watermark = 11, should remove rows having window=[0,10]
+      CheckNewAnswer(),
+      assertNumStateRows(total = 2, updated = 12),
+      AddData(leftInput, 22),
+      CheckNewAnswer(Row(22, 30, 44)),
+      assertNumStateRows(total = 2, updated = 0),
+      StopStream,
+      StartStream(),
+
+      AddData(leftInput, 1),

Review comment:
       I see you've referred the existing test code:
   
   ```
         AddData(input2, 1),
         CheckNewAnswer(),                             // Should not join as < 15 removed
         assertNumStateRows(total = 2, updated = 0),   // row not add as 1 < state key watermark = 15
   
         AddData(input1, 5),
         CheckNewAnswer(),                             // Same reason as above
         assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1)
   ```
   
   could you please add `assertNumStateRows` per case here as well?

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -1041,3 +1077,104 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
     )
   }
 }
+
+class StreamingLeftSemiJoinSuite extends StreamingJoinSuite {
+
+  import testImplicits._
+
+  test("windowed left semi join") {
+    val (leftInput, rightInput, joined) = setupWindowedJoin("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
+      CheckNewAnswer(Row(3, 10, 6), Row(4, 10, 8), Row(5, 10, 10)),
+      MultiAddData(leftInput, 21)(rightInput, 22),
+      // Watermark = 11, should remove rows having window=[0,10]
+      CheckNewAnswer(),
+      assertNumStateRows(total = 2, updated = 12),
+      AddData(leftInput, 22),
+      CheckNewAnswer(Row(22, 30, 44)),
+      assertNumStateRows(total = 2, updated = 0),
+      StopStream,
+      StartStream(),
+
+      AddData(leftInput, 1),
+      // Row not add as 1 < state key watermark = 12
+      CheckNewAnswer(),
+      AddData(rightInput, 11),
+      // Row not add as 11 < state key watermark = 12
+      CheckNewAnswer()
+    )
+  }
+
+  test("left semi early state exclusion on left") {
+    val (leftInput, rightInput, joined) = setupWindowedJoinWithLeftCondition("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 1, 2, 3)(rightInput, 1, 2, 3, 4, 5),
+      // The left rows with leftValue <= 4 should not generate their semi join row and
+      // not get added to the state.
+      CheckNewAnswer(Row(3, 10, 6)),
+      assertNumStateRows(total = 4, updated = 4),
+      // We shouldn't get more semi join rows when the watermark advances.
+      MultiAddData(leftInput, 20)(rightInput, 21),
+      CheckNewAnswer(),
+      AddData(rightInput, 20),
+      CheckNewAnswer((20, 30, 40))
+    )
+  }
+
+  test("left semi early state exclusion on right") {
+    val (leftInput, rightInput, joined) = setupWindowedJoinWithRightCondition("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 3, 4, 5)(rightInput, 1, 2, 3),
+      // The right rows with rightValue <= 7 should never be added to the state.
+      // The right row with rightValue = 9 > 7, hence joined and added to state.
+      CheckNewAnswer(Row(3, 10, 6)),
+      assertNumStateRows(total = 4, updated = 4),
+      // We shouldn't get more semi join rows when the watermark advances.
+      MultiAddData(leftInput, 20)(rightInput, 21),
+      CheckNewAnswer(),
+      AddData(rightInput, 20),
+      CheckNewAnswer((20, 30, 40))
+    )
+  }
+
+  test("left semi join with watermark range condition") {
+    val (leftInput, rightInput, joined) = setupWindowedJoinWithRangeCondition("left_semi")
+
+    testStream(joined)(
+      AddData(leftInput, (1, 5), (3, 5)),
+      CheckAnswer(),
+      AddData(rightInput, (1, 10), (2, 5)),
+      CheckNewAnswer((1, 5)),
+      AddData(rightInput, (1, 11)),
+      // No match as left time is too low and left row is already matched.
+      CheckNewAnswer(),
+      assertNumStateRows(total = 5, updated = 5),
+
+      // Increase event time watermark to 20s by adding data with time = 30s on left input.
+      AddData(leftInput, (1, 7), (1, 30)),
+      CheckNewAnswer((1, 7)),
+      assertNumStateRows(total = 6, updated = 1),
+      // Watermark = 30 - 10 = 20, no matched row.
+      AddData(rightInput, (0, 30)),
+      CheckNewAnswer(),
+      assertNumStateRows(total = 2, updated = 1)
+    )
+  }
+
+  test("self left semi join") {
+    val (inputStream, query) = setupWindowedSelfJoin("left_semi")
+
+    testStream(query)(
+      AddData(inputStream, (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)),
+      CheckNewAnswer((2, 2), (4, 4)),

Review comment:
       Same here.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -1041,3 +1077,104 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
     )
   }
 }
+
+class StreamingLeftSemiJoinSuite extends StreamingJoinSuite {
+
+  import testImplicits._
+
+  test("windowed left semi join") {
+    val (leftInput, rightInput, joined) = setupWindowedJoin("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
+      CheckNewAnswer(Row(3, 10, 6), Row(4, 10, 8), Row(5, 10, 10)),
+      MultiAddData(leftInput, 21)(rightInput, 22),
+      // Watermark = 11, should remove rows having window=[0,10]
+      CheckNewAnswer(),
+      assertNumStateRows(total = 2, updated = 12),
+      AddData(leftInput, 22),
+      CheckNewAnswer(Row(22, 30, 44)),
+      assertNumStateRows(total = 2, updated = 0),
+      StopStream,
+      StartStream(),
+
+      AddData(leftInput, 1),
+      // Row not add as 1 < state key watermark = 12
+      CheckNewAnswer(),
+      AddData(rightInput, 11),
+      // Row not add as 11 < state key watermark = 12
+      CheckNewAnswer()
+    )
+  }
+
+  test("left semi early state exclusion on left") {
+    val (leftInput, rightInput, joined) = setupWindowedJoinWithLeftCondition("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 1, 2, 3)(rightInput, 1, 2, 3, 4, 5),
+      // The left rows with leftValue <= 4 should not generate their semi join row and
+      // not get added to the state.
+      CheckNewAnswer(Row(3, 10, 6)),
+      assertNumStateRows(total = 4, updated = 4),
+      // We shouldn't get more semi join rows when the watermark advances.
+      MultiAddData(leftInput, 20)(rightInput, 21),
+      CheckNewAnswer(),
+      AddData(rightInput, 20),
+      CheckNewAnswer((20, 30, 40))
+    )
+  }
+
+  test("left semi early state exclusion on right") {
+    val (leftInput, rightInput, joined) = setupWindowedJoinWithRightCondition("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 3, 4, 5)(rightInput, 1, 2, 3),
+      // The right rows with rightValue <= 7 should never be added to the state.
+      // The right row with rightValue = 9 > 7, hence joined and added to state.
+      CheckNewAnswer(Row(3, 10, 6)),
+      assertNumStateRows(total = 4, updated = 4),
+      // We shouldn't get more semi join rows when the watermark advances.
+      MultiAddData(leftInput, 20)(rightInput, 21),
+      CheckNewAnswer(),
+      AddData(rightInput, 20),
+      CheckNewAnswer((20, 30, 40))
+    )
+  }
+
+  test("left semi join with watermark range condition") {
+    val (leftInput, rightInput, joined) = setupWindowedJoinWithRangeCondition("left_semi")
+
+    testStream(joined)(
+      AddData(leftInput, (1, 5), (3, 5)),
+      CheckAnswer(),

Review comment:
       Same here.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -1041,3 +1077,104 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
     )
   }
 }
+
+class StreamingLeftSemiJoinSuite extends StreamingJoinSuite {
+
+  import testImplicits._
+
+  test("windowed left semi join") {
+    val (leftInput, rightInput, joined) = setupWindowedJoin("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
+      CheckNewAnswer(Row(3, 10, 6), Row(4, 10, 8), Row(5, 10, 10)),
+      MultiAddData(leftInput, 21)(rightInput, 22),
+      // Watermark = 11, should remove rows having window=[0,10]
+      CheckNewAnswer(),
+      assertNumStateRows(total = 2, updated = 12),
+      AddData(leftInput, 22),
+      CheckNewAnswer(Row(22, 30, 44)),
+      assertNumStateRows(total = 2, updated = 0),
+      StopStream,
+      StartStream(),
+
+      AddData(leftInput, 1),
+      // Row not add as 1 < state key watermark = 12
+      CheckNewAnswer(),
+      AddData(rightInput, 11),
+      // Row not add as 11 < state key watermark = 12
+      CheckNewAnswer()
+    )
+  }
+
+  test("left semi early state exclusion on left") {
+    val (leftInput, rightInput, joined) = setupWindowedJoinWithLeftCondition("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 1, 2, 3)(rightInput, 1, 2, 3, 4, 5),
+      // The left rows with leftValue <= 4 should not generate their semi join row and
+      // not get added to the state.
+      CheckNewAnswer(Row(3, 10, 6)),
+      assertNumStateRows(total = 4, updated = 4),
+      // We shouldn't get more semi join rows when the watermark advances.
+      MultiAddData(leftInput, 20)(rightInput, 21),
+      CheckNewAnswer(),
+      AddData(rightInput, 20),
+      CheckNewAnswer((20, 30, 40))
+    )
+  }
+
+  test("left semi early state exclusion on right") {
+    val (leftInput, rightInput, joined) = setupWindowedJoinWithRightCondition("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 3, 4, 5)(rightInput, 1, 2, 3),
+      // The right rows with rightValue <= 7 should never be added to the state.
+      // The right row with rightValue = 9 > 7, hence joined and added to state.
+      CheckNewAnswer(Row(3, 10, 6)),
+      assertNumStateRows(total = 4, updated = 4),

Review comment:
       Same here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-716287855






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-713888225


   Kubernetes integration test status success
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34715/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-713376913


   **[Test build #130082 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/130082/testReport)** for PR 30076 at commit [`765a233`](https://github.com/apache/spark/commit/765a233237e309f112058e73cef4f4c516f39a8d).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-716287424


   **[Test build #130246 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/130246/testReport)** for PR 30076 at commit [`14871d9`](https://github.com/apache/spark/commit/14871d9d2be6b751687e78dd4d17c2e249b8f205).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-714261583


   @HeartSaVioR Agree, post my LGTM.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-713431994






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-713517218


   **[Test build #130082 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/130082/testReport)** for PR 30076 at commit [`765a233`](https://github.com/apache/spark/commit/765a233237e309f112058e73cef4f4c516f39a8d).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-710764300


   Merged build finished. Test FAILed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r508012673



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -99,13 +99,20 @@ class SymmetricHashJoinStateManager(
   /**
    * Get all the matched values for given join condition, with marking matched.
    * This method is designed to mark joined rows properly without exposing internal index of row.
+   *
+   * @param joinOnlyFirstTimeMatchedRow Only join with first-time matched row.

Review comment:
       `skipMatchedRowsInPreviousBatches`? `first-time matched row` reads a bit confusing to me. Can you rephrase it like we only join with rows that are not matched in previous batches?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-714019114






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r508129405



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -151,7 +151,7 @@ case class StreamingSymmetricHashJoinExec(
       stateWatermarkPredicates = JoinStateWatermarkPredicates(), stateFormatVersion, left, right)
   }
 
-  if (stateFormatVersion < 2 && joinType != Inner) {
+  if (stateFormatVersion < 2 && (joinType == LeftOuter || joinType == RightOuter)) {

Review comment:
       @HeartSaVioR - sure, thanks for pointing it out in the first place.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r509771569



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -1041,3 +1077,204 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
     )
   }
 }
+
+class StreamingLeftSemiJoinSuite extends StreamingJoinSuite {
+
+  import testImplicits._
+
+  test("windowed left semi join") {
+    val (leftInput, rightInput, joined) = setupWindowedJoin("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
+      CheckNewAnswer(Row(3, 10, 6), Row(4, 10, 8), Row(5, 10, 10)),
+      // states
+      // left: 1, 2, 3, 4 ,5
+      // right: 3, 4, 5, 6, 7
+      assertNumStateRows(total = 10, updated = 10),
+      MultiAddData(leftInput, 21)(rightInput, 22),
+      // Watermark = 11, should remove rows having window=[0,10].
+      CheckNewAnswer(),
+      // states
+      // left: 21
+      // right: 22
+      //
+      // states evicted
+      // left: 1, 2, 3, 4 ,5 (below watermark)
+      // right: 3, 4, 5, 6, 7 (below watermark)
+      assertNumStateRows(total = 2, updated = 2),
+      AddData(leftInput, 22),
+      CheckNewAnswer(Row(22, 30, 44)),
+      // Unlike inner/outer joins, given left input row matches with right input row,
+      // we don't buffer the matched left input row to the state store.
+      //
+      // states
+      // left: 21
+      // right: 22
+      assertNumStateRows(total = 2, updated = 0),
+      StopStream,
+      StartStream(),
+
+      AddData(leftInput, 1),
+      // Row not add as 1 < state key watermark = 12.
+      CheckNewAnswer(),
+      // states
+      // left: 21
+      // right: 22
+      assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1),
+      AddData(rightInput, 5),
+      // Row not add as 5 < state key watermark = 12.
+      CheckNewAnswer(),
+      // states
+      // left: 21
+      // right: 22
+      assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1)
+    )
+  }
+
+  test("left semi early state exclusion on left") {
+    val (leftInput, rightInput, joined) = setupWindowedJoinWithLeftCondition("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 1, 2, 3)(rightInput, 3, 4, 5),
+      // The left rows with leftValue <= 4 should not generate their semi join rows and
+      // not get added to the state.
+      CheckNewAnswer(Row(3, 10, 6)),
+      // states
+      // left: 3
+      // right: 3, 4, 5
+      assertNumStateRows(total = 4, updated = 4),
+      // We shouldn't get more semi join rows when the watermark advances.
+      MultiAddData(leftInput, 20)(rightInput, 21),
+      CheckNewAnswer(),
+      // states
+      // left: 20
+      // right: 21
+      //
+      // states evicted
+      // left: 3 (below watermark)
+      // right: 3, 4, 5 (below watermark)
+      assertNumStateRows(total = 2, updated = 2),
+      AddData(rightInput, 20),
+      CheckNewAnswer((20, 30, 40)),
+      // states
+      // left: 20
+      // right: 21, 20
+      assertNumStateRows(total = 3, updated = 1)
+    )
+  }
+
+  test("left semi early state exclusion on right") {
+    val (leftInput, rightInput, joined) = setupWindowedJoinWithRightCondition("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 3, 4, 5)(rightInput, 1, 2, 3),
+      // The right rows with rightValue <= 7 should never be added to the state.
+      // The right row with rightValue = 9 > 7, hence joined and added to state.
+      CheckNewAnswer(Row(3, 10, 6)),
+      // states
+      // left: 3, 4, 5
+      // right: 3
+      assertNumStateRows(total = 4, updated = 4),
+      // We shouldn't get more semi join rows when the watermark advances.
+      MultiAddData(leftInput, 20)(rightInput, 21),
+      CheckNewAnswer(),
+      // states
+      // left: 20
+      // right: 21
+      //
+      // states evicted
+      // left: 3, 4, 5 (below watermark)
+      // right: 3 (below watermark)
+      assertNumStateRows(total = 2, updated = 2),
+      AddData(rightInput, 20),
+      CheckNewAnswer((20, 30, 40)),
+      // states
+      // left: 20
+      // right: 21, 20
+      assertNumStateRows(total = 3, updated = 1)
+    )
+  }
+
+  test("left semi join with watermark range condition") {
+    val (leftInput, rightInput, joined) = setupWindowedJoinWithRangeCondition("left_semi")
+
+    testStream(joined)(
+      AddData(leftInput, (1, 5), (3, 5)),
+      CheckNewAnswer(),
+      // states
+      // left: (1, 5), (3, 5)
+      // right: nothing
+      assertNumStateRows(total = 2, updated = 2),
+      AddData(rightInput, (1, 10), (2, 5)),
+      // Match left row in the state.
+      CheckNewAnswer((1, 5)),
+      // states
+      // left: (1, 5), (3, 5)
+      // right: (1, 10), (2, 5)
+      assertNumStateRows(total = 4, updated = 2),
+      AddData(rightInput, (1, 11)),
+      // No match as left time is too low and left row is already matched.

Review comment:
       @HeartSaVioR - sure, updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-716251124


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34848/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r509736400



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -1041,3 +1077,204 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
     )
   }
 }
+
+class StreamingLeftSemiJoinSuite extends StreamingJoinSuite {
+
+  import testImplicits._
+
+  test("windowed left semi join") {
+    val (leftInput, rightInput, joined) = setupWindowedJoin("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
+      CheckNewAnswer(Row(3, 10, 6), Row(4, 10, 8), Row(5, 10, 10)),
+      // states
+      // left: 1, 2, 3, 4 ,5
+      // right: 3, 4, 5, 6, 7
+      assertNumStateRows(total = 10, updated = 10),
+      MultiAddData(leftInput, 21)(rightInput, 22),
+      // Watermark = 11, should remove rows having window=[0,10].
+      CheckNewAnswer(),
+      // states
+      // left: 21
+      // right: 22
+      //
+      // states evicted
+      // left: 1, 2, 3, 4 ,5 (below watermark)
+      // right: 3, 4, 5, 6, 7 (below watermark)
+      assertNumStateRows(total = 2, updated = 2),
+      AddData(leftInput, 22),
+      CheckNewAnswer(Row(22, 30, 44)),
+      // Unlike inner/outer joins, given left input row matches with right input row,
+      // we don't buffer the matched left input row to the state store.
+      //
+      // states
+      // left: 21
+      // right: 22
+      assertNumStateRows(total = 2, updated = 0),
+      StopStream,
+      StartStream(),
+
+      AddData(leftInput, 1),
+      // Row not add as 1 < state key watermark = 12.
+      CheckNewAnswer(),
+      // states
+      // left: 21
+      // right: 22
+      assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1),
+      AddData(rightInput, 5),
+      // Row not add as 5 < state key watermark = 12.
+      CheckNewAnswer(),
+      // states
+      // left: 21
+      // right: 22
+      assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1)
+    )
+  }
+
+  test("left semi early state exclusion on left") {
+    val (leftInput, rightInput, joined) = setupWindowedJoinWithLeftCondition("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 1, 2, 3)(rightInput, 3, 4, 5),
+      // The left rows with leftValue <= 4 should not generate their semi join rows and
+      // not get added to the state.
+      CheckNewAnswer(Row(3, 10, 6)),
+      // states
+      // left: 3
+      // right: 3, 4, 5
+      assertNumStateRows(total = 4, updated = 4),
+      // We shouldn't get more semi join rows when the watermark advances.
+      MultiAddData(leftInput, 20)(rightInput, 21),
+      CheckNewAnswer(),
+      // states
+      // left: 20
+      // right: 21
+      //
+      // states evicted
+      // left: 3 (below watermark)
+      // right: 3, 4, 5 (below watermark)
+      assertNumStateRows(total = 2, updated = 2),
+      AddData(rightInput, 20),
+      CheckNewAnswer((20, 30, 40)),
+      // states
+      // left: 20
+      // right: 21, 20
+      assertNumStateRows(total = 3, updated = 1)
+    )
+  }
+
+  test("left semi early state exclusion on right") {
+    val (leftInput, rightInput, joined) = setupWindowedJoinWithRightCondition("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 3, 4, 5)(rightInput, 1, 2, 3),
+      // The right rows with rightValue <= 7 should never be added to the state.
+      // The right row with rightValue = 9 > 7, hence joined and added to state.
+      CheckNewAnswer(Row(3, 10, 6)),
+      // states
+      // left: 3, 4, 5
+      // right: 3
+      assertNumStateRows(total = 4, updated = 4),
+      // We shouldn't get more semi join rows when the watermark advances.
+      MultiAddData(leftInput, 20)(rightInput, 21),
+      CheckNewAnswer(),
+      // states
+      // left: 20
+      // right: 21
+      //
+      // states evicted
+      // left: 3, 4, 5 (below watermark)
+      // right: 3 (below watermark)
+      assertNumStateRows(total = 2, updated = 2),
+      AddData(rightInput, 20),
+      CheckNewAnswer((20, 30, 40)),
+      // states
+      // left: 20
+      // right: 21, 20
+      assertNumStateRows(total = 3, updated = 1)
+    )
+  }
+
+  test("left semi join with watermark range condition") {
+    val (leftInput, rightInput, joined) = setupWindowedJoinWithRangeCondition("left_semi")
+
+    testStream(joined)(
+      AddData(leftInput, (1, 5), (3, 5)),
+      CheckNewAnswer(),
+      // states
+      // left: (1, 5), (3, 5)
+      // right: nothing
+      assertNumStateRows(total = 2, updated = 2),
+      AddData(rightInput, (1, 10), (2, 5)),
+      // Match left row in the state.
+      CheckNewAnswer((1, 5)),
+      // states
+      // left: (1, 5), (3, 5)
+      // right: (1, 10), (2, 5)
+      assertNumStateRows(total = 4, updated = 2),
+      AddData(rightInput, (1, 11)),
+      // No match as left time is too low and left row is already matched.

Review comment:
       Yeah thanks! Please also modify the comment as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-710748253


   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/129931/
   Test FAILed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-712486187


   **[Test build #130010 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/130010/testReport)** for PR 30076 at commit [`3918727`](https://github.com/apache/spark/commit/3918727a08c8d0d4c65ccc8ea902f77051b78b1d).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-713431963


   Kubernetes integration test status success
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34691/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r508092673



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -151,7 +151,7 @@ case class StreamingSymmetricHashJoinExec(
       stateWatermarkPredicates = JoinStateWatermarkPredicates(), stateFormatVersion, left, right)
   }
 
-  if (stateFormatVersion < 2 && joinType != Inner) {
+  if (stateFormatVersion < 2 && (joinType == LeftOuter || joinType == RightOuter)) {

Review comment:
       In practice it shouldn't be occurred. Changing the state format version is not something we want to expose to end users - making it as config is merely for convenience, if I'm not missing something.
   
   Please consider my comment as a kind of defensive programming - left semi doesn't work with state format version 1, hence we prevent it. Nothing much.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r509273007



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -99,13 +99,20 @@ class SymmetricHashJoinStateManager(
   /**
    * Get all the matched values for given join condition, with marking matched.
    * This method is designed to mark joined rows properly without exposing internal index of row.
+   *
+   * @param excludeRowsAlreadyMatched Do not join with rows already matched previously.
+   *                                  This is used for right side of left semi join in
+   *                                  [[StreamingSymmetricHashJoinExec]] only.
    */
   def getJoinedRows(
       key: UnsafeRow,
       generateJoinedRow: InternalRow => JoinedRow,
-      predicate: JoinedRow => Boolean): Iterator[JoinedRow] = {
+      predicate: JoinedRow => Boolean,
+      excludeRowsAlreadyMatched: Boolean = false): Iterator[JoinedRow] = {
     val numValues = keyToNumValues.get(key)
-    keyWithIndexToValue.getAll(key, numValues).map { keyIdxToValue =>
+    keyWithIndexToValue.getAll(key, numValues).filterNot { keyIdxToValue =>

Review comment:
       Yes, after taking a further look, the `joinedRow` already dropped the message of `matched`, so it's hard to do now. +1 for the change now.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-713888240






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-712354760


   **[Test build #130010 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/130010/testReport)** for PR 30076 at commit [`3918727`](https://github.com/apache/spark/commit/3918727a08c8d0d4c65ccc8ea902f77051b78b1d).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r508201850



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -99,13 +99,20 @@ class SymmetricHashJoinStateManager(
   /**
    * Get all the matched values for given join condition, with marking matched.
    * This method is designed to mark joined rows properly without exposing internal index of row.
+   *
+   * @param joinOnlyFirstTimeMatchedRow Only join with first-time matched row.

Review comment:
       That would depend on whether the follow PR would revert the change done here (say, back and forth) or not. If it's likely to do the back-and-forth and the follow-up is not a big thing (like less than 100 lines of code change and could be done in a couple of days), then probably better to address here to save efforts on reviewing something which will be reverted later.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-714604162


   Will go through this again today.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-713888240






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-710764300






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-710729510


   **[Test build #129931 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129931/testReport)** for PR 30076 at commit [`e5af8e1`](https://github.com/apache/spark/commit/e5af8e12112533150321a292a261489b3523e23a).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-712379994


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34617/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r509658931



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -99,13 +99,20 @@ class SymmetricHashJoinStateManager(
   /**
    * Get all the matched values for given join condition, with marking matched.
    * This method is designed to mark joined rows properly without exposing internal index of row.
+   *
+   * @param excludeRowsAlreadyMatched Do not join with rows already matched previously.
+   *                                  This is used for right side of left semi join in
+   *                                  [[StreamingSymmetricHashJoinExec]] only.
    */
   def getJoinedRows(
       key: UnsafeRow,
       generateJoinedRow: InternalRow => JoinedRow,
-      predicate: JoinedRow => Boolean): Iterator[JoinedRow] = {
+      predicate: JoinedRow => Boolean,
+      excludeRowsAlreadyMatched: Boolean = false): Iterator[JoinedRow] = {
     val numValues = keyToNumValues.get(key)
-    keyWithIndexToValue.getAll(key, numValues).map { keyIdxToValue =>
+    keyWithIndexToValue.getAll(key, numValues).filterNot { keyIdxToValue =>

Review comment:
       FYI I created https://issues.apache.org/jira/browse/SPARK-33211 for this followup.

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
##########
@@ -291,17 +291,17 @@ object UnsupportedOperationChecker extends Logging {
                 throwError("Full outer joins with streaming DataFrames/Datasets are not supported")
               }
 
-            case LeftSemi | LeftAnti =>
+            case LeftAnti =>
               if (right.isStreaming) {
-                throwError("Left semi/anti joins with a streaming DataFrame/Dataset " +
+                throwError("Left anti joins with a streaming DataFrame/Dataset " +
                     "on the right are not supported")
               }
 
             // We support streaming left outer joins with static on the right always, and with

Review comment:
       @xuanyuanking - updated.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -1041,3 +1077,204 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
     )
   }
 }
+
+class StreamingLeftSemiJoinSuite extends StreamingJoinSuite {
+
+  import testImplicits._
+
+  test("windowed left semi join") {
+    val (leftInput, rightInput, joined) = setupWindowedJoin("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
+      CheckNewAnswer(Row(3, 10, 6), Row(4, 10, 8), Row(5, 10, 10)),
+      // states
+      // left: 1, 2, 3, 4 ,5
+      // right: 3, 4, 5, 6, 7
+      assertNumStateRows(total = 10, updated = 10),
+      MultiAddData(leftInput, 21)(rightInput, 22),
+      // Watermark = 11, should remove rows having window=[0,10].
+      CheckNewAnswer(),
+      // states
+      // left: 21
+      // right: 22
+      //
+      // states evicted
+      // left: 1, 2, 3, 4 ,5 (below watermark)
+      // right: 3, 4, 5, 6, 7 (below watermark)
+      assertNumStateRows(total = 2, updated = 2),
+      AddData(leftInput, 22),
+      CheckNewAnswer(Row(22, 30, 44)),
+      // Unlike inner/outer joins, given left input row matches with right input row,
+      // we don't buffer the matched left input row to the state store.
+      //
+      // states
+      // left: 21
+      // right: 22
+      assertNumStateRows(total = 2, updated = 0),
+      StopStream,
+      StartStream(),
+
+      AddData(leftInput, 1),
+      // Row not add as 1 < state key watermark = 12.
+      CheckNewAnswer(),
+      // states
+      // left: 21
+      // right: 22
+      assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1),
+      AddData(rightInput, 5),
+      // Row not add as 5 < state key watermark = 12.
+      CheckNewAnswer(),
+      // states
+      // left: 21
+      // right: 22
+      assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1)
+    )
+  }
+
+  test("left semi early state exclusion on left") {
+    val (leftInput, rightInput, joined) = setupWindowedJoinWithLeftCondition("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 1, 2, 3)(rightInput, 3, 4, 5),
+      // The left rows with leftValue <= 4 should not generate their semi join rows and
+      // not get added to the state.
+      CheckNewAnswer(Row(3, 10, 6)),
+      // states
+      // left: 3
+      // right: 3, 4, 5
+      assertNumStateRows(total = 4, updated = 4),
+      // We shouldn't get more semi join rows when the watermark advances.
+      MultiAddData(leftInput, 20)(rightInput, 21),
+      CheckNewAnswer(),
+      // states
+      // left: 20
+      // right: 21
+      //
+      // states evicted
+      // left: 3 (below watermark)
+      // right: 3, 4, 5 (below watermark)
+      assertNumStateRows(total = 2, updated = 2),
+      AddData(rightInput, 20),
+      CheckNewAnswer((20, 30, 40)),
+      // states
+      // left: 20
+      // right: 21, 20
+      assertNumStateRows(total = 3, updated = 1)
+    )
+  }
+
+  test("left semi early state exclusion on right") {
+    val (leftInput, rightInput, joined) = setupWindowedJoinWithRightCondition("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 3, 4, 5)(rightInput, 1, 2, 3),
+      // The right rows with rightValue <= 7 should never be added to the state.
+      // The right row with rightValue = 9 > 7, hence joined and added to state.
+      CheckNewAnswer(Row(3, 10, 6)),
+      // states
+      // left: 3, 4, 5
+      // right: 3
+      assertNumStateRows(total = 4, updated = 4),
+      // We shouldn't get more semi join rows when the watermark advances.
+      MultiAddData(leftInput, 20)(rightInput, 21),
+      CheckNewAnswer(),
+      // states
+      // left: 20
+      // right: 21
+      //
+      // states evicted
+      // left: 3, 4, 5 (below watermark)
+      // right: 3 (below watermark)
+      assertNumStateRows(total = 2, updated = 2),
+      AddData(rightInput, 20),
+      CheckNewAnswer((20, 30, 40)),
+      // states
+      // left: 20
+      // right: 21, 20
+      assertNumStateRows(total = 3, updated = 1)
+    )
+  }
+
+  test("left semi join with watermark range condition") {
+    val (leftInput, rightInput, joined) = setupWindowedJoinWithRangeCondition("left_semi")
+
+    testStream(joined)(
+      AddData(leftInput, (1, 5), (3, 5)),
+      CheckNewAnswer(),
+      // states
+      // left: (1, 5), (3, 5)
+      // right: nothing
+      assertNumStateRows(total = 2, updated = 2),
+      AddData(rightInput, (1, 10), (2, 5)),
+      // Match left row in the state.
+      CheckNewAnswer((1, 5)),
+      // states
+      // left: (1, 5), (3, 5)
+      // right: (1, 10), (2, 5)
+      assertNumStateRows(total = 4, updated = 2),
+      AddData(rightInput, (1, 11)),
+      // No match as left time is too low and left row is already matched.
+      CheckNewAnswer(),
+      // states
+      // left: (1, 5), (3, 5)
+      // right: (1, 10), (2, 5), (1, 11)
+      assertNumStateRows(total = 5, updated = 1),
+      // Increase event time watermark to 20s by adding data with time = 30s on both inputs.
+      AddData(leftInput, (1, 7), (1, 30)),
+      CheckNewAnswer((1, 7)),
+      // states
+      // left: (1, 5), (3, 5), (1, 30)
+      // right: (1, 10), (2, 5), (1, 11)
+      assertNumStateRows(total = 6, updated = 1),
+      // Watermark = 30 - 10 = 20, no matched row.
+      AddData(rightInput, (0, 30)),
+      CheckNewAnswer(),
+      // states
+      // left: (1, 30)
+      // right: (0, 30)
+      //
+      // states evicted
+      // left: (1, 5), (3, 5) (below watermark = 20)
+      // right: (1, 10), (2, 5), (1, 11) (below watermark = 20)
+      assertNumStateRows(total = 2, updated = 1)
+    )
+  }
+
+  test("self left semi join") {
+    val (inputStream, query) = setupWindowedSelfJoin("left_semi")
+
+    testStream(query)(
+      AddData(inputStream, (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)),
+      CheckNewAnswer((2, 2), (4, 4)),
+      // batch 1 - global watermark = 0
+      // states
+      // left: (2, 2L), (4, 4L)
+      //       (left rows with value % 2 != 0 is filtered per [[PushDownLeftSemiAntiJoin]])
+      // right: (2, 2L), (4, 4L)

Review comment:
       @HeartSaVioR - updated, I also figured the optimization rule should be `PushPredicateThroughJoin`, instead of `PushDownLeftSemiAntiJoin `, updated comment as well.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -1041,3 +1077,204 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
     )
   }
 }
+
+class StreamingLeftSemiJoinSuite extends StreamingJoinSuite {
+
+  import testImplicits._
+
+  test("windowed left semi join") {
+    val (leftInput, rightInput, joined) = setupWindowedJoin("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
+      CheckNewAnswer(Row(3, 10, 6), Row(4, 10, 8), Row(5, 10, 10)),
+      // states
+      // left: 1, 2, 3, 4 ,5
+      // right: 3, 4, 5, 6, 7
+      assertNumStateRows(total = 10, updated = 10),
+      MultiAddData(leftInput, 21)(rightInput, 22),
+      // Watermark = 11, should remove rows having window=[0,10].
+      CheckNewAnswer(),
+      // states
+      // left: 21
+      // right: 22
+      //
+      // states evicted
+      // left: 1, 2, 3, 4 ,5 (below watermark)
+      // right: 3, 4, 5, 6, 7 (below watermark)
+      assertNumStateRows(total = 2, updated = 2),
+      AddData(leftInput, 22),
+      CheckNewAnswer(Row(22, 30, 44)),
+      // Unlike inner/outer joins, given left input row matches with right input row,
+      // we don't buffer the matched left input row to the state store.
+      //
+      // states
+      // left: 21
+      // right: 22
+      assertNumStateRows(total = 2, updated = 0),
+      StopStream,
+      StartStream(),
+
+      AddData(leftInput, 1),
+      // Row not add as 1 < state key watermark = 12.
+      CheckNewAnswer(),
+      // states
+      // left: 21
+      // right: 22
+      assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1),
+      AddData(rightInput, 5),
+      // Row not add as 5 < state key watermark = 12.
+      CheckNewAnswer(),
+      // states
+      // left: 21
+      // right: 22
+      assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1)
+    )
+  }
+
+  test("left semi early state exclusion on left") {
+    val (leftInput, rightInput, joined) = setupWindowedJoinWithLeftCondition("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 1, 2, 3)(rightInput, 3, 4, 5),
+      // The left rows with leftValue <= 4 should not generate their semi join rows and
+      // not get added to the state.
+      CheckNewAnswer(Row(3, 10, 6)),
+      // states
+      // left: 3
+      // right: 3, 4, 5
+      assertNumStateRows(total = 4, updated = 4),
+      // We shouldn't get more semi join rows when the watermark advances.
+      MultiAddData(leftInput, 20)(rightInput, 21),
+      CheckNewAnswer(),
+      // states
+      // left: 20
+      // right: 21
+      //
+      // states evicted
+      // left: 3 (below watermark)
+      // right: 3, 4, 5 (below watermark)
+      assertNumStateRows(total = 2, updated = 2),
+      AddData(rightInput, 20),
+      CheckNewAnswer((20, 30, 40)),
+      // states
+      // left: 20
+      // right: 21, 20
+      assertNumStateRows(total = 3, updated = 1)
+    )
+  }
+
+  test("left semi early state exclusion on right") {
+    val (leftInput, rightInput, joined) = setupWindowedJoinWithRightCondition("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 3, 4, 5)(rightInput, 1, 2, 3),
+      // The right rows with rightValue <= 7 should never be added to the state.
+      // The right row with rightValue = 9 > 7, hence joined and added to state.
+      CheckNewAnswer(Row(3, 10, 6)),
+      // states
+      // left: 3, 4, 5
+      // right: 3
+      assertNumStateRows(total = 4, updated = 4),
+      // We shouldn't get more semi join rows when the watermark advances.
+      MultiAddData(leftInput, 20)(rightInput, 21),
+      CheckNewAnswer(),
+      // states
+      // left: 20
+      // right: 21
+      //
+      // states evicted
+      // left: 3, 4, 5 (below watermark)
+      // right: 3 (below watermark)
+      assertNumStateRows(total = 2, updated = 2),
+      AddData(rightInput, 20),
+      CheckNewAnswer((20, 30, 40)),
+      // states
+      // left: 20
+      // right: 21, 20
+      assertNumStateRows(total = 3, updated = 1)
+    )
+  }
+
+  test("left semi join with watermark range condition") {
+    val (leftInput, rightInput, joined) = setupWindowedJoinWithRangeCondition("left_semi")
+
+    testStream(joined)(
+      AddData(leftInput, (1, 5), (3, 5)),
+      CheckNewAnswer(),
+      // states
+      // left: (1, 5), (3, 5)
+      // right: nothing
+      assertNumStateRows(total = 2, updated = 2),
+      AddData(rightInput, (1, 10), (2, 5)),
+      // Match left row in the state.
+      CheckNewAnswer((1, 5)),
+      // states
+      // left: (1, 5), (3, 5)
+      // right: (1, 10), (2, 5)
+      assertNumStateRows(total = 4, updated = 2),
+      AddData(rightInput, (1, 11)),
+      // No match as left time is too low and left row is already matched.

Review comment:
       @HeartSaVioR - sounds good, updated.

##########
File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
##########
@@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     _.join(_, joinType = LeftSemi),
     streamStreamSupported = false,
     batchStreamSupported = false,
-    expectedMsg = "left semi/anti joins")
+    expectedMsg = "LeftSemi join")
+
+  // Left semi joins: update and complete mode not allowed
+  assertNotSupportedInStreamingPlan(
+    "left semi join with stream-stream relations and update mode",
+    streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attribute)),
+    OutputMode.Update(),
+    Seq("is not supported in Update output mode"))
+  assertNotSupportedInStreamingPlan(
+    "left semi join with stream-stream relations and complete mode",
+    Aggregate(Nil, aggExprs("d"), streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attribute))),
+    OutputMode.Complete(),
+    Seq("is not supported in Complete output mode"))
+
+  // Left semi joins: stream-stream allowed with join on watermark attribute
+  // Note that the attribute need not be watermarked on both sides.
+  assertSupportedInStreamingPlan(
+    "left semi join with stream-stream relations and join on attribute with left watermark",
+    streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attributeWithWatermark === attribute)),
+    OutputMode.Append())
+  assertSupportedInStreamingPlan(
+    "left semi join with stream-stream relations and join on attribute with right watermark",
+    streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attributeWithWatermark)),
+    OutputMode.Append())
+  assertNotSupportedInStreamingPlan(
+    "left semi join with stream-stream relations and join on non-watermarked attribute",
+    streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attribute)),
+    OutputMode.Append(),
+    Seq("watermark in the join keys"))

Review comment:
       @xuanyuanking - yeah I agree adding "without" would be better. I updated for the left semi join here. A refactoring for all joins (inner, outer, semi, anti, etc) is anyway needed as a followup JIRA (https://issues.apache.org/jira/browse/SPARK-33209), so I want to clean up other places in a separate PR, e.g. "appropriate range condition" has similar problem.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r508188143



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -99,13 +99,20 @@ class SymmetricHashJoinStateManager(
   /**
    * Get all the matched values for given join condition, with marking matched.
    * This method is designed to mark joined rows properly without exposing internal index of row.
+   *
+   * @param joinOnlyFirstTimeMatchedRow Only join with first-time matched row.

Review comment:
       Or like `excludeRowsAlreadyMatched`. I guess we'd like to have another method to deal with left-semi join efficiently like I commented (and in the PR description as well).




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-712391522






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r509130528



##########
File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
##########
@@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     _.join(_, joinType = LeftSemi),
     streamStreamSupported = false,
     batchStreamSupported = false,
-    expectedMsg = "left semi/anti joins")
+    expectedMsg = "LeftSemi join")
+
+  // Left semi joins: update and complete mode not allowed
+  assertNotSupportedInStreamingPlan(
+    "left semi join with stream-stream relations and update mode",
+    streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attribute)),
+    OutputMode.Update(),
+    Seq("is not supported in Update output mode"))
+  assertNotSupportedInStreamingPlan(
+    "left semi join with stream-stream relations and complete mode",
+    Aggregate(Nil, aggExprs("d"), streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attribute))),
+    OutputMode.Complete(),
+    Seq("is not supported in Complete output mode"))
+
+  // Left semi joins: stream-stream allowed with join on watermark attribute
+  // Note that the attribute need not be watermarked on both sides.
+  assertSupportedInStreamingPlan(
+    "left semi join with stream-stream relations and join on attribute with left watermark",
+    streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attributeWithWatermark === attribute)),
+    OutputMode.Append())
+  assertSupportedInStreamingPlan(
+    "left semi join with stream-stream relations and join on attribute with right watermark",
+    streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attributeWithWatermark)),
+    OutputMode.Append())
+  assertNotSupportedInStreamingPlan(
+    "left semi join with stream-stream relations and join on non-watermarked attribute",
+    streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attribute)),
+    OutputMode.Append(),
+    Seq("watermark in the join keys"))

Review comment:
       Personally, I prefer to change the match string to `without a watermark in the join keys`. I saw this is the same as the original tests, maybe we can change them together in this PR.

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
##########
@@ -291,17 +291,17 @@ object UnsupportedOperationChecker extends Logging {
                 throwError("Full outer joins with streaming DataFrames/Datasets are not supported")
               }
 
-            case LeftSemi | LeftAnti =>
+            case LeftAnti =>
               if (right.isStreaming) {
-                throwError("Left semi/anti joins with a streaming DataFrame/Dataset " +
+                throwError("Left anti joins with a streaming DataFrame/Dataset " +
                     "on the right are not supported")
               }
 
             // We support streaming left outer joins with static on the right always, and with

Review comment:
       nit: Also change this comment?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -99,13 +99,20 @@ class SymmetricHashJoinStateManager(
   /**
    * Get all the matched values for given join condition, with marking matched.
    * This method is designed to mark joined rows properly without exposing internal index of row.
+   *
+   * @param excludeRowsAlreadyMatched Do not join with rows already matched previously.
+   *                                  This is used for right side of left semi join in
+   *                                  [[StreamingSymmetricHashJoinExec]] only.
    */
   def getJoinedRows(
       key: UnsafeRow,
       generateJoinedRow: InternalRow => JoinedRow,
-      predicate: JoinedRow => Boolean): Iterator[JoinedRow] = {
+      predicate: JoinedRow => Boolean,
+      excludeRowsAlreadyMatched: Boolean = false): Iterator[JoinedRow] = {
     val numValues = keyToNumValues.get(key)
-    keyWithIndexToValue.getAll(key, numValues).map { keyIdxToValue =>
+    keyWithIndexToValue.getAll(key, numValues).filterNot { keyIdxToValue =>

Review comment:
       It makes more sense to add this filter logic in the `predicate` param(i.e `postJoinFilter` for OneSideHashJoiner) for rightSideJoiner only, corresponding to the comment https://github.com/apache/spark/pull/30076/files#diff-6cd66da710d8d54025c1edf658bbec5230e8b4e748f9f2f884a60b1ba1efed42R264




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-713968416


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34719/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-713922731


   **[Test build #130110 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/130110/testReport)** for PR 30076 at commit [`14871d9`](https://github.com/apache/spark/commit/14871d9d2be6b751687e78dd4d17c2e249b8f205).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r509080312



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -1088,39 +1088,79 @@ class StreamingLeftSemiJoinSuite extends StreamingJoinSuite {
     testStream(joined)(
       MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
       CheckNewAnswer(Row(3, 10, 6), Row(4, 10, 8), Row(5, 10, 10)),
+      // states
+      // left: 1, 2, 3, 4 ,5
+      // right: 3, 4, 5, 6, 7
+      assertNumStateRows(total = 10, updated = 10),
       MultiAddData(leftInput, 21)(rightInput, 22),
-      // Watermark = 11, should remove rows having window=[0,10]
+      // Watermark = 11, should remove rows having window=[0,10].
       CheckNewAnswer(),
-      assertNumStateRows(total = 2, updated = 12),
+      // states
+      // left: 21
+      // right: 22
+      //
+      // states evicted
+      // left: 1, 2, 3, 4 ,5 (below watermark)
+      // right: 3, 4, 5, 6, 7 (below watermark)
+      assertNumStateRows(total = 2, updated = 2),
       AddData(leftInput, 22),
       CheckNewAnswer(Row(22, 30, 44)),
+      // Unlike inner/outer joins, given left input row matches with right input row,
+      // we don't buffer the matched left input row to the state store.
+      //
+      // states
+      // left: 21
+      // right: 22
       assertNumStateRows(total = 2, updated = 0),
       StopStream,
       StartStream(),
 
       AddData(leftInput, 1),
-      // Row not add as 1 < state key watermark = 12
+      // Row not add as 1 < state key watermark = 12.
       CheckNewAnswer(),
-      AddData(rightInput, 11),
-      // Row not add as 11 < state key watermark = 12
-      CheckNewAnswer()
+      // states
+      // left: 21
+      // right: 22
+      assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1),
+      AddData(rightInput, 5),
+      // Row not add as 5 < state key watermark = 12.
+      CheckNewAnswer(),
+      // states
+      // left: 21
+      // right: 22
+      assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1)
     )
   }
 
   test("left semi early state exclusion on left") {
     val (leftInput, rightInput, joined) = setupWindowedJoinWithLeftCondition("left_semi")
 
     testStream(joined)(
-      MultiAddData(leftInput, 1, 2, 3)(rightInput, 1, 2, 3, 4, 5),
-      // The left rows with leftValue <= 4 should not generate their semi join row and
+      MultiAddData(leftInput, 1, 2, 3)(rightInput, 3, 4, 5),

Review comment:
       I see you exclude 1, 2 from right in this commit and nothing changed for both output and state rows. Could you please explain this, say, the reason 1, 2 in right side are not added in state?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r507713923



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -151,7 +151,7 @@ case class StreamingSymmetricHashJoinExec(
       stateWatermarkPredicates = JoinStateWatermarkPredicates(), stateFormatVersion, left, right)
   }
 
-  if (stateFormatVersion < 2 && joinType != Inner) {
+  if (stateFormatVersion < 2 && (joinType == LeftOuter || joinType == RightOuter)) {

Review comment:
       I see you're only considering state format version 2 while explaining how to implement left semi join. If that's the case, this if condition still applies to left semi as well and you should make it fail. Otherwise, please describe the approach how you deal with left semi with state format version 1.

##########
File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
##########
@@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     _.join(_, joinType = LeftSemi),
     streamStreamSupported = false,
     batchStreamSupported = false,
-    expectedMsg = "left semi/anti joins")
+    expectedMsg = "LeftSemi join")
+
+  // Left semi joins: update and complete mode not allowed
+  assertNotSupportedInStreamingPlan(
+    s"left semi join with stream-stream relations and update mode",
+    streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attribute)),
+    OutputMode.Update(),
+    Seq("is not supported in Update output mode"))
+  assertNotSupportedInStreamingPlan(
+    s"left semi join with stream-stream relations and complete mode",
+    Aggregate(Nil, aggExprs("d"), streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attribute))),
+    OutputMode.Complete(),
+    Seq("is not supported in Complete output mode"))
+
+  // Left ousemiter joins: stream-stream allowed with join on watermark attribute

Review comment:
       nit: ousemiter -> semi

##########
File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
##########
@@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     _.join(_, joinType = LeftSemi),
     streamStreamSupported = false,
     batchStreamSupported = false,
-    expectedMsg = "left semi/anti joins")
+    expectedMsg = "LeftSemi join")
+
+  // Left semi joins: update and complete mode not allowed

Review comment:
       I see the code around left outer join/right outer join/left semi join are very similar which is good to deduplicate, but let's consider it optional in this PR.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -330,11 +338,17 @@ case class StreamingSymmetricHashJoinExec(
           }
         }.map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value))
 
-        innerOutputIter ++ outerOutputIter
+        hashJoinOutputIter ++ outerOutputIter
+      case LeftSemi =>

Review comment:
       This could be consolidated with Inner.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -347,24 +361,29 @@ case class StreamingSymmetricHashJoinExec(
 
       // Processing time between inner output completion and here comes from the outer portion of a
       // join, and thus counts as removal time as we remove old state from one side while iterating.
-      if (innerOutputCompletionTimeNs != 0) {
+      if (hashJoinOutputCompletionTimeNs != 0) {
         allRemovalsTimeMs +=
-          math.max(NANOSECONDS.toMillis(System.nanoTime - innerOutputCompletionTimeNs), 0)
+          math.max(NANOSECONDS.toMillis(System.nanoTime - hashJoinOutputCompletionTimeNs), 0)
       }
 
       allRemovalsTimeMs += timeTakenMs {
         // Remove any remaining state rows which aren't needed because they're below the watermark.
         //
         // For inner joins, we have to remove unnecessary state rows from both sides if possible.
+        //
         // For outer joins, we have already removed unnecessary state rows from the outer side
         // (e.g., left side for left outer join) while generating the outer "null" outputs. Now, we
         // have to remove unnecessary state rows from the other side (e.g., right side for the left
         // outer join) if possible. In all cases, nothing needs to be outputted, hence the removal
         // needs to be done greedily by immediately consuming the returned iterator.
+        //
+        // For left semi joins, we have to remove unnecessary state rows from both sides if
+        // possible.
         val cleanupIter = joinType match {
           case Inner => leftSideJoiner.removeOldState() ++ rightSideJoiner.removeOldState()
           case LeftOuter => rightSideJoiner.removeOldState()
           case RightOuter => leftSideJoiner.removeOldState()
+          case LeftSemi => leftSideJoiner.removeOldState() ++ rightSideJoiner.removeOldState()

Review comment:
       Both case statement and code comment could be consolidated with `Inner`.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -99,15 +99,22 @@ class SymmetricHashJoinStateManager(
   /**
    * Get all the matched values for given join condition, with marking matched.
    * This method is designed to mark joined rows properly without exposing internal index of row.
+   *
+   * @param joinOnlyFirstTimeMatchedRow Only join with first-time matched row.
+   *                                    This is used for right side of left semi join in
+   *                                    [[StreamingSymmetricHashJoinExec]] only.
    */
   def getJoinedRows(
       key: UnsafeRow,
       generateJoinedRow: InternalRow => JoinedRow,
-      predicate: JoinedRow => Boolean): Iterator[JoinedRow] = {
+      predicate: JoinedRow => Boolean,
+      joinOnlyFirstTimeMatchedRow: Boolean = false): Iterator[JoinedRow] = {
     val numValues = keyToNumValues.get(key)
     keyWithIndexToValue.getAll(key, numValues).map { keyIdxToValue =>
       val joinedRow = generateJoinedRow(keyIdxToValue.value)
-      if (predicate(joinedRow)) {
+      if (joinOnlyFirstTimeMatchedRow && keyIdxToValue.matched) {

Review comment:
       If I'm not missing something, this can be applied as a filter against `keyWithIndexToValue.getAll(key, numValues)` to avoid calling unnecessary `generateJoinedRow`, like
   
   ```
   keyWithIndexToValue.getAll(key, numValues).filterNot { keyIdxToValue =>
     joinOnlyFirstTimeMatchedRow && keyIdxToValue.matched
   }.map { keyIdxToValue =>
     val joinedRow = generateJoinedRow(keyIdxToValue.value)
     if (predicate(joinedRow)) {
       if (!keyIdxToValue.matched) {
         keyWithIndexToValue.put(key, keyIdxToValue.valueIndex, keyIdxToValue.value,
           matched = true)
       }
       joinedRow
     } else {
       null
     }
   }.filter(_ != null)
   ```
   
   I'm OK to have the check in map if someone concerns about having another filters - the point is, it's unnecessary to call generateJoinedRow if the condition is fulfilled.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r508090637



##########
File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
##########
@@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     _.join(_, joinType = LeftSemi),
     streamStreamSupported = false,
     batchStreamSupported = false,
-    expectedMsg = "left semi/anti joins")
+    expectedMsg = "LeftSemi join")
+
+  // Left semi joins: update and complete mode not allowed
+  assertNotSupportedInStreamingPlan(
+    s"left semi join with stream-stream relations and update mode",
+    streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attribute)),
+    OutputMode.Update(),
+    Seq("is not supported in Update output mode"))
+  assertNotSupportedInStreamingPlan(
+    s"left semi join with stream-stream relations and complete mode",
+    Aggregate(Nil, aggExprs("d"), streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attribute))),
+    OutputMode.Complete(),
+    Seq("is not supported in Complete output mode"))
+
+  // Left ousemiter joins: stream-stream allowed with join on watermark attribute

Review comment:
       (Just think out loud - don't need to be sorry. I'm not spending my time to review PR to "point out" someone's faults. We often make mistakes and review process is not to blame. It's completely OK to simply fix even without commenting if you simply agree with the comment.)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-713345766


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34681/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-710729510


   **[Test build #129931 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129931/testReport)** for PR 30076 at commit [`e5af8e1`](https://github.com/apache/spark/commit/e5af8e12112533150321a292a261489b3523e23a).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-713357229






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r508191906



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -99,13 +99,20 @@ class SymmetricHashJoinStateManager(
   /**
    * Get all the matched values for given join condition, with marking matched.
    * This method is designed to mark joined rows properly without exposing internal index of row.
+   *
+   * @param joinOnlyFirstTimeMatchedRow Only join with first-time matched row.

Review comment:
       IMO it would be good to add the early eviction for left side matched row, in a follow up PR. If people strongly think we should add that at the first place with this PR, I can add as well, but need more time to polish it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-716240797


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-710738846






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-713357229






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r509080975



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -99,13 +99,20 @@ class SymmetricHashJoinStateManager(
   /**
    * Get all the matched values for given join condition, with marking matched.
    * This method is designed to mark joined rows properly without exposing internal index of row.
+   *
+   * @param joinOnlyFirstTimeMatchedRow Only join with first-time matched row.

Review comment:
       I'm OK to leave it as a follow up.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-713359205






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r507995582



##########
File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
##########
@@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     _.join(_, joinType = LeftSemi),
     streamStreamSupported = false,
     batchStreamSupported = false,
-    expectedMsg = "left semi/anti joins")
+    expectedMsg = "LeftSemi join")
+
+  // Left semi joins: update and complete mode not allowed
+  assertNotSupportedInStreamingPlan(
+    s"left semi join with stream-stream relations and update mode",

Review comment:
       nit: don't need s"".

##########
File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
##########
@@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     _.join(_, joinType = LeftSemi),
     streamStreamSupported = false,
     batchStreamSupported = false,
-    expectedMsg = "left semi/anti joins")
+    expectedMsg = "LeftSemi join")
+
+  // Left semi joins: update and complete mode not allowed
+  assertNotSupportedInStreamingPlan(
+    s"left semi join with stream-stream relations and update mode",
+    streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attribute)),
+    OutputMode.Update(),
+    Seq("is not supported in Update output mode"))
+  assertNotSupportedInStreamingPlan(
+    s"left semi join with stream-stream relations and complete mode",

Review comment:
       ditto. There are also couples similar ditto below. I don't comment one by one.
   

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -99,13 +99,20 @@ class SymmetricHashJoinStateManager(
   /**
    * Get all the matched values for given join condition, with marking matched.
    * This method is designed to mark joined rows properly without exposing internal index of row.
+   *
+   * @param joinOnlyFirstTimeMatchedRow Only join with first-time matched row.
+   *                                    This is used for right side of left semi join in
+   *                                    [[StreamingSymmetricHashJoinExec]] only.
    */
   def getJoinedRows(
       key: UnsafeRow,
       generateJoinedRow: InternalRow => JoinedRow,
-      predicate: JoinedRow => Boolean): Iterator[JoinedRow] = {
+      predicate: JoinedRow => Boolean,
+      joinOnlyFirstTimeMatchedRow: Boolean = false): Iterator[JoinedRow] = {
     val numValues = keyToNumValues.get(key)
-    keyWithIndexToValue.getAll(key, numValues).map { keyIdxToValue =>
+    keyWithIndexToValue.getAll(key, numValues).filterNot { keyIdxToValue =>
+      joinOnlyFirstTimeMatchedRow && keyIdxToValue.matched
+    }.map { keyIdxToValue =>

Review comment:
       I feel it is easier to read if:
   
   ```scala
   val keyIdxToValues = if (joinOnlyFirstTimeMatchedRow) {
     keyWithIndexToValue.getAll(key, numValues).filter { keyIdxToValue =>
       !keyIdxToValue.matched
     }
   } else {
     keyWithIndexToValue.getAll(key, numValues) 
   }
   
   keyIdxToValues.map { keyIdxToValue =>
     ...
   }
   ```

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -347,22 +360,28 @@ case class StreamingSymmetricHashJoinExec(
 
       // Processing time between inner output completion and here comes from the outer portion of a
       // join, and thus counts as removal time as we remove old state from one side while iterating.
-      if (innerOutputCompletionTimeNs != 0) {
+      if (hashJoinOutputCompletionTimeNs != 0) {
         allRemovalsTimeMs +=
-          math.max(NANOSECONDS.toMillis(System.nanoTime - innerOutputCompletionTimeNs), 0)
+          math.max(NANOSECONDS.toMillis(System.nanoTime - hashJoinOutputCompletionTimeNs), 0)
       }
 
       allRemovalsTimeMs += timeTakenMs {
         // Remove any remaining state rows which aren't needed because they're below the watermark.
         //
-        // For inner joins, we have to remove unnecessary state rows from both sides if possible.
+        // For inner and left semi joins, we have to remove unnecessary state rows from both sides
+        // if possible.
+        //
         // For outer joins, we have already removed unnecessary state rows from the outer side
         // (e.g., left side for left outer join) while generating the outer "null" outputs. Now, we
         // have to remove unnecessary state rows from the other side (e.g., right side for the left
         // outer join) if possible. In all cases, nothing needs to be outputted, hence the removal
         // needs to be done greedily by immediately consuming the returned iterator.
+        //
+        // For left semi joins, we have to remove unnecessary state rows from both sides if
+        // possible.

Review comment:
       Is this duplicated with L371?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -246,37 +249,43 @@ case class StreamingSymmetricHashJoinExec(
 
     //  Join one side input using the other side's buffered/state rows. Here is how it is done.
     //
-    //  - `leftSideJoiner.storeAndJoinWithOtherSide(rightSideJoiner)` generates all rows from
-    //    matching new left input with stored right input, and also stores all the left input
+    //  - `leftSideJoiner.storeAndJoinWithOtherSide(rightSideJoiner)`
+    //    - inner, left outer, right outer join: generates all rows from matching new left input

Review comment:
       `Inner, Left Outer, Right Outer Join: ...`
   
   It is better for readability.
   
   ditto for below.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -347,22 +360,28 @@ case class StreamingSymmetricHashJoinExec(
 
       // Processing time between inner output completion and here comes from the outer portion of a

Review comment:
       update `inner output completion`?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -99,13 +99,20 @@ class SymmetricHashJoinStateManager(
   /**
    * Get all the matched values for given join condition, with marking matched.
    * This method is designed to mark joined rows properly without exposing internal index of row.
+   *
+   * @param joinOnlyFirstTimeMatchedRow Only join with first-time matched row.

Review comment:
       `skipMatchedRowsInPreviousBatches`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-713877922


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34715/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r508086669



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -99,13 +99,20 @@ class SymmetricHashJoinStateManager(
   /**
    * Get all the matched values for given join condition, with marking matched.
    * This method is designed to mark joined rows properly without exposing internal index of row.
+   *
+   * @param joinOnlyFirstTimeMatchedRow Only join with first-time matched row.
+   *                                    This is used for right side of left semi join in
+   *                                    [[StreamingSymmetricHashJoinExec]] only.
    */
   def getJoinedRows(
       key: UnsafeRow,
       generateJoinedRow: InternalRow => JoinedRow,
-      predicate: JoinedRow => Boolean): Iterator[JoinedRow] = {
+      predicate: JoinedRow => Boolean,
+      joinOnlyFirstTimeMatchedRow: Boolean = false): Iterator[JoinedRow] = {
     val numValues = keyToNumValues.get(key)
-    keyWithIndexToValue.getAll(key, numValues).map { keyIdxToValue =>
+    keyWithIndexToValue.getAll(key, numValues).filterNot { keyIdxToValue =>
+      joinOnlyFirstTimeMatchedRow && keyIdxToValue.matched
+    }.map { keyIdxToValue =>

Review comment:
       I think current code is more concise and doesn't make distraction. If someone feels filterNot is something possibly making confusion, we can simply use filter and reverse the condition.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r509245494



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -1041,3 +1077,204 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
     )
   }
 }
+
+class StreamingLeftSemiJoinSuite extends StreamingJoinSuite {
+
+  import testImplicits._
+
+  test("windowed left semi join") {
+    val (leftInput, rightInput, joined) = setupWindowedJoin("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
+      CheckNewAnswer(Row(3, 10, 6), Row(4, 10, 8), Row(5, 10, 10)),
+      // states
+      // left: 1, 2, 3, 4 ,5
+      // right: 3, 4, 5, 6, 7
+      assertNumStateRows(total = 10, updated = 10),
+      MultiAddData(leftInput, 21)(rightInput, 22),
+      // Watermark = 11, should remove rows having window=[0,10].
+      CheckNewAnswer(),
+      // states
+      // left: 21
+      // right: 22
+      //
+      // states evicted
+      // left: 1, 2, 3, 4 ,5 (below watermark)
+      // right: 3, 4, 5, 6, 7 (below watermark)
+      assertNumStateRows(total = 2, updated = 2),
+      AddData(leftInput, 22),
+      CheckNewAnswer(Row(22, 30, 44)),
+      // Unlike inner/outer joins, given left input row matches with right input row,
+      // we don't buffer the matched left input row to the state store.
+      //
+      // states
+      // left: 21
+      // right: 22
+      assertNumStateRows(total = 2, updated = 0),
+      StopStream,
+      StartStream(),
+
+      AddData(leftInput, 1),
+      // Row not add as 1 < state key watermark = 12.
+      CheckNewAnswer(),
+      // states
+      // left: 21
+      // right: 22
+      assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1),
+      AddData(rightInput, 5),
+      // Row not add as 5 < state key watermark = 12.
+      CheckNewAnswer(),
+      // states
+      // left: 21
+      // right: 22
+      assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1)
+    )
+  }
+
+  test("left semi early state exclusion on left") {
+    val (leftInput, rightInput, joined) = setupWindowedJoinWithLeftCondition("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 1, 2, 3)(rightInput, 3, 4, 5),
+      // The left rows with leftValue <= 4 should not generate their semi join rows and
+      // not get added to the state.
+      CheckNewAnswer(Row(3, 10, 6)),
+      // states
+      // left: 3
+      // right: 3, 4, 5
+      assertNumStateRows(total = 4, updated = 4),
+      // We shouldn't get more semi join rows when the watermark advances.
+      MultiAddData(leftInput, 20)(rightInput, 21),
+      CheckNewAnswer(),
+      // states
+      // left: 20
+      // right: 21
+      //
+      // states evicted
+      // left: 3 (below watermark)
+      // right: 3, 4, 5 (below watermark)
+      assertNumStateRows(total = 2, updated = 2),
+      AddData(rightInput, 20),
+      CheckNewAnswer((20, 30, 40)),
+      // states
+      // left: 20
+      // right: 21, 20
+      assertNumStateRows(total = 3, updated = 1)
+    )
+  }
+
+  test("left semi early state exclusion on right") {
+    val (leftInput, rightInput, joined) = setupWindowedJoinWithRightCondition("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 3, 4, 5)(rightInput, 1, 2, 3),
+      // The right rows with rightValue <= 7 should never be added to the state.
+      // The right row with rightValue = 9 > 7, hence joined and added to state.
+      CheckNewAnswer(Row(3, 10, 6)),
+      // states
+      // left: 3, 4, 5
+      // right: 3
+      assertNumStateRows(total = 4, updated = 4),
+      // We shouldn't get more semi join rows when the watermark advances.
+      MultiAddData(leftInput, 20)(rightInput, 21),
+      CheckNewAnswer(),
+      // states
+      // left: 20
+      // right: 21
+      //
+      // states evicted
+      // left: 3, 4, 5 (below watermark)
+      // right: 3 (below watermark)
+      assertNumStateRows(total = 2, updated = 2),
+      AddData(rightInput, 20),
+      CheckNewAnswer((20, 30, 40)),
+      // states
+      // left: 20
+      // right: 21, 20
+      assertNumStateRows(total = 3, updated = 1)
+    )
+  }
+
+  test("left semi join with watermark range condition") {
+    val (leftInput, rightInput, joined) = setupWindowedJoinWithRangeCondition("left_semi")
+
+    testStream(joined)(
+      AddData(leftInput, (1, 5), (3, 5)),
+      CheckNewAnswer(),
+      // states
+      // left: (1, 5), (3, 5)
+      // right: nothing
+      assertNumStateRows(total = 2, updated = 2),
+      AddData(rightInput, (1, 10), (2, 5)),
+      // Match left row in the state.
+      CheckNewAnswer((1, 5)),
+      // states
+      // left: (1, 5), (3, 5)
+      // right: (1, 10), (2, 5)
+      assertNumStateRows(total = 4, updated = 2),
+      AddData(rightInput, (1, 11)),
+      // No match as left time is too low and left row is already matched.

Review comment:
       That said, it'd be nice if we add or change the row for right input to verify the single condition `left row is already matched`. Probably like `(1, 9)`?

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -1041,3 +1077,204 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
     )
   }
 }
+
+class StreamingLeftSemiJoinSuite extends StreamingJoinSuite {
+
+  import testImplicits._
+
+  test("windowed left semi join") {
+    val (leftInput, rightInput, joined) = setupWindowedJoin("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
+      CheckNewAnswer(Row(3, 10, 6), Row(4, 10, 8), Row(5, 10, 10)),
+      // states
+      // left: 1, 2, 3, 4 ,5
+      // right: 3, 4, 5, 6, 7
+      assertNumStateRows(total = 10, updated = 10),
+      MultiAddData(leftInput, 21)(rightInput, 22),
+      // Watermark = 11, should remove rows having window=[0,10].
+      CheckNewAnswer(),
+      // states
+      // left: 21
+      // right: 22
+      //
+      // states evicted
+      // left: 1, 2, 3, 4 ,5 (below watermark)
+      // right: 3, 4, 5, 6, 7 (below watermark)
+      assertNumStateRows(total = 2, updated = 2),
+      AddData(leftInput, 22),
+      CheckNewAnswer(Row(22, 30, 44)),
+      // Unlike inner/outer joins, given left input row matches with right input row,
+      // we don't buffer the matched left input row to the state store.
+      //
+      // states
+      // left: 21
+      // right: 22
+      assertNumStateRows(total = 2, updated = 0),
+      StopStream,
+      StartStream(),
+
+      AddData(leftInput, 1),
+      // Row not add as 1 < state key watermark = 12.
+      CheckNewAnswer(),
+      // states
+      // left: 21
+      // right: 22
+      assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1),
+      AddData(rightInput, 5),
+      // Row not add as 5 < state key watermark = 12.
+      CheckNewAnswer(),
+      // states
+      // left: 21
+      // right: 22
+      assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1)
+    )
+  }
+
+  test("left semi early state exclusion on left") {
+    val (leftInput, rightInput, joined) = setupWindowedJoinWithLeftCondition("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 1, 2, 3)(rightInput, 3, 4, 5),
+      // The left rows with leftValue <= 4 should not generate their semi join rows and
+      // not get added to the state.
+      CheckNewAnswer(Row(3, 10, 6)),
+      // states
+      // left: 3
+      // right: 3, 4, 5
+      assertNumStateRows(total = 4, updated = 4),
+      // We shouldn't get more semi join rows when the watermark advances.
+      MultiAddData(leftInput, 20)(rightInput, 21),
+      CheckNewAnswer(),
+      // states
+      // left: 20
+      // right: 21
+      //
+      // states evicted
+      // left: 3 (below watermark)
+      // right: 3, 4, 5 (below watermark)
+      assertNumStateRows(total = 2, updated = 2),
+      AddData(rightInput, 20),
+      CheckNewAnswer((20, 30, 40)),
+      // states
+      // left: 20
+      // right: 21, 20
+      assertNumStateRows(total = 3, updated = 1)
+    )
+  }
+
+  test("left semi early state exclusion on right") {
+    val (leftInput, rightInput, joined) = setupWindowedJoinWithRightCondition("left_semi")
+
+    testStream(joined)(
+      MultiAddData(leftInput, 3, 4, 5)(rightInput, 1, 2, 3),
+      // The right rows with rightValue <= 7 should never be added to the state.
+      // The right row with rightValue = 9 > 7, hence joined and added to state.
+      CheckNewAnswer(Row(3, 10, 6)),
+      // states
+      // left: 3, 4, 5
+      // right: 3
+      assertNumStateRows(total = 4, updated = 4),
+      // We shouldn't get more semi join rows when the watermark advances.
+      MultiAddData(leftInput, 20)(rightInput, 21),
+      CheckNewAnswer(),
+      // states
+      // left: 20
+      // right: 21
+      //
+      // states evicted
+      // left: 3, 4, 5 (below watermark)
+      // right: 3 (below watermark)
+      assertNumStateRows(total = 2, updated = 2),
+      AddData(rightInput, 20),
+      CheckNewAnswer((20, 30, 40)),
+      // states
+      // left: 20
+      // right: 21, 20
+      assertNumStateRows(total = 3, updated = 1)
+    )
+  }
+
+  test("left semi join with watermark range condition") {
+    val (leftInput, rightInput, joined) = setupWindowedJoinWithRangeCondition("left_semi")
+
+    testStream(joined)(
+      AddData(leftInput, (1, 5), (3, 5)),
+      CheckNewAnswer(),
+      // states
+      // left: (1, 5), (3, 5)
+      // right: nothing
+      assertNumStateRows(total = 2, updated = 2),
+      AddData(rightInput, (1, 10), (2, 5)),
+      // Match left row in the state.
+      CheckNewAnswer((1, 5)),
+      // states
+      // left: (1, 5), (3, 5)
+      // right: (1, 10), (2, 5)
+      assertNumStateRows(total = 4, updated = 2),
+      AddData(rightInput, (1, 11)),
+      // No match as left time is too low and left row is already matched.
+      CheckNewAnswer(),
+      // states
+      // left: (1, 5), (3, 5)
+      // right: (1, 10), (2, 5), (1, 11)
+      assertNumStateRows(total = 5, updated = 1),
+      // Increase event time watermark to 20s by adding data with time = 30s on both inputs.
+      AddData(leftInput, (1, 7), (1, 30)),
+      CheckNewAnswer((1, 7)),
+      // states
+      // left: (1, 5), (3, 5), (1, 30)
+      // right: (1, 10), (2, 5), (1, 11)
+      assertNumStateRows(total = 6, updated = 1),
+      // Watermark = 30 - 10 = 20, no matched row.
+      AddData(rightInput, (0, 30)),
+      CheckNewAnswer(),
+      // states
+      // left: (1, 30)
+      // right: (0, 30)
+      //
+      // states evicted
+      // left: (1, 5), (3, 5) (below watermark = 20)
+      // right: (1, 10), (2, 5), (1, 11) (below watermark = 20)
+      assertNumStateRows(total = 2, updated = 1)
+    )
+  }
+
+  test("self left semi join") {
+    val (inputStream, query) = setupWindowedSelfJoin("left_semi")
+
+    testStream(query)(
+      AddData(inputStream, (1, 1L), (2, 2L), (3, 3L), (4, 4L), (5, 5L)),
+      CheckNewAnswer((2, 2), (4, 4)),
+      // batch 1 - global watermark = 0
+      // states
+      // left: (2, 2L), (4, 4L)
+      //       (left rows with value % 2 != 0 is filtered per [[PushDownLeftSemiAntiJoin]])
+      // right: (2, 2L), (4, 4L)

Review comment:
       Probably better to leave the similar explanation (like you do for left state) for right state.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -99,13 +99,20 @@ class SymmetricHashJoinStateManager(
   /**
    * Get all the matched values for given join condition, with marking matched.
    * This method is designed to mark joined rows properly without exposing internal index of row.
+   *
+   * @param excludeRowsAlreadyMatched Do not join with rows already matched previously.
+   *                                  This is used for right side of left semi join in
+   *                                  [[StreamingSymmetricHashJoinExec]] only.
    */
   def getJoinedRows(
       key: UnsafeRow,
       generateJoinedRow: InternalRow => JoinedRow,
-      predicate: JoinedRow => Boolean): Iterator[JoinedRow] = {
+      predicate: JoinedRow => Boolean,
+      excludeRowsAlreadyMatched: Boolean = false): Iterator[JoinedRow] = {
     val numValues = keyToNumValues.get(key)
-    keyWithIndexToValue.getAll(key, numValues).map { keyIdxToValue =>
+    keyWithIndexToValue.getAll(key, numValues).filterNot { keyIdxToValue =>

Review comment:
       I thought this first and not proposed because current predicate cannot check the condition. We can still do this via adjusting the type of predicate a bit, but I guess the followup PR would try to separate left semi case of performance which lets us to can revert the change here. For the reason I prefer the small change for now.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -1088,39 +1088,79 @@ class StreamingLeftSemiJoinSuite extends StreamingJoinSuite {
     testStream(joined)(
       MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
       CheckNewAnswer(Row(3, 10, 6), Row(4, 10, 8), Row(5, 10, 10)),
+      // states
+      // left: 1, 2, 3, 4 ,5
+      // right: 3, 4, 5, 6, 7
+      assertNumStateRows(total = 10, updated = 10),
       MultiAddData(leftInput, 21)(rightInput, 22),
-      // Watermark = 11, should remove rows having window=[0,10]
+      // Watermark = 11, should remove rows having window=[0,10].
       CheckNewAnswer(),
-      assertNumStateRows(total = 2, updated = 12),
+      // states
+      // left: 21
+      // right: 22
+      //
+      // states evicted
+      // left: 1, 2, 3, 4 ,5 (below watermark)
+      // right: 3, 4, 5, 6, 7 (below watermark)
+      assertNumStateRows(total = 2, updated = 2),
       AddData(leftInput, 22),
       CheckNewAnswer(Row(22, 30, 44)),
+      // Unlike inner/outer joins, given left input row matches with right input row,
+      // we don't buffer the matched left input row to the state store.
+      //
+      // states
+      // left: 21
+      // right: 22
       assertNumStateRows(total = 2, updated = 0),
       StopStream,
       StartStream(),
 
       AddData(leftInput, 1),
-      // Row not add as 1 < state key watermark = 12
+      // Row not add as 1 < state key watermark = 12.
       CheckNewAnswer(),
-      AddData(rightInput, 11),
-      // Row not add as 11 < state key watermark = 12
-      CheckNewAnswer()
+      // states
+      // left: 21
+      // right: 22
+      assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1),
+      AddData(rightInput, 5),
+      // Row not add as 5 < state key watermark = 12.
+      CheckNewAnswer(),
+      // states
+      // left: 21
+      // right: 22
+      assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1)
     )
   }
 
   test("left semi early state exclusion on left") {
     val (leftInput, rightInput, joined) = setupWindowedJoinWithLeftCondition("left_semi")
 
     testStream(joined)(
-      MultiAddData(leftInput, 1, 2, 3)(rightInput, 1, 2, 3, 4, 5),
-      // The left rows with leftValue <= 4 should not generate their semi join row and
+      MultiAddData(leftInput, 1, 2, 3)(rightInput, 3, 4, 5),

Review comment:
       Ah OK thanks for explanation. I agree it's probably less confused to just remove them from right side instead of being affected by the optimizer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-714203992






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-713922731


   **[Test build #130110 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/130110/testReport)** for PR 30076 at commit [`14871d9`](https://github.com/apache/spark/commit/14871d9d2be6b751687e78dd4d17c2e249b8f205).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-716238344


   Kubernetes integration test status failure
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34846/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-716234041


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34846/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-713324398


   **[Test build #130072 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/130072/testReport)** for PR 30076 at commit [`765a233`](https://github.com/apache/spark/commit/765a233237e309f112058e73cef4f4c516f39a8d).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-716315794


   Thank you @HeartSaVioR , @xuanyuanking and @viirya for review!


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r508128997



##########
File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
##########
@@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     _.join(_, joinType = LeftSemi),
     streamStreamSupported = false,
     batchStreamSupported = false,
-    expectedMsg = "left semi/anti joins")
+    expectedMsg = "LeftSemi join")
+
+  // Left semi joins: update and complete mode not allowed
+  assertNotSupportedInStreamingPlan(
+    s"left semi join with stream-stream relations and update mode",
+    streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attribute)),
+    OutputMode.Update(),
+    Seq("is not supported in Update output mode"))
+  assertNotSupportedInStreamingPlan(
+    s"left semi join with stream-stream relations and complete mode",
+    Aggregate(Nil, aggExprs("d"), streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attribute))),
+    OutputMode.Complete(),
+    Seq("is not supported in Complete output mode"))
+
+  // Left ousemiter joins: stream-stream allowed with join on watermark attribute

Review comment:
       @HeartSaVioR - yeah, totally understand. Appreciate your careful review here, thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r508876329



##########
File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
##########
@@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     _.join(_, joinType = LeftSemi),
     streamStreamSupported = false,
     batchStreamSupported = false,
-    expectedMsg = "left semi/anti joins")
+    expectedMsg = "LeftSemi join")
+
+  // Left semi joins: update and complete mode not allowed
+  assertNotSupportedInStreamingPlan(
+    s"left semi join with stream-stream relations and update mode",
+    streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attribute)),
+    OutputMode.Update(),
+    Seq("is not supported in Update output mode"))
+  assertNotSupportedInStreamingPlan(
+    s"left semi join with stream-stream relations and complete mode",

Review comment:
       @viirya - updated for all places.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -246,37 +249,43 @@ case class StreamingSymmetricHashJoinExec(
 
     //  Join one side input using the other side's buffered/state rows. Here is how it is done.
     //
-    //  - `leftSideJoiner.storeAndJoinWithOtherSide(rightSideJoiner)` generates all rows from
-    //    matching new left input with stored right input, and also stores all the left input
+    //  - `leftSideJoiner.storeAndJoinWithOtherSide(rightSideJoiner)`
+    //    - inner, left outer, right outer join: generates all rows from matching new left input

Review comment:
       @viirya - updated.

##########
File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
##########
@@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     _.join(_, joinType = LeftSemi),
     streamStreamSupported = false,
     batchStreamSupported = false,
-    expectedMsg = "left semi/anti joins")
+    expectedMsg = "LeftSemi join")
+
+  // Left semi joins: update and complete mode not allowed
+  assertNotSupportedInStreamingPlan(
+    s"left semi join with stream-stream relations and update mode",

Review comment:
       @viirya - updated.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -347,22 +360,28 @@ case class StreamingSymmetricHashJoinExec(
 
       // Processing time between inner output completion and here comes from the outer portion of a

Review comment:
       @viirya - updated.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -99,13 +99,20 @@ class SymmetricHashJoinStateManager(
   /**
    * Get all the matched values for given join condition, with marking matched.
    * This method is designed to mark joined rows properly without exposing internal index of row.
+   *
+   * @param joinOnlyFirstTimeMatchedRow Only join with first-time matched row.

Review comment:
       @HeartSaVioR - after more thought I feel the early eviction is more complicated and needs more thoughts.
   
   For left semi join state store eviction:
   * [if the watermark predicate is on value](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L542), we can add a method in `SymmetricHashJoinStateManager` or change current method `SymmetricHashJoinStateManager.removeByValueCondition()` to remove matched values when iterating all values.
   
   * [if the watermark predicate is on key](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L540), only the values of keys passed predicate will be iterated, but not all values in state store. So we need more efficient way to evict matched values here, otherwise we need to iterate all keys and values to find matched values.
   
   So I think this piece of code may be still needed (not remove completely away). Maybe we we can store the matched rows in some other data structure after getting all matched rows here. I feel a follow-up PR is appropriate, WDYT? @HeartSaVioR 

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -347,22 +360,28 @@ case class StreamingSymmetricHashJoinExec(
 
       // Processing time between inner output completion and here comes from the outer portion of a
       // join, and thus counts as removal time as we remove old state from one side while iterating.
-      if (innerOutputCompletionTimeNs != 0) {
+      if (hashJoinOutputCompletionTimeNs != 0) {
         allRemovalsTimeMs +=
-          math.max(NANOSECONDS.toMillis(System.nanoTime - innerOutputCompletionTimeNs), 0)
+          math.max(NANOSECONDS.toMillis(System.nanoTime - hashJoinOutputCompletionTimeNs), 0)
       }
 
       allRemovalsTimeMs += timeTakenMs {
         // Remove any remaining state rows which aren't needed because they're below the watermark.
         //
-        // For inner joins, we have to remove unnecessary state rows from both sides if possible.
+        // For inner and left semi joins, we have to remove unnecessary state rows from both sides
+        // if possible.
+        //
         // For outer joins, we have already removed unnecessary state rows from the outer side
         // (e.g., left side for left outer join) while generating the outer "null" outputs. Now, we
         // have to remove unnecessary state rows from the other side (e.g., right side for the left
         // outer join) if possible. In all cases, nothing needs to be outputted, hence the removal
         // needs to be done greedily by immediately consuming the returned iterator.
+        //
+        // For left semi joins, we have to remove unnecessary state rows from both sides if
+        // possible.

Review comment:
       @viirya - removed.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-714203437


   **[Test build #130110 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/130110/testReport)** for PR 30076 at commit [`14871d9`](https://github.com/apache/spark/commit/14871d9d2be6b751687e78dd4d17c2e249b8f205).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-714087252






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-712354760


   **[Test build #130010 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/130010/testReport)** for PR 30076 at commit [`3918727`](https://github.com/apache/spark/commit/3918727a08c8d0d4c65ccc8ea902f77051b78b1d).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r507955252



##########
File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
##########
@@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     _.join(_, joinType = LeftSemi),
     streamStreamSupported = false,
     batchStreamSupported = false,
-    expectedMsg = "left semi/anti joins")
+    expectedMsg = "LeftSemi join")
+
+  // Left semi joins: update and complete mode not allowed
+  assertNotSupportedInStreamingPlan(
+    s"left semi join with stream-stream relations and update mode",
+    streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attribute)),
+    OutputMode.Update(),
+    Seq("is not supported in Update output mode"))
+  assertNotSupportedInStreamingPlan(
+    s"left semi join with stream-stream relations and complete mode",
+    Aggregate(Nil, aggExprs("d"), streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attribute))),
+    OutputMode.Complete(),
+    Seq("is not supported in Complete output mode"))
+
+  // Left ousemiter joins: stream-stream allowed with join on watermark attribute

Review comment:
       @HeartSaVioR - sorry, updated.

##########
File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
##########
@@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     _.join(_, joinType = LeftSemi),
     streamStreamSupported = false,
     batchStreamSupported = false,
-    expectedMsg = "left semi/anti joins")
+    expectedMsg = "LeftSemi join")
+
+  // Left semi joins: update and complete mode not allowed

Review comment:
       @HeartSaVioR - sure, will create a followup JIRA.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -347,24 +361,29 @@ case class StreamingSymmetricHashJoinExec(
 
       // Processing time between inner output completion and here comes from the outer portion of a
       // join, and thus counts as removal time as we remove old state from one side while iterating.
-      if (innerOutputCompletionTimeNs != 0) {
+      if (hashJoinOutputCompletionTimeNs != 0) {
         allRemovalsTimeMs +=
-          math.max(NANOSECONDS.toMillis(System.nanoTime - innerOutputCompletionTimeNs), 0)
+          math.max(NANOSECONDS.toMillis(System.nanoTime - hashJoinOutputCompletionTimeNs), 0)
       }
 
       allRemovalsTimeMs += timeTakenMs {
         // Remove any remaining state rows which aren't needed because they're below the watermark.
         //
         // For inner joins, we have to remove unnecessary state rows from both sides if possible.
+        //
         // For outer joins, we have already removed unnecessary state rows from the outer side
         // (e.g., left side for left outer join) while generating the outer "null" outputs. Now, we
         // have to remove unnecessary state rows from the other side (e.g., right side for the left
         // outer join) if possible. In all cases, nothing needs to be outputted, hence the removal
         // needs to be done greedily by immediately consuming the returned iterator.
+        //
+        // For left semi joins, we have to remove unnecessary state rows from both sides if
+        // possible.
         val cleanupIter = joinType match {
           case Inner => leftSideJoiner.removeOldState() ++ rightSideJoiner.removeOldState()
           case LeftOuter => rightSideJoiner.removeOldState()
           case RightOuter => leftSideJoiner.removeOldState()
+          case LeftSemi => leftSideJoiner.removeOldState() ++ rightSideJoiner.removeOldState()

Review comment:
       @HeartSaVioR - updated.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -330,11 +338,17 @@ case class StreamingSymmetricHashJoinExec(
           }
         }.map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value))
 
-        innerOutputIter ++ outerOutputIter
+        hashJoinOutputIter ++ outerOutputIter
+      case LeftSemi =>

Review comment:
       @HeartSaVioR - updated.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -99,15 +99,22 @@ class SymmetricHashJoinStateManager(
   /**
    * Get all the matched values for given join condition, with marking matched.
    * This method is designed to mark joined rows properly without exposing internal index of row.
+   *
+   * @param joinOnlyFirstTimeMatchedRow Only join with first-time matched row.
+   *                                    This is used for right side of left semi join in
+   *                                    [[StreamingSymmetricHashJoinExec]] only.
    */
   def getJoinedRows(
       key: UnsafeRow,
       generateJoinedRow: InternalRow => JoinedRow,
-      predicate: JoinedRow => Boolean): Iterator[JoinedRow] = {
+      predicate: JoinedRow => Boolean,
+      joinOnlyFirstTimeMatchedRow: Boolean = false): Iterator[JoinedRow] = {
     val numValues = keyToNumValues.get(key)
     keyWithIndexToValue.getAll(key, numValues).map { keyIdxToValue =>
       val joinedRow = generateJoinedRow(keyIdxToValue.value)
-      if (predicate(joinedRow)) {
+      if (joinOnlyFirstTimeMatchedRow && keyIdxToValue.matched) {

Review comment:
       @HeartSaVioR - make sense to me, though the cost for `generateJoinedRow` is minor. Updated.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -151,7 +151,7 @@ case class StreamingSymmetricHashJoinExec(
       stateWatermarkPredicates = JoinStateWatermarkPredicates(), stateFormatVersion, left, right)
   }
 
-  if (stateFormatVersion < 2 && joinType != Inner) {
+  if (stateFormatVersion < 2 && (joinType == LeftOuter || joinType == RightOuter)) {

Review comment:
       updated.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-716309131






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-710764302


   Test FAILed.
   Refer to this link for build results (access rights to CI server needed): 
   https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/129934/
   Test FAILed.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-710748251






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-716242211


   **[Test build #130248 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/130248/testReport)** for PR 30076 at commit [`14871d9`](https://github.com/apache/spark/commit/14871d9d2be6b751687e78dd4d17c2e249b8f205).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-714019114






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-714083774


   **[Test build #130106 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/130106/testReport)** for PR 30076 at commit [`9cd222f`](https://github.com/apache/spark/commit/9cd222f833cc47cd7880028f42fd13d8396e2414).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-714203992






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-713518327






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-710735984


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34536/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-716309131






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r508012673



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -99,13 +99,20 @@ class SymmetricHashJoinStateManager(
   /**
    * Get all the matched values for given join condition, with marking matched.
    * This method is designed to mark joined rows properly without exposing internal index of row.
+   *
+   * @param joinOnlyFirstTimeMatchedRow Only join with first-time matched row.

Review comment:
       `skipMatchedRowsPreviously`? `first-time matched row` reads a bit confusing to me. Can you rephrase it like we only join with rows that are not matched in previously?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-716257746


   Kubernetes integration test status success
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34848/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-710751008


   Kubernetes integration test status success
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34539/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-713431994






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-714087252






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-714243009


   I'll let the PR around 2 days to see whether others have additional comments. If no further comment is provided I'll merge this probably in this weekend. cc. @viirya @xuanyuanking 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-713324398


   **[Test build #130072 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/130072/testReport)** for PR 30076 at commit [`765a233`](https://github.com/apache/spark/commit/765a233237e309f112058e73cef4f4c516f39a8d).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-710742615


   **[Test build #129934 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129934/testReport)** for PR 30076 at commit [`ee16690`](https://github.com/apache/spark/commit/ee166901bbaa3500a9fa8704f2a5598a8ce4b6d7).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
viirya commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-710731815


   cc @HeartSaVioR 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-710748357


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34539/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-710748095


   **[Test build #129931 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129931/testReport)** for PR 30076 at commit [`e5af8e1`](https://github.com/apache/spark/commit/e5af8e12112533150321a292a261489b3523e23a).
    * This patch **fails Spark unit tests**.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-712486951






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] gaborgsomogyi commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
gaborgsomogyi commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-712050432


   I've just picked this up and from high level perspective I see at least 2 things in the PR:
   * Test code deduplication (which is good to do)
   * Left semi join itself
   
   I suggest to split them up by creating a jira for the test code deduplication.
   That would make the PR more consize and easier to review.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-713518327






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-713357125


   **[Test build #130072 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/130072/testReport)** for PR 30076 at commit [`765a233`](https://github.com/apache/spark/commit/765a233237e309f112058e73cef4f4c516f39a8d).
    * This patch **fails due to an unknown error code, -9**.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-716238353






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] xuanyuanking commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
xuanyuanking commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r509907626



##########
File path: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
##########
@@ -490,15 +490,77 @@ class UnsupportedOperationsSuite extends SparkFunSuite {
     _.join(_, joinType = LeftSemi),
     streamStreamSupported = false,
     batchStreamSupported = false,
-    expectedMsg = "left semi/anti joins")
+    expectedMsg = "LeftSemi join")
+
+  // Left semi joins: update and complete mode not allowed
+  assertNotSupportedInStreamingPlan(
+    "left semi join with stream-stream relations and update mode",
+    streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attribute)),
+    OutputMode.Update(),
+    Seq("is not supported in Update output mode"))
+  assertNotSupportedInStreamingPlan(
+    "left semi join with stream-stream relations and complete mode",
+    Aggregate(Nil, aggExprs("d"), streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attribute))),
+    OutputMode.Complete(),
+    Seq("is not supported in Complete output mode"))
+
+  // Left semi joins: stream-stream allowed with join on watermark attribute
+  // Note that the attribute need not be watermarked on both sides.
+  assertSupportedInStreamingPlan(
+    "left semi join with stream-stream relations and join on attribute with left watermark",
+    streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attributeWithWatermark === attribute)),
+    OutputMode.Append())
+  assertSupportedInStreamingPlan(
+    "left semi join with stream-stream relations and join on attribute with right watermark",
+    streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attributeWithWatermark)),
+    OutputMode.Append())
+  assertNotSupportedInStreamingPlan(
+    "left semi join with stream-stream relations and join on non-watermarked attribute",
+    streamRelation.join(streamRelation, joinType = LeftSemi,
+      condition = Some(attribute === attribute)),
+    OutputMode.Append(),
+    Seq("watermark in the join keys"))

Review comment:
       Make sense.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR closed pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR closed pull request #30076:
URL: https://github.com/apache/spark/pull/30076


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-711753933


   cc @tdas @zsxwing @jose-torres @gaborgsomogyi 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
viirya commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r508012673



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
##########
@@ -99,13 +99,20 @@ class SymmetricHashJoinStateManager(
   /**
    * Get all the matched values for given join condition, with marking matched.
    * This method is designed to mark joined rows properly without exposing internal index of row.
+   *
+   * @param joinOnlyFirstTimeMatchedRow Only join with first-time matched row.

Review comment:
       `skipMatchedRowsPreviously`? `first-time matched row` reads a bit confusing to me. Can you rephrase it like we only join with rows that are not matched previously?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-716242211


   **[Test build #130248 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/130248/testReport)** for PR 30076 at commit [`14871d9`](https://github.com/apache/spark/commit/14871d9d2be6b751687e78dd4d17c2e249b8f205).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #30076: [SPARK-32862][SQL] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-710728381


   cc @cloud-fan and @sameeragarwal if you guys have time to take a look, thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-716257756






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-716228578


   **[Test build #130246 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/130246/testReport)** for PR 30076 at commit [`14871d9`](https://github.com/apache/spark/commit/14871d9d2be6b751687e78dd4d17c2e249b8f205).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins removed a comment on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins removed a comment on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-712486951






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-716308498


   **[Test build #130248 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/130248/testReport)** for PR 30076 at commit [`14871d9`](https://github.com/apache/spark/commit/14871d9d2be6b751687e78dd4d17c2e249b8f205).
    * This patch passes all tests.
    * This patch merges cleanly.
    * This patch adds no public classes.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-714019050


   Kubernetes integration test status success
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34719/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-713417560


   Kubernetes integration test starting
   URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/34691/
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-710742615


   **[Test build #129934 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/129934/testReport)** for PR 30076 at commit [`ee16690`](https://github.com/apache/spark/commit/ee166901bbaa3500a9fa8704f2a5598a8ce4b6d7).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-713373315


   retest this please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
c21 commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-716207490


   @viirya - gentle ping again, any more comments before merging? Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-710738846






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-713376913


   **[Test build #130082 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/130082/testReport)** for PR 30076 at commit [`765a233`](https://github.com/apache/spark/commit/765a233237e309f112058e73cef4f4c516f39a8d).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-716227637


   retest this, please


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r507946803



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -151,7 +151,7 @@ case class StreamingSymmetricHashJoinExec(
       stateWatermarkPredicates = JoinStateWatermarkPredicates(), stateFormatVersion, left, right)
   }
 
-  if (stateFormatVersion < 2 && joinType != Inner) {
+  if (stateFormatVersion < 2 && (joinType == LeftOuter || joinType == RightOuter)) {

Review comment:
       @HeartSaVioR - sorry I was under impression that we don't allow state format version 1 after 3.0 without looking into the relevant code too much. But I found it seems that we are having a config [spark.sql.streaming.join.stateFormatVersion](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L1345), that allows user to use version 1.
   
   I will add the check to fail the left semi join query with state format 1 as outer joins here. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] c21 commented on a change in pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
c21 commented on a change in pull request #30076:
URL: https://github.com/apache/spark/pull/30076#discussion_r509089331



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -1088,39 +1088,79 @@ class StreamingLeftSemiJoinSuite extends StreamingJoinSuite {
     testStream(joined)(
       MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
       CheckNewAnswer(Row(3, 10, 6), Row(4, 10, 8), Row(5, 10, 10)),
+      // states
+      // left: 1, 2, 3, 4 ,5
+      // right: 3, 4, 5, 6, 7
+      assertNumStateRows(total = 10, updated = 10),
       MultiAddData(leftInput, 21)(rightInput, 22),
-      // Watermark = 11, should remove rows having window=[0,10]
+      // Watermark = 11, should remove rows having window=[0,10].
       CheckNewAnswer(),
-      assertNumStateRows(total = 2, updated = 12),
+      // states
+      // left: 21
+      // right: 22
+      //
+      // states evicted
+      // left: 1, 2, 3, 4 ,5 (below watermark)
+      // right: 3, 4, 5, 6, 7 (below watermark)
+      assertNumStateRows(total = 2, updated = 2),
       AddData(leftInput, 22),
       CheckNewAnswer(Row(22, 30, 44)),
+      // Unlike inner/outer joins, given left input row matches with right input row,
+      // we don't buffer the matched left input row to the state store.
+      //
+      // states
+      // left: 21
+      // right: 22
       assertNumStateRows(total = 2, updated = 0),
       StopStream,
       StartStream(),
 
       AddData(leftInput, 1),
-      // Row not add as 1 < state key watermark = 12
+      // Row not add as 1 < state key watermark = 12.
       CheckNewAnswer(),
-      AddData(rightInput, 11),
-      // Row not add as 11 < state key watermark = 12
-      CheckNewAnswer()
+      // states
+      // left: 21
+      // right: 22
+      assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1),
+      AddData(rightInput, 5),
+      // Row not add as 5 < state key watermark = 12.
+      CheckNewAnswer(),
+      // states
+      // left: 21
+      // right: 22
+      assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1)
     )
   }
 
   test("left semi early state exclusion on left") {
     val (leftInput, rightInput, joined) = setupWindowedJoinWithLeftCondition("left_semi")
 
     testStream(joined)(
-      MultiAddData(leftInput, 1, 2, 3)(rightInput, 1, 2, 3, 4, 5),
-      // The left rows with leftValue <= 4 should not generate their semi join row and
+      MultiAddData(leftInput, 1, 2, 3)(rightInput, 3, 4, 5),

Review comment:
       The join condition has `leftValue > 4` on left side. For left semi join there's a logical plan optimization rule to push down the condition from join (`PushPredicateThroughJoin`), so there's a filter operator on right side as well to filter out the rows with 1,2 before join. So the join result and state store will be the same with/without 1,2 rows because they are filtered out before join.
   
   Left semi join physical plan:
   
   ```
   *(4) Project [key#3, cast(precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), LongType, TimestampType) as bigint) AS end#46L, leftValue#5]
   +- StreamingSymmetricHashJoin [key#3, named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, L
 ongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), LongType, TimestampType))], [key#12, window#23-T10000ms], LeftSemi, condition = [ leftOnly = null, rightOnly = null, both = null, full = null ], state info [ checkpoint = <unknown>, runId = 92a31d09-4275-4ee6-8bba-03e1973b4298, opId = 0, ver = 0, numPartitions = 5], 0, state cleanup [ left key predicate: (input[1, struct<start:timestamp,end:timestamp>, false].end <= 0), right key predicate: (input[1, struct<start:timestamp,end:timestamp>, false].end <= 0) ], 2
      :- Exchange hashpartitioning(key#3, named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, 
 LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(leftTime#4-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), LongType, TimestampType)), 5), true, [id=#54]
      :  +- EventTimeWatermark leftTime#4: timestamp, 10 seconds
      :     +- *(1) Project [value#1 AS key#3, timestamp_seconds(value#1) AS leftTime#4, (value#1 * 2) AS leftValue#5]
      :        +- *(1) Filter ((value#1 * 2) > 4)
      :           +- StreamingRelation memory, [value#1]
      +- Exchange hashpartitioning(key#12, window#23-T10000ms, 5), true, [id=#63]
         +- *(3) Project [key#12, named_struct(start, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(rightTime#13-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(rightTime#13-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(rightTime#13-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(rightTime#13-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 0), LongType, TimestampType), end, precisetimestampconversion(((((CASE WHEN (cast(CEIL((cast((precisetimestampconversion(rightTime#13-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) as double) = (cast((precisetimestampconversion(rightTime#13-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) THEN (CEIL((cast((precisetimestampconversion(rightTime#13-T10000ms, Timestamp
 Type, LongType) - 0) as double) / 1.0E7)) + 1) ELSE CEIL((cast((precisetimestampconversion(rightTime#13-T10000ms, TimestampType, LongType) - 0) as double) / 1.0E7)) END + 0) - 1) * 10000000) + 10000000), LongType, TimestampType)) AS window#23-T10000ms]
            +- EventTimeWatermark rightTime#13: timestamp, 10 seconds
               +- *(2) Project [value#10 AS key#12, timestamp_seconds(value#10) AS rightTime#13]
                  +- *(2) Filter ((value#10 * 2) > 4)
                     +- StreamingRelation memory, [value#10]
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] SparkQA removed a comment on pull request #30076: [SPARK-32862][SS] Left semi stream-stream join

Posted by GitBox <gi...@apache.org>.
SparkQA removed a comment on pull request #30076:
URL: https://github.com/apache/spark/pull/30076#issuecomment-716228578


   **[Test build #130246 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/130246/testReport)** for PR 30076 at commit [`14871d9`](https://github.com/apache/spark/commit/14871d9d2be6b751687e78dd4d17c2e249b8f205).


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org