You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/03/29 11:46:00 UTC

[GitHub] [spark] HeartSaVioR commented on a change in pull request #36002: [SPARK-38684][SS] Fix correctness issue on stream-stream outer join with RocksDB state store provider

HeartSaVioR commented on a change in pull request #36002:
URL: https://github.com/apache/spark/pull/36002#discussion_r837377919



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
##########
@@ -1353,6 +1353,67 @@ class StreamingOuterJoinSuite extends StreamingJoinSuite {
     ).select(Symbol("leftKey1"), Symbol("rightKey1"), Symbol("leftKey2"), Symbol("rightKey2"),
       $"leftWindow.end".cast("long"), Symbol("leftValue"), Symbol("rightValue"))
   }
+
+  test("SPARK-38684: outer join works correctly even if processing input rows and " +
+    "evicting state rows for same grouping key happens in the same micro-batch") {
+
+    // The test is to demonstrate the correctness issue in outer join before SPARK-38684.
+    withSQLConf(
+      SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key -> "false",
+      SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName) {
+
+      val input1 = MemoryStream[(Timestamp, String, String)]
+      val df1 = input1.toDF
+        .selectExpr("_1 as eventTime", "_2 as id", "_3 as comment")
+        .withWatermark("eventTime", "0 second")
+
+      val input2 = MemoryStream[(Timestamp, String, String)]
+      val df2 = input2.toDF
+        .selectExpr("_1 as eventTime", "_2 as id", "_3 as comment")
+        .withWatermark("eventTime", "0 second")
+
+      val joined = df1.as("left")
+        .join(df2.as("right"),
+          expr("""
+                 |left.id = right.id AND left.eventTime BETWEEN
+                 |  right.eventTime - INTERVAL 30 seconds AND
+                 |  right.eventTime + INTERVAL 30 seconds
+             """.stripMargin),
+          joinType = "leftOuter")
+
+      testStream(joined)(
+        MultiAddData(
+          (input1, Seq((Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "left in batch 1"))),
+          (input2, Seq((Timestamp.valueOf("2020-01-02 00:01:00"), "abc", "right in batch 1")))
+        ),
+        CheckNewAnswer(),
+        MultiAddData(
+          (input1, Seq((Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "left in batch 2"))),
+          (input2, Seq((Timestamp.valueOf("2020-01-02 01:01:00"), "abc", "right in batch 2")))
+        ),
+        // watermark advanced to "2020-01-02 00:00:00"
+        CheckNewAnswer(),
+        AddData(input1, (Timestamp.valueOf("2020-01-02 01:30:00"), "abc", "left in batch 3")),
+        // watermark advanced to "2020-01-02 01:00:00"
+        CheckNewAnswer(
+          (Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "left in batch 1", null, null, null)
+        ),
+        // left side state should still contain "left in batch 2" and "left in batch 3"
+        // we should see both rows in the left side since
+        // - "left in batch 2" is going to be evicted in this batch
+        // - "left in batch 3" is going to be matched with new row in right side
+        AddData(input2,
+          (Timestamp.valueOf("2020-01-02 01:30:10"), "abc", "match with left in batch 3")),
+        // watermark advanced to "2020-01-02 01:01:00"
+        CheckNewAnswer(
+          (Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "left in batch 2",

Review comment:
       Without this fix, Spark only produces one row instead of two.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org