You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2024/02/06 03:29:08 UTC
(spark) branch master updated: [SPARK-46960][SS] Testing Multiple Input Streams with TransformWithState operator
This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 5100b2e5d1aa [SPARK-46960][SS] Testing Multiple Input Streams with TransformWithState operator
5100b2e5d1aa is described below
commit 5100b2e5d1aab081e6c5ac9cb3d9a46f5b2b6353
Author: Eric Marnadi <er...@databricks.com>
AuthorDate: Tue Feb 6 12:28:46 2024 +0900
[SPARK-46960][SS] Testing Multiple Input Streams with TransformWithState operator
Adding unit tests to test multiple input streams with the TransformWithState operator
### What changes were proposed in this pull request?
Added unit tests in TransformWithStateSuite
### Why are the changes needed?
These changes are needed to ensure that we can union multiple input streams with the TWS operator, just like any other stateful operator
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
This change is just adding tests. No further tests needed.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #45004 from ericm-db/multiple-input-streams.
Authored-by: Eric Marnadi <er...@databricks.com>
Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
.../sql/streaming/TransformWithStateSuite.scala | 96 ++++++++++++++++++++++
1 file changed, 96 insertions(+)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
index 7a6c3f00fc7a..3efef3b37000 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
@@ -233,6 +233,102 @@ class TransformWithStateSuite extends StateStoreMetricsTest
}
}
}
+
+ test("transformWithState - two input streams") {
+ withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+ classOf[RocksDBStateStoreProvider].getName,
+ SQLConf.SHUFFLE_PARTITIONS.key ->
+ TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString) {
+ val inputData1 = MemoryStream[String]
+ val inputData2 = MemoryStream[String]
+
+ val result = inputData1.toDS()
+ .union(inputData2.toDS())
+ .groupByKey(x => x)
+ .transformWithState(new RunningCountStatefulProcessor(),
+ TimeoutMode.NoTimeouts(),
+ OutputMode.Update())
+
+ testStream(result, OutputMode.Update())(
+ AddData(inputData1, "a"),
+ CheckNewAnswer(("a", "1")),
+ AddData(inputData2, "a", "b"),
+ CheckNewAnswer(("a", "2"), ("b", "1")),
+ AddData(inputData1, "a", "b"), // should remove state for "a" and not return anything for a
+ CheckNewAnswer(("b", "2")),
+ AddData(inputData1, "d", "e"),
+ AddData(inputData2, "a", "c"), // should recreate state for "a" and return count as 1
+ CheckNewAnswer(("a", "1"), ("c", "1"), ("d", "1"), ("e", "1")),
+ StopStream
+ )
+ }
+ }
+
+ test("transformWithState - three input streams") {
+ withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+ classOf[RocksDBStateStoreProvider].getName,
+ SQLConf.SHUFFLE_PARTITIONS.key ->
+ TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString) {
+ val inputData1 = MemoryStream[String]
+ val inputData2 = MemoryStream[String]
+ val inputData3 = MemoryStream[String]
+
+ // union 3 input streams
+ val result = inputData1.toDS()
+ .union(inputData2.toDS())
+ .union(inputData3.toDS())
+ .groupByKey(x => x)
+ .transformWithState(new RunningCountStatefulProcessor(),
+ TimeoutMode.NoTimeouts(),
+ OutputMode.Update())
+
+ testStream(result, OutputMode.Update())(
+ AddData(inputData1, "a"),
+ CheckNewAnswer(("a", "1")),
+ AddData(inputData2, "a", "b"),
+ CheckNewAnswer(("a", "2"), ("b", "1")),
+ AddData(inputData3, "a", "b"), // should remove state for "a" and not return anything for a
+ CheckNewAnswer(("b", "2")),
+ AddData(inputData1, "d", "e"),
+ AddData(inputData2, "a", "c"), // should recreate state for "a" and return count as 1
+ CheckNewAnswer(("a", "1"), ("c", "1"), ("d", "1"), ("e", "1")),
+ AddData(inputData3, "a", "c", "d", "e"),
+ CheckNewAnswer(("a", "2"), ("c", "2"), ("d", "2"), ("e", "2")),
+ StopStream
+ )
+ }
+ }
+
+ test("transformWithState - two input streams, different key type") {
+ withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+ classOf[RocksDBStateStoreProvider].getName,
+ SQLConf.SHUFFLE_PARTITIONS.key ->
+ TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString) {
+ val inputData1 = MemoryStream[String]
+ val inputData2 = MemoryStream[Long]
+
+ val result = inputData1.toDS()
+ // union inputData2 by casting it to a String
+ .union(inputData2.toDS().map(_.toString))
+ .groupByKey(x => x)
+ .transformWithState(new RunningCountStatefulProcessor(),
+ TimeoutMode.NoTimeouts(),
+ OutputMode.Update())
+
+ testStream(result, OutputMode.Update())(
+ AddData(inputData1, "1"),
+ CheckNewAnswer(("1", "1")),
+ AddData(inputData2, 1L, 2L),
+ CheckNewAnswer(("1", "2"), ("2", "1")),
+ AddData(inputData1, "1", "2"), // should remove state for "1" and not return anything.
+ CheckNewAnswer(("2", "2")),
+ AddData(inputData1, "4", "5"),
+ AddData(inputData2, 1L, 3L), // should recreate state for "1" and return count as 1
+ CheckNewAnswer(("1", "1"), ("3", "1"), ("4", "1"), ("5", "1")),
+ StopStream
+ )
+ }
+ }
}
class TransformWithStateValidationSuite extends StateStoreMetricsTest {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org