You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "chaoqin-li1123 (via GitHub)" <gi...@apache.org> on 2023/10/17 04:05:47 UTC

[PR] [SPARK-45558][SS] Introduce a metadata file for streaming stateful operator [spark]

chaoqin-li1123 opened a new pull request, #43393:
URL: https://github.com/apache/spark/pull/43393

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   Introduce a metadata file for streaming stateful operator, write metadata for stateful operator during planning.
   The information to store in the metadata file:
   - operator name (no need to be unique among stateful operators in the query)
   - state store name
   - numColumnsPrefixKey: > 0 if prefix scan is enabled, 0 otherwise
   The body of metadata file will be in json format. The metadata file will be versioned just as other streaming metadata file to be future proof.
   
   ### Why are the changes needed?
   The metadata file will improve expose more information about the state store, improves debugability and facilitate the development of state related feature such as reading and writing state and state repartitioning.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   
   ### How was this patch tested?
   Add unit test and integration tests
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45558][SS] Introduce a metadata file for streaming stateful operator [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #43393:
URL: https://github.com/apache/spark/pull/43393#discussion_r1362969534


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -145,14 +142,37 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession {
       StopStream
     )
 
-    val statePath = new Path(checkpointDir.toString, "state/0")
-    val operatorMetadata = new OperatorStateMetadataReader(statePath, hadoopConf).read()
-      .asInstanceOf[OperatorStateMetadataV1]
-
     val expectedMetadata = OperatorStateMetadataV1(
       OperatorInfoV1(0, "sessionWindowStateStoreSaveExec"),
       Array(StateStoreMetadataV1("default", 1, spark.sessionState.conf.numShufflePartitions))
     )
-    assert(sameOperatorStateMetadata(operatorMetadata, expectedMetadata))
+    checkOperatorStateMetadata(0, expectedMetadata)
+  }
+
+  test("Stateful operator metadata for multiple operators.") {

Review Comment:
   nit: remove `.` at the end for consistency



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -32,30 +32,37 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession {
 
   private lazy val hadoopConf = spark.sessionState.newHadoopConf()
 
+  private var checkpointDir = Utils.createTempDir()

Review Comment:
   Our best practice is to use withTempDir per each test. Could we please follow the pattern and remove this and beforeEach?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -32,30 +32,37 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession {
 
   private lazy val hadoopConf = spark.sessionState.newHadoopConf()
 
+  private var checkpointDir = Utils.createTempDir()
+
   private def numShufflePartitions = spark.sessionState.conf.numShufflePartitions
 
-  private def sameOperatorStateMetadata(
-      operatorMetadata1: OperatorStateMetadataV1,
-      operatorMetadata2: OperatorStateMetadataV1): Boolean = {
-    operatorMetadata1.operatorInfo == operatorMetadata2.operatorInfo &&
-      operatorMetadata1.stateStoreInfo.sameElements(operatorMetadata2.stateStoreInfo)
+  override def beforeEach(): Unit = {
+    Utils.deleteRecursively(checkpointDir)
+    checkpointDir = Utils.createTempDir()
+  }
+
+  private def checkOperatorStateMetadata(
+      operatorId: Int,
+      expectedMetadata: OperatorStateMetadataV1): Unit = {
+    val statePath = new Path(checkpointDir.toString, s"state/$operatorId")
+    val operatorMetadata = new OperatorStateMetadataReader(statePath, hadoopConf).read()
+      .asInstanceOf[OperatorStateMetadataV1]
+    // println("doodoo: " + operatorMetadata.stateStoreInfo)

Review Comment:
   nit: should be removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45558][SS] Introduce a metadata file for streaming stateful operator [spark]

Posted by "chaoqin-li1123 (via GitHub)" <gi...@apache.org>.
chaoqin-li1123 commented on code in PR #43393:
URL: https://github.com/apache/spark/pull/43393#discussion_r1362899909


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -177,6 +183,20 @@ class IncrementalExecution(
     }
   }
 
+  object WriteStatefulOperatorMetadataRule extends SparkPlanPartialRule {
+    override val rule: PartialFunction[SparkPlan, SparkPlan] = {
+      case stateStoreWriter: StateStoreWriter =>
+        if (isFirstBatch) {
+          val metadata = stateStoreWriter.operatorStateMetadata()
+          val metadataWriter = new OperatorStateMetadataWriter(new Path(
+            checkpointLocation, stateStoreWriter.getStateInfo.operatorId.toString), hadoopConf)
+          metadataWriter.write(metadata)
+        }
+        stateStoreWriter
+      case plan: SparkPlan => plan

Review Comment:
   Thanks for reminding, we can simplify the code a lot.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45558][SS] Introduce a metadata file for streaming stateful operator [spark]

Posted by "chaoqin-li1123 (via GitHub)" <gi...@apache.org>.
chaoqin-li1123 commented on code in PR #43393:
URL: https://github.com/apache/spark/pull/43393#discussion_r1362790940


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala:
##########


Review Comment:
   class doc added/



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala:
##########


Review Comment:
   class doc added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45558][SS] Introduce a metadata file for streaming stateful operator [spark]

Posted by "chaoqin-li1123 (via GitHub)" <gi...@apache.org>.
chaoqin-li1123 commented on code in PR #43393:
URL: https://github.com/apache/spark/pull/43393#discussion_r1362790709


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.Column
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.functions.{count, session_window}
+import org.apache.spark.sql.streaming.{OutputMode, StreamTest}
+import org.apache.spark.sql.streaming.OutputMode.Complete
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.util.Utils
+
+class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession {
+  import testImplicits._
+
+  private lazy val hadoopConf = spark.sessionState.newHadoopConf()
+
+  private def numShufflePartitions = spark.sessionState.conf.numShufflePartitions
+
+  private def sameOperatorStateMetadata(
+      operatorMetadata1: OperatorStateMetadataV1,
+      operatorMetadata2: OperatorStateMetadataV1): Boolean = {
+    operatorMetadata1.operatorInfo == operatorMetadata2.operatorInfo &&
+      operatorMetadata1.stateStoreInfo.sameElements(operatorMetadata2.stateStoreInfo)
+  }
+
+  test("Serialize and deserialize stateful operator metadata") {
+    val stateDir = Utils.createTempDir()
+    val statePath = new Path(stateDir.toString)
+    val stateStoreInfo = (1 to 4).map(i => StateStoreMetadataV1(s"store$i", 1, 200))
+    val operatorInfo = OperatorInfoV1(1, "Join")
+    val operatorMetadata = OperatorStateMetadataV1(operatorInfo, stateStoreInfo.toArray)
+    new OperatorStateMetadataWriter(statePath, hadoopConf).write(operatorMetadata)
+    val operatorMetadata1 = new OperatorStateMetadataReader(statePath, hadoopConf).read()
+      .asInstanceOf[OperatorStateMetadataV1]
+    assert(sameOperatorStateMetadata(operatorMetadata, operatorMetadata1))
+  }
+
+  test("Stateful operator metadata for streaming aggregation") {
+    val inputData = MemoryStream[Int]
+    val checkpointDir = Utils.createTempDir()
+    val aggregated =
+      inputData.toDF()
+        .groupBy($"value")
+        .agg(count("*"))
+        .as[(Int, Long)]
+
+    testStream(aggregated, Complete)(
+      StartStream(checkpointLocation = checkpointDir.toString),
+      AddData(inputData, 3),
+      CheckLastBatch((3, 1)),
+      StopStream
+    )
+
+    val statePath = new Path(checkpointDir.getCanonicalPath, "state/0")
+    val operatorMetadata = new OperatorStateMetadataReader(statePath, hadoopConf).read()
+      .asInstanceOf[OperatorStateMetadataV1]
+    val expectedMetadata = OperatorStateMetadataV1(OperatorInfoV1(0, "stateStoreSave"),
+      Array(StateStoreMetadataV1("default", 0, numShufflePartitions)))
+    assert(sameOperatorStateMetadata(operatorMetadata, expectedMetadata))
+  }
+
+  test("Stateful operator metadata for streaming join") {
+    val input1 = MemoryStream[Int]
+    val input2 = MemoryStream[Int]
+
+    val df1 = input1.toDF.select($"value" as "key", ($"value" * 2) as "leftValue")
+    val df2 = input2.toDF.select($"value" as "key", ($"value" * 3) as "rightValue")
+    val joined = df1.join(df2, "key")
+
+    val checkpointDir = Utils.createTempDir()
+    testStream(joined)(
+      StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
+      AddData(input1, 1),
+      CheckAnswer(),
+      AddData(input2, 1, 10),       // 1 arrived on input1 first, then input2, should join
+      CheckNewAnswer((1, 2, 3)),
+      StopStream
+    )
+
+    val statePath = new Path(checkpointDir.toString, "state/0")
+    val operatorMetadata = new OperatorStateMetadataReader(statePath, hadoopConf).read()
+      .asInstanceOf[OperatorStateMetadataV1]
+
+    val expectedStateStoreInfo = Array(
+      StateStoreMetadataV1("left-keyToNumValues", 0, numShufflePartitions),
+      StateStoreMetadataV1("left-keyWithIndexToValue", 0, numShufflePartitions),
+      StateStoreMetadataV1("right-keyToNumValues", 0, numShufflePartitions),
+      StateStoreMetadataV1("right-keyWithIndexToValue", 0, numShufflePartitions))
+
+    val expectedMetadata = OperatorStateMetadataV1(
+      OperatorInfoV1(0, "symmetricHashJoin"), expectedStateStoreInfo)
+    assert(sameOperatorStateMetadata(operatorMetadata, expectedMetadata))
+  }
+
+  test("Stateful operator metadata for streaming session window") {
+    val input = MemoryStream[(String, Long)]
+    val sessionWindow: Column = session_window($"eventTime", "10 seconds")
+
+    val checkpointDir = Utils.createTempDir()
+
+    val events = input.toDF()
+      .select($"_1".as("value"), $"_2".as("timestamp"))
+      .withColumn("eventTime", $"timestamp".cast("timestamp"))
+      .withWatermark("eventTime", "30 seconds")
+      .selectExpr("explode(split(value, ' ')) AS sessionId", "eventTime")
+
+    val streamingDf = events
+      .groupBy(sessionWindow as Symbol("session"), $"sessionId")
+      .agg(count("*").as("numEvents"))
+      .selectExpr("sessionId", "CAST(session.start AS LONG)", "CAST(session.end AS LONG)",
+        "CAST(session.end AS LONG) - CAST(session.start AS LONG) AS durationMs",
+        "numEvents")
+
+    testStream(streamingDf, OutputMode.Complete())(
+      StartStream(checkpointLocation = checkpointDir.toString),
+      AddData(input,
+        ("hello world spark streaming", 40L),
+        ("world hello structured streaming", 41L)
+      ),
+      CheckNewAnswer(
+        ("hello", 40, 51, 11, 2),
+        ("world", 40, 51, 11, 2),
+        ("streaming", 40, 51, 11, 2),
+        ("spark", 40, 50, 10, 1),
+        ("structured", 41, 51, 10, 1)
+      ),
+      StopStream
+    )
+
+    val statePath = new Path(checkpointDir.toString, "state/0")
+    val operatorMetadata = new OperatorStateMetadataReader(statePath, hadoopConf).read()
+      .asInstanceOf[OperatorStateMetadataV1]
+
+    val expectedMetadata = OperatorStateMetadataV1(
+      OperatorInfoV1(0, "sessionWindowStateStoreSaveExec"),
+      Array(StateStoreMetadataV1("default", 1, spark.sessionState.conf.numShufflePartitions))
+    )
+    assert(sameOperatorStateMetadata(operatorMetadata, expectedMetadata))
+  }

Review Comment:
   Multiple operators test added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45558][SS] Introduce a metadata file for streaming stateful operator [spark]

Posted by "chaoqin-li1123 (via GitHub)" <gi...@apache.org>.
chaoqin-li1123 commented on code in PR #43393:
URL: https://github.com/apache/spark/pull/43393#discussion_r1363131430


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -32,30 +32,37 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession {
 
   private lazy val hadoopConf = spark.sessionState.newHadoopConf()
 
+  private var checkpointDir = Utils.createTempDir()
+
   private def numShufflePartitions = spark.sessionState.conf.numShufflePartitions
 
-  private def sameOperatorStateMetadata(
-      operatorMetadata1: OperatorStateMetadataV1,
-      operatorMetadata2: OperatorStateMetadataV1): Boolean = {
-    operatorMetadata1.operatorInfo == operatorMetadata2.operatorInfo &&
-      operatorMetadata1.stateStoreInfo.sameElements(operatorMetadata2.stateStoreInfo)
+  override def beforeEach(): Unit = {
+    Utils.deleteRecursively(checkpointDir)
+    checkpointDir = Utils.createTempDir()
+  }
+
+  private def checkOperatorStateMetadata(
+      operatorId: Int,
+      expectedMetadata: OperatorStateMetadataV1): Unit = {
+    val statePath = new Path(checkpointDir.toString, s"state/$operatorId")
+    val operatorMetadata = new OperatorStateMetadataReader(statePath, hadoopConf).read()
+      .asInstanceOf[OperatorStateMetadataV1]
+    // println("doodoo: " + operatorMetadata.stateStoreInfo)

Review Comment:
   Sorry, forgot to delete the debug print.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -32,30 +32,37 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession {
 
   private lazy val hadoopConf = spark.sessionState.newHadoopConf()
 
+  private var checkpointDir = Utils.createTempDir()

Review Comment:
   Refactored.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45558][SS] Introduce a metadata file for streaming stateful operator [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #43393:
URL: https://github.com/apache/spark/pull/43393#issuecomment-1767782043

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45558][SS] Introduce a metadata file for streaming stateful operator [spark]

Posted by "chaoqin-li1123 (via GitHub)" <gi...@apache.org>.
chaoqin-li1123 commented on PR #43393:
URL: https://github.com/apache/spark/pull/43393#issuecomment-1765706556

   @HeartSaVioR 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45558][SS] Introduce a metadata file for streaming stateful operator [spark]

Posted by "chaoqin-li1123 (via GitHub)" <gi...@apache.org>.
chaoqin-li1123 commented on code in PR #43393:
URL: https://github.com/apache/spark/pull/43393#discussion_r1363132266


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -145,14 +142,37 @@ class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession {
       StopStream
     )
 
-    val statePath = new Path(checkpointDir.toString, "state/0")
-    val operatorMetadata = new OperatorStateMetadataReader(statePath, hadoopConf).read()
-      .asInstanceOf[OperatorStateMetadataV1]
-
     val expectedMetadata = OperatorStateMetadataV1(
       OperatorInfoV1(0, "sessionWindowStateStoreSaveExec"),
       Array(StateStoreMetadataV1("default", 1, spark.sessionState.conf.numShufflePartitions))
     )
-    assert(sameOperatorStateMetadata(operatorMetadata, expectedMetadata))
+    checkOperatorStateMetadata(0, expectedMetadata)
+  }
+
+  test("Stateful operator metadata for multiple operators.") {

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45558][SS] Introduce a metadata file for streaming stateful operator [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #43393:
URL: https://github.com/apache/spark/pull/43393#discussion_r1362048469


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadata.scala:
##########


Review Comment:
   General comment: we'd like to have a class doc for each trait/class.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -177,6 +183,20 @@ class IncrementalExecution(
     }
   }
 
+  object WriteStatefulOperatorMetadataRule extends SparkPlanPartialRule {
+    override val rule: PartialFunction[SparkPlan, SparkPlan] = {
+      case stateStoreWriter: StateStoreWriter =>
+        if (isFirstBatch) {
+          val metadata = stateStoreWriter.operatorStateMetadata()
+          val metadataWriter = new OperatorStateMetadataWriter(new Path(
+            checkpointLocation, stateStoreWriter.getStateInfo.operatorId.toString), hadoopConf)
+          metadataWriter.write(metadata)
+        }
+        stateStoreWriter
+      case plan: SparkPlan => plan

Review Comment:
   nit: Does this require all cases to be matched? I guess it should be OK to construct a "partial" match since we are creating PartialFunction. This line may not be needed.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/OperatorStateMetadataSuite.scala:
##########
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.state
+
+import org.apache.hadoop.fs.Path
+
+import org.apache.spark.sql.Column
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.functions.{count, session_window}
+import org.apache.spark.sql.streaming.{OutputMode, StreamTest}
+import org.apache.spark.sql.streaming.OutputMode.Complete
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.util.Utils
+
+class OperatorStateMetadataSuite extends StreamTest with SharedSparkSession {
+  import testImplicits._
+
+  private lazy val hadoopConf = spark.sessionState.newHadoopConf()
+
+  private def numShufflePartitions = spark.sessionState.conf.numShufflePartitions
+
+  private def sameOperatorStateMetadata(
+      operatorMetadata1: OperatorStateMetadataV1,
+      operatorMetadata2: OperatorStateMetadataV1): Boolean = {
+    operatorMetadata1.operatorInfo == operatorMetadata2.operatorInfo &&
+      operatorMetadata1.stateStoreInfo.sameElements(operatorMetadata2.stateStoreInfo)
+  }
+
+  test("Serialize and deserialize stateful operator metadata") {
+    val stateDir = Utils.createTempDir()
+    val statePath = new Path(stateDir.toString)
+    val stateStoreInfo = (1 to 4).map(i => StateStoreMetadataV1(s"store$i", 1, 200))
+    val operatorInfo = OperatorInfoV1(1, "Join")
+    val operatorMetadata = OperatorStateMetadataV1(operatorInfo, stateStoreInfo.toArray)
+    new OperatorStateMetadataWriter(statePath, hadoopConf).write(operatorMetadata)
+    val operatorMetadata1 = new OperatorStateMetadataReader(statePath, hadoopConf).read()
+      .asInstanceOf[OperatorStateMetadataV1]
+    assert(sameOperatorStateMetadata(operatorMetadata, operatorMetadata1))
+  }
+
+  test("Stateful operator metadata for streaming aggregation") {
+    val inputData = MemoryStream[Int]
+    val checkpointDir = Utils.createTempDir()
+    val aggregated =
+      inputData.toDF()
+        .groupBy($"value")
+        .agg(count("*"))
+        .as[(Int, Long)]
+
+    testStream(aggregated, Complete)(
+      StartStream(checkpointLocation = checkpointDir.toString),
+      AddData(inputData, 3),
+      CheckLastBatch((3, 1)),
+      StopStream
+    )
+
+    val statePath = new Path(checkpointDir.getCanonicalPath, "state/0")
+    val operatorMetadata = new OperatorStateMetadataReader(statePath, hadoopConf).read()
+      .asInstanceOf[OperatorStateMetadataV1]
+    val expectedMetadata = OperatorStateMetadataV1(OperatorInfoV1(0, "stateStoreSave"),
+      Array(StateStoreMetadataV1("default", 0, numShufflePartitions)))
+    assert(sameOperatorStateMetadata(operatorMetadata, expectedMetadata))
+  }
+
+  test("Stateful operator metadata for streaming join") {
+    val input1 = MemoryStream[Int]
+    val input2 = MemoryStream[Int]
+
+    val df1 = input1.toDF.select($"value" as "key", ($"value" * 2) as "leftValue")
+    val df2 = input2.toDF.select($"value" as "key", ($"value" * 3) as "rightValue")
+    val joined = df1.join(df2, "key")
+
+    val checkpointDir = Utils.createTempDir()
+    testStream(joined)(
+      StartStream(checkpointLocation = checkpointDir.getCanonicalPath),
+      AddData(input1, 1),
+      CheckAnswer(),
+      AddData(input2, 1, 10),       // 1 arrived on input1 first, then input2, should join
+      CheckNewAnswer((1, 2, 3)),
+      StopStream
+    )
+
+    val statePath = new Path(checkpointDir.toString, "state/0")
+    val operatorMetadata = new OperatorStateMetadataReader(statePath, hadoopConf).read()
+      .asInstanceOf[OperatorStateMetadataV1]
+
+    val expectedStateStoreInfo = Array(
+      StateStoreMetadataV1("left-keyToNumValues", 0, numShufflePartitions),
+      StateStoreMetadataV1("left-keyWithIndexToValue", 0, numShufflePartitions),
+      StateStoreMetadataV1("right-keyToNumValues", 0, numShufflePartitions),
+      StateStoreMetadataV1("right-keyWithIndexToValue", 0, numShufflePartitions))
+
+    val expectedMetadata = OperatorStateMetadataV1(
+      OperatorInfoV1(0, "symmetricHashJoin"), expectedStateStoreInfo)
+    assert(sameOperatorStateMetadata(operatorMetadata, expectedMetadata))
+  }
+
+  test("Stateful operator metadata for streaming session window") {
+    val input = MemoryStream[(String, Long)]
+    val sessionWindow: Column = session_window($"eventTime", "10 seconds")
+
+    val checkpointDir = Utils.createTempDir()
+
+    val events = input.toDF()
+      .select($"_1".as("value"), $"_2".as("timestamp"))
+      .withColumn("eventTime", $"timestamp".cast("timestamp"))
+      .withWatermark("eventTime", "30 seconds")
+      .selectExpr("explode(split(value, ' ')) AS sessionId", "eventTime")
+
+    val streamingDf = events
+      .groupBy(sessionWindow as Symbol("session"), $"sessionId")
+      .agg(count("*").as("numEvents"))
+      .selectExpr("sessionId", "CAST(session.start AS LONG)", "CAST(session.end AS LONG)",
+        "CAST(session.end AS LONG) - CAST(session.start AS LONG) AS durationMs",
+        "numEvents")
+
+    testStream(streamingDf, OutputMode.Complete())(
+      StartStream(checkpointLocation = checkpointDir.toString),
+      AddData(input,
+        ("hello world spark streaming", 40L),
+        ("world hello structured streaming", 41L)
+      ),
+      CheckNewAnswer(
+        ("hello", 40, 51, 11, 2),
+        ("world", 40, 51, 11, 2),
+        ("streaming", 40, 51, 11, 2),
+        ("spark", 40, 50, 10, 1),
+        ("structured", 41, 51, 10, 1)
+      ),
+      StopStream
+    )
+
+    val statePath = new Path(checkpointDir.toString, "state/0")
+    val operatorMetadata = new OperatorStateMetadataReader(statePath, hadoopConf).read()
+      .asInstanceOf[OperatorStateMetadataV1]
+
+    val expectedMetadata = OperatorStateMetadataV1(
+      OperatorInfoV1(0, "sessionWindowStateStoreSaveExec"),
+      Array(StateStoreMetadataV1("default", 1, spark.sessionState.conf.numShufflePartitions))
+    )
+    assert(sameOperatorStateMetadata(operatorMetadata, expectedMetadata))
+  }

Review Comment:
   Let's add another test to verify multiple stateful operators (opId being 0, 1, 2, etc.).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45558][SS] Introduce a metadata file for streaming stateful operator [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR closed pull request #43393: [SPARK-45558][SS] Introduce a metadata file for streaming stateful operator
URL: https://github.com/apache/spark/pull/43393


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org