You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by td...@apache.org on 2018/05/16 21:55:07 UTC
spark git commit: [SPARK-24158][SS] Enable no-data batches for
streaming joins
Repository: spark
Updated Branches:
refs/heads/master 8e60a16b7 -> 991726f31
[SPARK-24158][SS] Enable no-data batches for streaming joins
## What changes were proposed in this pull request?
This is a continuation of the larger task of enabling zero-data batches for more eager state cleanup. This PR enables it for stream-stream joins.
## How was this patch tested?
- Updated join tests. Additionally, updated them to not use `CheckLastBatch` anywhere to set good precedence for future.
Author: Tathagata Das <ta...@gmail.com>
Closes #21253 from tdas/SPARK-24158.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/991726f3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/991726f3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/991726f3
Branch: refs/heads/master
Commit: 991726f31a8d182ed6d5b0e59185d97c0c5c532f
Parents: 8e60a16
Author: Tathagata Das <ta...@gmail.com>
Authored: Wed May 16 14:55:02 2018 -0700
Committer: Tathagata Das <ta...@gmail.com>
Committed: Wed May 16 14:55:02 2018 -0700
----------------------------------------------------------------------
.../spark/sql/execution/SparkStrategies.scala | 2 +-
.../StreamingSymmetricHashJoinExec.scala | 14 +-
.../apache/spark/sql/streaming/StreamTest.scala | 15 +-
.../sql/streaming/StreamingJoinSuite.scala | 217 ++++++++++---------
4 files changed, 130 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/991726f3/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
index 82b4eb9..37a0b9d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
@@ -361,7 +361,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
case Join(left, right, _, _) if left.isStreaming && right.isStreaming =>
throw new AnalysisException(
- "Stream stream joins without equality predicate is not supported", plan = Some(plan))
+ "Stream-stream join without equality predicate is not supported", plan = Some(plan))
case _ => Nil
}
http://git-wip-us.apache.org/repos/asf/spark/blob/991726f3/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
index fa7c8ee..afa664e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala
@@ -187,6 +187,17 @@ case class StreamingSymmetricHashJoinExec(
s"${getClass.getSimpleName} should not take $x as the JoinType")
}
+ override def shouldRunAnotherBatch(newMetadata: OffsetSeqMetadata): Boolean = {
+ val watermarkUsedForStateCleanup =
+ stateWatermarkPredicates.left.nonEmpty || stateWatermarkPredicates.right.nonEmpty
+
+ // Latest watermark value is more than that used in this previous executed plan
+ val watermarkHasChanged =
+ eventTimeWatermark.isDefined && newMetadata.batchWatermarkMs > eventTimeWatermark.get
+
+ watermarkUsedForStateCleanup && watermarkHasChanged
+ }
+
protected override def doExecute(): RDD[InternalRow] = {
val stateStoreCoord = sqlContext.sessionState.streamingQueryManager.stateStoreCoordinator
val stateStoreNames = SymmetricHashJoinStateManager.allStateStoreNames(LeftSide, RightSide)
@@ -319,8 +330,7 @@ case class StreamingSymmetricHashJoinExec(
// outer join) if possible. In all cases, nothing needs to be outputted, hence the removal
// needs to be done greedily by immediately consuming the returned iterator.
val cleanupIter = joinType match {
- case Inner =>
- leftSideJoiner.removeOldState() ++ rightSideJoiner.removeOldState()
+ case Inner => leftSideJoiner.removeOldState() ++ rightSideJoiner.removeOldState()
case LeftOuter => rightSideJoiner.removeOldState()
case RightOuter => leftSideJoiner.removeOldState()
case _ => throwBadJoinTypeException()
http://git-wip-us.apache.org/repos/asf/spark/blob/991726f3/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index 9d139a9..f348dac 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -199,15 +199,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
case class CheckAnswerRowsByFunc(
globalCheckFunction: Seq[Row] => Unit,
lastOnly: Boolean) extends StreamAction with StreamMustBeRunning {
- override def toString: String = s"$operatorName"
- private def operatorName = if (lastOnly) "CheckLastBatchByFunc" else "CheckAnswerByFunc"
+ override def toString: String = if (lastOnly) "CheckLastBatchByFunc" else "CheckAnswerByFunc"
}
case class CheckNewAnswerRows(expectedAnswer: Seq[Row])
extends StreamAction with StreamMustBeRunning {
- override def toString: String = s"$operatorName: ${expectedAnswer.mkString(",")}"
-
- private def operatorName = "CheckNewAnswer"
+ override def toString: String = s"CheckNewAnswer: ${expectedAnswer.mkString(",")}"
}
object CheckNewAnswer {
@@ -218,6 +215,8 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
val toExternalRow = RowEncoder(encoder.schema).resolveAndBind()
CheckNewAnswerRows((data +: moreData).map(d => toExternalRow.fromRow(encoder.toRow(d))))
}
+
+ def apply(rows: Row*): CheckNewAnswerRows = CheckNewAnswerRows(rows)
}
/** Stops the stream. It must currently be running. */
@@ -747,7 +746,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
error => failTest(error)
}
}
- pos += 1
}
try {
@@ -761,8 +759,11 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
currentStream.asInstanceOf[MicroBatchExecution].withProgressLocked {
actns.foreach(executeAction)
}
+ pos += 1
- case action: StreamAction => executeAction(action)
+ case action: StreamAction =>
+ executeAction(action)
+ pos += 1
}
if (streamThreadDeathCause != null) {
failTest("Stream Thread Died", streamThreadDeathCause)
http://git-wip-us.apache.org/repos/asf/spark/blob/991726f3/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
index da8f960..1f62357 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
@@ -62,20 +62,20 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
AddData(input1, 1),
CheckAnswer(),
AddData(input2, 1, 10), // 1 arrived on input1 first, then input2, should join
- CheckLastBatch((1, 2, 3)),
+ CheckNewAnswer((1, 2, 3)),
AddData(input1, 10), // 10 arrived on input2 first, then input1, should join
- CheckLastBatch((10, 20, 30)),
+ CheckNewAnswer((10, 20, 30)),
AddData(input2, 1), // another 1 in input2 should join with 1 input1
- CheckLastBatch((1, 2, 3)),
+ CheckNewAnswer((1, 2, 3)),
StopStream,
StartStream(),
AddData(input1, 1), // multiple 1s should be kept in state causing multiple (1, 2, 3)
- CheckLastBatch((1, 2, 3), (1, 2, 3)),
+ CheckNewAnswer((1, 2, 3), (1, 2, 3)),
StopStream,
StartStream(),
AddData(input1, 100),
AddData(input2, 100),
- CheckLastBatch((100, 200, 300))
+ CheckNewAnswer((100, 200, 300))
)
}
@@ -97,25 +97,25 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
testStream(joined)(
AddData(input1, 1),
- CheckLastBatch(),
+ CheckNewAnswer(),
AddData(input2, 1),
- CheckLastBatch((1, 10, 2, 3)),
+ CheckNewAnswer((1, 10, 2, 3)),
StopStream,
StartStream(),
AddData(input1, 25),
- CheckLastBatch(),
+ CheckNewAnswer(),
StopStream,
StartStream(),
AddData(input2, 25),
- CheckLastBatch((25, 30, 50, 75)),
+ CheckNewAnswer((25, 30, 50, 75)),
AddData(input1, 1),
- CheckLastBatch((1, 10, 2, 3)), // State for 1 still around as there is no watermark
+ CheckNewAnswer((1, 10, 2, 3)), // State for 1 still around as there is no watermark
StopStream,
StartStream(),
AddData(input1, 5),
- CheckLastBatch(),
+ CheckNewAnswer(),
AddData(input2, 5),
- CheckLastBatch((5, 10, 10, 15)) // No filter by any watermark
+ CheckNewAnswer((5, 10, 10, 15)) // No filter by any watermark
)
}
@@ -142,27 +142,27 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
assertNumStateRows(total = 1, updated = 1),
AddData(input2, 1),
- CheckLastBatch((1, 10, 2, 3)),
+ CheckAnswer((1, 10, 2, 3)),
assertNumStateRows(total = 2, updated = 1),
StopStream,
StartStream(),
AddData(input1, 25),
- CheckLastBatch(), // since there is only 1 watermark operator, the watermark should be 15
- assertNumStateRows(total = 3, updated = 1),
+ CheckNewAnswer(), // watermark = 15, no-data-batch should remove 2 rows having window=[0,10]
+ assertNumStateRows(total = 1, updated = 1),
AddData(input2, 25),
- CheckLastBatch((25, 30, 50, 75)), // watermark = 15 should remove 2 rows having window=[0,10]
+ CheckNewAnswer((25, 30, 50, 75)),
assertNumStateRows(total = 2, updated = 1),
StopStream,
StartStream(),
AddData(input2, 1),
- CheckLastBatch(), // Should not join as < 15 removed
- assertNumStateRows(total = 2, updated = 0), // row not add as 1 < state key watermark = 15
+ CheckNewAnswer(), // Should not join as < 15 removed
+ assertNumStateRows(total = 2, updated = 0), // row not add as 1 < state key watermark = 15
AddData(input1, 5),
- CheckLastBatch(), // Should not join or add to state as < 15 got filtered by watermark
+ CheckNewAnswer(), // Same reason as above
assertNumStateRows(total = 2, updated = 0)
)
}
@@ -189,42 +189,39 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
AddData(leftInput, (1, 5)),
CheckAnswer(),
AddData(rightInput, (1, 11)),
- CheckLastBatch((1, 5, 11)),
+ CheckNewAnswer((1, 5, 11)),
AddData(rightInput, (1, 10)),
- CheckLastBatch(), // no match as neither 5, nor 10 from leftTime is less than rightTime 10 - 5
+ CheckNewAnswer(), // no match as leftTime 5 is not < rightTime 10 - 5
assertNumStateRows(total = 3, updated = 3),
// Increase event time watermark to 20s by adding data with time = 30s on both inputs
AddData(leftInput, (1, 3), (1, 30)),
- CheckLastBatch((1, 3, 10), (1, 3, 11)),
+ CheckNewAnswer((1, 3, 10), (1, 3, 11)),
assertNumStateRows(total = 5, updated = 2),
AddData(rightInput, (0, 30)),
- CheckLastBatch(),
- assertNumStateRows(total = 6, updated = 1),
+ CheckNewAnswer(),
// event time watermark: max event time - 10 ==> 30 - 10 = 20
+ // so left side going to only receive data where leftTime > 20
// right side state constraint: 20 < leftTime < rightTime - 5 ==> rightTime > 25
-
- // Run another batch with event time = 25 to clear right state where rightTime <= 25
- AddData(rightInput, (0, 30)),
- CheckLastBatch(),
- assertNumStateRows(total = 5, updated = 1), // removed (1, 11) and (1, 10), added (0, 30)
+ // right state where rightTime <= 25 will be cleared, (1, 11) and (1, 10) removed
+ assertNumStateRows(total = 4, updated = 1),
// New data to right input should match with left side (1, 3) and (1, 5), as left state should
// not be cleared. But rows rightTime <= 20 should be filtered due to event time watermark and
// state rows with rightTime <= 25 should be removed from state.
// (1, 20) ==> filtered by event time watermark = 20
// (1, 21) ==> passed filter, matched with left (1, 3) and (1, 5), not added to state
- // as state watermark = 25
+ // as 21 < state watermark = 25
// (1, 28) ==> passed filter, matched with left (1, 3) and (1, 5), added to state
AddData(rightInput, (1, 20), (1, 21), (1, 28)),
- CheckLastBatch((1, 3, 21), (1, 5, 21), (1, 3, 28), (1, 5, 28)),
- assertNumStateRows(total = 6, updated = 1),
+ CheckNewAnswer((1, 3, 21), (1, 5, 21), (1, 3, 28), (1, 5, 28)),
+ assertNumStateRows(total = 5, updated = 1),
// New data to left input with leftTime <= 20 should be filtered due to event time watermark
AddData(leftInput, (1, 20), (1, 21)),
- CheckLastBatch((1, 21, 28)),
- assertNumStateRows(total = 7, updated = 1)
+ CheckNewAnswer((1, 21, 28)),
+ assertNumStateRows(total = 6, updated = 1)
)
}
@@ -275,38 +272,39 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
AddData(leftInput, (1, 20)),
CheckAnswer(),
AddData(rightInput, (1, 14), (1, 15), (1, 25), (1, 26), (1, 30), (1, 31)),
- CheckLastBatch((1, 20, 15), (1, 20, 25), (1, 20, 26), (1, 20, 30)),
+ CheckNewAnswer((1, 20, 15), (1, 20, 25), (1, 20, 26), (1, 20, 30)),
assertNumStateRows(total = 7, updated = 7),
// If rightTime = 60, then it matches only leftTime = [50, 65]
AddData(rightInput, (1, 60)),
- CheckLastBatch(), // matches with nothing on the left
+ CheckNewAnswer(), // matches with nothing on the left
AddData(leftInput, (1, 49), (1, 50), (1, 65), (1, 66)),
- CheckLastBatch((1, 50, 60), (1, 65, 60)),
- assertNumStateRows(total = 12, updated = 5),
+ CheckNewAnswer((1, 50, 60), (1, 65, 60)),
// Event time watermark = min(left: 66 - delay 20 = 46, right: 60 - delay 30 = 30) = 30
// Left state value watermark = 30 - 10 = slightly less than 20 (since condition has <=)
// Should drop < 20 from left, i.e., none
// Right state value watermark = 30 - 5 = slightly less than 25 (since condition has <=)
// Should drop < 25 from the right, i.e., 14 and 15
- AddData(leftInput, (1, 30), (1, 31)), // 30 should not be processed or added to stat
- CheckLastBatch((1, 31, 26), (1, 31, 30), (1, 31, 31)),
- assertNumStateRows(total = 11, updated = 1), // 12 - 2 removed + 1 added
+ assertNumStateRows(total = 10, updated = 5), // 12 - 2 removed
+
+ AddData(leftInput, (1, 30), (1, 31)), // 30 should not be processed or added to state
+ CheckNewAnswer((1, 31, 26), (1, 31, 30), (1, 31, 31)),
+ assertNumStateRows(total = 11, updated = 1), // only 31 added
// Advance the watermark
AddData(rightInput, (1, 80)),
- CheckLastBatch(),
- assertNumStateRows(total = 12, updated = 1),
-
+ CheckNewAnswer(),
// Event time watermark = min(left: 66 - delay 20 = 46, right: 80 - delay 30 = 50) = 46
// Left state value watermark = 46 - 10 = slightly less than 36 (since condition has <=)
// Should drop < 36 from left, i.e., 20, 31 (30 was not added)
// Right state value watermark = 46 - 5 = slightly less than 41 (since condition has <=)
// Should drop < 41 from the right, i.e., 25, 26, 30, 31
- AddData(rightInput, (1, 50)),
- CheckLastBatch((1, 49, 50), (1, 50, 50)),
- assertNumStateRows(total = 7, updated = 1) // 12 - 6 removed + 1 added
+ assertNumStateRows(total = 6, updated = 1), // 12 - 6 removed
+
+ AddData(rightInput, (1, 46), (1, 50)), // 46 should not be processed or added to state
+ CheckNewAnswer((1, 49, 50), (1, 50, 50)),
+ assertNumStateRows(total = 7, updated = 1) // 50 added
)
}
@@ -322,7 +320,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
input1.addData(1)
q.awaitTermination(10000)
}
- assert(e.toString.contains("Stream stream joins without equality predicate is not supported"))
+ assert(e.toString.contains("Stream-stream join without equality predicate is not supported"))
}
test("stream stream self join") {
@@ -404,10 +402,11 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
AddData(input1, 1, 5),
AddData(input2, 1, 5, 10),
AddData(input3, 5, 10),
- CheckLastBatch((5, 10, 5, 15, 5, 25)))
+ CheckNewAnswer((5, 10, 5, 15, 5, 25)))
}
}
+
class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with BeforeAndAfter {
import testImplicits._
@@ -465,13 +464,13 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
MultiAddData(leftInput, 1, 2, 3)(rightInput, 3, 4, 5),
// The left rows with leftValue <= 4 should generate their outer join row now and
// not get added to the state.
- CheckLastBatch(Row(3, 10, 6, "9"), Row(1, 10, 2, null), Row(2, 10, 4, null)),
+ CheckNewAnswer(Row(3, 10, 6, "9"), Row(1, 10, 2, null), Row(2, 10, 4, null)),
assertNumStateRows(total = 4, updated = 4),
// We shouldn't get more outer join rows when the watermark advances.
MultiAddData(leftInput, 20)(rightInput, 21),
- CheckLastBatch(),
+ CheckNewAnswer(),
AddData(rightInput, 20),
- CheckLastBatch((20, 30, 40, "60"))
+ CheckNewAnswer((20, 30, 40, "60"))
)
}
@@ -492,15 +491,15 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
testStream(joined)(
MultiAddData(leftInput, 3, 4, 5)(rightInput, 1, 2, 3),
- // The right rows with value <= 7 should never be added to the state.
- CheckLastBatch(Row(3, 10, 6, "9")),
+ // The right rows with rightValue <= 7 should never be added to the state.
+ CheckNewAnswer(Row(3, 10, 6, "9")), // rightValue = 9 > 7 hence joined and added to state
assertNumStateRows(total = 4, updated = 4),
// When the watermark advances, we get the outer join rows just as we would if they
// were added but didn't match the full join condition.
- MultiAddData(leftInput, 20)(rightInput, 21),
- CheckLastBatch(),
+ MultiAddData(leftInput, 20)(rightInput, 21), // watermark = 10, no-data-batch computes nulls
+ CheckNewAnswer(Row(4, 10, 8, null), Row(5, 10, 10, null)),
AddData(rightInput, 20),
- CheckLastBatch(Row(20, 30, 40, "60"), Row(4, 10, 8, null), Row(5, 10, 10, null))
+ CheckNewAnswer(Row(20, 30, 40, "60"))
)
}
@@ -521,15 +520,15 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
testStream(joined)(
MultiAddData(leftInput, 1, 2, 3)(rightInput, 3, 4, 5),
- // The left rows with value <= 4 should never be added to the state.
- CheckLastBatch(Row(3, 10, 6, "9")),
+ // The left rows with leftValue <= 4 should never be added to the state.
+ CheckNewAnswer(Row(3, 10, 6, "9")), // leftValue = 7 > 4 hence joined and added to state
assertNumStateRows(total = 4, updated = 4),
// When the watermark advances, we get the outer join rows just as we would if they
// were added but didn't match the full join condition.
- MultiAddData(leftInput, 20)(rightInput, 21),
- CheckLastBatch(),
+ MultiAddData(leftInput, 20)(rightInput, 21), // watermark = 10, no-data-batch computes nulls
+ CheckNewAnswer(Row(4, 10, null, "12"), Row(5, 10, null, "15")),
AddData(rightInput, 20),
- CheckLastBatch(Row(20, 30, 40, "60"), Row(4, 10, null, "12"), Row(5, 10, null, "15"))
+ CheckNewAnswer(Row(20, 30, 40, "60"))
)
}
@@ -552,13 +551,13 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
MultiAddData(leftInput, 3, 4, 5)(rightInput, 1, 2, 3),
// The right rows with rightValue <= 7 should generate their outer join row now and
// not get added to the state.
- CheckLastBatch(Row(3, 10, 6, "9"), Row(1, 10, null, "3"), Row(2, 10, null, "6")),
+ CheckNewAnswer(Row(3, 10, 6, "9"), Row(1, 10, null, "3"), Row(2, 10, null, "6")),
assertNumStateRows(total = 4, updated = 4),
// We shouldn't get more outer join rows when the watermark advances.
MultiAddData(leftInput, 20)(rightInput, 21),
- CheckLastBatch(),
+ CheckNewAnswer(),
AddData(rightInput, 20),
- CheckLastBatch((20, 30, 40, "60"))
+ CheckNewAnswer((20, 30, 40, "60"))
)
}
@@ -568,14 +567,14 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
testStream(joined)(
// Test inner part of the join.
MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
- CheckLastBatch((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)),
- // Old state doesn't get dropped until the batch *after* it gets introduced, so the
- // nulls won't show up until the next batch after the watermark advances.
- MultiAddData(leftInput, 21)(rightInput, 22),
- CheckLastBatch(),
- assertNumStateRows(total = 12, updated = 12),
+ CheckNewAnswer((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)),
+
+ MultiAddData(leftInput, 21)(rightInput, 22), // watermark = 11, no-data-batch computes nulls
+ CheckNewAnswer(Row(1, 10, 2, null), Row(2, 10, 4, null)),
+ assertNumStateRows(total = 2, updated = 12),
+
AddData(leftInput, 22),
- CheckLastBatch(Row(22, 30, 44, 66), Row(1, 10, 2, null), Row(2, 10, 4, null)),
+ CheckNewAnswer(Row(22, 30, 44, 66)),
assertNumStateRows(total = 3, updated = 1)
)
}
@@ -586,14 +585,14 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
testStream(joined)(
// Test inner part of the join.
MultiAddData(leftInput, 1, 2, 3, 4, 5)(rightInput, 3, 4, 5, 6, 7),
- CheckLastBatch((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)),
- // Old state doesn't get dropped until the batch *after* it gets introduced, so the
- // nulls won't show up until the next batch after the watermark advances.
- MultiAddData(leftInput, 21)(rightInput, 22),
- CheckLastBatch(),
- assertNumStateRows(total = 12, updated = 12),
+ CheckNewAnswer((3, 10, 6, 9), (4, 10, 8, 12), (5, 10, 10, 15)),
+
+ MultiAddData(leftInput, 21)(rightInput, 22), // watermark = 11, no-data-batch computes nulls
+ CheckNewAnswer(Row(6, 10, null, 18), Row(7, 10, null, 21)),
+ assertNumStateRows(total = 2, updated = 12),
+
AddData(leftInput, 22),
- CheckLastBatch(Row(22, 30, 44, 66), Row(6, 10, null, 18), Row(7, 10, null, 21)),
+ CheckNewAnswer(Row(22, 30, 44, 66)),
assertNumStateRows(total = 3, updated = 1)
)
}
@@ -627,21 +626,18 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
AddData(leftInput, (1, 5), (3, 5)),
CheckAnswer(),
AddData(rightInput, (1, 10), (2, 5)),
- CheckLastBatch((1, 1, 5, 10)),
+ CheckNewAnswer((1, 1, 5, 10)),
AddData(rightInput, (1, 11)),
- CheckLastBatch(), // no match as left time is too low
+ CheckNewAnswer(), // no match as left time is too low
assertNumStateRows(total = 5, updated = 5),
// Increase event time watermark to 20s by adding data with time = 30s on both inputs
AddData(leftInput, (1, 7), (1, 30)),
- CheckLastBatch((1, 1, 7, 10), (1, 1, 7, 11)),
+ CheckNewAnswer((1, 1, 7, 10), (1, 1, 7, 11)),
assertNumStateRows(total = 7, updated = 2),
- AddData(rightInput, (0, 30)),
- CheckLastBatch(),
- assertNumStateRows(total = 8, updated = 1),
- AddData(rightInput, (0, 30)),
- CheckLastBatch(outerResult),
- assertNumStateRows(total = 3, updated = 1)
+ AddData(rightInput, (0, 30)), // watermark = 30 - 10 = 20, no-data-batch computes nulls
+ CheckNewAnswer(outerResult),
+ assertNumStateRows(total = 2, updated = 1)
)
}
}
@@ -665,36 +661,41 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest with
testStream(joined)(
// leftValue <= 10 should generate outer join rows even though it matches right keys
MultiAddData(leftInput, 1, 2, 3)(rightInput, 1, 2, 3),
- CheckLastBatch(Row(1, 10, 2, null), Row(2, 10, 4, null), Row(3, 10, 6, null)),
- MultiAddData(leftInput, 20)(rightInput, 21),
- CheckLastBatch(),
- assertNumStateRows(total = 5, updated = 5), // 1...3 added, but 20 and 21 not added
+ CheckNewAnswer(Row(1, 10, 2, null), Row(2, 10, 4, null), Row(3, 10, 6, null)),
+ assertNumStateRows(total = 3, updated = 3), // only right 1, 2, 3 added
+
+ MultiAddData(leftInput, 20)(rightInput, 21), // watermark = 10, no-data-batch cleared < 10
+ CheckNewAnswer(),
+ assertNumStateRows(total = 2, updated = 2), // only 20 and 21 left in state
+
AddData(rightInput, 20),
- CheckLastBatch(
- Row(20, 30, 40, 60)),
+ CheckNewAnswer(Row(20, 30, 40, 60)),
assertNumStateRows(total = 3, updated = 1),
+
// leftValue and rightValue both satisfying condition should not generate outer join rows
- MultiAddData(leftInput, 40, 41)(rightInput, 40, 41),
- CheckLastBatch((40, 50, 80, 120), (41, 50, 82, 123)),
- MultiAddData(leftInput, 70)(rightInput, 71),
- CheckLastBatch(),
- assertNumStateRows(total = 6, updated = 6), // all inputs added since last check
+ MultiAddData(leftInput, 40, 41)(rightInput, 40, 41), // watermark = 31
+ CheckNewAnswer((40, 50, 80, 120), (41, 50, 82, 123)),
+ assertNumStateRows(total = 4, updated = 4), // only left 40, 41 + right 40,41 left in state
+
+ MultiAddData(leftInput, 70)(rightInput, 71), // watermark = 60
+ CheckNewAnswer(),
+ assertNumStateRows(total = 2, updated = 2), // only 70, 71 left in state
+
AddData(rightInput, 70),
- CheckLastBatch((70, 80, 140, 210)),
+ CheckNewAnswer((70, 80, 140, 210)),
assertNumStateRows(total = 3, updated = 1),
+
// rightValue between 300 and 1000 should generate outer join rows even though it matches left
- MultiAddData(leftInput, 101, 102, 103)(rightInput, 101, 102, 103),
- CheckLastBatch(),
+ MultiAddData(leftInput, 101, 102, 103)(rightInput, 101, 102, 103), // watermark = 91
+ CheckNewAnswer(),
+ assertNumStateRows(total = 6, updated = 3), // only 101 - 103 left in state
+
MultiAddData(leftInput, 1000)(rightInput, 1001),
- CheckLastBatch(),
- assertNumStateRows(total = 8, updated = 5), // 101...103 added, but 1000 and 1001 not added
- AddData(rightInput, 1000),
- CheckLastBatch(
- Row(1000, 1010, 2000, 3000),
+ CheckNewAnswer(
Row(101, 110, 202, null),
Row(102, 110, 204, null),
Row(103, 110, 206, null)),
- assertNumStateRows(total = 3, updated = 1)
+ assertNumStateRows(total = 2, updated = 2)
)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org