You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2024/02/06 03:29:08 UTC

(spark) branch master updated: [SPARK-46960][SS] Testing Multiple Input Streams with TransformWithState operator

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5100b2e5d1aa [SPARK-46960][SS] Testing Multiple Input Streams with TransformWithState operator
5100b2e5d1aa is described below

commit 5100b2e5d1aab081e6c5ac9cb3d9a46f5b2b6353
Author: Eric Marnadi <er...@databricks.com>
AuthorDate: Tue Feb 6 12:28:46 2024 +0900

    [SPARK-46960][SS] Testing Multiple Input Streams with TransformWithState operator
    
    Adding unit tests to test multiple input streams with the TransformWithState operator
    ### What changes were proposed in this pull request?
    
    Added unit tests in TransformWithStateSuite
    ### Why are the changes needed?
    
    These changes are needed to ensure that we can union multiple input streams with the TWS operator, just like any other stateful operator
    ### Does this PR introduce _any_ user-facing change?
    
    No
    ### How was this patch tested?
    
    This change is just adding tests. No further tests needed.
    ### Was this patch authored or co-authored using generative AI tooling?
    
    No
    
    Closes #45004 from ericm-db/multiple-input-streams.
    
    Authored-by: Eric Marnadi <er...@databricks.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 .../sql/streaming/TransformWithStateSuite.scala    | 96 ++++++++++++++++++++++
 1 file changed, 96 insertions(+)

diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
index 7a6c3f00fc7a..3efef3b37000 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala
@@ -233,6 +233,102 @@ class TransformWithStateSuite extends StateStoreMetricsTest
       }
     }
   }
+
+  test("transformWithState - two input streams") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName,
+      SQLConf.SHUFFLE_PARTITIONS.key ->
+        TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString) {
+      val inputData1 = MemoryStream[String]
+      val inputData2 = MemoryStream[String]
+
+      val result = inputData1.toDS()
+        .union(inputData2.toDS())
+        .groupByKey(x => x)
+        .transformWithState(new RunningCountStatefulProcessor(),
+          TimeoutMode.NoTimeouts(),
+          OutputMode.Update())
+
+      testStream(result, OutputMode.Update())(
+        AddData(inputData1, "a"),
+        CheckNewAnswer(("a", "1")),
+        AddData(inputData2, "a", "b"),
+        CheckNewAnswer(("a", "2"), ("b", "1")),
+        AddData(inputData1, "a", "b"), // should remove state for "a" and not return anything for a
+        CheckNewAnswer(("b", "2")),
+        AddData(inputData1, "d", "e"),
+        AddData(inputData2, "a", "c"), // should recreate state for "a" and return count as 1
+        CheckNewAnswer(("a", "1"), ("c", "1"), ("d", "1"), ("e", "1")),
+        StopStream
+      )
+    }
+  }
+
+  test("transformWithState - three input streams") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName,
+      SQLConf.SHUFFLE_PARTITIONS.key ->
+        TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString) {
+      val inputData1 = MemoryStream[String]
+      val inputData2 = MemoryStream[String]
+      val inputData3 = MemoryStream[String]
+
+      // union 3 input streams
+      val result = inputData1.toDS()
+        .union(inputData2.toDS())
+        .union(inputData3.toDS())
+        .groupByKey(x => x)
+        .transformWithState(new RunningCountStatefulProcessor(),
+          TimeoutMode.NoTimeouts(),
+          OutputMode.Update())
+
+      testStream(result, OutputMode.Update())(
+        AddData(inputData1, "a"),
+        CheckNewAnswer(("a", "1")),
+        AddData(inputData2, "a", "b"),
+        CheckNewAnswer(("a", "2"), ("b", "1")),
+        AddData(inputData3, "a", "b"), // should remove state for "a" and not return anything for a
+        CheckNewAnswer(("b", "2")),
+        AddData(inputData1, "d", "e"),
+        AddData(inputData2, "a", "c"), // should recreate state for "a" and return count as 1
+        CheckNewAnswer(("a", "1"), ("c", "1"), ("d", "1"), ("e", "1")),
+        AddData(inputData3, "a", "c", "d", "e"),
+        CheckNewAnswer(("a", "2"), ("c", "2"), ("d", "2"), ("e", "2")),
+        StopStream
+      )
+    }
+  }
+
+  test("transformWithState - two input streams, different key type") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName,
+      SQLConf.SHUFFLE_PARTITIONS.key ->
+        TransformWithStateSuiteUtils.NUM_SHUFFLE_PARTITIONS.toString) {
+      val inputData1 = MemoryStream[String]
+      val inputData2 = MemoryStream[Long]
+
+      val result = inputData1.toDS()
+        // union inputData2 by casting it to a String
+        .union(inputData2.toDS().map(_.toString))
+        .groupByKey(x => x)
+        .transformWithState(new RunningCountStatefulProcessor(),
+          TimeoutMode.NoTimeouts(),
+          OutputMode.Update())
+
+      testStream(result, OutputMode.Update())(
+        AddData(inputData1, "1"),
+        CheckNewAnswer(("1", "1")),
+        AddData(inputData2, 1L, 2L),
+        CheckNewAnswer(("1", "2"), ("2", "1")),
+        AddData(inputData1, "1", "2"), // should remove state for "1" and not return anything.
+        CheckNewAnswer(("2", "2")),
+        AddData(inputData1, "4", "5"),
+        AddData(inputData2, 1L, 3L), // should recreate state for "1" and return count as 1
+        CheckNewAnswer(("1", "1"), ("3", "1"), ("4", "1"), ("5", "1")),
+        StopStream
+      )
+    }
+  }
 }
 
 class TransformWithStateValidationSuite extends StateStoreMetricsTest {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org