You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "ericm-db (via GitHub)" <gi...@apache.org> on 2024/01/25 18:46:42 UTC

[PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

ericm-db opened a new pull request, #44884:
URL: https://github.com/apache/spark/pull/44884

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   We are allowing batch queries to use and define the `TransformWithState` operator, which was initially introduced for streaming.
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   This is needed to keep up the parity between streaming and batch APIs, since we want everything supported in streaming to be supported in batch, as well. 
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   No
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   Added unit tests that use the TransformWithState operator with a batch query.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   <!--
   If generative AI tooling has been used in the process of authoring this patch, please include the
   phrase: 'Generated-by: ' followed by the name of the tool and its version.
   If no, write 'No'.
   Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
   -->
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1466824277


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala:
##########
@@ -891,6 +892,12 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
           initialStateGroupAttrs, data, initialStateDataAttrs, output, timeout,
           hasInitialState, planLater(initialState), planLater(child)
         ) :: Nil
+      case logical.TransformWithState(keyDeserializer, valueDeserializer, groupingAttributes,
+      dataAttributes, statefulProcessor, timeoutMode, outputMode, outputObjAttr, child) =>

Review Comment:
   Needs indent maybe ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1467180436


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -171,3 +175,44 @@ case class TransformWithStateExec(
     }
   }
 }
+
+
+object TransformWithStateExec {
+
+  // Plan logical transformWithState for batch queries
+  def generateSparkPlanForBatchQueries(
+      keyDeserializer: Expression,
+      valueDeserializer: Expression,
+      groupingAttributes: Seq[Attribute],
+      dataAttributes: Seq[Attribute],
+      statefulProcessor: StatefulProcessor[Any, Any, Any],
+      timeoutMode: TimeoutMode,
+      outputMode: OutputMode,
+      outputObjAttr: Attribute,
+      child: SparkPlan): SparkPlan = {
+    val shufflePartitions = child.session.sessionState.conf.numShufflePartitions
+    val statefulOperatorStateInfo = StatefulOperatorStateInfo(
+      Utils.createTempDir().getAbsolutePath,

Review Comment:
   We can't expect the path to exist for both driver and executor. If we want to leverage temp dir, the full path should be retrieved from executor.
   
   Also, in flatMapGroupsWithState, we just mapped the batch version of flatMapGroupsWithState to flatMapGroups. I'd guess it's no longer simple as we allow users to initialize multiple states, but would be great if we can fake state instance (or state store implementation) rather than initiating full lifecycle of state store, including coordination.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1480841542


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -155,23 +161,112 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    child.execute().mapPartitionsWithStateStore[InternalRow](
-      getStateInfo,
-      schemaForKeyRow,
-      schemaForValueRow,
-      numColsPrefixKey = 0,
-      session.sqlContext.sessionState,
-      Some(session.sqlContext.streams.stateStoreCoordinator),
-      useColumnFamilies = true
-    ) {
-      case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-        val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId,
-          keyEncoder)
-        assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
-        statefulProcessor.init(processorHandle, outputMode)
-        processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-        val result = processDataWithPartition(singleIterator, store, processorHandle)
-        result
+    if (isStreaming) {
+      child.execute().mapPartitionsWithStateStore[InternalRow](
+        getStateInfo,
+        schemaForKeyRow,
+        schemaForValueRow,
+        numColsPrefixKey = 0,
+        session.sqlContext.sessionState,
+        Some(session.sqlContext.streams.stateStoreCoordinator),
+        useColumnFamilies = true
+      ) {
+        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+          processData(store, singleIterator)
+      }
+    } else {
+      // If the query is running in batch mode, we need to create a new StateStore and instantiate
+      // a temp directory on the executors in mapPartitionsWithIndex.
+      child.execute().mapPartitionsWithIndex[InternalRow](
+        (i, iter) => {
+          val providerId = {
+            // lazy creation to initialize tempDirPath once
+            lazy val tempDirPath = Utils.createTempDir().getAbsolutePath

Review Comment:
   Please see the question again - my question was "will this be evaluate once per executor **after serde**?". If then this is a good optimization, otherwise lazy val does nothing and it just confuses people.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -155,23 +161,112 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    child.execute().mapPartitionsWithStateStore[InternalRow](
-      getStateInfo,
-      schemaForKeyRow,
-      schemaForValueRow,
-      numColsPrefixKey = 0,
-      session.sqlContext.sessionState,
-      Some(session.sqlContext.streams.stateStoreCoordinator),
-      useColumnFamilies = true
-    ) {
-      case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-        val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId,
-          keyEncoder)
-        assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
-        statefulProcessor.init(processorHandle, outputMode)
-        processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-        val result = processDataWithPartition(singleIterator, store, processorHandle)
-        result
+    if (isStreaming) {
+      child.execute().mapPartitionsWithStateStore[InternalRow](
+        getStateInfo,
+        schemaForKeyRow,
+        schemaForValueRow,
+        numColsPrefixKey = 0,
+        session.sqlContext.sessionState,
+        Some(session.sqlContext.streams.stateStoreCoordinator),
+        useColumnFamilies = true
+      ) {
+        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+          processData(store, singleIterator)
+      }
+    } else {
+      // If the query is running in batch mode, we need to create a new StateStore and instantiate
+      // a temp directory on the executors in mapPartitionsWithIndex.
+      child.execute().mapPartitionsWithIndex[InternalRow](
+        (i, iter) => {
+          val providerId = {
+            // lazy creation to initialize tempDirPath once
+            lazy val tempDirPath = Utils.createTempDir().getAbsolutePath

Review Comment:
   Please see the question again - my question was "will this be evaluate once per executor **_after serde_**?". If then this is a good optimization, otherwise lazy val does nothing and it just confuses people.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "ericm-db (via GitHub)" <gi...@apache.org>.
ericm-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1473325333


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -152,22 +159,116 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    child.execute().mapPartitionsWithStateStore[InternalRow](
-      getStateInfo,
-      schemaForKeyRow,
-      schemaForValueRow,
-      numColsPrefixKey = 0,
-      session.sqlContext.sessionState,
-      Some(session.sqlContext.streams.stateStoreCoordinator),
-      useColumnFamilies = true
-    ) {
-      case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-        val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId)
-        assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
-        statefulProcessor.init(processorHandle, outputMode)
-        processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-        val result = processDataWithPartition(singleIterator, store, processorHandle)
-        result
+    // If the query is running in batch mode, we need to create a new StateStore and instantiate
+    // a temp directory on the executors in mapPartitions
+    if (isStreaming) {
+      child.execute().mapPartitionsWithStateStore[InternalRow](
+        getStateInfo,
+        schemaForKeyRow,
+        schemaForValueRow,
+        numColsPrefixKey = 0,
+        session.sqlContext.sessionState,
+        Some(session.sqlContext.streams.stateStoreCoordinator),
+        useColumnFamilies = true
+      ) {
+        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+          processData(store, singleIterator)
+      }
+    } else {
+      child.execute().mapPartitionsWithIndex[InternalRow](
+        (i, iter) => {
+          val providerId = new StateStoreProviderId(
+            StateStoreId(Utils.createTempDir().getAbsolutePath,
+              i, 0), getStateInfo.queryRunId)
+
+          // Create StateStoreProvider for this partition
+          val stateStoreProvider = StateStoreProvider.createAndInit(
+            providerId,
+            schemaForKeyRow,
+            schemaForValueRow,
+            numColsPrefixKey = 0,
+            useColumnFamilies = true,
+            storeConf = StateStoreConf(getDefaultStateStoreSQLConf),

Review Comment:
   Right, refactored



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1473332965


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -16,17 +16,22 @@
  */
 package org.apache.spark.sql.execution.streaming
 
+

Review Comment:
   Nit: extra newline ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1475161938


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -155,23 +162,109 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    child.execute().mapPartitionsWithStateStore[InternalRow](
-      getStateInfo,
-      schemaForKeyRow,
-      schemaForValueRow,
-      numColsPrefixKey = 0,
-      session.sqlContext.sessionState,
-      Some(session.sqlContext.streams.stateStoreCoordinator),
-      useColumnFamilies = true
-    ) {
-      case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-        val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId,
-          keyEncoder)
-        assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
-        statefulProcessor.init(processorHandle, outputMode)
-        processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-        val result = processDataWithPartition(singleIterator, store, processorHandle)
-        result
+    if (isStreaming) {
+      child.execute().mapPartitionsWithStateStore[InternalRow](
+        getStateInfo,
+        schemaForKeyRow,
+        schemaForValueRow,
+        numColsPrefixKey = 0,
+        session.sqlContext.sessionState,
+        Some(session.sqlContext.streams.stateStoreCoordinator),
+        useColumnFamilies = true
+      ) {
+        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+          processData(store, singleIterator)
+      }
+    } else {
+      // If the query is running in batch mode, we need to create a new StateStore and instantiate
+      // a temp directory on the executors in mapPartitionsWithIndex.
+      child.execute().mapPartitionsWithIndex[InternalRow](
+        (i, iter) => {
+          val providerId = {
+            // lazy creation to initialize tempDirPath once
+            lazy val tempDirPath = Utils.createTempDir().getAbsolutePath
+            new StateStoreProviderId(
+              StateStoreId(tempDirPath, 0, i), getStateInfo.queryRunId)
+          }
+
+          val sqlConf = new SQLConf()
+          sqlConf.setConfString(SQLConf.STATE_STORE_PROVIDER_CLASS.key,
+            classOf[RocksDBStateStoreProvider].getName)
+          val storeConf = new StateStoreConf(sqlConf)
+
+          // Create StateStoreProvider for this partition
+          val stateStoreProvider = StateStoreProvider.createAndInit(

Review Comment:
   If the logic is well implemented, CompletionIterator returned by processData will close the state store instance. You need to wrap with another CompletionIterator to add callback on completion of reading iterator, and close the provider instance. Also, you may want to add task completion listener to close the provider instance as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1466949855


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##########
@@ -68,28 +68,34 @@ class QueryInfoImpl(
  * track of valid transitions as various functions are invoked to track object lifecycle.
  * @param store - instance of state store

Review Comment:
   Can you update the params here too ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on PR #44884:
URL: https://github.com/apache/spark/pull/44884#issuecomment-1911036480

   @HeartSaVioR - could you PTAL, thx ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "sahnib (via GitHub)" <gi...@apache.org>.
sahnib commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1471710039


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -171,3 +175,44 @@ case class TransformWithStateExec(
     }
   }
 }
+
+
+object TransformWithStateExec {
+
+  // Plan logical transformWithState for batch queries
+  def generateSparkPlanForBatchQueries(
+      keyDeserializer: Expression,
+      valueDeserializer: Expression,
+      groupingAttributes: Seq[Attribute],
+      dataAttributes: Seq[Attribute],
+      statefulProcessor: StatefulProcessor[Any, Any, Any],
+      timeoutMode: TimeoutMode,
+      outputMode: OutputMode,
+      outputObjAttr: Attribute,
+      child: SparkPlan): SparkPlan = {
+    val shufflePartitions = child.session.sessionState.conf.numShufflePartitions
+    val statefulOperatorStateInfo = StatefulOperatorStateInfo(
+      Utils.createTempDir().getAbsolutePath,

Review Comment:
   @HeartSaVioR @anishshri-db 
   
   1. Do we see a usecase where the user would want to read the State files (from DFS) post query?
   2. I think implementing a memory based state store is likely a larger effort. For the time being, we can also create the temp directory inside the executor node (inside mapPartitionsWithState), and discard the directory post evaluation (inside completion iterator). I agree though that this is inefficient, and a memory based store would be better in long run.  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1467180436


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -171,3 +175,44 @@ case class TransformWithStateExec(
     }
   }
 }
+
+
+object TransformWithStateExec {
+
+  // Plan logical transformWithState for batch queries
+  def generateSparkPlanForBatchQueries(
+      keyDeserializer: Expression,
+      valueDeserializer: Expression,
+      groupingAttributes: Seq[Attribute],
+      dataAttributes: Seq[Attribute],
+      statefulProcessor: StatefulProcessor[Any, Any, Any],
+      timeoutMode: TimeoutMode,
+      outputMode: OutputMode,
+      outputObjAttr: Attribute,
+      child: SparkPlan): SparkPlan = {
+    val shufflePartitions = child.session.sessionState.conf.numShufflePartitions
+    val statefulOperatorStateInfo = StatefulOperatorStateInfo(
+      Utils.createTempDir().getAbsolutePath,

Review Comment:
   We can't expect the path to exist for both driver and executor. If we want to leverage temp dir, the full path should be retrieved from executor.
   
   Also, in flatMapGroupsWithState, we just mapped the batch version of flatMapGroupsWithState to flatMapGroups. I'd guess it's no longer simple as we allow users to initialize multiple states, but would be great if we can fake state instance (or state store implementation) rather than initiating full lifecycle of state store including coordination.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "ericm-db (via GitHub)" <gi...@apache.org>.
ericm-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1475196950


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala:
##########
@@ -723,7 +723,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
    * Strategy to convert [[TransformWithState]] logical operator to physical operator
    * in streaming plans.
    */
-  object TransformWithStateStrategy extends Strategy {
+  object TransformWithStateStreamingSrategy extends Strategy {

Review Comment:
   Thanks for catching, refactored



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -60,6 +66,7 @@ case class TransformWithStateExec(
     batchTimestampMs: Option[Long],
     eventTimeWatermarkForLateEvents: Option[Long],
     eventTimeWatermarkForEviction: Option[Long],
+    isStreaming: Boolean = true,

Review Comment:
   refactored



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1480841542


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -155,23 +161,112 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    child.execute().mapPartitionsWithStateStore[InternalRow](
-      getStateInfo,
-      schemaForKeyRow,
-      schemaForValueRow,
-      numColsPrefixKey = 0,
-      session.sqlContext.sessionState,
-      Some(session.sqlContext.streams.stateStoreCoordinator),
-      useColumnFamilies = true
-    ) {
-      case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-        val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId,
-          keyEncoder)
-        assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
-        statefulProcessor.init(processorHandle, outputMode)
-        processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-        val result = processDataWithPartition(singleIterator, store, processorHandle)
-        result
+    if (isStreaming) {
+      child.execute().mapPartitionsWithStateStore[InternalRow](
+        getStateInfo,
+        schemaForKeyRow,
+        schemaForValueRow,
+        numColsPrefixKey = 0,
+        session.sqlContext.sessionState,
+        Some(session.sqlContext.streams.stateStoreCoordinator),
+        useColumnFamilies = true
+      ) {
+        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+          processData(store, singleIterator)
+      }
+    } else {
+      // If the query is running in batch mode, we need to create a new StateStore and instantiate
+      // a temp directory on the executors in mapPartitionsWithIndex.
+      child.execute().mapPartitionsWithIndex[InternalRow](
+        (i, iter) => {
+          val providerId = {
+            // lazy creation to initialize tempDirPath once
+            lazy val tempDirPath = Utils.createTempDir().getAbsolutePath

Review Comment:
   Please see the question again - my question was "will this be evaluate once per executor?". If then this is a good optimization, otherwise lazy val does nothing and it just confuses people.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1473308137


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -152,22 +159,116 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    child.execute().mapPartitionsWithStateStore[InternalRow](
-      getStateInfo,
-      schemaForKeyRow,
-      schemaForValueRow,
-      numColsPrefixKey = 0,
-      session.sqlContext.sessionState,
-      Some(session.sqlContext.streams.stateStoreCoordinator),
-      useColumnFamilies = true
-    ) {
-      case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-        val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId)
-        assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
-        statefulProcessor.init(processorHandle, outputMode)
-        processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-        val result = processDataWithPartition(singleIterator, store, processorHandle)
-        result
+    // If the query is running in batch mode, we need to create a new StateStore and instantiate
+    // a temp directory on the executors in mapPartitions
+    if (isStreaming) {
+      child.execute().mapPartitionsWithStateStore[InternalRow](
+        getStateInfo,
+        schemaForKeyRow,
+        schemaForValueRow,
+        numColsPrefixKey = 0,
+        session.sqlContext.sessionState,
+        Some(session.sqlContext.streams.stateStoreCoordinator),
+        useColumnFamilies = true
+      ) {
+        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+          processData(store, singleIterator)
+      }
+    } else {
+      child.execute().mapPartitionsWithIndex[InternalRow](
+        (i, iter) => {
+          val providerId = new StateStoreProviderId(
+            StateStoreId(Utils.createTempDir().getAbsolutePath,
+              i, 0), getStateInfo.queryRunId)
+
+          // Create StateStoreProvider for this partition
+          val stateStoreProvider = StateStoreProvider.createAndInit(
+            providerId,
+            schemaForKeyRow,
+            schemaForValueRow,
+            numColsPrefixKey = 0,
+            useColumnFamilies = true,
+            storeConf = StateStoreConf(getDefaultStateStoreSQLConf),

Review Comment:
   That way we can avoid changes to `StateStoreConf.scala`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1480850291


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -155,23 +161,112 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    child.execute().mapPartitionsWithStateStore[InternalRow](
-      getStateInfo,
-      schemaForKeyRow,
-      schemaForValueRow,
-      numColsPrefixKey = 0,
-      session.sqlContext.sessionState,
-      Some(session.sqlContext.streams.stateStoreCoordinator),
-      useColumnFamilies = true
-    ) {
-      case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-        val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId,
-          keyEncoder)
-        assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
-        statefulProcessor.init(processorHandle, outputMode)
-        processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-        val result = processDataWithPartition(singleIterator, store, processorHandle)
-        result
+    if (isStreaming) {
+      child.execute().mapPartitionsWithStateStore[InternalRow](
+        getStateInfo,
+        schemaForKeyRow,
+        schemaForValueRow,
+        numColsPrefixKey = 0,
+        session.sqlContext.sessionState,
+        Some(session.sqlContext.streams.stateStoreCoordinator),
+        useColumnFamilies = true
+      ) {
+        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+          processData(store, singleIterator)
+      }
+    } else {
+      // If the query is running in batch mode, we need to create a new StateStore and instantiate
+      // a temp directory on the executors in mapPartitionsWithIndex.
+      child.execute().mapPartitionsWithIndex[InternalRow](
+        (i, iter) => {
+          val providerId = {
+            // lazy creation to initialize tempDirPath once
+            lazy val tempDirPath = Utils.createTempDir().getAbsolutePath

Review Comment:
   Ah ok - gotcha, thx ! Yea agreed - if its not evaluating once, might as well remove the `lazy` portion. Maybe we should just remove it anyway - will become easier to read I feel



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #44884:
URL: https://github.com/apache/spark/pull/44884#issuecomment-1933298214

   Thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1480600742


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -155,23 +161,112 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    child.execute().mapPartitionsWithStateStore[InternalRow](
-      getStateInfo,
-      schemaForKeyRow,
-      schemaForValueRow,
-      numColsPrefixKey = 0,
-      session.sqlContext.sessionState,
-      Some(session.sqlContext.streams.stateStoreCoordinator),
-      useColumnFamilies = true
-    ) {
-      case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-        val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId,
-          keyEncoder)
-        assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
-        statefulProcessor.init(processorHandle, outputMode)
-        processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-        val result = processDataWithPartition(singleIterator, store, processorHandle)
-        result
+    if (isStreaming) {
+      child.execute().mapPartitionsWithStateStore[InternalRow](
+        getStateInfo,
+        schemaForKeyRow,
+        schemaForValueRow,
+        numColsPrefixKey = 0,
+        session.sqlContext.sessionState,
+        Some(session.sqlContext.streams.stateStoreCoordinator),
+        useColumnFamilies = true
+      ) {
+        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+          processData(store, singleIterator)
+      }
+    } else {
+      // If the query is running in batch mode, we need to create a new StateStore and instantiate
+      // a temp directory on the executors in mapPartitionsWithIndex.
+      child.execute().mapPartitionsWithIndex[InternalRow](
+        (i, iter) => {
+          val providerId = {
+            // lazy creation to initialize tempDirPath once
+            lazy val tempDirPath = Utils.createTempDir().getAbsolutePath

Review Comment:
   I don't think this will evaluate once. But do we need it to evaluate once ? I thought we were fine passing a tmp path to each executor here, given that the store instance is not tracked/checkpointed ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1467180436


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -171,3 +175,44 @@ case class TransformWithStateExec(
     }
   }
 }
+
+
+object TransformWithStateExec {
+
+  // Plan logical transformWithState for batch queries
+  def generateSparkPlanForBatchQueries(
+      keyDeserializer: Expression,
+      valueDeserializer: Expression,
+      groupingAttributes: Seq[Attribute],
+      dataAttributes: Seq[Attribute],
+      statefulProcessor: StatefulProcessor[Any, Any, Any],
+      timeoutMode: TimeoutMode,
+      outputMode: OutputMode,
+      outputObjAttr: Attribute,
+      child: SparkPlan): SparkPlan = {
+    val shufflePartitions = child.session.sessionState.conf.numShufflePartitions
+    val statefulOperatorStateInfo = StatefulOperatorStateInfo(
+      Utils.createTempDir().getAbsolutePath,

Review Comment:
   We can't expect the path to exist for both driver and executor. If we want to leverage temp dir, the full path should be retrieved from executor.
   
   Also, in flatMapGroupsWithState, we just mapped the batch version of flatMapGroupsWithState to flatMapGroups. I'd guess it's no longer simple as we allow users to initialize multiple states, but would be great if we can fake state instance (or state store implementation) rather than initiating full lifecycle of state store.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "ericm-db (via GitHub)" <gi...@apache.org>.
ericm-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1473703052


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -152,22 +159,104 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    child.execute().mapPartitionsWithStateStore[InternalRow](
-      getStateInfo,
-      schemaForKeyRow,
-      schemaForValueRow,
-      numColsPrefixKey = 0,
-      session.sqlContext.sessionState,
-      Some(session.sqlContext.streams.stateStoreCoordinator),
-      useColumnFamilies = true
-    ) {
-      case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-        val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId)
-        assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
-        statefulProcessor.init(processorHandle, outputMode)
-        processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-        val result = processDataWithPartition(singleIterator, store, processorHandle)
-        result
+    if (isStreaming) {
+      child.execute().mapPartitionsWithStateStore[InternalRow](
+        getStateInfo,
+        schemaForKeyRow,
+        schemaForValueRow,
+        numColsPrefixKey = 0,
+        session.sqlContext.sessionState,
+        Some(session.sqlContext.streams.stateStoreCoordinator),
+        useColumnFamilies = true
+      ) {
+        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+          processData(store, singleIterator)
+      }
+    } else {
+      // If the query is running in batch mode, we need to create a new StateStore and instantiate
+      // a temp directory on the executors in mapPartitionsWithIndex.
+      child.execute().mapPartitionsWithIndex[InternalRow](
+        (i, iter) => {
+          val providerId = new StateStoreProviderId(
+            StateStoreId(Utils.createTempDir().getAbsolutePath,

Review Comment:
   I did try that, but that didn't work



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "ericm-db (via GitHub)" <gi...@apache.org>.
ericm-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1473614718


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -152,22 +159,104 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    child.execute().mapPartitionsWithStateStore[InternalRow](
-      getStateInfo,
-      schemaForKeyRow,
-      schemaForValueRow,
-      numColsPrefixKey = 0,
-      session.sqlContext.sessionState,
-      Some(session.sqlContext.streams.stateStoreCoordinator),
-      useColumnFamilies = true
-    ) {
-      case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-        val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId)
-        assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
-        statefulProcessor.init(processorHandle, outputMode)
-        processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-        val result = processDataWithPartition(singleIterator, store, processorHandle)
-        result
+    if (isStreaming) {
+      child.execute().mapPartitionsWithStateStore[InternalRow](
+        getStateInfo,
+        schemaForKeyRow,
+        schemaForValueRow,
+        numColsPrefixKey = 0,
+        session.sqlContext.sessionState,
+        Some(session.sqlContext.streams.stateStoreCoordinator),
+        useColumnFamilies = true
+      ) {
+        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+          processData(store, singleIterator)
+      }
+    } else {
+      // If the query is running in batch mode, we need to create a new StateStore and instantiate
+      // a temp directory on the executors in mapPartitionsWithIndex.
+      child.execute().mapPartitionsWithIndex[InternalRow](
+        (i, iter) => {
+          val providerId = new StateStoreProviderId(
+            StateStoreId(Utils.createTempDir().getAbsolutePath,

Review Comment:
   Hm, actually, if we move this temp directory initialization outside of where it is now, I don't see how we can ensure that this path exists on the executor. We run into the same exact issue @HeartSaVioR pointed out, correct?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "ericm-db (via GitHub)" <gi...@apache.org>.
ericm-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1473621671


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -152,22 +159,104 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    child.execute().mapPartitionsWithStateStore[InternalRow](
-      getStateInfo,
-      schemaForKeyRow,
-      schemaForValueRow,
-      numColsPrefixKey = 0,
-      session.sqlContext.sessionState,
-      Some(session.sqlContext.streams.stateStoreCoordinator),
-      useColumnFamilies = true
-    ) {
-      case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-        val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId)
-        assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
-        statefulProcessor.init(processorHandle, outputMode)
-        processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-        val result = processDataWithPartition(singleIterator, store, processorHandle)
-        result
+    if (isStreaming) {
+      child.execute().mapPartitionsWithStateStore[InternalRow](
+        getStateInfo,
+        schemaForKeyRow,
+        schemaForValueRow,
+        numColsPrefixKey = 0,
+        session.sqlContext.sessionState,
+        Some(session.sqlContext.streams.stateStoreCoordinator),
+        useColumnFamilies = true
+      ) {
+        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+          processData(store, singleIterator)
+      }
+    } else {
+      // If the query is running in batch mode, we need to create a new StateStore and instantiate
+      // a temp directory on the executors in mapPartitionsWithIndex.
+      child.execute().mapPartitionsWithIndex[InternalRow](
+        (i, iter) => {
+          val providerId = new StateStoreProviderId(
+            StateStoreId(Utils.createTempDir().getAbsolutePath,

Review Comment:
   I tried to lazily initialize this tmpDir within the loop. This should work, I think



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1475193738


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -44,23 +48,25 @@ import org.apache.spark.util.CompletionIterator
  * @param batchTimestampMs processing timestamp of the current batch.
  * @param eventTimeWatermarkForLateEvents event time watermark for filtering late events
  * @param eventTimeWatermarkForEviction event time watermark for state eviction
+ * @param isStreaming defines whether the query is streaming or batch
  * @param child the physical plan for the underlying data
  */
 case class TransformWithStateExec(
-    keyDeserializer: Expression,
-    valueDeserializer: Expression,
-    groupingAttributes: Seq[Attribute],
-    dataAttributes: Seq[Attribute],
-    statefulProcessor: StatefulProcessor[Any, Any, Any],
-    timeoutMode: TimeoutMode,
-    outputMode: OutputMode,
-    keyEncoder: ExpressionEncoder[Any],
-    outputObjAttr: Attribute,
-    stateInfo: Option[StatefulOperatorStateInfo],
-    batchTimestampMs: Option[Long],
-    eventTimeWatermarkForLateEvents: Option[Long],
-    eventTimeWatermarkForEviction: Option[Long],
-    child: SparkPlan)
+     keyDeserializer: Expression,

Review Comment:
   Is the indent change intentional ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1473307363


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -152,22 +159,116 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    child.execute().mapPartitionsWithStateStore[InternalRow](
-      getStateInfo,
-      schemaForKeyRow,
-      schemaForValueRow,
-      numColsPrefixKey = 0,
-      session.sqlContext.sessionState,
-      Some(session.sqlContext.streams.stateStoreCoordinator),
-      useColumnFamilies = true
-    ) {
-      case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-        val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId)
-        assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
-        statefulProcessor.init(processorHandle, outputMode)
-        processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-        val result = processDataWithPartition(singleIterator, store, processorHandle)
-        result
+    // If the query is running in batch mode, we need to create a new StateStore and instantiate
+    // a temp directory on the executors in mapPartitions
+    if (isStreaming) {
+      child.execute().mapPartitionsWithStateStore[InternalRow](
+        getStateInfo,
+        schemaForKeyRow,
+        schemaForValueRow,
+        numColsPrefixKey = 0,
+        session.sqlContext.sessionState,
+        Some(session.sqlContext.streams.stateStoreCoordinator),
+        useColumnFamilies = true
+      ) {
+        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+          processData(store, singleIterator)
+      }
+    } else {
+      child.execute().mapPartitionsWithIndex[InternalRow](
+        (i, iter) => {
+          val providerId = new StateStoreProviderId(
+            StateStoreId(Utils.createTempDir().getAbsolutePath,
+              i, 0), getStateInfo.queryRunId)
+
+          // Create StateStoreProvider for this partition
+          val stateStoreProvider = StateStoreProvider.createAndInit(
+            providerId,
+            schemaForKeyRow,
+            schemaForValueRow,
+            numColsPrefixKey = 0,
+            useColumnFamilies = true,
+            storeConf = StateStoreConf(getDefaultStateStoreSQLConf),

Review Comment:
   Instead of trying to copy the conf here, could we do one of the following:
   
   - Create the RocksDB provider explicitly and then call init. Something like this
   
   ```
       val provider = new RocksDBStateStoreProvider()
       val sqlConf = new SQLConf()
       val storeConf = new StateStoreConf(sqlConf)
   
       provider.init(
         storeId, keySchema, valueSchema, 0, useColumnFamilies = true,
         storeConf, new Configuration)
   ```
   
   - Pass the SQL conf setting and then call createAndInit
   ```
       val sqlConf = new SQLConf()
      sqlConf.setConfString("<provider class>", "<RocksDBStateStoreProvider>")
       val storeConf = new StateStoreConf(sqlConf)
   
       StateStoreProvider.createAndInit(
         storeId, keySchema, valueSchema, 0, useColumnFamilies = true,
         storeConf, new Configuration)
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1473270336


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -152,22 +159,116 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    child.execute().mapPartitionsWithStateStore[InternalRow](
-      getStateInfo,
-      schemaForKeyRow,
-      schemaForValueRow,
-      numColsPrefixKey = 0,
-      session.sqlContext.sessionState,
-      Some(session.sqlContext.streams.stateStoreCoordinator),
-      useColumnFamilies = true
-    ) {
-      case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-        val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId)
-        assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
-        statefulProcessor.init(processorHandle, outputMode)
-        processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-        val result = processDataWithPartition(singleIterator, store, processorHandle)
-        result
+    // If the query is running in batch mode, we need to create a new StateStore and instantiate

Review Comment:
   Can we move this comment to the `else` section below ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1475138458


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -155,23 +162,109 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    child.execute().mapPartitionsWithStateStore[InternalRow](
-      getStateInfo,
-      schemaForKeyRow,
-      schemaForValueRow,
-      numColsPrefixKey = 0,
-      session.sqlContext.sessionState,
-      Some(session.sqlContext.streams.stateStoreCoordinator),
-      useColumnFamilies = true
-    ) {
-      case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-        val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId,
-          keyEncoder)
-        assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
-        statefulProcessor.init(processorHandle, outputMode)
-        processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-        val result = processDataWithPartition(singleIterator, store, processorHandle)
-        result
+    if (isStreaming) {
+      child.execute().mapPartitionsWithStateStore[InternalRow](
+        getStateInfo,
+        schemaForKeyRow,
+        schemaForValueRow,
+        numColsPrefixKey = 0,
+        session.sqlContext.sessionState,
+        Some(session.sqlContext.streams.stateStoreCoordinator),
+        useColumnFamilies = true
+      ) {
+        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+          processData(store, singleIterator)
+      }
+    } else {
+      // If the query is running in batch mode, we need to create a new StateStore and instantiate
+      // a temp directory on the executors in mapPartitionsWithIndex.
+      child.execute().mapPartitionsWithIndex[InternalRow](
+        (i, iter) => {
+          val providerId = {
+            // lazy creation to initialize tempDirPath once
+            lazy val tempDirPath = Utils.createTempDir().getAbsolutePath
+            new StateStoreProviderId(
+              StateStoreId(tempDirPath, 0, i), getStateInfo.queryRunId)
+          }
+
+          val sqlConf = new SQLConf()
+          sqlConf.setConfString(SQLConf.STATE_STORE_PROVIDER_CLASS.key,
+            classOf[RocksDBStateStoreProvider].getName)
+          val storeConf = new StateStoreConf(sqlConf)
+
+          // Create StateStoreProvider for this partition
+          val stateStoreProvider = StateStoreProvider.createAndInit(

Review Comment:
   You need to clean up provider by yourself if you are creating it by yourself.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala:
##########
@@ -723,7 +723,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
    * Strategy to convert [[TransformWithState]] logical operator to physical operator
    * in streaming plans.
    */
-  object TransformWithStateStrategy extends Strategy {
+  object TransformWithStateStreamingSrategy extends Strategy {

Review Comment:
   nit: TransformWithStateStreamingSrategy -> TransformWithStateStreamingS`t`rategy



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -60,6 +66,7 @@ case class TransformWithStateExec(
     batchTimestampMs: Option[Long],
     eventTimeWatermarkForLateEvents: Option[Long],
     eventTimeWatermarkForEviction: Option[Long],
+    isStreaming: Boolean = true,

Review Comment:
   Is this allowed? Params with default values should be put to the end.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1475214606


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -73,7 +73,7 @@ class IncrementalExecution(
       StreamingRelationStrategy ::
       StreamingDeduplicationStrategy ::
       StreamingGlobalLimitStrategy(outputMode) ::
-      TransformWithStateStrategy :: Nil
+      TransformWithStateStreamingStrategy :: Nil

Review Comment:
   maybe say `StreamingTransformWithStateStrategy` to keep it consistent with most above ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1470068681


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -171,3 +175,44 @@ case class TransformWithStateExec(
     }
   }
 }
+
+
+object TransformWithStateExec {
+
+  // Plan logical transformWithState for batch queries
+  def generateSparkPlanForBatchQueries(
+      keyDeserializer: Expression,
+      valueDeserializer: Expression,
+      groupingAttributes: Seq[Attribute],
+      dataAttributes: Seq[Attribute],
+      statefulProcessor: StatefulProcessor[Any, Any, Any],
+      timeoutMode: TimeoutMode,
+      outputMode: OutputMode,
+      outputObjAttr: Attribute,
+      child: SparkPlan): SparkPlan = {
+    val shufflePartitions = child.session.sessionState.conf.numShufflePartitions
+    val statefulOperatorStateInfo = StatefulOperatorStateInfo(
+      Utils.createTempDir().getAbsolutePath,

Review Comment:
   @HeartSaVioR - the tmp dir creation is based on the `java.io.tmpdir` setting right, which should be OS dependent ? are you saying that it's not reliable if we are running different OS types/versions across driver/executor ? 
   
   unlike FMGWS, we don't actually have a similar batch equivalent like mapGroupsExec that we can use anymore. Implementing fake state store implementation would be fair amount of work I feel (we would also have to ensure that the store supports composite types like ListState, MapState etc in the future) ?
   
   IIUC - the only reason we really need the checkpoint loc for the state store is for keeping track of the committed state across batches (and potentially also within the stateStoreId). Do you think its ok to do either of the following:
   - use a local path on the executor (either ways this is a dummy path for batch queries - so I guess should be safe to use)
   - skip using the checkpoint loc/pass a None option
   - avoid registration for the instance with the state store coordinator ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1479190035


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -155,23 +161,112 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    child.execute().mapPartitionsWithStateStore[InternalRow](
-      getStateInfo,
-      schemaForKeyRow,
-      schemaForValueRow,
-      numColsPrefixKey = 0,
-      session.sqlContext.sessionState,
-      Some(session.sqlContext.streams.stateStoreCoordinator),
-      useColumnFamilies = true
-    ) {
-      case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-        val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId,
-          keyEncoder)
-        assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
-        statefulProcessor.init(processorHandle, outputMode)
-        processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-        val result = processDataWithPartition(singleIterator, store, processorHandle)
-        result
+    if (isStreaming) {
+      child.execute().mapPartitionsWithStateStore[InternalRow](
+        getStateInfo,
+        schemaForKeyRow,
+        schemaForValueRow,
+        numColsPrefixKey = 0,
+        session.sqlContext.sessionState,
+        Some(session.sqlContext.streams.stateStoreCoordinator),
+        useColumnFamilies = true
+      ) {
+        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+          processData(store, singleIterator)
+      }
+    } else {
+      // If the query is running in batch mode, we need to create a new StateStore and instantiate
+      // a temp directory on the executors in mapPartitionsWithIndex.
+      child.execute().mapPartitionsWithIndex[InternalRow](
+        (i, iter) => {
+          val providerId = {
+            // lazy creation to initialize tempDirPath once
+            lazy val tempDirPath = Utils.createTempDir().getAbsolutePath
+            new StateStoreProviderId(
+              StateStoreId(tempDirPath, 0, i), getStateInfo.queryRunId)
+          }
+
+          val sqlConf = new SQLConf()
+          sqlConf.setConfString(SQLConf.STATE_STORE_PROVIDER_CLASS.key,
+            classOf[RocksDBStateStoreProvider].getName)
+          val storeConf = new StateStoreConf(sqlConf)
+
+          // Create StateStoreProvider for this partition
+          val stateStoreProvider = StateStoreProvider.createAndInit(
+            providerId,
+            schemaForKeyRow,
+            schemaForValueRow,
+            numColsPrefixKey = 0,
+            useColumnFamilies = true,
+            storeConf = storeConf,
+            hadoopConf = new Configuration())

Review Comment:
   We push key-value pairs from SQL conf to Hadoop Configuration; if you simply create a Hadoop Configuration via constructor, all hadoop configurations being pushed through Spark conf will be missed.
   
   ```
     // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
     private val hadoopConfBroadcast = session.sparkContext.broadcast(
       new SerializableConfiguration(session.sessionState.newHadoopConf()))
   ```
   
   This is the logic we pass over Hadoop Configuration from driver to executor, with broadcast. Note that you'll need to test this against real cluster, preferably multi-nodes. Please verify via trying to read the Spark conf value in hadoopConf you get in the code in the executor side.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -155,23 +161,112 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    child.execute().mapPartitionsWithStateStore[InternalRow](
-      getStateInfo,
-      schemaForKeyRow,
-      schemaForValueRow,
-      numColsPrefixKey = 0,
-      session.sqlContext.sessionState,
-      Some(session.sqlContext.streams.stateStoreCoordinator),
-      useColumnFamilies = true
-    ) {
-      case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-        val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId,
-          keyEncoder)
-        assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
-        statefulProcessor.init(processorHandle, outputMode)
-        processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-        val result = processDataWithPartition(singleIterator, store, processorHandle)
-        result
+    if (isStreaming) {
+      child.execute().mapPartitionsWithStateStore[InternalRow](
+        getStateInfo,
+        schemaForKeyRow,
+        schemaForValueRow,
+        numColsPrefixKey = 0,
+        session.sqlContext.sessionState,
+        Some(session.sqlContext.streams.stateStoreCoordinator),
+        useColumnFamilies = true
+      ) {
+        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+          processData(store, singleIterator)
+      }
+    } else {
+      // If the query is running in batch mode, we need to create a new StateStore and instantiate
+      // a temp directory on the executors in mapPartitionsWithIndex.
+      child.execute().mapPartitionsWithIndex[InternalRow](
+        (i, iter) => {
+          val providerId = {
+            // lazy creation to initialize tempDirPath once
+            lazy val tempDirPath = Utils.createTempDir().getAbsolutePath

Review Comment:
   Out of curiosity - did it really evaluate once? This part would be serialized from driver and deserialized from executor - I see multiple tasks being executed in the same executor, though I don't know tasks can share the singleton, even scoped within initialization of providerId.
   
   If you want to confirm with this, add logging on initialization of lazy val and execute it with real cluster. Again, the reason could be different between single JVM vs multiple JVMs.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateSuite.scala:
##########
@@ -197,6 +196,18 @@ class TransformWithStateSuite extends StateStoreMetricsTest
     }
   }
 
+  test("transformWithState - batch should succeed") {
+    val inputData = Seq("a", "a", "b")
+    val result = inputData.toDS()
+      .groupByKey(x => x)
+      .transformWithState(new RunningCountStatefulProcessor(),
+        TimeoutMode.NoTimeouts(),
+        OutputMode.Append())
+
+    val df = result.toDF()
+    checkAnswer(df, Seq(("a", "1"), ("b", "1")).toDF())

Review Comment:
   Why "a" counts to 1 rather than 2?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1480989833


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -155,23 +163,114 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    child.execute().mapPartitionsWithStateStore[InternalRow](
-      getStateInfo,
-      schemaForKeyRow,
-      schemaForValueRow,
-      numColsPrefixKey = 0,
-      session.sqlContext.sessionState,
-      Some(session.sqlContext.streams.stateStoreCoordinator),
-      useColumnFamilies = true
-    ) {
-      case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-        val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId,
-          keyEncoder)
-        assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
-        statefulProcessor.init(processorHandle, outputMode)
-        processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-        val result = processDataWithPartition(singleIterator, store, processorHandle)
-        result
+    val broadcastedHadoopConf =

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1467180436


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -171,3 +175,44 @@ case class TransformWithStateExec(
     }
   }
 }
+
+
+object TransformWithStateExec {
+
+  // Plan logical transformWithState for batch queries
+  def generateSparkPlanForBatchQueries(
+      keyDeserializer: Expression,
+      valueDeserializer: Expression,
+      groupingAttributes: Seq[Attribute],
+      dataAttributes: Seq[Attribute],
+      statefulProcessor: StatefulProcessor[Any, Any, Any],
+      timeoutMode: TimeoutMode,
+      outputMode: OutputMode,
+      outputObjAttr: Attribute,
+      child: SparkPlan): SparkPlan = {
+    val shufflePartitions = child.session.sessionState.conf.numShufflePartitions
+    val statefulOperatorStateInfo = StatefulOperatorStateInfo(
+      Utils.createTempDir().getAbsolutePath,

Review Comment:
   We can't expect this path to be available for both driver and executor. If we want to leverage temp dir, the full path should be retrieved from executor. This is likely not working with real cluster, or at least does not work with specific setup of the cluster.
   
   Also, in flatMapGroupsWithState, we just mapped the batch version of flatMapGroupsWithState to flatMapGroups. I'd guess it's no longer simple as we allow users to initialize multiple states, but would be great if we can fake state instance (or state store implementation) rather than initiating full lifecycle of state store including coordination.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "ericm-db (via GitHub)" <gi...@apache.org>.
ericm-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1473304602


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -152,22 +159,116 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    child.execute().mapPartitionsWithStateStore[InternalRow](
-      getStateInfo,
-      schemaForKeyRow,
-      schemaForValueRow,
-      numColsPrefixKey = 0,
-      session.sqlContext.sessionState,
-      Some(session.sqlContext.streams.stateStoreCoordinator),
-      useColumnFamilies = true
-    ) {
-      case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-        val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId)
-        assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
-        statefulProcessor.init(processorHandle, outputMode)
-        processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-        val result = processDataWithPartition(singleIterator, store, processorHandle)
-        result
+    // If the query is running in batch mode, we need to create a new StateStore and instantiate

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "ericm-db (via GitHub)" <gi...@apache.org>.
ericm-db commented on PR #44884:
URL: https://github.com/apache/spark/pull/44884#issuecomment-1920038483

   cc @HeartSaVioR 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "sahnib (via GitHub)" <gi...@apache.org>.
sahnib commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1473582927


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##########
@@ -67,29 +67,37 @@ class QueryInfoImpl(
  * Class that provides a concrete implementation of a StatefulProcessorHandle. Note that we keep
  * track of valid transitions as various functions are invoked to track object lifecycle.
  * @param store - instance of state store
+ * @param runId - unique id for the current run
+ * @param isStreaming - defines whether the query is streaming or batch
  */
-class StatefulProcessorHandleImpl(store: StateStore, runId: UUID)
+class StatefulProcessorHandleImpl(store: StateStore, runId: UUID, isStreaming: Boolean = true)
   extends StatefulProcessorHandle with Logging {
   import StatefulProcessorHandleState._
 
   private def buildQueryInfo(): QueryInfo = {
-    val taskCtxOpt = Option(TaskContext.get())
-    // Task context is not available in tests, so we generate a random query id and batch id here
-    val queryId = if (taskCtxOpt.isDefined) {
-      taskCtxOpt.get.getLocalProperty(StreamExecution.QUERY_ID_KEY)
-    } else {
-      assert(Utils.isTesting, "Failed to find query id in task context")
-      UUID.randomUUID().toString
-    }
 
-    val batchId = if (taskCtxOpt.isDefined) {
-      taskCtxOpt.get.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY).toLong
+    if (!isStreaming) {
+      val queryId = "00000000-0000-0000-0000-000000000000"
+      val batchId = 0L
+      new QueryInfoImpl(UUID.fromString(queryId), runId, batchId)
     } else {
-      assert(Utils.isTesting, "Failed to find batch id in task context")
-      0
+      val taskCtxOpt = Option(TaskContext.get())
+      // Task context is not available in tests, so we generate a random query id and batch id here
+      val queryId = if (taskCtxOpt.isDefined) {
+        taskCtxOpt.get.getLocalProperty(StreamExecution.QUERY_ID_KEY)
+      } else {
+        assert(Utils.isTesting, "Failed to find query id in task context")
+        UUID.randomUUID().toString
+      }
+
+      val batchId = if (taskCtxOpt.isDefined) {
+        taskCtxOpt.get.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY).toLong
+      } else {
+        assert(Utils.isTesting, "Failed to find batch id in task context")
+        0
+      }

Review Comment:
   [nit] We can simplify this to the below code. 
   
   ```
   private val BATCH_QUERY_ID = "00000000-0000-0000-0000-000000000000"
   
   .... 
   
      val (queryId, batchId) =  if (!isStreaming) {
         (BATCH_QUERY_ID, 0L)
       } else if (taskCtxOpt.isDefined) {
         (taskCtxOpt.get.getLocalProperty(StreamExecution.QUERY_ID_KEY),
           taskCtxOpt.get.getLocalProperty(MicroBatchExecution.BATCH_ID_KEY).toLong)
       } else {
         assert(Utils.isTesting, "Failed to find query id/batch Id in task context")
         (UUID.randomUUID().toString, 0)
       }
   
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala:
##########
@@ -742,6 +742,7 @@ abstract class SparkStrategies extends QueryPlanner[SparkPlan] {
           batchTimestampMs = None,
           eventTimeWatermarkForLateEvents = None,
           eventTimeWatermarkForEviction = None,
+          isStreaming = true,

Review Comment:
   [nit] Should we now rename this class to `TransformWithStateStreamingSrategy` to make clear that its only for Streaming workloads. 



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -152,22 +159,104 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    child.execute().mapPartitionsWithStateStore[InternalRow](
-      getStateInfo,
-      schemaForKeyRow,
-      schemaForValueRow,
-      numColsPrefixKey = 0,
-      session.sqlContext.sessionState,
-      Some(session.sqlContext.streams.stateStoreCoordinator),
-      useColumnFamilies = true
-    ) {
-      case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-        val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId)
-        assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
-        statefulProcessor.init(processorHandle, outputMode)
-        processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-        val result = processDataWithPartition(singleIterator, store, processorHandle)
-        result
+    if (isStreaming) {
+      child.execute().mapPartitionsWithStateStore[InternalRow](
+        getStateInfo,
+        schemaForKeyRow,
+        schemaForValueRow,
+        numColsPrefixKey = 0,
+        session.sqlContext.sessionState,
+        Some(session.sqlContext.streams.stateStoreCoordinator),
+        useColumnFamilies = true
+      ) {
+        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+          processData(store, singleIterator)
+      }
+    } else {
+      // If the query is running in batch mode, we need to create a new StateStore and instantiate
+      // a temp directory on the executors in mapPartitionsWithIndex.
+      child.execute().mapPartitionsWithIndex[InternalRow](
+        (i, iter) => {
+          val providerId = new StateStoreProviderId(
+            StateStoreId(Utils.createTempDir().getAbsolutePath,

Review Comment:
   Do we need a temporary path per operator? We can create temp directory once and reuse it. StateStore should create sub-directories inside it for operators. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "ericm-db (via GitHub)" <gi...@apache.org>.
ericm-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1475150246


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -155,23 +162,109 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    child.execute().mapPartitionsWithStateStore[InternalRow](
-      getStateInfo,
-      schemaForKeyRow,
-      schemaForValueRow,
-      numColsPrefixKey = 0,
-      session.sqlContext.sessionState,
-      Some(session.sqlContext.streams.stateStoreCoordinator),
-      useColumnFamilies = true
-    ) {
-      case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-        val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId,
-          keyEncoder)
-        assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
-        statefulProcessor.init(processorHandle, outputMode)
-        processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-        val result = processDataWithPartition(singleIterator, store, processorHandle)
-        result
+    if (isStreaming) {
+      child.execute().mapPartitionsWithStateStore[InternalRow](
+        getStateInfo,
+        schemaForKeyRow,
+        schemaForValueRow,
+        numColsPrefixKey = 0,
+        session.sqlContext.sessionState,
+        Some(session.sqlContext.streams.stateStoreCoordinator),
+        useColumnFamilies = true
+      ) {
+        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+          processData(store, singleIterator)
+      }
+    } else {
+      // If the query is running in batch mode, we need to create a new StateStore and instantiate
+      // a temp directory on the executors in mapPartitionsWithIndex.
+      child.execute().mapPartitionsWithIndex[InternalRow](
+        (i, iter) => {
+          val providerId = {
+            // lazy creation to initialize tempDirPath once
+            lazy val tempDirPath = Utils.createTempDir().getAbsolutePath
+            new StateStoreProviderId(
+              StateStoreId(tempDirPath, 0, i), getStateInfo.queryRunId)
+          }
+
+          val sqlConf = new SQLConf()
+          sqlConf.setConfString(SQLConf.STATE_STORE_PROVIDER_CLASS.key,
+            classOf[RocksDBStateStoreProvider].getName)
+          val storeConf = new StateStoreConf(sqlConf)
+
+          // Create StateStoreProvider for this partition
+          val stateStoreProvider = StateStoreProvider.createAndInit(

Review Comment:
   Where do we delete it?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "ericm-db (via GitHub)" <gi...@apache.org>.
ericm-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1475199869


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -44,23 +48,25 @@ import org.apache.spark.util.CompletionIterator
  * @param batchTimestampMs processing timestamp of the current batch.
  * @param eventTimeWatermarkForLateEvents event time watermark for filtering late events
  * @param eventTimeWatermarkForEviction event time watermark for state eviction
+ * @param isStreaming defines whether the query is streaming or batch
  * @param child the physical plan for the underlying data
  */
 case class TransformWithStateExec(
-    keyDeserializer: Expression,
-    valueDeserializer: Expression,
-    groupingAttributes: Seq[Attribute],
-    dataAttributes: Seq[Attribute],
-    statefulProcessor: StatefulProcessor[Any, Any, Any],
-    timeoutMode: TimeoutMode,
-    outputMode: OutputMode,
-    keyEncoder: ExpressionEncoder[Any],
-    outputObjAttr: Attribute,
-    stateInfo: Option[StatefulOperatorStateInfo],
-    batchTimestampMs: Option[Long],
-    eventTimeWatermarkForLateEvents: Option[Long],
-    eventTimeWatermarkForEviction: Option[Long],
-    child: SparkPlan)
+     keyDeserializer: Expression,

Review Comment:
   Was trying to follow this:
   
   > For method declarations, use 4 space indentation for their parameters and put each in each line when the parameters don't fit in two lines. Return types can be either on the same line as the last parameter, or start a new line with 2 space indent.
   
   https://github.com/databricks/scala-style-guide?tab=readme-ov-file#indent



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1470068681


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -171,3 +175,44 @@ case class TransformWithStateExec(
     }
   }
 }
+
+
+object TransformWithStateExec {
+
+  // Plan logical transformWithState for batch queries
+  def generateSparkPlanForBatchQueries(
+      keyDeserializer: Expression,
+      valueDeserializer: Expression,
+      groupingAttributes: Seq[Attribute],
+      dataAttributes: Seq[Attribute],
+      statefulProcessor: StatefulProcessor[Any, Any, Any],
+      timeoutMode: TimeoutMode,
+      outputMode: OutputMode,
+      outputObjAttr: Attribute,
+      child: SparkPlan): SparkPlan = {
+    val shufflePartitions = child.session.sessionState.conf.numShufflePartitions
+    val statefulOperatorStateInfo = StatefulOperatorStateInfo(
+      Utils.createTempDir().getAbsolutePath,

Review Comment:
   @HeartSaVioR - the tmp dir creation is based on the `java.io.tmpdir` setting right, which should be OS dependent ? are you saying that it's not reliable if we are running different OS types/versions across driver/executor ? 
   
   unlike FMGWS, we don't actually have a similar batch equivalent like mapGroupsExec that we can use anymore. Implementing fake state store implementation would be fair amount of work I feel (we would also have to ensure that the store supports composite types like ListState, MapState etc in the future) ?
   
   IIUC - the only reason we really need the checkpoint loc for the state store is for keeping track of the committed state across batches (and potentially also within the stateStoreId). Do you think its ok to do some combination of the following:
   - use a local path on the executor (either ways this is a dummy path for batch queries - so I guess should be safe to use)
   - skip using the checkpoint loc/pass a None option
   - avoid registration for the instance with the state store coordinator ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1480916290


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -155,23 +163,114 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    child.execute().mapPartitionsWithStateStore[InternalRow](
-      getStateInfo,
-      schemaForKeyRow,
-      schemaForValueRow,
-      numColsPrefixKey = 0,
-      session.sqlContext.sessionState,
-      Some(session.sqlContext.streams.stateStoreCoordinator),
-      useColumnFamilies = true
-    ) {
-      case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-        val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId,
-          keyEncoder)
-        assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
-        statefulProcessor.init(processorHandle, outputMode)
-        processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-        val result = processDataWithPartition(singleIterator, store, processorHandle)
-        result
+    val broadcastedHadoopConf =

Review Comment:
   This is not needed for streaming - let's do this just before L184.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1480916745


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -155,23 +163,114 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    child.execute().mapPartitionsWithStateStore[InternalRow](
-      getStateInfo,
-      schemaForKeyRow,
-      schemaForValueRow,
-      numColsPrefixKey = 0,
-      session.sqlContext.sessionState,
-      Some(session.sqlContext.streams.stateStoreCoordinator),
-      useColumnFamilies = true
-    ) {
-      case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-        val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId,
-          keyEncoder)
-        assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
-        statefulProcessor.init(processorHandle, outputMode)
-        processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-        val result = processDataWithPartition(singleIterator, store, processorHandle)
-        result
+    val broadcastedHadoopConf =
+      new SerializableConfiguration(session.sessionState.newHadoopConf())
+    if (isStreaming) {
+      child.execute().mapPartitionsWithStateStore[InternalRow](
+        getStateInfo,
+        schemaForKeyRow,
+        schemaForValueRow,
+        numColsPrefixKey = 0,
+        session.sqlContext.sessionState,
+        Some(session.sqlContext.streams.stateStoreCoordinator),
+        useColumnFamilies = true
+      ) {
+        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+          processData(store, singleIterator)
+      }
+    } else {
+      // If the query is running in batch mode, we need to create a new StateStore and instantiate
+      // a temp directory on the executors in mapPartitionsWithIndex.
+      child.execute().mapPartitionsWithIndex[InternalRow](
+        (i, iter) => {
+          val providerId = {
+            // lazy creation to initialize tempDirPath once

Review Comment:
   nit: remove the comment to be in sync



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1466826107


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -152,6 +154,7 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
+    // populate stateInfo if this is a streaming query

Review Comment:
   Should we remove this since its populated elsewhere ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1467180436


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -171,3 +175,44 @@ case class TransformWithStateExec(
     }
   }
 }
+
+
+object TransformWithStateExec {
+
+  // Plan logical transformWithState for batch queries
+  def generateSparkPlanForBatchQueries(
+      keyDeserializer: Expression,
+      valueDeserializer: Expression,
+      groupingAttributes: Seq[Attribute],
+      dataAttributes: Seq[Attribute],
+      statefulProcessor: StatefulProcessor[Any, Any, Any],
+      timeoutMode: TimeoutMode,
+      outputMode: OutputMode,
+      outputObjAttr: Attribute,
+      child: SparkPlan): SparkPlan = {
+    val shufflePartitions = child.session.sessionState.conf.numShufflePartitions
+    val statefulOperatorStateInfo = StatefulOperatorStateInfo(
+      Utils.createTempDir().getAbsolutePath,

Review Comment:
   We can't expect the path to exist for both driver and executor. If we want to leverage temp dir, the full path should be retrieved from executor. This is likely not working with real cluster, or at least does not work with specific setup of the cluster.
   
   Also, in flatMapGroupsWithState, we just mapped the batch version of flatMapGroupsWithState to flatMapGroups. I'd guess it's no longer simple as we allow users to initialize multiple states, but would be great if we can fake state instance (or state store implementation) rather than initiating full lifecycle of state store including coordination.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1473313760


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -152,22 +159,116 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    child.execute().mapPartitionsWithStateStore[InternalRow](
-      getStateInfo,
-      schemaForKeyRow,
-      schemaForValueRow,
-      numColsPrefixKey = 0,
-      session.sqlContext.sessionState,
-      Some(session.sqlContext.streams.stateStoreCoordinator),
-      useColumnFamilies = true
-    ) {
-      case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-        val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId)
-        assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
-        statefulProcessor.init(processorHandle, outputMode)
-        processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-        val result = processDataWithPartition(singleIterator, store, processorHandle)
-        result
+    if (isStreaming) {
+      child.execute().mapPartitionsWithStateStore[InternalRow](
+        getStateInfo,
+        schemaForKeyRow,
+        schemaForValueRow,
+        numColsPrefixKey = 0,
+        session.sqlContext.sessionState,
+        Some(session.sqlContext.streams.stateStoreCoordinator),
+        useColumnFamilies = true
+      ) {
+        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+          processData(store, singleIterator)
+      }
+    } else {
+      // If the query is running in batch mode, we need to create a new StateStore and instantiate
+      // a temp directory on the executors in mapPartitionsWithIndex.
+      child.execute().mapPartitionsWithIndex[InternalRow](
+        (i, iter) => {
+          val providerId = new StateStoreProviderId(
+            StateStoreId(Utils.createTempDir().getAbsolutePath,
+              i, 0), getStateInfo.queryRunId)
+
+          // Create StateStoreProvider for this partition
+          val stateStoreProvider = StateStoreProvider.createAndInit(
+            providerId,
+            schemaForKeyRow,
+            schemaForValueRow,
+            numColsPrefixKey = 0,
+            useColumnFamilies = true,
+            storeConf = StateStoreConf(getDefaultStateStoreSQLConf),
+            hadoopConf = new Configuration())
+
+          val store = stateStoreProvider.getStore(0)
+          processData(store, iter)
+        }
+      )
     }
   }
+
+  /**
+   * Process the data in the partition using the state store and the stateful processor.
+   * @param store The state store to use
+   * @param singleIterator The iterator of rows to process
+   * @return An iterator of rows that are the result of processing the input rows
+   */
+  private def processData(store: StateStore, singleIterator: Iterator[InternalRow]):
+    CompletionIterator[InternalRow, Iterator[InternalRow]] = {
+    val processorHandle = new StatefulProcessorHandleImpl(
+      store, getStateInfo.queryRunId, isStreaming)
+    assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
+    statefulProcessor.init(processorHandle, outputMode)
+    processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
+    val result = processDataWithPartition(singleIterator, store, processorHandle)
+    result
+  }
+
+  /**
+   * Set the default SQLConf values for the State Store.
+   * This is used for the batch operator, since these SQLConfs are not
+   * automatically populated for batch queries.
+   * @return SQLConf with default values with RocksDBStateStoreProvider
+   */
+  private def getDefaultStateStoreSQLConf: SQLConf = {
+      val sqlConf = new SQLConf()
+      StateStoreConf.sqlConfKeys.foreach {
+        case conf@SQLConf.STATE_STORE_PROVIDER_CLASS =>
+          sqlConf.setConfString(conf.key, classOf[RocksDBStateStoreProvider].getName)
+        case conf => sqlConf.setConfString(conf.key, conf.defaultValueString)
+      }
+      sqlConf
+  }
+}
+
+

Review Comment:
   nit: extra newline ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "ericm-db (via GitHub)" <gi...@apache.org>.
ericm-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1475211689


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -155,23 +162,109 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    child.execute().mapPartitionsWithStateStore[InternalRow](
-      getStateInfo,
-      schemaForKeyRow,
-      schemaForValueRow,
-      numColsPrefixKey = 0,
-      session.sqlContext.sessionState,
-      Some(session.sqlContext.streams.stateStoreCoordinator),
-      useColumnFamilies = true
-    ) {
-      case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-        val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId,
-          keyEncoder)
-        assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
-        statefulProcessor.init(processorHandle, outputMode)
-        processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-        val result = processDataWithPartition(singleIterator, store, processorHandle)
-        result
+    if (isStreaming) {
+      child.execute().mapPartitionsWithStateStore[InternalRow](
+        getStateInfo,
+        schemaForKeyRow,
+        schemaForValueRow,
+        numColsPrefixKey = 0,
+        session.sqlContext.sessionState,
+        Some(session.sqlContext.streams.stateStoreCoordinator),
+        useColumnFamilies = true
+      ) {
+        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+          processData(store, singleIterator)
+      }
+    } else {
+      // If the query is running in batch mode, we need to create a new StateStore and instantiate
+      // a temp directory on the executors in mapPartitionsWithIndex.
+      child.execute().mapPartitionsWithIndex[InternalRow](
+        (i, iter) => {
+          val providerId = {
+            // lazy creation to initialize tempDirPath once
+            lazy val tempDirPath = Utils.createTempDir().getAbsolutePath
+            new StateStoreProviderId(
+              StateStoreId(tempDirPath, 0, i), getStateInfo.queryRunId)
+          }
+
+          val sqlConf = new SQLConf()
+          sqlConf.setConfString(SQLConf.STATE_STORE_PROVIDER_CLASS.key,
+            classOf[RocksDBStateStoreProvider].getName)
+          val storeConf = new StateStoreConf(sqlConf)
+
+          // Create StateStoreProvider for this partition
+          val stateStoreProvider = StateStoreProvider.createAndInit(

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1473467963


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -171,3 +175,44 @@ case class TransformWithStateExec(
     }
   }
 }
+
+
+object TransformWithStateExec {
+
+  // Plan logical transformWithState for batch queries
+  def generateSparkPlanForBatchQueries(
+      keyDeserializer: Expression,
+      valueDeserializer: Expression,
+      groupingAttributes: Seq[Attribute],
+      dataAttributes: Seq[Attribute],
+      statefulProcessor: StatefulProcessor[Any, Any, Any],
+      timeoutMode: TimeoutMode,
+      outputMode: OutputMode,
+      outputObjAttr: Attribute,
+      child: SparkPlan): SparkPlan = {
+    val shufflePartitions = child.session.sessionState.conf.numShufflePartitions
+    val statefulOperatorStateInfo = StatefulOperatorStateInfo(
+      Utils.createTempDir().getAbsolutePath,

Review Comment:
   > Do we see a usecase where the user would want to read the State files (from DFS) post query?
   
   It could be, but let's keep the common UX for batch query.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "ericm-db (via GitHub)" <gi...@apache.org>.
ericm-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1480908781


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -155,23 +161,112 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    child.execute().mapPartitionsWithStateStore[InternalRow](
-      getStateInfo,
-      schemaForKeyRow,
-      schemaForValueRow,
-      numColsPrefixKey = 0,
-      session.sqlContext.sessionState,
-      Some(session.sqlContext.streams.stateStoreCoordinator),
-      useColumnFamilies = true
-    ) {
-      case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-        val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId,
-          keyEncoder)
-        assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
-        statefulProcessor.init(processorHandle, outputMode)
-        processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-        val result = processDataWithPartition(singleIterator, store, processorHandle)
-        result
+    if (isStreaming) {
+      child.execute().mapPartitionsWithStateStore[InternalRow](
+        getStateInfo,
+        schemaForKeyRow,
+        schemaForValueRow,
+        numColsPrefixKey = 0,
+        session.sqlContext.sessionState,
+        Some(session.sqlContext.streams.stateStoreCoordinator),
+        useColumnFamilies = true
+      ) {
+        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+          processData(store, singleIterator)
+      }
+    } else {
+      // If the query is running in batch mode, we need to create a new StateStore and instantiate
+      // a temp directory on the executors in mapPartitionsWithIndex.
+      child.execute().mapPartitionsWithIndex[InternalRow](
+        (i, iter) => {
+          val providerId = {
+            // lazy creation to initialize tempDirPath once
+            lazy val tempDirPath = Utils.createTempDir().getAbsolutePath

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "ericm-db (via GitHub)" <gi...@apache.org>.
ericm-db commented on PR #44884:
URL: https://github.com/apache/spark/pull/44884#issuecomment-1931304594

   cc @HeartSaVioR 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "ericm-db (via GitHub)" <gi...@apache.org>.
ericm-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1480608319


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -155,23 +161,112 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    child.execute().mapPartitionsWithStateStore[InternalRow](
-      getStateInfo,
-      schemaForKeyRow,
-      schemaForValueRow,
-      numColsPrefixKey = 0,
-      session.sqlContext.sessionState,
-      Some(session.sqlContext.streams.stateStoreCoordinator),
-      useColumnFamilies = true
-    ) {
-      case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-        val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId,
-          keyEncoder)
-        assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
-        statefulProcessor.init(processorHandle, outputMode)
-        processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-        val result = processDataWithPartition(singleIterator, store, processorHandle)
-        result
+    if (isStreaming) {
+      child.execute().mapPartitionsWithStateStore[InternalRow](
+        getStateInfo,
+        schemaForKeyRow,
+        schemaForValueRow,
+        numColsPrefixKey = 0,
+        session.sqlContext.sessionState,
+        Some(session.sqlContext.streams.stateStoreCoordinator),
+        useColumnFamilies = true
+      ) {
+        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+          processData(store, singleIterator)
+      }
+    } else {
+      // If the query is running in batch mode, we need to create a new StateStore and instantiate
+      // a temp directory on the executors in mapPartitionsWithIndex.
+      child.execute().mapPartitionsWithIndex[InternalRow](
+        (i, iter) => {
+          val providerId = {
+            // lazy creation to initialize tempDirPath once
+            lazy val tempDirPath = Utils.createTempDir().getAbsolutePath

Review Comment:
   I thought it was good enough as long as it didn't initialize per partition, and once per executor was okay



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "ericm-db (via GitHub)" <gi...@apache.org>.
ericm-db commented on PR #44884:
URL: https://github.com/apache/spark/pull/44884#issuecomment-1932703827

   Addressed feedback, cc @HeartSaVioR 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "ericm-db (via GitHub)" <gi...@apache.org>.
ericm-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1475204518


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -44,23 +48,25 @@ import org.apache.spark.util.CompletionIterator
  * @param batchTimestampMs processing timestamp of the current batch.
  * @param eventTimeWatermarkForLateEvents event time watermark for filtering late events
  * @param eventTimeWatermarkForEviction event time watermark for state eviction
+ * @param isStreaming defines whether the query is streaming or batch
  * @param child the physical plan for the underlying data
  */
 case class TransformWithStateExec(
-    keyDeserializer: Expression,
-    valueDeserializer: Expression,
-    groupingAttributes: Seq[Attribute],
-    dataAttributes: Seq[Attribute],
-    statefulProcessor: StatefulProcessor[Any, Any, Any],
-    timeoutMode: TimeoutMode,
-    outputMode: OutputMode,
-    keyEncoder: ExpressionEncoder[Any],
-    outputObjAttr: Attribute,
-    stateInfo: Option[StatefulOperatorStateInfo],
-    batchTimestampMs: Option[Long],
-    eventTimeWatermarkForLateEvents: Option[Long],
-    eventTimeWatermarkForEviction: Option[Long],
-    child: SparkPlan)
+     keyDeserializer: Expression,

Review Comment:
   Didn't realize I had deviated from this, thanks for catching it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1470538842


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -171,3 +175,44 @@ case class TransformWithStateExec(
     }
   }
 }
+
+
+object TransformWithStateExec {
+
+  // Plan logical transformWithState for batch queries
+  def generateSparkPlanForBatchQueries(
+      keyDeserializer: Expression,
+      valueDeserializer: Expression,
+      groupingAttributes: Seq[Attribute],
+      dataAttributes: Seq[Attribute],
+      statefulProcessor: StatefulProcessor[Any, Any, Any],
+      timeoutMode: TimeoutMode,
+      outputMode: OutputMode,
+      outputObjAttr: Attribute,
+      child: SparkPlan): SparkPlan = {
+    val shufflePartitions = child.session.sessionState.conf.numShufflePartitions
+    val statefulOperatorStateInfo = StatefulOperatorStateInfo(
+      Utils.createTempDir().getAbsolutePath,

Review Comment:
   The code should work with any resource management (standalone, YARN, K8S, etc) with any custom configuration performed by operators/cloud vendors. Also you can override the value on executing JVM process (`-Djava.io.tmpdir`). It is too naive to consider the two nodes for driver and executor to be set up as exactly same.
   
   Maybe I wasn't clear - my ask was mostly about faking state instance (not faking state store implementation - that's mostly rewriting HDFS backed state store provider and we should just use that provider), like abstracting the layer like what we did for FMGWS (GroupStateImpl) and enabling us to provide in-memory value for the state. Yes that may require us to create such fake instance per state type, but it won't be hard as Java/Scala have native types for composite types we are going to support.
   
   If it's non-trivial to inject a layer, I'm OK with spinning up RocksDB state store provider and leveraging it. Pretty sure, it is even better if we can avoid registration with state store coordinator.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR closed pull request #44884: [SPARK-46865][SS] Add Batch Support for TransformWithState Operator
URL: https://github.com/apache/spark/pull/44884


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1466826476


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -171,3 +175,44 @@ case class TransformWithStateExec(
     }
   }
 }
+
+
+object TransformWithStateExec {
+
+  // Plan logical transformWithState for batch queries
+  def generateSparkPlanForBatchQueries(
+                                        keyDeserializer: Expression,

Review Comment:
   Indent needs to be fixed ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1466949417


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -57,6 +58,7 @@ case class TransformWithStateExec(
     batchTimestampMs: Option[Long],
     eventTimeWatermarkForLateEvents: Option[Long],
     eventTimeWatermarkForEviction: Option[Long],
+    isStreaming: Boolean = true,

Review Comment:
   Can you add the param to the case class comment above too ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1467180436


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -171,3 +175,44 @@ case class TransformWithStateExec(
     }
   }
 }
+
+
+object TransformWithStateExec {
+
+  // Plan logical transformWithState for batch queries
+  def generateSparkPlanForBatchQueries(
+      keyDeserializer: Expression,
+      valueDeserializer: Expression,
+      groupingAttributes: Seq[Attribute],
+      dataAttributes: Seq[Attribute],
+      statefulProcessor: StatefulProcessor[Any, Any, Any],
+      timeoutMode: TimeoutMode,
+      outputMode: OutputMode,
+      outputObjAttr: Attribute,
+      child: SparkPlan): SparkPlan = {
+    val shufflePartitions = child.session.sessionState.conf.numShufflePartitions
+    val statefulOperatorStateInfo = StatefulOperatorStateInfo(
+      Utils.createTempDir().getAbsolutePath,

Review Comment:
   We can't expect this path to exist for both driver and executor. If we want to leverage temp dir, the full path should be retrieved from executor. This is likely not working with real cluster, or at least does not work with specific setup of the cluster.
   
   Also, in flatMapGroupsWithState, we just mapped the batch version of flatMapGroupsWithState to flatMapGroups. I'd guess it's no longer simple as we allow users to initialize multiple states, but would be great if we can fake state instance (or state store implementation) rather than initiating full lifecycle of state store including coordination.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "ericm-db (via GitHub)" <gi...@apache.org>.
ericm-db commented on PR #44884:
URL: https://github.com/apache/spark/pull/44884#issuecomment-1922469723

   cc @HeartSaVioR 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "ericm-db (via GitHub)" <gi...@apache.org>.
ericm-db commented on PR #44884:
URL: https://github.com/apache/spark/pull/44884#issuecomment-1920038295

   cc @HeartSaVioR 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-46865][SS] Add Batch Support for TransformWithState Operator [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #44884:
URL: https://github.com/apache/spark/pull/44884#discussion_r1473701291


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -152,22 +159,104 @@ case class TransformWithStateExec(
   override protected def doExecute(): RDD[InternalRow] = {
     metrics // force lazy init at driver
 
-    child.execute().mapPartitionsWithStateStore[InternalRow](
-      getStateInfo,
-      schemaForKeyRow,
-      schemaForValueRow,
-      numColsPrefixKey = 0,
-      session.sqlContext.sessionState,
-      Some(session.sqlContext.streams.stateStoreCoordinator),
-      useColumnFamilies = true
-    ) {
-      case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-        val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId)
-        assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
-        statefulProcessor.init(processorHandle, outputMode)
-        processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
-        val result = processDataWithPartition(singleIterator, store, processorHandle)
-        result
+    if (isStreaming) {
+      child.execute().mapPartitionsWithStateStore[InternalRow](
+        getStateInfo,
+        schemaForKeyRow,
+        schemaForValueRow,
+        numColsPrefixKey = 0,
+        session.sqlContext.sessionState,
+        Some(session.sqlContext.streams.stateStoreCoordinator),
+        useColumnFamilies = true
+      ) {
+        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+          processData(store, singleIterator)
+      }
+    } else {
+      // If the query is running in batch mode, we need to create a new StateStore and instantiate
+      // a temp directory on the executors in mapPartitionsWithIndex.
+      child.execute().mapPartitionsWithIndex[InternalRow](
+        (i, iter) => {
+          val providerId = new StateStoreProviderId(
+            StateStoreId(Utils.createTempDir().getAbsolutePath,

Review Comment:
   Yea in this case - the tmp dir creation has to be within the `mapPartitions` block. @ericm-db - did you try passing an empty checkpoint loc ? does that work ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org