You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2018/05/16 21:55:07 UTC

spark git commit: [SPARK-24158][SS] Enable no-data batches for streaming joins

Repository: spark
Updated Branches:
  refs/heads/master 8e60a16b7 -> 991726f31


[SPARK-24158][SS] Enable no-data batches for streaming joins

## What changes were proposed in this pull request?

This is a continuation of the larger task of enabling zero-data batches for more eager state cleanup. This PR enables it for stream-stream joins.

## How was this patch tested?
- Updated join tests. Additionally, updated them to not use `CheckLastBatch` anywhere to set good precedence for future.

Author: Tathagata Das <ta...@gmail.com>

Closes #21253 from tdas/SPARK-24158.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/991726f3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/991726f3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/991726f3

Branch: refs/heads/master
Commit: 991726f31a8d182ed6d5b0e59185d97c0c5c532f
Parents: 8e60a16
Author: Tathagata Das <ta...@gmail.com>
Authored: Wed May 16 14:55:02 2018 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed May 16 14:55:02 2018 -0700

----------------------------------------------------------------------
 .../spark/sql/execution/SparkStrategies.scala   |   2 +-
 .../StreamingSymmetricHashJoinExec.scala        |  14 +-
 .../apache/spark/sql/streaming/StreamTest.scala |  15 +-
 .../sql/streaming/StreamingJoinSuite.scala      | 217 ++++++++++---------
 4 files changed, 130 insertions(+), 118 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/991726f3/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 82b4eb9..37a0b9d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -361,7 +361,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
 
         case Join(left, right, _, _) if left.isStreaming && right.isStreaming =>
           throw new AnalysisException(
-            "Stream stream joins without equality predicate is not supported", plan = Some(plan))
+            "Stream-stream join without equality predicate is not supported", plan = Some(plan))
 
         case _ => Nil
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/991726f3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
index fa7c8ee..afa664e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
@@ -187,6 +187,17 @@ case class StreamingSymmetricHashJoinExec(
         s"${getClass.getSimpleName} should not take $x as the JoinType")
   }
 
+  override def shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean = {
+    val watermarkUsedForStateCleanup =
+      stateWatermarkPredicates.left.nonEmpty || stateWatermarkPredicates.right.nonEmpty
+
+    // Latest watermark value is more than that used in this previous executed plan
+    val watermarkHasChanged =
+      eventTimeWatermark.isDefined && newMetadata.batchWatermarkMs > eventTimeWatermark.get
+
+    watermarkUsedForStateCleanup && watermarkHasChanged
+  }
+
   protected override def doExecute(): RDD[InternalRow] = {
     val stateStoreCoord = sqlContext.sessionState.streamingQueryManager.stateStoreCoordinator
     val stateStoreNames = SymmetricHashJoinStateManager.allStateStoreNames(LeftSide, RightSide)
@@ -319,8 +330,7 @@ case class StreamingSymmetricHashJoinExec(
         // outer join) if possible. In all cases, nothing needs to be outputted, hence the removal
         // needs to be done greedily by immediately consuming the returned iterator.
         val cleanupIter = joinType match {
-          case Inner =>
-            leftSideJoiner.removeOldState() ++ rightSideJoiner.removeOldState()
+          case Inner => leftSideJoiner.removeOldState() ++ rightSideJoiner.removeOldState()
           case LeftOuter => rightSideJoiner.removeOldState()
           case RightOuter => leftSideJoiner.removeOldState()
           case _ => throwBadJoinTypeException()

http://git-wip-us.apache.org/repos/asf/spark/blob/991726f3/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 9d139a9..f348dac 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -199,15 +199,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
   case class CheckAnswerRowsByFunc(
       globalCheckFunction: Seq[Row] => Unit,
       lastOnly: Boolean) extends StreamAction with StreamMustBeRunning {
-    override def toString: String = s"$operatorName"
-    private def operatorName = if (lastOnly) "CheckLastBatchByFunc" else "CheckAnswerByFunc"
+    override def toString: String = if (lastOnly) "CheckLastBatchByFunc" else "CheckAnswerByFunc"
   }
 
   case class CheckNewAnswerRows(expectedAnswer: Seq[Row])
     extends StreamAction with StreamMustBeRunning {
-    override def toString: String = s"$operatorName: ${expectedAnswer.mkString(",")}"
-
-    private def operatorName = "CheckNewAnswer"
+    override def toString: String = s"CheckNewAnswer: ${expectedAnswer.mkString(",")}"
   }
 
   object CheckNewAnswer {
@@ -218,6 +215,8 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
       val toExternalRow = RowEncoder(encoder.schema).resolveAndBind()
       CheckNewAnswerRows((data +: moreData).map(d => toExternalRow.fromRow(encoder.toRow(d))))
     }
+
+    def apply(rows: Row*): CheckNewAnswerRows = CheckNewAnswerRows(rows)
   }
 
   /** Stops the stream. It must currently be running. */
@@ -747,7 +746,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
             error => failTest(error)
           }
       }
-      pos += 1
     }
 
     try {
@@ -761,8 +759,11 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
           currentStream.asInstanceOf[MicroBatchExecution].withProgressLocked {
             actns.foreach(executeAction)
           }
+          pos += 1
 
-        case action: StreamAction => executeAction(action)
+        case action: StreamAction =>
+          executeAction(action)
+          pos += 1
       }
       if (streamThreadDeathCause != null) {
         failTest("Stream Thread Died", streamThreadDeathCause)

http://git-wip-us.apache.org/repos/asf/spark/blob/991726f3/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
index da8f960..1f62357 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
@@ -62,20 +62,20 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
       AddData(input1, 1),
       CheckAnswer(),
       AddData(input2, 1, 10),       // 1 arrived on input1 first, then input2, should join
-      CheckLastBatch((1, 2, 3)),
+      CheckNewAnswer((1, 2, 3)),
       AddData(input1, 10),          // 10 arrived on input2 first, then input1, should join
-      CheckLastBatch((10, 20, 30)),
+      CheckNewAnswer((10, 20, 30)),
       AddData(input2, 1),           // another 1 in input2 should join with 1 input1
-      CheckLastBatch((1, 2, 3)),
+      CheckNewAnswer((1, 2, 3)),
       StopStream,
       StartStream(),
       AddData(input1, 1), // multiple 1s should be kept in state causing multiple (1, 2, 3)
-      CheckLastBatch((1, 2, 3), (1, 2, 3)),
+      CheckNewAnswer((1, 2, 3), (1, 2, 3)),
       StopStream,
       StartStream(),
       AddData(input1, 100),
       AddData(input2, 100),
-      CheckLastBatch((100, 200, 300))
+      CheckNewAnswer((100, 200, 300))
     )
   }
 
@@ -97,25 +97,25 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
 
     testStream(joined)(
       AddData(input1, 1),
-      CheckLastBatch(),
+      CheckNewAnswer(),
       AddData(input2, 1),
-      CheckLastBatch((1, 10, 2, 3)),
+      CheckNewAnswer((1, 10, 2, 3)),
       StopStream,
       StartStream(),
       AddData(input1, 25),
-      CheckLastBatch(),
+      CheckNewAnswer(),
       StopStream,
       StartStream(),
       AddData(input2, 25),
-      CheckLastBatch((25, 30, 50, 75)),
+      CheckNewAnswer((25, 30, 50, 75)),
       AddData(input1, 1),
-      CheckLastBatch((1, 10, 2, 3)),      // State for 1 still around as there is no watermark
+      CheckNewAnswer((1, 10, 2, 3)),      // State for 1 still around as there is no watermark
       StopStream,
       StartStream(),
       AddData(input1, 5),
-      CheckLastBatch(),
+      CheckNewAnswer(),
       AddData(input2, 5),
-      CheckLastBatch((5, 10, 10, 15))     // No filter by any watermark
+      CheckNewAnswer((5, 10, 10, 15))     // No filter by any watermark
     )
   }
 
@@ -142,27 +142,27 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
       assertNumStateRows(total = 1, updated = 1),
 
       AddData(input2, 1),
-      CheckLastBatch((1, 10, 2, 3)),
+      CheckAnswer((1, 10, 2, 3)),
       assertNumStateRows(total = 2, updated = 1),
       StopStream,
       StartStream(),
 
       AddData(input1, 25),
-      CheckLastBatch(), // since there is only 1 watermark operator, the watermark should be 15
-      assertNumStateRows(total = 3, updated = 1),
+      CheckNewAnswer(),   // watermark = 15, no-data-batch should remove 2 rows having window=[0,10]
+      assertNumStateRows(total = 1, updated = 1),
 
       AddData(input2, 25),
-      CheckLastBatch((25, 30, 50, 75)), // watermark = 15 should remove 2 rows having window=[0,10]
+      CheckNewAnswer((25, 30, 50, 75)),
       assertNumStateRows(total = 2, updated = 1),
       StopStream,
       StartStream(),
 
       AddData(input2, 1),
-      CheckLastBatch(),       // Should not join as < 15 removed
-      assertNumStateRows(total = 2, updated = 0),  // row not add as 1 < state key watermark = 15
+      CheckNewAnswer(),                             // Should not join as < 15 removed
+      assertNumStateRows(total = 2, updated = 0),   // row not add as 1 < state key watermark = 15
 
       AddData(input1, 5),
-      CheckLastBatch(),       // Should not join or add to state as < 15 got filtered by watermark
+      CheckNewAnswer(),                             // Same reason as above
       assertNumStateRows(total = 2, updated = 0)
     )
   }
@@ -189,42 +189,39 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
       AddData(leftInput, (1, 5)),
       CheckAnswer(),
       AddData(rightInput, (1, 11)),
-      CheckLastBatch((1, 5, 11)),
+      CheckNewAnswer((1, 5, 11)),
       AddData(rightInput, (1, 10)),
-      CheckLastBatch(), // no match as neither 5, nor 10 from leftTime is less than rightTime 10 - 5
+      CheckNewAnswer(), // no match as leftTime 5 is not < rightTime 10 - 5
       assertNumStateRows(total = 3, updated = 3),
 
       // Increase event time watermark to 20s by adding data with time = 30s on both inputs
       AddData(leftInput, (1, 3), (1, 30)),
-      CheckLastBatch((1, 3, 10), (1, 3, 11)),
+      CheckNewAnswer((1, 3, 10), (1, 3, 11)),
       assertNumStateRows(total = 5, updated = 2),
       AddData(rightInput, (0, 30)),
-      CheckLastBatch(),
-      assertNumStateRows(total = 6, updated = 1),
+      CheckNewAnswer(),
 
       // event time watermark:    max event time - 10   ==>   30 - 10 = 20
+      // so left side going to only receive data where leftTime > 20
       // right side state constraint:    20 < leftTime < rightTime - 5   ==>   rightTime > 25
-
-      // Run another batch with event time = 25 to clear right state where rightTime <= 25
-      AddData(rightInput, (0, 30)),
-      CheckLastBatch(),
-      assertNumStateRows(total = 5, updated = 1),  // removed (1, 11) and (1, 10), added (0, 30)
+      // right state where rightTime <= 25 will be cleared, (1, 11) and (1, 10) removed
+      assertNumStateRows(total = 4, updated = 1),
 
       // New data to right input should match with left side (1, 3) and (1, 5), as left state should
       // not be cleared. But rows rightTime <= 20 should be filtered due to event time watermark and
       // state rows with rightTime <= 25 should be removed from state.
       // (1, 20) ==> filtered by event time watermark = 20
       // (1, 21) ==> passed filter, matched with left (1, 3) and (1, 5), not added to state
-      //             as state watermark = 25
+      //             as 21 < state watermark = 25
       // (1, 28) ==> passed filter, matched with left (1, 3) and (1, 5), added to state
       AddData(rightInput, (1, 20), (1, 21), (1, 28)),
-      CheckLastBatch((1, 3, 21), (1, 5, 21), (1, 3, 28), (1, 5, 28)),
-      assertNumStateRows(total = 6, updated = 1),
+      CheckNewAnswer((1, 3, 21), (1, 5, 21), (1, 3, 28), (1, 5, 28)),
+      assertNumStateRows(total = 5, updated = 1),
 
       // New data to left input with leftTime <= 20 should be filtered due to event time watermark
       AddData(leftInput, (1, 20), (1, 21)),
-      CheckLastBatch((1, 21, 28)),
-      assertNumStateRows(total = 7, updated = 1)
+      CheckNewAnswer((1, 21, 28)),
+      assertNumStateRows(total = 6, updated = 1)
     )
   }
 
@@ -275,38 +272,39 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
       AddData(leftInput, (1, 20)),
       CheckAnswer(),
       AddData(rightInput, (1, 14), (1, 15), (1, 25), (1, 26), (1, 30), (1, 31)),
-      CheckLastBatch((1, 20, 15), (1, 20, 25), (1, 20, 26), (1, 20, 30)),
+      CheckNewAnswer((1, 20, 15), (1, 20, 25), (1, 20, 26), (1, 20, 30)),
       assertNumStateRows(total = 7, updated = 7),
 
       // If rightTime = 60, then it matches only leftTime = [50, 65]
       AddData(rightInput, (1, 60)),
-      CheckLastBatch(),                // matches with nothing on the left
+      CheckNewAnswer(),                // matches with nothing on the left
       AddData(leftInput, (1, 49), (1, 50), (1, 65), (1, 66)),
-      CheckLastBatch((1, 50, 60), (1, 65, 60)),
-      assertNumStateRows(total = 12, updated = 5),
+      CheckNewAnswer((1, 50, 60), (1, 65, 60)),
 
       // Event time watermark = min(left: 66 - delay 20 = 46, right: 60 - delay 30 = 30) = 30
       // Left state value watermark = 30 - 10 = slightly less than 20 (since condition has <=)
       //    Should drop < 20 from left, i.e., none
       // Right state value watermark = 30 - 5 = slightly less than 25 (since condition has <=)
       //    Should drop < 25 from the right, i.e., 14 and 15
-      AddData(leftInput, (1, 30), (1, 31)),     // 30 should not be processed or added to stat
-      CheckLastBatch((1, 31, 26), (1, 31, 30), (1, 31, 31)),
-      assertNumStateRows(total = 11, updated = 1),  // 12 - 2 removed + 1 added
+      assertNumStateRows(total = 10, updated = 5), // 12 - 2 removed
+
+      AddData(leftInput, (1, 30), (1, 31)),     // 30 should not be processed or added to state
+      CheckNewAnswer((1, 31, 26), (1, 31, 30), (1, 31, 31)),
+      assertNumStateRows(total = 11, updated = 1),  // only 31 added
 
       // Advance the watermark
       AddData(rightInput, (1, 80)),
-      CheckLastBatch(),
-      assertNumStateRows(total = 12, updated = 1),
-
+      CheckNewAnswer(),
       // Event time watermark = min(left: 66 - delay 20 = 46, right: 80 - delay 30 = 50) = 46
       // Left state value watermark = 46 - 10 = slightly less than 36 (since condition has <=)
       //    Should drop < 36 from left, i.e., 20, 31 (30 was not added)
       // Right state value watermark = 46 - 5 = slightly less than 41 (since condition has <=)
       //    Should drop < 41 from the right, i.e., 25, 26, 30, 31
-      AddData(rightInput, (1, 50)),
-      CheckLastBatch((1, 49, 50), (1, 50, 50)),
-      assertNumStateRows(total = 7, updated = 1)  // 12 - 6 removed + 1 added
+      assertNumStateRows(total = 6, updated = 1),  // 12 - 6 removed
+
+      AddData(rightInput, (1, 46), (1, 50)),     // 46 should not be processed or added to state
+      CheckNewAnswer((1, 49, 50), (1, 50, 50)),
+      assertNumStateRows(total = 7, updated = 1)   // 50 added
     )
   }
 
@@ -322,7 +320,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
       input1.addData(1)
       q.awaitTermination(10000)
     }
-    assert(e.toString.contains("Stream stream joins without equality predicate is not supported"))
+    assert(e.toString.contains("Stream-stream join without equality predicate is not supported"))
   }
 
   test("stream stream self join") {
@@ -404,10 +402,11 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
       AddData(input1, 1, 5),
       AddData(input2, 1, 5, 10),
       AddData(input3, 5, 10),
-      CheckLastBatch((5, 10, 5, 15, 5, 25)))
+      CheckNewAnswer((5, 10, 5, 15, 5, 25)))
   }
 }
 
+
 class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with BeforeAndAfter {
 
   import testImplicits._
@@ -465,13 +464,13 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
       MultiAddData(leftInput, 1, 2, 3)(rightInput, 3, 4, 5),
       // The left rows with leftValue <= 4 should generate their outer join row now and
       // not get added to the state.
-      CheckLastBatch(Row(3, 10, 6, "9"), Row(1, 10, 2, null), Row(2, 10, 4, null)),
+      CheckNewAnswer(Row(3, 10, 6, "9"), Row(1, 10, 2, null), Row(2, 10, 4, null)),
       assertNumStateRows(total = 4, updated = 4),
       // We shouldn't get more outer join rows when the watermark advances.
       MultiAddData(leftInput, 20)(rightInput, 21),
-      CheckLastBatch(),
+      CheckNewAnswer(),
       AddData(rightInput, 20),
-      CheckLastBatch((20, 30, 40, "60"))
+      CheckNewAnswer((20, 30, 40, "60"))
     )
   }
 
@@ -492,15 +491,15 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
 
     testStream(joined)(
       MultiAddData(leftInput, 3, 4, 5)(rightInput, 1, 2, 3),
-      // The right rows with value <= 7 should never be added to the state.
-      CheckLastBatch(Row(3, 10, 6, "9")),
+      // The right rows with rightValue <= 7 should never be added to the state.
+      CheckNewAnswer(Row(3, 10, 6, "9")),     // rightValue = 9 > 7 hence joined and added to state
       assertNumStateRows(total = 4, updated = 4),
       // When the watermark advances, we get the outer join rows just as we would if they
       // were added but didn't match the full join condition.
-      MultiAddData(leftInput, 20)(rightInput, 21),
-      CheckLastBatch(),
+      MultiAddData(leftInput, 20)(rightInput, 21),  // watermark = 10, no-data-batch computes nulls
+      CheckNewAnswer(Row(4, 10, 8, null), Row(5, 10, 10, null)),
       AddData(rightInput, 20),
-      CheckLastBatch(Row(20, 30, 40, "60"), Row(4, 10, 8, null), Row(5, 10, 10, null))
+      CheckNewAnswer(Row(20, 30, 40, "60"))
     )
   }
 
@@ -521,15 +520,15 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
 
     testStream(joined)(
       MultiAddData(leftInput, 1, 2, 3)(rightInput, 3, 4, 5),
-      // The left rows with value <= 4 should never be added to the state.
-      CheckLastBatch(Row(3, 10, 6, "9")),
+      // The left rows with leftValue <= 4 should never be added to the state.
+      CheckNewAnswer(Row(3, 10, 6, "9")),     // leftValue = 7 > 4 hence joined and added to state
       assertNumStateRows(total = 4, updated = 4),
       // When the watermark advances, we get the outer join rows just as we would if they
       // were added but didn't match the full join condition.
-      MultiAddData(leftInput, 20)(rightInput, 21),
-      CheckLastBatch(),
+      MultiAddData(leftInput, 20)(rightInput, 21), // watermark = 10, no-data-batch computes nulls
+      CheckNewAnswer(Row(4, 10, null, "12"), Row(5, 10, null, "15")),
       AddData(rightInput, 20),
-      CheckLastBatch(Row(20, 30, 40, "60"), Row(4, 10, null, "12"), Row(5, 10, null, "15"))
+      CheckNewAnswer(Row(20, 30, 40, "60"))
     )
   }
 
@@ -552,13 +551,13 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
       MultiAddData(leftInput, 3, 4, 5)(rightInput, 1, 2, 3),
       // The right rows with rightValue <= 7 should generate their outer join row now and
       // not get added to the state.
-      CheckLastBatch(Row(3, 10, 6, "9"), Row(1, 10, null, "3"), Row(2, 10, null, "6")),
+      CheckNewAnswer(Row(3, 10, 6, "9"), Row(1, 10, null, "3"), Row(2, 10, null, "6")),
       assertNumStateRows(total = 4, updated = 4),
       // We shouldn't get more outer join rows when the watermark advances.
       MultiAddData(leftInput, 20)(rightInput, 21),
-      CheckLastBatch(),
+      CheckNewAnswer(),
       AddData(rightInput, 20),
-      CheckLastBatch((20, 30, 40, "60"))
+      CheckNewAnswer((20, 30, 40, "60"))
     )
   }
 
@@ -568,14 +567,14 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
     testStream(joined)(
       // Test inner part of the join.
       MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
-      CheckLastBatch((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)),
-      // Old state doesn't get dropped until the batch *after* it gets introduced, so the
-      // nulls won't show up until the next batch after the watermark advances.
-      MultiAddData(leftInput, 21)(rightInput, 22),
-      CheckLastBatch(),
-      assertNumStateRows(total = 12, updated = 12),
+      CheckNewAnswer((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)),
+
+      MultiAddData(leftInput, 21)(rightInput, 22), // watermark = 11, no-data-batch computes nulls
+      CheckNewAnswer(Row(1, 10, 2, null), Row(2, 10, 4, null)),
+      assertNumStateRows(total = 2, updated = 12),
+
       AddData(leftInput, 22),
-      CheckLastBatch(Row(22, 30, 44, 66), Row(1, 10, 2, null), Row(2, 10, 4, null)),
+      CheckNewAnswer(Row(22, 30, 44, 66)),
       assertNumStateRows(total = 3, updated = 1)
     )
   }
@@ -586,14 +585,14 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
     testStream(joined)(
       // Test inner part of the join.
       MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
-      CheckLastBatch((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)),
-      // Old state doesn't get dropped until the batch *after* it gets introduced, so the
-      // nulls won't show up until the next batch after the watermark advances.
-      MultiAddData(leftInput, 21)(rightInput, 22),
-      CheckLastBatch(),
-      assertNumStateRows(total = 12, updated = 12),
+      CheckNewAnswer((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)),
+
+      MultiAddData(leftInput, 21)(rightInput, 22), // watermark = 11, no-data-batch computes nulls
+      CheckNewAnswer(Row(6, 10, null, 18), Row(7, 10, null, 21)),
+      assertNumStateRows(total = 2, updated = 12),
+
       AddData(leftInput, 22),
-      CheckLastBatch(Row(22, 30, 44, 66), Row(6, 10, null, 18), Row(7, 10, null, 21)),
+      CheckNewAnswer(Row(22, 30, 44, 66)),
       assertNumStateRows(total = 3, updated = 1)
     )
   }
@@ -627,21 +626,18 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
         AddData(leftInput, (1, 5), (3, 5)),
         CheckAnswer(),
         AddData(rightInput, (1, 10), (2, 5)),
-        CheckLastBatch((1, 1, 5, 10)),
+        CheckNewAnswer((1, 1, 5, 10)),
         AddData(rightInput, (1, 11)),
-        CheckLastBatch(), // no match as left time is too low
+        CheckNewAnswer(), // no match as left time is too low
         assertNumStateRows(total = 5, updated = 5),
 
         // Increase event time watermark to 20s by adding data with time = 30s on both inputs
         AddData(leftInput, (1, 7), (1, 30)),
-        CheckLastBatch((1, 1, 7, 10), (1, 1, 7, 11)),
+        CheckNewAnswer((1, 1, 7, 10), (1, 1, 7, 11)),
         assertNumStateRows(total = 7, updated = 2),
-        AddData(rightInput, (0, 30)),
-        CheckLastBatch(),
-        assertNumStateRows(total = 8, updated = 1),
-        AddData(rightInput, (0, 30)),
-        CheckLastBatch(outerResult),
-        assertNumStateRows(total = 3, updated = 1)
+        AddData(rightInput, (0, 30)), // watermark = 30 - 10 = 20, no-data-batch computes nulls
+        CheckNewAnswer(outerResult),
+        assertNumStateRows(total = 2, updated = 1)
       )
     }
   }
@@ -665,36 +661,41 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
     testStream(joined)(
       // leftValue <= 10 should generate outer join rows even though it matches right keys
       MultiAddData(leftInput, 1, 2, 3)(rightInput, 1, 2, 3),
-      CheckLastBatch(Row(1, 10, 2, null), Row(2, 10, 4, null), Row(3, 10, 6, null)),
-      MultiAddData(leftInput, 20)(rightInput, 21),
-      CheckLastBatch(),
-      assertNumStateRows(total = 5, updated = 5),  // 1...3 added, but 20 and 21 not added
+      CheckNewAnswer(Row(1, 10, 2, null), Row(2, 10, 4, null), Row(3, 10, 6, null)),
+      assertNumStateRows(total = 3, updated = 3), // only right 1, 2, 3 added
+
+      MultiAddData(leftInput, 20)(rightInput, 21), // watermark = 10, no-data-batch cleared < 10
+      CheckNewAnswer(),
+      assertNumStateRows(total = 2, updated = 2),  // only 20 and 21 left in state
+
       AddData(rightInput, 20),
-      CheckLastBatch(
-        Row(20, 30, 40, 60)),
+      CheckNewAnswer(Row(20, 30, 40, 60)),
       assertNumStateRows(total = 3, updated = 1),
+
       // leftValue and rightValue both satisfying condition should not generate outer join rows
-      MultiAddData(leftInput, 40, 41)(rightInput, 40, 41),
-      CheckLastBatch((40, 50, 80, 120), (41, 50, 82, 123)),
-      MultiAddData(leftInput, 70)(rightInput, 71),
-      CheckLastBatch(),
-      assertNumStateRows(total = 6, updated = 6),  // all inputs added since last check
+      MultiAddData(leftInput, 40, 41)(rightInput, 40, 41), // watermark = 31
+      CheckNewAnswer((40, 50, 80, 120), (41, 50, 82, 123)),
+      assertNumStateRows(total = 4, updated = 4),   // only left 40, 41 + right 40,41 left in state
+
+      MultiAddData(leftInput, 70)(rightInput, 71), // watermark = 60
+      CheckNewAnswer(),
+      assertNumStateRows(total = 2, updated = 2), // only 70, 71 left in state
+
       AddData(rightInput, 70),
-      CheckLastBatch((70, 80, 140, 210)),
+      CheckNewAnswer((70, 80, 140, 210)),
       assertNumStateRows(total = 3, updated = 1),
+
       // rightValue between 300 and 1000 should generate outer join rows even though it matches left
-      MultiAddData(leftInput, 101, 102, 103)(rightInput, 101, 102, 103),
-      CheckLastBatch(),
+      MultiAddData(leftInput, 101, 102, 103)(rightInput, 101, 102, 103), // watermark = 91
+      CheckNewAnswer(),
+      assertNumStateRows(total = 6, updated = 3), // only 101 - 103 left in state
+
       MultiAddData(leftInput, 1000)(rightInput, 1001),
-      CheckLastBatch(),
-      assertNumStateRows(total = 8, updated = 5),  // 101...103 added, but 1000 and 1001 not added
-      AddData(rightInput, 1000),
-      CheckLastBatch(
-        Row(1000, 1010, 2000, 3000),
+      CheckNewAnswer(
         Row(101, 110, 202, null),
         Row(102, 110, 204, null),
         Row(103, 110, 206, null)),
-      assertNumStateRows(total = 3, updated = 1)
+      assertNumStateRows(total = 2, updated = 2)
     )
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org