You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "chaoqin-li1123 (via GitHub)" <gi...@apache.org> on 2023/11/13 19:12:55 UTC

[PR] [SPARK-45747][SS] Use prefix key information in state metadata to handle reading state for session window aggregation [spark]

chaoqin-li1123 opened a new pull request, #43788:
URL: https://github.com/apache/spark/pull/43788

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   Currently reading state for session window aggregation operator is not supported because the numColPrefixKey is unknown. We can read the operator state metadata introduced in SPARK-45558 to determine the number of prefix columns and load the state of session window correctly.
   
   
   ### Why are the changes needed?
   To support reading state for session window aggregation.
   
   
   ### Does this PR introduce _any_ user-facing change?
   No.
   
   
   ### How was this patch tested?
   Add integration test.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45747][SS] Use prefix key information in state metadata to handle reading state for session window aggregation [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR closed pull request #43788: [SPARK-45747][SS] Use prefix key information in state metadata to handle reading state for session window aggregation
URL: https://github.com/apache/spark/pull/43788


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45747][SS] Use prefix key information in state metadata to handle reading state for session window aggregation [spark]

Posted by "chaoqin-li1123 (via GitHub)" <gi...@apache.org>.
chaoqin-li1123 commented on code in PR #43788:
URL: https://github.com/apache/spark/pull/43788#discussion_r1393179190


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -57,12 +58,31 @@ class StatePartitionReader(
       partition.sourceOptions.operatorId, partition.partition, partition.sourceOptions.storeName)
     val stateStoreProviderId = StateStoreProviderId(stateStoreId, partition.queryId)
 
+    val allStateStoreMetadata = new StateMetadataPartitionReader(
+      partition.sourceOptions.stateCheckpointLocation.getParent.toString, hadoopConf)
+      .stateMetadata.toArray
+
+    val stateStoreMetadata = allStateStoreMetadata.filter(entry =>

Review Comment:
   fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45747][SS] Use prefix key information in state metadata to handle reading state for session window aggregation [spark]

Posted by "chaoqin-li1123 (via GitHub)" <gi...@apache.org>.
chaoqin-li1123 commented on PR #43788:
URL: https://github.com/apache/spark/pull/43788#issuecomment-1809278646

   @HeartSaVioR 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45747][SS] Use prefix key information in state metadata to handle reading state for session window aggregation [spark]

Posted by "chaoqin-li1123 (via GitHub)" <gi...@apache.org>.
chaoqin-li1123 commented on code in PR #43788:
URL: https://github.com/apache/spark/pull/43788#discussion_r1393178895


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala:
##########
@@ -195,7 +195,7 @@ class StateMetadataPartitionReader(
     }
   }
 
-  private lazy val stateMetadata: Iterator[StateMetadataTableEntry] = {
+  private[state] lazy val stateMetadata: Iterator[StateMetadataTableEntry] = {

Review Comment:
   Yes, I use this class to get prefix key number but don't want to access the internal row because it is untyped. So I need to exposed the api to access the metadata as case class



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -57,12 +58,31 @@ class StatePartitionReader(
       partition.sourceOptions.operatorId, partition.partition, partition.sourceOptions.storeName)
     val stateStoreProviderId = StateStoreProviderId(stateStoreId, partition.queryId)
 
+    val allStateStoreMetadata = new StateMetadataPartitionReader(
+      partition.sourceOptions.stateCheckpointLocation.getParent.toString, hadoopConf)
+      .stateMetadata.toArray
+
+    val stateStoreMetadata = allStateStoreMetadata.filter(entry =>
+      entry.operatorId == partition.sourceOptions.operatorId
+        && entry.stateStoreName == partition.sourceOptions.storeName
+    )
+    val numColsPrefixKey = if (stateStoreMetadata.isEmpty) {
+      logWarning("Metadata for state store not found, possible cause is this checkpoint " +
+        "is created by older version of spark. The state of session window aggregation can't be " +
+        "read correctly without state metadata and runtime exception will be thrown. " +
+        "Run the streaming query in newer spark version to generate state metadata.")
+      0
+    } else {
+      require(stateStoreMetadata.length == 1)
+      stateStoreMetadata.head.numColsPrefixKey
+    }
+
     // TODO: This does not handle the case of session window aggregation; we don't have an

Review Comment:
   Removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45747][SS] Use prefix key information in state metadata to handle reading state for session window aggregation [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #43788:
URL: https://github.com/apache/spark/pull/43788#issuecomment-1812628737

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45747][SS] Use prefix key information in state metadata to handle reading state for session window aggregation [spark]

Posted by "chaoqin-li1123 (via GitHub)" <gi...@apache.org>.
chaoqin-li1123 commented on code in PR #43788:
URL: https://github.com/apache/spark/pull/43788#discussion_r1393180207


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala:
##########
@@ -494,6 +497,48 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass
     }
   }
 
+  test("Session window aggregation") {
+    withTempDir { checkpointDir =>
+      val input = MemoryStream[(String, Long)]
+      val sessionWindow = session_window($"eventTime", "10 seconds")
+
+      val events = input.toDF()
+        .select($"_1".as("value"), $"_2".as("timestamp"))
+        .withColumn("eventTime", $"timestamp".cast("timestamp"))
+        .withWatermark("eventTime", "30 seconds")
+        .selectExpr("explode(split(value, ' ')) AS sessionId", "eventTime")
+
+      val streamingDf = events
+        .groupBy(sessionWindow as Symbol("session"), $"sessionId")
+        .agg(count("*").as("numEvents"))
+        .selectExpr("sessionId", "CAST(session.start AS LONG)", "CAST(session.end AS LONG)",
+          "CAST(session.end AS LONG) - CAST(session.start AS LONG) AS durationMs",
+          "numEvents")
+
+      testStream(streamingDf, OutputMode.Complete())(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(input,
+          ("hello world spark streaming", 40L),
+          ("world hello structured streaming", 41L)
+        ),
+        CheckNewAnswer(
+          ("hello", 40, 51, 11, 2),
+          ("world", 40, 51, 11, 2),
+          ("streaming", 40, 51, 11, 2),
+          ("spark", 40, 50, 10, 1),
+          ("structured", 41, 51, 10, 1)
+        ),
+        StopStream
+      )
+
+
+      val df = spark.read.format("statestore").load(checkpointDir.toString)
+        .selectExpr("key.sessionId as sessionId", "value.count as count")

Review Comment:
   Changed the query to validate all the field.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45747][SS] Use prefix key information in state metadata to handle reading state for session window aggregation [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #43788:
URL: https://github.com/apache/spark/pull/43788#discussion_r1392113944


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -57,12 +58,31 @@ class StatePartitionReader(
       partition.sourceOptions.operatorId, partition.partition, partition.sourceOptions.storeName)
     val stateStoreProviderId = StateStoreProviderId(stateStoreId, partition.queryId)
 
+    val allStateStoreMetadata = new StateMetadataPartitionReader(
+      partition.sourceOptions.stateCheckpointLocation.getParent.toString, hadoopConf)
+      .stateMetadata.toArray
+
+    val stateStoreMetadata = allStateStoreMetadata.filter(entry =>
+      entry.operatorId == partition.sourceOptions.operatorId
+        && entry.stateStoreName == partition.sourceOptions.storeName
+    )
+    val numColsPrefixKey = if (stateStoreMetadata.isEmpty) {
+      logWarning("Metadata for state store not found, possible cause is this checkpoint " +
+        "is created by older version of spark. The state of session window aggregation can't be " +
+        "read correctly without state metadata and runtime exception will be thrown. " +
+        "Run the streaming query in newer spark version to generate state metadata.")
+      0
+    } else {
+      require(stateStoreMetadata.length == 1)
+      stateStoreMetadata.head.numColsPrefixKey
+    }
+
     // TODO: This does not handle the case of session window aggregation; we don't have an

Review Comment:
   Let's remove this TODO comment as you're addressing this.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala:
##########
@@ -494,6 +497,48 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass
     }
   }
 
+  test("Session window aggregation") {
+    withTempDir { checkpointDir =>
+      val input = MemoryStream[(String, Long)]
+      val sessionWindow = session_window($"eventTime", "10 seconds")
+
+      val events = input.toDF()
+        .select($"_1".as("value"), $"_2".as("timestamp"))
+        .withColumn("eventTime", $"timestamp".cast("timestamp"))
+        .withWatermark("eventTime", "30 seconds")
+        .selectExpr("explode(split(value, ' ')) AS sessionId", "eventTime")
+
+      val streamingDf = events
+        .groupBy(sessionWindow as Symbol("session"), $"sessionId")
+        .agg(count("*").as("numEvents"))
+        .selectExpr("sessionId", "CAST(session.start AS LONG)", "CAST(session.end AS LONG)",
+          "CAST(session.end AS LONG) - CAST(session.start AS LONG) AS durationMs",
+          "numEvents")
+
+      testStream(streamingDf, OutputMode.Complete())(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(input,
+          ("hello world spark streaming", 40L),
+          ("world hello structured streaming", 41L)
+        ),
+        CheckNewAnswer(
+          ("hello", 40, 51, 11, 2),
+          ("world", 40, 51, 11, 2),
+          ("streaming", 40, 51, 11, 2),
+          ("spark", 40, 50, 10, 1),
+          ("structured", 41, 51, 10, 1)
+        ),
+        StopStream
+      )
+

Review Comment:
   nit: single empty line



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/metadata/StateMetadataSource.scala:
##########
@@ -195,7 +195,7 @@ class StateMetadataPartitionReader(
     }
   }
 
-  private lazy val stateMetadata: Iterator[StateMetadataTableEntry] = {
+  private[state] lazy val stateMetadata: Iterator[StateMetadataTableEntry] = {

Review Comment:
   Is this change necessary?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala:
##########
@@ -494,6 +497,48 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass
     }
   }
 
+  test("Session window aggregation") {
+    withTempDir { checkpointDir =>
+      val input = MemoryStream[(String, Long)]

Review Comment:
   Shall we follow the pattern we do for StateDataSourceReadSuite vs StateDataSourceTestBase? The main reason I put the part of query execution to StateDataSourceTestBase is that we'd probably be likely to reuse the query between test for read vs test for write.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StatePartitionReader.scala:
##########
@@ -57,12 +58,31 @@ class StatePartitionReader(
       partition.sourceOptions.operatorId, partition.partition, partition.sourceOptions.storeName)
     val stateStoreProviderId = StateStoreProviderId(stateStoreId, partition.queryId)
 
+    val allStateStoreMetadata = new StateMetadataPartitionReader(
+      partition.sourceOptions.stateCheckpointLocation.getParent.toString, hadoopConf)
+      .stateMetadata.toArray
+
+    val stateStoreMetadata = allStateStoreMetadata.filter(entry =>

Review Comment:
   nit: `(entry =>` to ` { entry =>` (attention on the space around {)



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala:
##########
@@ -494,6 +497,48 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass
     }
   }
 
+  test("Session window aggregation") {
+    withTempDir { checkpointDir =>
+      val input = MemoryStream[(String, Long)]
+      val sessionWindow = session_window($"eventTime", "10 seconds")
+
+      val events = input.toDF()
+        .select($"_1".as("value"), $"_2".as("timestamp"))
+        .withColumn("eventTime", $"timestamp".cast("timestamp"))
+        .withWatermark("eventTime", "30 seconds")
+        .selectExpr("explode(split(value, ' ')) AS sessionId", "eventTime")
+
+      val streamingDf = events
+        .groupBy(sessionWindow as Symbol("session"), $"sessionId")
+        .agg(count("*").as("numEvents"))
+        .selectExpr("sessionId", "CAST(session.start AS LONG)", "CAST(session.end AS LONG)",
+          "CAST(session.end AS LONG) - CAST(session.start AS LONG) AS durationMs",
+          "numEvents")
+
+      testStream(streamingDf, OutputMode.Complete())(
+        StartStream(checkpointLocation = checkpointDir.toString),
+        AddData(input,
+          ("hello world spark streaming", 40L),
+          ("world hello structured streaming", 41L)
+        ),
+        CheckNewAnswer(
+          ("hello", 40, 51, 11, 2),
+          ("world", 40, 51, 11, 2),
+          ("streaming", 40, 51, 11, 2),
+          ("spark", 40, 50, 10, 1),
+          ("structured", 41, 51, 10, 1)
+        ),
+        StopStream
+      )
+
+
+      val df = spark.read.format("statestore").load(checkpointDir.toString)
+        .selectExpr("key.sessionId as sessionId", "value.count as count")

Review Comment:
   Shall we validate the full schema of the state, including the part of session window?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45747][SS] Use prefix key information in state metadata to handle reading state for session window aggregation [spark]

Posted by "chaoqin-li1123 (via GitHub)" <gi...@apache.org>.
chaoqin-li1123 commented on code in PR #43788:
URL: https://github.com/apache/spark/pull/43788#discussion_r1393176718


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/v2/state/StateDataSourceReadSuite.scala:
##########
@@ -494,6 +497,48 @@ abstract class StateDataSourceReadSuite extends StateDataSourceTestBase with Ass
     }
   }
 
+  test("Session window aggregation") {
+    withTempDir { checkpointDir =>
+      val input = MemoryStream[(String, Long)]

Review Comment:
   Moved the query running code to StateDataSourceTestBase.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-45747][SS] Use prefix key information in state metadata to handle reading state for session window aggregation [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #43788:
URL: https://github.com/apache/spark/pull/43788#issuecomment-1812633610

   @chaoqin-li1123 Could you please rebase your change with latest master branch? merge script is confusing that I'm the main author due to my commits listed here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org