You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/07/09 07:40:47 UTC

[spark] branch branch-3.0 updated: [SPARK-32148][SS] Fix stream-stream join issue on missing to copy reused unsafe row

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 83f0423  [SPARK-32148][SS] Fix stream-stream join issue on missing to copy reused unsafe row
83f0423 is described below

commit 83f0423ed91388518345cc4e66a552195bcf6f7a
Author: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
AuthorDate: Thu Jul 9 07:37:06 2020 +0000

    [SPARK-32148][SS] Fix stream-stream join issue on missing to copy reused unsafe row
    
    ### What changes were proposed in this pull request?
    
    This patch fixes the odd join result being occurred from stream-stream join for state store format V2.
    
    There're some spots on V2 path which leverage UnsafeProjection. As the result row is reused, the row should be copied to avoid changing value during reading (or make sure the caller doesn't affect by such behavior) but `SymmetricHashJoinStateManager.removeByValueCondition` violates the case.
    
    This patch makes `KeyWithIndexToValueRowConverterV2.convertValue` copy the row by itself so that callers don't need to take care about it. This patch doesn't change the behavior of `KeyWithIndexToValueRowConverterV2.convertToValueRow` to avoid double-copying, as the caller is expected to store the row which the implementation of state store will call `copy()`.
    
    This patch adds such behavior into each method doc in `KeyWithIndexToValueRowConverter`, so that further contributors can read through and make sure the change / new addition doesn't break the contract.
    
    ### Why are the changes needed?
    
    Stream-stream join with state store format V2 (newly added in Spark 3.0.0) has a serious correctness bug which brings indeterministic result.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, some of Spark 3.0.0 users using stream-stream join from the new checkpoint (as the bug exists to only v2 format path) may encounter wrong join result. This patch will fix it.
    
    ### How was this patch tested?
    
    Reported case is converted to the new UT, and confirmed UT passed. All UTs in StreamingInnerJoinSuite and StreamingOuterJoinSuite passed as well
    
    Closes #28975 from HeartSaVioR/SPARK-32148.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <ka...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 526cb2d1ba2b4c07e10d7011367fdef24a40a927)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../streaming/StreamingSymmetricHashJoinExec.scala |  4 ++
 .../state/SymmetricHashJoinStateManager.scala      | 17 ++++++++-
 .../spark/sql/streaming/StreamingJoinSuite.scala   | 44 ++++++++++++++++++++++
 3 files changed, 64 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
index 198e17d..57e62dc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
@@ -295,6 +295,10 @@ case class StreamingSymmetricHashJoinExec(
             postJoinFilter(joinedRow.withLeft(leftKeyValue.value).withRight(rightValue))
           }
         }
+
+        // NOTE: we need to make sure `outerOutputIter` is evaluated "after" exhausting all of
+        // elements in `innerOutputIter`, because evaluation of `innerOutputIter` may update
+        // the match flag which the logic for outer join is relying on.
         val removedRowIter = leftSideJoiner.removeOldState()
         val outerOutputIter = removedRowIter.filterNot { kv =>
           stateFormatVersion match {
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
index 1a0a43c..1a5b50d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/SymmetricHashJoinStateManager.scala
@@ -451,10 +451,25 @@ class SymmetricHashJoinStateManager(
   }
 
   private trait KeyWithIndexToValueRowConverter {
+    /** Defines the schema of the value row (the value side of K-V in state store). */
     def valueAttributes: Seq[Attribute]
 
+    /**
+     * Convert the value row to (actual value, match) pair.
+     *
+     * NOTE: implementations should ensure the result row is NOT reused during execution, so
+     * that caller can safely read the value in any time.
+     */
     def convertValue(value: UnsafeRow): ValueAndMatchPair
 
+    /**
+     * Build the value row from (actual value, match) pair. This is expected to be called just
+     * before storing to the state store.
+     *
+     * NOTE: depending on the implementation, the result row "may" be reused during execution
+     * (to avoid initialization of object), so the caller should ensure that the logic doesn't
+     * affect by such behavior. Call copy() against the result row if needed.
+     */
     def convertToValueRow(value: UnsafeRow, matched: Boolean): UnsafeRow
   }
 
@@ -493,7 +508,7 @@ class SymmetricHashJoinStateManager(
 
     override def convertValue(value: UnsafeRow): ValueAndMatchPair = {
       if (value != null) {
-        ValueAndMatchPair(valueRowGenerator(value),
+        ValueAndMatchPair(valueRowGenerator(value).copy(),
           value.getBoolean(indexOrdinalInValueWithMatchedRow))
       } else {
         null
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
index 3f218c9..7837b20 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.streaming
 
 import java.io.File
+import java.sql.Timestamp
 import java.util.{Locale, UUID}
 
 import scala.util.Random
@@ -991,4 +992,47 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
       )
     }
   }
+
+  test("SPARK-32148 stream-stream join regression on Spark 3.0.0") {
+    val input1 = MemoryStream[(Timestamp, String, String)]
+    val df1 = input1.toDF
+      .selectExpr("_1 as eventTime", "_2 as id", "_3 as comment")
+      .withWatermark(s"eventTime", "2 minutes")
+
+    val input2 = MemoryStream[(Timestamp, String, String)]
+    val df2 = input2.toDF
+      .selectExpr("_1 as eventTime", "_2 as id", "_3 as name")
+      .withWatermark(s"eventTime", "4 minutes")
+
+    val joined = df1.as("left")
+      .join(df2.as("right"),
+        expr("""
+               |left.id = right.id AND left.eventTime BETWEEN
+               |  right.eventTime - INTERVAL 30 seconds AND
+               |  right.eventTime + INTERVAL 30 seconds
+             """.stripMargin),
+        joinType = "leftOuter")
+
+    val inputDataForInput1 = Seq(
+      (Timestamp.valueOf("2020-01-01 00:00:00"), "abc", "has no join partner"),
+      (Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "joined with A"),
+      (Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "joined with B"))
+
+    val inputDataForInput2 = Seq(
+      (Timestamp.valueOf("2020-01-02 00:00:10"), "abc", "A"),
+      (Timestamp.valueOf("2020-01-02 00:59:59"), "abc", "B"),
+      (Timestamp.valueOf("2020-01-02 02:00:00"), "abc", "C"))
+
+    val expectedOutput = Seq(
+      (Timestamp.valueOf("2020-01-01 00:00:00"), "abc", "has no join partner", null, null, null),
+      (Timestamp.valueOf("2020-01-02 00:00:00"), "abc", "joined with A",
+        Timestamp.valueOf("2020-01-02 00:00:10"), "abc", "A"),
+      (Timestamp.valueOf("2020-01-02 01:00:00"), "abc", "joined with B",
+        Timestamp.valueOf("2020-01-02 00:59:59"), "abc", "B"))
+
+    testStream(joined)(
+      MultiAddData((input1, inputDataForInput1), (input2, inputDataForInput2)),
+      CheckNewAnswer(expectedOutput.head, expectedOutput.tail: _*)
+    )
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org