You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/03/29 10:47:04 UTC

[GitHub] [spark] HeartSaVioR opened a new pull request #36002: [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider

HeartSaVioR opened a new pull request #36002:
URL: https://github.com/apache/spark/pull/36002


   ### What changes were proposed in this pull request?
   
   This PR fixes the correctness issue on stream-stream outer join with RocksDB state store provider, which can occur in certain condition, like below:
   
   * stream-stream time interval outer join
     * left outer join has an issue on left side, right outer join has an issue on right side, full outer join has an issue on both sides
   * At batch N, produce non-late row(s) on the problematic side
   * At the same batch (batch N), some row(s) on the problematic side are evicted by the condition of watermark
   
   https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L327-L339
   
   https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L619-L627
   
   https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala#L195-L201
   
   https://github.com/apache/spark/blob/ca7200b0008dc6101a252020e6c34ef7b72d81d6/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala#L208-L223
   
   The root cause is same as [SPARK-38320](https://issues.apache.org/jira/browse/SPARK-38320) - weak read consistency on iterator, especially with RocksDB state store provider. (Quoting from SPARK-38320: The problem is due to the StateStore.iterator not reflecting StateStore changes made after its creation.)
   
   More specifically, if updates are performed during processing input rows and somehow updates the number of values for grouping key, the update is not seen in SymmetricHashJoinStateManager.removeByValueCondition, and the method does the eviction with the number of values in out of sync.
   
   Making it more worse, if the method performs the eviction and updates the number of values for grouping key, it "overwrites" the number of value, effectively drop all rows being inserted in the same batch.
   
   This PR fixes the outer iterators as late evaluation to ensure all updates on processing input rows are reflected "before" outer iterators are initialized.
   
   ### Why are the changes needed?
   
   The bug is described in above section.
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   New UT added.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR closed pull request #36002: [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider

Posted by GitBox <gi...@apache.org>.
HeartSaVioR closed pull request #36002:
URL: https://github.com/apache/spark/pull/36002


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anishshri-db commented on a change in pull request #36002: [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider

Posted by GitBox <gi...@apache.org>.
anishshri-db commented on a change in pull request #36002:
URL: https://github.com/apache/spark/pull/36002#discussion_r840105969



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -324,17 +324,21 @@ case class StreamingSymmetricHashJoinExec(
           }
         }
 
+        val initIterFn = { () =>
+          val removedRowIter = leftSideJoiner.removeOldState()
+          removedRowIter.filterNot { kv =>
+            stateFormatVersion match {
+              case 1 => matchesWithRightSideState(new UnsafeRowPair(kv.key, kv.value))
+              case 2 => kv.matched
+              case _ => throwBadStateFormatVersionException()
+            }
+          }.map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))
+        }
+
         // NOTE: we need to make sure `outerOutputIter` is evaluated "after" exhausting all of
-        // elements in `innerOutputIter`, because evaluation of `innerOutputIter` may update
+        // elements in `hashJoinOutputIter`, because evaluation of `hashJoinOutputIter` may update

Review comment:
       Nit: Do you think we should tag/mention the spark JIRA ticket (https://issues.apache.org/jira/browse/SPARK-38684) in the comment here and below? Also, should we mention some of the PR description you added here, to elaborate the issue further? :
   
   ```
   More specifically, if updates are performed during processing input rows and somehow updates the number of values for grouping key, the update is not seen in SymmetricHashJoinStateManager.removeByValueCondition, and the method does the eviction with the number of values in out of sync.
   
   Making it more worse, if the method performs the eviction and updates the number of values for grouping key, it "overwrites" the number of value, effectively drop all rows being inserted in the same batch.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #36002: [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #36002:
URL: https://github.com/apache/spark/pull/36002#issuecomment-1085297235


   cc. @tdas @zsxwing @viirya @xuanyuanking Kindly reminder.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #36002: [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #36002:
URL: https://github.com/apache/spark/pull/36002#discussion_r837377919



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -1353,6 +1353,67 @@ class StreamingOuterJoinSuite extends StreamingJoinSuite {
     ).select(Symbol("leftKey1"), Symbol("rightKey1"), Symbol("leftKey2"), Symbol("rightKey2"),
       $"leftWindow.end".cast("long"), Symbol("leftValue"), Symbol("rightValue"))
   }
+
+  test("SPARK-38684: outer join works correctly even if processing input rows and " +
+    "evicting state rows for same grouping key happens in the same micro-batch") {
+
+    // The test is to demonstrate the correctness issue in outer join before SPARK-38684.
+    withSQLConf(
+      SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key -> "false",
+      SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) {
+
+      val input1 = MemoryStream[(Timestamp, String, String)]
+      val df1 = input1.toDF
+        .selectExpr("_1 as eventTime", "_2 as id", "_3 as comment")
+        .withWatermark("eventTime", "0 second")
+
+      val input2 = MemoryStream[(Timestamp, String, String)]
+      val df2 = input2.toDF
+        .selectExpr("_1 as eventTime", "_2 as id", "_3 as comment")
+        .withWatermark("eventTime", "0 second")
+
+      val joined = df1.as("left")
+        .join(df2.as("right"),
+          expr("""
+                 |left.id = right.id AND left.eventTime BETWEEN
+                 |  right.eventTime - INTERVAL 30 seconds AND
+                 |  right.eventTime + INTERVAL 30 seconds
+             """.stripMargin),
+          joinType = "leftOuter")
+
+      testStream(joined)(
+        MultiAddData(
+          (input1, Seq((Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "left in batch 1"))),
+          (input2, Seq((Timestamp.valueOf("2020-01-02 00:01:00"), "abc", "right in batch 1")))
+        ),
+        CheckNewAnswer(),
+        MultiAddData(
+          (input1, Seq((Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "left in batch 2"))),
+          (input2, Seq((Timestamp.valueOf("2020-01-02 01:01:00"), "abc", "right in batch 2")))
+        ),
+        // watermark advanced to "2020-01-02 00:00:00"
+        CheckNewAnswer(),
+        AddData(input1, (Timestamp.valueOf("2020-01-02 01:30:00"), "abc", "left in batch 3")),
+        // watermark advanced to "2020-01-02 01:00:00"
+        CheckNewAnswer(
+          (Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "left in batch 1", null, null, null)
+        ),
+        // left side state should still contain "left in batch 2" and "left in batch 3"
+        // we should see both rows in the left side since
+        // - "left in batch 2" is going to be evicted in this batch
+        // - "left in batch 3" is going to be matched with new row in right side
+        AddData(input2,
+          (Timestamp.valueOf("2020-01-02 01:30:10"), "abc", "match with left in batch 3")),
+        // watermark advanced to "2020-01-02 01:01:00"
+        CheckNewAnswer(
+          (Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "left in batch 2",

Review comment:
       Without this fix, Spark only produces one row instead of two.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #36002: [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #36002:
URL: https://github.com/apache/spark/pull/36002#discussion_r840143781



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -324,17 +324,21 @@ case class StreamingSymmetricHashJoinExec(
           }
         }
 
+        val initIterFn = { () =>
+          val removedRowIter = leftSideJoiner.removeOldState()
+          removedRowIter.filterNot { kv =>
+            stateFormatVersion match {
+              case 1 => matchesWithRightSideState(new UnsafeRowPair(kv.key, kv.value))
+              case 2 => kv.matched
+              case _ => throwBadStateFormatVersionException()
+            }
+          }.map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))
+        }
+
         // NOTE: we need to make sure `outerOutputIter` is evaluated "after" exhausting all of
-        // elements in `innerOutputIter`, because evaluation of `innerOutputIter` may update
+        // elements in `hashJoinOutputIter`, because evaluation of `hashJoinOutputIter` may update

Review comment:
       FYI, we also recently updated the method doc of StateStore.iterator - with updated contact for StateStore.iterator, the logic before this fix is "unsafe" by interface contract.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #36002: [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #36002:
URL: https://github.com/apache/spark/pull/36002#discussion_r840133149



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -324,17 +324,21 @@ case class StreamingSymmetricHashJoinExec(
           }
         }
 
+        val initIterFn = { () =>
+          val removedRowIter = leftSideJoiner.removeOldState()
+          removedRowIter.filterNot { kv =>
+            stateFormatVersion match {
+              case 1 => matchesWithRightSideState(new UnsafeRowPair(kv.key, kv.value))
+              case 2 => kv.matched
+              case _ => throwBadStateFormatVersionException()
+            }
+          }.map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))
+        }
+
         // NOTE: we need to make sure `outerOutputIter` is evaluated "after" exhausting all of
-        // elements in `innerOutputIter`, because evaluation of `innerOutputIter` may update
+        // elements in `hashJoinOutputIter`, because evaluation of `hashJoinOutputIter` may update

Review comment:
       Thanks for pointing this out! Yeah the comment is out-of-date now - technically it solves multiple issues.
   
   We may not want to put all of the details here since it becomes redundant on commit description, PR description, etc. We probably could mention simply as ", otherwise it may lead to correctness issue. Please refer SPARK-38684 for more details."




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #36002: [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #36002:
URL: https://github.com/apache/spark/pull/36002#discussion_r840133149



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -324,17 +324,21 @@ case class StreamingSymmetricHashJoinExec(
           }
         }
 
+        val initIterFn = { () =>
+          val removedRowIter = leftSideJoiner.removeOldState()
+          removedRowIter.filterNot { kv =>
+            stateFormatVersion match {
+              case 1 => matchesWithRightSideState(new UnsafeRowPair(kv.key, kv.value))
+              case 2 => kv.matched
+              case _ => throwBadStateFormatVersionException()
+            }
+          }.map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))
+        }
+
         // NOTE: we need to make sure `outerOutputIter` is evaluated "after" exhausting all of
-        // elements in `innerOutputIter`, because evaluation of `innerOutputIter` may update
+        // elements in `hashJoinOutputIter`, because evaluation of `hashJoinOutputIter` may update

Review comment:
       Thanks for pointing this out! Yeah the comment is out-of-date now - technically it solves multiple issues.
   
   We may not want to put all of the details here since it becomes redundant on commit description, PR description, etc. We probably could mention simply as ", otherwise it may lead to correctness issues. Please refer SPARK-38684 for more details."




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #36002: [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #36002:
URL: https://github.com/apache/spark/pull/36002#discussion_r837960636



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -638,4 +665,13 @@ case class StreamingSymmetricHashJoinExec(
   override protected def withNewChildrenInternal(
       newLeft: SparkPlan, newRight: SparkPlan): StreamingSymmetricHashJoinExec =
     copy(left = newLeft, right = newRight)
+
+  private class LazilyInitializingJoinedRowIterator(
+      initFn: () => Iterator[JoinedRow]) extends Iterator[JoinedRow] {
+    private lazy val iter: Iterator[JoinedRow] = initFn()
+

Review comment:
       Yeah agreed since all are single-line. Will reflect. Thanks!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #36002: [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #36002:
URL: https://github.com/apache/spark/pull/36002#issuecomment-1085713403


   I merged the PR into master/3.3/3.2.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #36002: [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #36002:
URL: https://github.com/apache/spark/pull/36002#issuecomment-1081737758


   cc. @tdas @zsxwing @viirya @xuanyuanking @alex-balikov @anishshri-db 
   Please take a look into the change. Thanks in advance!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anishshri-db commented on a change in pull request #36002: [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider

Posted by GitBox <gi...@apache.org>.
anishshri-db commented on a change in pull request #36002:
URL: https://github.com/apache/spark/pull/36002#discussion_r840105969



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -324,17 +324,21 @@ case class StreamingSymmetricHashJoinExec(
           }
         }
 
+        val initIterFn = { () =>
+          val removedRowIter = leftSideJoiner.removeOldState()
+          removedRowIter.filterNot { kv =>
+            stateFormatVersion match {
+              case 1 => matchesWithRightSideState(new UnsafeRowPair(kv.key, kv.value))
+              case 2 => kv.matched
+              case _ => throwBadStateFormatVersionException()
+            }
+          }.map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))
+        }
+
         // NOTE: we need to make sure `outerOutputIter` is evaluated "after" exhausting all of
-        // elements in `innerOutputIter`, because evaluation of `innerOutputIter` may update
+        // elements in `hashJoinOutputIter`, because evaluation of `hashJoinOutputIter` may update

Review comment:
       Nit: Do you think we should tag the spark JIRA ticket (https://issues.apache.org/jira/browse/SPARK-38684) in the comment here and below? Also, should we mention some of the PR description you added here, to elaborate the issue further? :
   
   ```
   More specifically, if updates are performed during processing input rows and somehow updates the number of values for grouping key, the update is not seen in SymmetricHashJoinStateManager.removeByValueCondition, and the method does the eviction with the number of values in out of sync.
   
   Making it more worse, if the method performs the eviction and updates the number of values for grouping key, it "overwrites" the number of value, effectively drop all rows being inserted in the same batch.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anishshri-db commented on a change in pull request #36002: [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider

Posted by GitBox <gi...@apache.org>.
anishshri-db commented on a change in pull request #36002:
URL: https://github.com/apache/spark/pull/36002#discussion_r840144732



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -324,17 +324,21 @@ case class StreamingSymmetricHashJoinExec(
           }
         }
 
+        val initIterFn = { () =>
+          val removedRowIter = leftSideJoiner.removeOldState()
+          removedRowIter.filterNot { kv =>
+            stateFormatVersion match {
+              case 1 => matchesWithRightSideState(new UnsafeRowPair(kv.key, kv.value))
+              case 2 => kv.matched
+              case _ => throwBadStateFormatVersionException()
+            }
+          }.map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))
+        }
+
         // NOTE: we need to make sure `outerOutputIter` is evaluated "after" exhausting all of
-        // elements in `innerOutputIter`, because evaluation of `innerOutputIter` may update
+        // elements in `hashJoinOutputIter`, because evaluation of `hashJoinOutputIter` may update

Review comment:
       > We probably could mention simply as ", otherwise it may lead to correctness issues. Please refer [SPARK-38684](https://issues.apache.org/jira/browse/SPARK-38684) for more details."
   
   Yes, sounds good 👍 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] alex-balikov commented on a change in pull request #36002: [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider

Posted by GitBox <gi...@apache.org>.
alex-balikov commented on a change in pull request #36002:
URL: https://github.com/apache/spark/pull/36002#discussion_r837942632



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -638,4 +665,13 @@ case class StreamingSymmetricHashJoinExec(
   override protected def withNewChildrenInternal(
       newLeft: SparkPlan, newRight: SparkPlan): StreamingSymmetricHashJoinExec =
     copy(left = newLeft, right = newRight)
+
+  private class LazilyInitializingJoinedRowIterator(
+      initFn: () => Iterator[JoinedRow]) extends Iterator[JoinedRow] {
+    private lazy val iter: Iterator[JoinedRow] = initFn()
+

Review comment:
       probably no need for blank lines between methods. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] codecov-commenter commented on pull request #36002: [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on pull request #36002:
URL: https://github.com/apache/spark/pull/36002#issuecomment-1081758601


   # [Codecov](https://codecov.io/gh/apache/spark/pull/36002?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#36002](https://codecov.io/gh/apache/spark/pull/36002?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (ca7200b) into [master](https://codecov.io/gh/apache/spark/commit/a627dac7d649412b2f0e823000a3b4d40b7975d7?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (a627dac) will **decrease** coverage by `21.78%`.
   > The diff coverage is `n/a`.
   
   > :exclamation: Current head ca7200b differs from pull request most recent head 4383739. Consider uploading reports for the commit 4383739 to get more accurate results
   
   ```diff
   @@             Coverage Diff             @@
   ##           master   #36002       +/-   ##
   ===========================================
   - Coverage   83.60%   61.82%   -21.79%     
   ===========================================
     Files         257      203       -54     
     Lines       59095    40478    -18617     
     Branches     9322     7582     -1740     
   ===========================================
   - Hits        49409    25026    -24383     
   - Misses       8477    14234     +5757     
   - Partials     1209     1218        +9     
   ```
   
   | Flag | Coverage Δ | |
   |---|---|---|
   | unittests | `61.82% <ø> (-21.77%)` | :arrow_down: |
   
   Flags with carried forward coverage won't be shown. [Click here](https://docs.codecov.io/docs/carryforward-flags?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#carryforward-flags-in-the-pull-request-comment) to find out more.
   
   | [Impacted Files](https://codecov.io/gh/apache/spark/pull/36002?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [python/pyspark/sql/observation.py](https://codecov.io/gh/apache/spark/pull/36002/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3Bhcmsvc3FsL29ic2VydmF0aW9uLnB5) | `26.08% <0.00%> (-69.57%)` | :arrow_down: |
   | [python/pyspark/pandas/frame.py](https://codecov.io/gh/apache/spark/pull/36002/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2ZyYW1lLnB5) | `33.01% <0.00%> (-64.06%)` | :arrow_down: |
   | [python/pyspark/cloudpickle/compat.py](https://codecov.io/gh/apache/spark/pull/36002/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvY2xvdWRwaWNrbGUvY29tcGF0LnB5) | `30.00% <0.00%> (-60.00%)` | :arrow_down: |
   | [python/pyspark/resource/requests.py](https://codecov.io/gh/apache/spark/pull/36002/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcmVzb3VyY2UvcmVxdWVzdHMucHk=) | `35.25% <0.00%> (-57.56%)` | :arrow_down: |
   | [python/pyspark/pandas/series.py](https://codecov.io/gh/apache/spark/pull/36002/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL3Nlcmllcy5weQ==) | `38.82% <0.00%> (-57.20%)` | :arrow_down: |
   | [python/pyspark/mllib/linalg/\_\_init\_\_.py](https://codecov.io/gh/apache/spark/pull/36002/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvbWxsaWIvbGluYWxnL19faW5pdF9fLnB5) | `28.44% <0.00%> (-55.34%)` | :arrow_down: |
   | [python/pyspark/sql/streaming.py](https://codecov.io/gh/apache/spark/pull/36002/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3Bhcmsvc3FsL3N0cmVhbWluZy5weQ==) | `27.03% <0.00%> (-54.95%)` | :arrow_down: |
   | [python/pyspark/sql/sql\_formatter.py](https://codecov.io/gh/apache/spark/pull/36002/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3Bhcmsvc3FsL3NxbF9mb3JtYXR0ZXIucHk=) | `32.50% <0.00%> (-52.50%)` | :arrow_down: |
   | [python/pyspark/mllib/stat/\_statistics.py](https://codecov.io/gh/apache/spark/pull/36002/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvbWxsaWIvc3RhdC9fc3RhdGlzdGljcy5weQ==) | `34.24% <0.00%> (-52.06%)` | :arrow_down: |
   | [python/pyspark/pandas/generic.py](https://codecov.io/gh/apache/spark/pull/36002/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-cHl0aG9uL3B5c3BhcmsvcGFuZGFzL2dlbmVyaWMucHk=) | `37.02% <0.00%> (-50.96%)` | :arrow_down: |
   | ... and [127 more](https://codecov.io/gh/apache/spark/pull/36002/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   ------
   
   [Continue to review full report at Codecov](https://codecov.io/gh/apache/spark/pull/36002?src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   > **Legend** - [Click here to learn more](https://docs.codecov.io/docs/codecov-delta?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   > `Δ = absolute <relative> (impact)`, `ø = not affected`, `? = missing data`
   > Powered by [Codecov](https://codecov.io/gh/apache/spark/pull/36002?src=pr&el=footer&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Last update [768ab55...4383739](https://codecov.io/gh/apache/spark/pull/36002?src=pr&el=lastupdated&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation). Read the [comment docs](https://docs.codecov.io/docs/pull-request-comments?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] anishshri-db commented on a change in pull request #36002: [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider

Posted by GitBox <gi...@apache.org>.
anishshri-db commented on a change in pull request #36002:
URL: https://github.com/apache/spark/pull/36002#discussion_r840149856



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -360,10 +374,25 @@ case class StreamingSymmetricHashJoinExec(
             case 2 => kv.matched
             case _ => throwBadStateFormatVersionException()
           }
-        val leftSideOutputIter = leftSideJoiner.removeOldState().filterNot(
-          isKeyToValuePairMatched).map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))
-        val rightSideOutputIter = rightSideJoiner.removeOldState().filterNot(
-          isKeyToValuePairMatched).map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value))
+
+        val leftSideInitIterFn = { () =>
+          val removedRowIter = leftSideJoiner.removeOldState()
+          removedRowIter.filterNot(isKeyToValuePairMatched)
+            .map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))
+        }
+
+        val rightSideInitIterFn = { () =>
+          val removedRowIter = rightSideJoiner.removeOldState()
+          removedRowIter.filterNot(isKeyToValuePairMatched)
+            .map(pair => joinedRow.withLeft(nullLeft).withRight(pair.value))
+        }
+
+        // NOTE: we need to make sure both `leftSideOutputIter` and `rightSideOutputIter` are
+        // evaluated "after" exhausting all of elements in `hashJoinOutputIter`, otherwise it may
+        // lead to out of sync according to the interface contract on StateStore.iterator and
+        // end up with correctness issue. Please refer SPARK-38684 for more details.

Review comment:
       Should we add the comment here as well ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on pull request #36002: [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on pull request #36002:
URL: https://github.com/apache/spark/pull/36002#issuecomment-1085667238


   Thanks all for reviewing! Merging to master/3.3/3.2.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] HeartSaVioR commented on a change in pull request #36002: [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider

Posted by GitBox <gi...@apache.org>.
HeartSaVioR commented on a change in pull request #36002:
URL: https://github.com/apache/spark/pull/36002#discussion_r840143781



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
##########
@@ -324,17 +324,21 @@ case class StreamingSymmetricHashJoinExec(
           }
         }
 
+        val initIterFn = { () =>
+          val removedRowIter = leftSideJoiner.removeOldState()
+          removedRowIter.filterNot { kv =>
+            stateFormatVersion match {
+              case 1 => matchesWithRightSideState(new UnsafeRowPair(kv.key, kv.value))
+              case 2 => kv.matched
+              case _ => throwBadStateFormatVersionException()
+            }
+          }.map(pair => joinedRow.withLeft(pair.value).withRight(nullRight))
+        }
+
         // NOTE: we need to make sure `outerOutputIter` is evaluated "after" exhausting all of
-        // elements in `innerOutputIter`, because evaluation of `innerOutputIter` may update
+        // elements in `hashJoinOutputIter`, because evaluation of `hashJoinOutputIter` may update

Review comment:
       FYI, we also recently updated the method doc of StateStore.iterator - with updated contact for StateStore.iterator, the logic before this fix is "unsafe".




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org