You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "jingz-db (via GitHub)" <gi...@apache.org> on 2024/03/11 21:21:23 UTC

[PR] WIP [spark]

jingz-db opened a new pull request, #45467:
URL: https://github.com/apache/spark/pull/45467

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'common/utils/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   <!--
   If generative AI tooling has been used in the process of authoring this patch, please include the
   phrase: 'Generated-by: ' followed by the name of the tool and its version.
   If no, write 'No'.
   Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
   -->
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "jingz-db (via GitHub)" <gi...@apache.org>.
jingz-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1530925161


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.{Encoders, KeyValueGroupedDataset}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, RocksDBStateStoreProvider, StateStoreMultipleColumnFamiliesNotSupportedException}
+import org.apache.spark.sql.internal.SQLConf
+
+case class InitInputRow(key: String, action: String, value: Double)
+case class InputRowForInitialState(
+    value: Double, entries: List[Double], mapping: Map[Double, Int])
+
+abstract class StatefulProcessorWithInitialStateTestClass[V]
+    extends StatefulProcessorWithInitialState[
+        String, InitInputRow, (String, String, Double), (String, V)] {
+  @transient var _valState: ValueState[Double] = _
+  @transient var _listState: ListState[Double] = _
+  @transient var _mapState: MapState[Double, Int] = _
+
+  override def init(outputMode: OutputMode, timeoutMode: TimeoutMode): Unit = {
+    _valState = getHandle.getValueState[Double]("testValueInit", Encoders.scalaDouble)
+    _listState = getHandle.getListState[Double]("testListInit", Encoders.scalaDouble)
+    _mapState = getHandle.getMapState[Double, Int](
+      "testMapInit", Encoders.scalaDouble, Encoders.scalaInt)
+  }
+
+  override def close(): Unit = {}
+
+  override def handleInputRows(
+      key: String,
+      inputRows: Iterator[InitInputRow],
+      timerValues: TimerValues,
+      expiredTimerInfo: ExpiredTimerInfo): Iterator[(String, String, Double)] = {
+    var output = List[(String, String, Double)]()
+    for (row <- inputRows) {
+      if (row.action == "getOption") {
+        output = (key, row.action, _valState.getOption().getOrElse(-1.0)) :: output
+      } else if (row.action == "update") {
+        _valState.update(row.value)
+      } else if (row.action == "remove") {
+        _valState.clear()
+      } else if (row.action == "getList") {
+        _listState.get().foreach { element =>
+          output = (key, row.action, element) :: output
+        }
+      } else if (row.action == "appendList") {
+        _listState.appendValue(row.value)
+      } else if (row.action == "clearList") {
+        _listState.clear()
+      } else if (row.action == "getCount") {
+        val count =
+          if (!_mapState.containsKey(row.value)) 0
+          else _mapState.getValue(row.value)
+        output = (key, row.action, count.toDouble) :: output
+      } else if (row.action == "incCount") {
+        val count =
+          if (!_mapState.containsKey(row.value)) 0
+          else _mapState.getValue(row.value)
+        _mapState.updateValue(row.value, count + 1)
+      } else if (row.action == "clearCount") {
+        _mapState.removeKey(row.value)
+      }
+    }
+    output.iterator
+  }
+}
+
+class AccumulateStatefulProcessorWithInitState
+    extends StatefulProcessorWithInitialStateTestClass[Double] {
+  override def handleInitialState(
+      key: String,
+      initialState: (String, Double)): Unit = {
+    _valState.update(initialState._2)
+  }
+
+  override def handleInputRows(
+      key: String,
+      inputRows: Iterator[InitInputRow],
+      timerValues: TimerValues,
+      expiredTimerInfo: ExpiredTimerInfo): Iterator[(String, String, Double)] = {
+    var output = List[(String, String, Double)]()
+    for (row <- inputRows) {
+      if (row.action == "getOption") {
+        output = (key, row.action, _valState.getOption().getOrElse(0.0)) :: output
+      } else if (row.action == "add") {
+        // Update state variable as accumulative sum
+        val accumulateSum = _valState.getOption().getOrElse(0.0) + row.value
+        _valState.update(accumulateSum)
+      } else if (row.action == "remove") {
+        _valState.clear()
+      }
+    }
+    output.iterator
+  }
+}
+
+class InitialStateInMemoryTestClass
+  extends StatefulProcessorWithInitialStateTestClass[InputRowForInitialState] {
+  override def handleInitialState(
+      key: String,
+      initialState: (String, InputRowForInitialState)): Unit = {
+    _valState.update(initialState._2.value)
+    _listState.appendList(initialState._2.entries.toArray)
+    val inMemoryMap = initialState._2.mapping
+    inMemoryMap.foreach { kvPair =>
+      _mapState.updateValue(kvPair._1, kvPair._2)
+    }
+  }
+}
+
+/**
+ * Class that adds tests for transformWithState stateful
+ * streaming operator with user-defined initial state
+ */
+class TransformWithStateInitialStateSuite extends StateStoreMetricsTest
+  with AlsoTestWithChangelogCheckpointingEnabled {
+
+  import testImplicits._
+
+  private def createInitialDfForTest: KeyValueGroupedDataset[String, (String, Double)] = {
+    Seq(("init_1", 40.0), ("init_2", 100.0)).toDS()
+      .groupByKey(x => x._1)
+      .mapValues(x => x)
+  }
+
+
+  test("transformWithStateWithInitialState - correctness test, " +
+    "run with multiple state variables - in-memory type") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName) {
+
+      val inputData = MemoryStream[InitInputRow]
+      val kvDataSet = inputData.toDS()
+        .groupByKey(x => x.key)
+      val initStateDf =
+        Seq(("init_1", InputRowForInitialState(40.0, List(40.0), Map(40.0 -> 1))),
+          ("init_2", InputRowForInitialState(100.0, List(100.0), Map(100.0 -> 1))))
+          .toDS().groupByKey(x => x._1).mapValues(x => x)
+      val query = kvDataSet.transformWithState(new InitialStateInMemoryTestClass(),
+            TimeoutMode.NoTimeouts (), OutputMode.Append (), initStateDf)
+
+      testStream(query, OutputMode.Update())(
+        // non-exist key test
+        AddData(inputData, InitInputRow("k1", "update", 37.0)),
+        AddData(inputData, InitInputRow("k2", "update", 40.0)),
+        AddData(inputData, InitInputRow("non-exist", "getOption", -1.0)),
+        CheckNewAnswer(("non-exist", "getOption", -1.0)),
+        AddData(inputData, InitInputRow("k1", "appendList", 37.0)),
+        AddData(inputData, InitInputRow("k2", "appendList", 40.0)),
+        AddData(inputData, InitInputRow("non-exist", "getList", -1.0)),
+        CheckNewAnswer(),
+
+        AddData(inputData, InitInputRow("k1", "incCount", 37.0)),
+        AddData(inputData, InitInputRow("k2", "incCount", 40.0)),
+        AddData(inputData, InitInputRow("non-exist", "getCount", -1.0)),
+        CheckNewAnswer(("non-exist", "getCount", 0.0)),
+        AddData(inputData, InitInputRow("k2", "incCount", 40.0)),
+        AddData(inputData, InitInputRow("k2", "getCount", 40.0)),
+        CheckNewAnswer(("k2", "getCount", 2.0)),
+
+        // test every row in initial State is processed
+        AddData(inputData, InitInputRow("init_1", "getOption", -1.0)),
+        CheckNewAnswer(("init_1", "getOption", 40.0)),
+        AddData(inputData, InitInputRow("init_2", "getOption", -1.0)),
+        CheckNewAnswer(("init_2", "getOption", 100.0)),
+
+        AddData(inputData, InitInputRow("init_1", "getList", -1.0)),
+        CheckNewAnswer(("init_1", "getList", 40.0)),
+        AddData(inputData, InitInputRow("init_2", "getList", -1.0)),
+        CheckNewAnswer(("init_2", "getList", 100.0)),
+
+        AddData(inputData, InitInputRow("init_1", "getCount", 40.0)),
+        CheckNewAnswer(("init_1", "getCount", 1.0)),
+        AddData(inputData, InitInputRow("init_2", "getCount", 100.0)),
+        CheckNewAnswer(("init_2", "getCount", 1.0)),
+
+        // Update row with key in initial row will work
+        AddData(inputData, InitInputRow("init_1", "update", 50.0)),
+        AddData(inputData, InitInputRow("init_1", "getOption", -1.0)),
+        CheckNewAnswer(("init_1", "getOption", 50.0)),
+        AddData(inputData, InitInputRow("init_1", "remove", -1.0)),
+        AddData(inputData, InitInputRow("init_1", "getOption", -1.0)),
+        CheckNewAnswer(("init_1", "getOption", -1.0)),
+
+        AddData(inputData, InitInputRow("init_1", "appendList", 50.0)),
+        AddData(inputData, InitInputRow("init_1", "getList", -1.0)),
+        CheckNewAnswer(("init_1", "getList", 50.0), ("init_1", "getList", 40.0)),
+
+        AddData(inputData, InitInputRow("init_1", "incCount", 40.0)),
+        AddData(inputData, InitInputRow("init_1", "getCount", 40.0)),
+        CheckNewAnswer(("init_1", "getCount", 2.0)),
+
+        // test remove
+        AddData(inputData, InitInputRow("k1", "remove", -1.0)),
+        AddData(inputData, InitInputRow("k1", "getOption", -1.0)),
+        CheckNewAnswer(("k1", "getOption", -1.0)),
+
+        AddData(inputData, InitInputRow("init_1", "clearCount", -1.0)),
+        AddData(inputData, InitInputRow("init_1", "getCount", -1.0)),
+        CheckNewAnswer(("init_1", "getCount", 0.0)),
+
+        AddData(inputData, InitInputRow("init_1", "clearList", -1.0)),
+        AddData(inputData, InitInputRow("init_1", "getList", -1.0)),
+        CheckNewAnswer()
+      )
+    }
+  }
+
+
+  test("transformWithStateWithInitialState -" +
+    " correctness test, processInitialState should only run once") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName) {
+      val initStateDf = createInitialDfForTest
+      val inputData = MemoryStream[InitInputRow]
+      val query = inputData.toDS()
+        .groupByKey(x => x.key)
+        .transformWithState(new AccumulateStatefulProcessorWithInitState(),
+          TimeoutMode.NoTimeouts(), OutputMode.Append(), initStateDf
+        )
+      testStream(query, OutputMode.Update())(
+        AddData(inputData, InitInputRow("init_1", "add", 50.0)),
+        AddData(inputData, InitInputRow("init_2", "add", 60.0)),
+        AddData(inputData, InitInputRow("init_1", "add", 50.0)),
+        // If processInitialState was processed multiple times,
+        // following checks will fail
+        AddData(inputData,
+          InitInputRow("init_1", "getOption", -1.0), InitInputRow("init_2", "getOption", -1.0)),
+        CheckNewAnswer(("init_2", "getOption", 160.0), ("init_1", "getOption", 140.0))
+      )
+    }
+  }
+
+  test("transformWithStateWithInitialState - batch should succeed") {
+    val inputData = Seq(InitInputRow("k1", "add", 37.0), InitInputRow("k1", "getOption", -1.0))
+    val result = inputData.toDS()
+      .groupByKey(x => x.key)
+      .transformWithState(new AccumulateStatefulProcessorWithInitState(),
+        TimeoutMode.NoTimeouts(),
+        OutputMode.Append(),
+        createInitialDfForTest)
+
+    val df = result.toDF()
+    checkAnswer(df, Seq(("k1", "getOption", 37.0)).toDF())
+  }
+}
+
+class TransformWithStateInitialStateValidationSuite extends StateStoreMetricsTest {

Review Comment:
   Moved to `TransformWithStateSuite`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1533102287


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -271,57 +320,111 @@ case class TransformWithStateExec(
       case _ =>
     }
 
-    if (isStreaming) {
-      child.execute().mapPartitionsWithStateStore[InternalRow](
+    if (hasInitialState) {
+      val storeConf = new StateStoreConf(session.sqlContext.sessionState.conf)
+      val hadoopConfBroadcast = sparkContext.broadcast(

Review Comment:
   Why do we need to do this ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1530889290


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -341,8 +475,67 @@ case class TransformWithStateExec(
     processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
     processDataWithPartition(singleIterator, store, processorHandle)
   }
+
+  private def processDataWithInitialState(
+      store: StateStore,
+      childDataIterator: Iterator[InternalRow],
+      initStateIterator: Iterator[InternalRow]):
+    CompletionIterator[InternalRow, Iterator[InternalRow]] = {
+    val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId,
+      keyEncoder, timeoutMode, isStreaming)
+    assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
+    statefulProcessor.setHandle(processorHandle)
+    statefulProcessor.init(outputMode, timeoutMode)
+    processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
+
+    // Check if is first batch
+    // Only process initial states for first batch
+    if (processorHandle.getQueryInfo().getBatchId == 0) {
+      // If the user provided initial state, we need to have the initial state and the
+      // data in the same partition so that we can still have just one commit at the end.
+      val groupedInitialStateIter =
+        GroupedIterator(initStateIterator,
+          initialStateGroupingAttrs, initialState.output)
+      groupedInitialStateIter.foreach {
+        case (keyRow, valueRowIter) =>
+          processInitialStateRows(keyRow.asInstanceOf[UnsafeRow],
+            valueRowIter)
+      }
+    }
+
+    processDataWithPartition(childDataIterator, store, processorHandle, Option(initStateIterator))
+  }
+
+  /** This class zip two RDDs together into the same partition, and returns partition id */
+  class ZipPartitionsWithIndexRDD[A: ClassTag, B: ClassTag, V: ClassTag](

Review Comment:
   Could we reuse `stateStoreAwareZipPartitions` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1523675926


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.{Encoders, KeyValueGroupedDataset}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, RocksDBStateStoreProvider, StateStoreMultipleColumnFamiliesNotSupportedException}
+import org.apache.spark.sql.internal.SQLConf
+
+case class InitInputRow(key: String, action: String, value: Double)
+
+class StatefulProcessorWithInitialStateTestClass extends StatefulProcessorWithInitialState[
+    String, InitInputRow, (String, String, Double), (String, Double)] {
+  @transient var _valState: ValueState[Double] = _
+  @transient var _listState: ListState[Double] = _
+  @transient var _mapState: MapState[Double, Int] = _
+
+  override def handleInitialState(
+      key: String,
+      initialState: (String, Double)): Unit = {
+    val initStateVal = initialState._2
+    _valState.update(initStateVal)

Review Comment:
   Can we simulate an actual case class for initial state that stores list/map and/or iterator for list values/iterator for map key-values ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "jingz-db (via GitHub)" <gi...@apache.org>.
jingz-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1531068752


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -127,13 +152,53 @@ case class TransformWithStateExec(
     mappedIterator
   }
 
+  private def processInitialStateRows(
+      keyRow: UnsafeRow,
+      initStateIter: Iterator[InternalRow]): Unit = {
+    val getKeyObj =
+      ObjectOperator.deserializeRowToObject(keyDeserializer, groupingAttributes)
+
+    val getStateValueObj =
+      ObjectOperator.deserializeRowToObject(initialStateDeserializer, initialStateDataAttrs)
+
+    val keyObj = getKeyObj(keyRow) // convert key to objects
+    ImplicitGroupingKeyTracker.setImplicitKey(keyObj)
+    val initStateObjIter = initStateIter.map(getStateValueObj.apply)
+
+    initStateObjIter.foreach { initState =>
+      statefulProcessor
+        .asInstanceOf[StatefulProcessorWithInitialState[Any, Any, Any, Any]]
+        .handleInitialState(keyObj, initState)
+    }
+    ImplicitGroupingKeyTracker.removeImplicitKey()
+  }
+
   private def processNewData(dataIter: Iterator[InternalRow]): Iterator[InternalRow] = {
     val groupedIter = GroupedIterator(dataIter, groupingAttributes, child.output)
     groupedIter.flatMap { case (keyRow, valueRowIter) =>
       val keyUnsafeRow = keyRow.asInstanceOf[UnsafeRow]
       handleInputRows(keyUnsafeRow, valueRowIter)
     }
   }
+// TODO double check this
+  private def processNewDataWithInitialState(
+      dataIter: Iterator[InternalRow],
+      initStateIter: Iterator[InternalRow]): Iterator[InternalRow] = {
+
+    val groupedChildDataIter = GroupedIterator(dataIter, groupingAttributes, child.output)
+    val groupedInitialStateIter =
+      GroupedIterator(initStateIter, initialStateGroupingAttrs, initialState.output)
+
+    // Create a CoGroupedIterator that will group the two iterators together for every key group.
+    new CoGroupedIterator(
+      groupedChildDataIter, groupedInitialStateIter, groupingAttributes).flatMap {
+      case (keyRow, valueRowIter, initialStateRowIter) =>
+        // TODO in design doc: trying to re-initialize state for the same
+        // grouping key will result in an error?

Review Comment:
   Not sure if I understand the scenario correctly, does it mean user tries to reassign state variable values in user defined function `handleInitialState()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1530885800


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -127,13 +152,53 @@ case class TransformWithStateExec(
     mappedIterator
   }
 
+  private def processInitialStateRows(
+      keyRow: UnsafeRow,
+      initStateIter: Iterator[InternalRow]): Unit = {
+    val getKeyObj =
+      ObjectOperator.deserializeRowToObject(keyDeserializer, groupingAttributes)
+
+    val getStateValueObj =
+      ObjectOperator.deserializeRowToObject(initialStateDeserializer, initialStateDataAttrs)
+
+    val keyObj = getKeyObj(keyRow) // convert key to objects
+    ImplicitGroupingKeyTracker.setImplicitKey(keyObj)
+    val initStateObjIter = initStateIter.map(getStateValueObj.apply)
+
+    initStateObjIter.foreach { initState =>
+      statefulProcessor
+        .asInstanceOf[StatefulProcessorWithInitialState[Any, Any, Any, Any]]
+        .handleInitialState(keyObj, initState)
+    }
+    ImplicitGroupingKeyTracker.removeImplicitKey()
+  }
+
   private def processNewData(dataIter: Iterator[InternalRow]): Iterator[InternalRow] = {
     val groupedIter = GroupedIterator(dataIter, groupingAttributes, child.output)
     groupedIter.flatMap { case (keyRow, valueRowIter) =>
       val keyUnsafeRow = keyRow.asInstanceOf[UnsafeRow]
       handleInputRows(keyUnsafeRow, valueRowIter)
     }
   }
+// TODO double check this
+  private def processNewDataWithInitialState(
+      dataIter: Iterator[InternalRow],
+      initStateIter: Iterator[InternalRow]): Iterator[InternalRow] = {
+
+    val groupedChildDataIter = GroupedIterator(dataIter, groupingAttributes, child.output)
+    val groupedInitialStateIter =
+      GroupedIterator(initStateIter, initialStateGroupingAttrs, initialState.output)
+
+    // Create a CoGroupedIterator that will group the two iterators together for every key group.
+    new CoGroupedIterator(
+      groupedChildDataIter, groupedInitialStateIter, groupingAttributes).flatMap {
+      case (keyRow, valueRowIter, initialStateRowIter) =>
+        // TODO in design doc: trying to re-initialize state for the same
+        // grouping key will result in an error?

Review Comment:
   yes - can we throw an error here if user tries to re-init state for same grouping key ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "jingz-db (via GitHub)" <gi...@apache.org>.
jingz-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1534358386


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -271,57 +320,111 @@ case class TransformWithStateExec(
       case _ =>
     }
 
-    if (isStreaming) {
-      child.execute().mapPartitionsWithStateStore[InternalRow](
+    if (hasInitialState) {
+      val storeConf = new StateStoreConf(session.sqlContext.sessionState.conf)
+      val hadoopConfBroadcast = sparkContext.broadcast(

Review Comment:
   I am not 100% percent sure, but this will distribute the read-only variable `hadoopConf` to all executors - similar as here: https://github.com/apache/spark/blob/74a9c6cfc6ea937031fe1ca5db539139322339a5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/state/StateScanBuilder.scala#L55-L57



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1523675926


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.{Encoders, KeyValueGroupedDataset}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, RocksDBStateStoreProvider, StateStoreMultipleColumnFamiliesNotSupportedException}
+import org.apache.spark.sql.internal.SQLConf
+
+case class InitInputRow(key: String, action: String, value: Double)
+
+class StatefulProcessorWithInitialStateTestClass extends StatefulProcessorWithInitialState[
+    String, InitInputRow, (String, String, Double), (String, Double)] {
+  @transient var _valState: ValueState[Double] = _
+  @transient var _listState: ListState[Double] = _
+  @transient var _mapState: MapState[Double, Int] = _
+
+  override def handleInitialState(
+      key: String,
+      initialState: (String, Double)): Unit = {
+    val initStateVal = initialState._2
+    _valState.update(initStateVal)

Review Comment:
   Can we simulate an actual case class that stores list/map and/or iterator for list values/iterator for map key-values ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1530873761


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.{Encoders, KeyValueGroupedDataset}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, RocksDBStateStoreProvider, StateStoreMultipleColumnFamiliesNotSupportedException}
+import org.apache.spark.sql.internal.SQLConf
+
+case class InitInputRow(key: String, action: String, value: Double)
+case class InputRowForInitialState(
+    value: Double, entries: List[Double], mapping: Map[Double, Int])
+
+abstract class StatefulProcessorWithInitialStateTestClass[V]
+    extends StatefulProcessorWithInitialState[
+        String, InitInputRow, (String, String, Double), (String, V)] {

Review Comment:
   Why not just make the `key` part of the initial state case class itself ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1534360102


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -271,57 +320,111 @@ case class TransformWithStateExec(
       case _ =>
     }
 
-    if (isStreaming) {
-      child.execute().mapPartitionsWithStateStore[InternalRow](
+    if (hasInitialState) {
+      val storeConf = new StateStoreConf(session.sqlContext.sessionState.conf)
+      val hadoopConfBroadcast =

Review Comment:
   I mean this was only needed for the batch support part right ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1540460865


##########
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##########
@@ -676,6 +676,42 @@ class KeyValueGroupedDataset[K, V] private[sql](
     )
   }
 
+  /**
+   * (Scala-specific)
+   * Invokes methods defined in the stateful processor used in arbitrary state API v2.
+   * Functions as the function above, but with additional initial state.
+   *
+   * @tparam U The type of the output objects. Must be encodable to Spark SQL types.
+   * @tparam S The type of initial state objects. Must be encodable to Spark SQL types.
+   * @param StatefulProcessorWithInitialState Instance of statefulProcessor whose functions will
+   *                                          be invoked by the operator.
+   * @param timeoutMode       The timeout mode of the stateful processor.
+   * @param outputMode        The output mode of the stateful processor. Defaults to APPEND mode.
+   * @param initialState      User provided initial state that will be used to initiate state for
+   *                          the query in the first batch.
+   *
+   */
+  private[sql] def transformWithState[U: Encoder, S: Encoder](
+      statefulProcessor: StatefulProcessorWithInitialState[K, V, U, S],
+      timeoutMode: TimeoutMode,
+      outputMode: OutputMode,
+      initialState: KeyValueGroupedDataset[K, S]): Dataset[U] = {
+    Dataset[U](
+      sparkSession,
+      TransformWithState[K, V, U, S](
+        groupingAttributes,
+        dataAttributes,
+        statefulProcessor,
+        timeoutMode,
+        outputMode,
+        child = logicalPlan,
+        initialState.groupingAttributes,
+        initialState.dataAttributes,
+        initialState.queryExecution.logical

Review Comment:
   Shall we follow the practice we did in flatMapGroupsWithState for safeness sake? 
   
   `initialState.queryExecution.analyzed`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -268,11 +268,13 @@ class IncrementalExecution(
         )
 
       case t: TransformWithStateExec =>
+        val hasInitialState = (isFirstBatch && t.hasInitialState)

Review Comment:
   I don't think we want to allow adding state in the middle of the query lifecycle. Here `isFirstBatch` does not mean batch ID = 0 but mean this is the first batch in this query run. 
   
   This should follow the above logic we did for FlatMapGroupsWithStateExec, `currentBatchId == 0L`.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##########
@@ -268,11 +268,13 @@ class IncrementalExecution(
         )
 
       case t: TransformWithStateExec =>
+        val hasInitialState = (isFirstBatch && t.hasInitialState)

Review Comment:
   Please let me know if this is a different functionality than we had in flatMapGroupsWithState.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -271,57 +320,111 @@ case class TransformWithStateExec(
       case _ =>
     }
 
-    if (isStreaming) {
-      child.execute().mapPartitionsWithStateStore[InternalRow](
+    if (hasInitialState) {
+      val storeConf = new StateStoreConf(session.sqlContext.sessionState.conf)
+      val hadoopConfBroadcast = sparkContext.broadcast(

Review Comment:
   Yeah there is a code comment. The practice seems to be that it's better to use broadcast rather than task serialization as it could be huge.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -271,57 +320,111 @@ case class TransformWithStateExec(
       case _ =>
     }
 
-    if (isStreaming) {
-      child.execute().mapPartitionsWithStateStore[InternalRow](
+    if (hasInitialState) {
+      val storeConf = new StateStoreConf(session.sqlContext.sessionState.conf)
+      val hadoopConfBroadcast = sparkContext.broadcast(
+        new SerializableConfiguration(session.sqlContext.sessionState.newHadoopConf()))
+      child.execute().stateStoreAwareZipPartitions(
+        initialState.execute(),
         getStateInfo,
-        schemaForKeyRow,
-        schemaForValueRow,
-        NoPrefixKeyStateEncoderSpec(schemaForKeyRow),
-        session.sqlContext.sessionState,
-        Some(session.sqlContext.streams.stateStoreCoordinator),
-        useColumnFamilies = true,
-        useMultipleValuesPerKey = true
-      ) {
-        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-          processData(store, singleIterator)
+        storeNames = Seq(),
+        session.sqlContext.streams.stateStoreCoordinator) {
+        // The state store aware zip partitions will provide us with two iterators,
+        // child data iterator and the initial state iterator per partition.
+        case (partitionId, childDataIterator, initStateIterator) =>
+          if (isStreaming) {
+            val stateStoreId = StateStoreId(stateInfo.get.checkpointLocation,
+              stateInfo.get.operatorId, partitionId)
+            val storeProviderId = StateStoreProviderId(stateStoreId, stateInfo.get.queryRunId)
+            val store = StateStore.get(
+              storeProviderId = storeProviderId,
+              keySchema = schemaForKeyRow,
+              valueSchema = schemaForValueRow,
+              NoPrefixKeyStateEncoderSpec(schemaForKeyRow),
+              version = stateInfo.get.storeVersion,
+              useColumnFamilies = true,
+              storeConf = storeConf,
+              hadoopConf = hadoopConfBroadcast.value.value
+            )
+
+            processDataWithInitialState(store, childDataIterator, initStateIterator)
+          } else {
+            val providerId = {
+              val tempDirPath = Utils.createTempDir().getAbsolutePath
+              new StateStoreProviderId(
+                StateStoreId(tempDirPath, 0, partitionId), getStateInfo.queryRunId)
+            }
+            val sqlConf = new SQLConf()
+            sqlConf.setConfString(SQLConf.STATE_STORE_PROVIDER_CLASS.key,
+              classOf[RocksDBStateStoreProvider].getName)
+
+            // Create StateStoreProvider for this partition
+            val stateStoreProvider = StateStoreProvider.createAndInit(
+              providerId,
+              schemaForKeyRow,
+              schemaForValueRow,
+              NoPrefixKeyStateEncoderSpec(schemaForKeyRow),
+              useColumnFamilies = true,
+              storeConf = new StateStoreConf(sqlConf),
+              hadoopConf = hadoopConfBroadcast.value.value,
+              useMultipleValuesPerKey = true)
+            val store = stateStoreProvider.getStore(0)
+
+            processDataWithInitialState(store, childDataIterator, initStateIterator)

Review Comment:
   We close the state store and state store provider in batch codepath (see below). Shall we do that here as well?
   
   Also, this is a good representation that we have duplicated code. two batch parts have similarity on spinning up state store provider and state store, and also closing them. That could be extracted out.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -271,57 +320,111 @@ case class TransformWithStateExec(
       case _ =>
     }
 
-    if (isStreaming) {
-      child.execute().mapPartitionsWithStateStore[InternalRow](
+    if (hasInitialState) {
+      val storeConf = new StateStoreConf(session.sqlContext.sessionState.conf)
+      val hadoopConfBroadcast = sparkContext.broadcast(
+        new SerializableConfiguration(session.sqlContext.sessionState.newHadoopConf()))
+      child.execute().stateStoreAwareZipPartitions(
+        initialState.execute(),
         getStateInfo,
-        schemaForKeyRow,
-        schemaForValueRow,
-        NoPrefixKeyStateEncoderSpec(schemaForKeyRow),
-        session.sqlContext.sessionState,
-        Some(session.sqlContext.streams.stateStoreCoordinator),
-        useColumnFamilies = true,
-        useMultipleValuesPerKey = true
-      ) {
-        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-          processData(store, singleIterator)
+        storeNames = Seq(),
+        session.sqlContext.streams.stateStoreCoordinator) {
+        // The state store aware zip partitions will provide us with two iterators,
+        // child data iterator and the initial state iterator per partition.
+        case (partitionId, childDataIterator, initStateIterator) =>
+          if (isStreaming) {
+            val stateStoreId = StateStoreId(stateInfo.get.checkpointLocation,
+              stateInfo.get.operatorId, partitionId)
+            val storeProviderId = StateStoreProviderId(stateStoreId, stateInfo.get.queryRunId)
+            val store = StateStore.get(
+              storeProviderId = storeProviderId,
+              keySchema = schemaForKeyRow,
+              valueSchema = schemaForValueRow,
+              NoPrefixKeyStateEncoderSpec(schemaForKeyRow),
+              version = stateInfo.get.storeVersion,
+              useColumnFamilies = true,
+              storeConf = storeConf,
+              hadoopConf = hadoopConfBroadcast.value.value
+            )
+
+            processDataWithInitialState(store, childDataIterator, initStateIterator)
+          } else {
+            val providerId = {
+              val tempDirPath = Utils.createTempDir().getAbsolutePath
+              new StateStoreProviderId(
+                StateStoreId(tempDirPath, 0, partitionId), getStateInfo.queryRunId)
+            }
+            val sqlConf = new SQLConf()
+            sqlConf.setConfString(SQLConf.STATE_STORE_PROVIDER_CLASS.key,
+              classOf[RocksDBStateStoreProvider].getName)
+
+            // Create StateStoreProvider for this partition
+            val stateStoreProvider = StateStoreProvider.createAndInit(
+              providerId,
+              schemaForKeyRow,
+              schemaForValueRow,
+              NoPrefixKeyStateEncoderSpec(schemaForKeyRow),
+              useColumnFamilies = true,
+              storeConf = new StateStoreConf(sqlConf),
+              hadoopConf = hadoopConfBroadcast.value.value,
+              useMultipleValuesPerKey = true)
+            val store = stateStoreProvider.getStore(0)
+
+            processDataWithInitialState(store, childDataIterator, initStateIterator)
+          }
       }
     } else {
-      // If the query is running in batch mode, we need to create a new StateStore and instantiate
-      // a temp directory on the executors in mapPartitionsWithIndex.
-      val broadcastedHadoopConf =
+      if (isStreaming) {
+        child.execute().mapPartitionsWithStateStore[InternalRow](
+          getStateInfo,
+          schemaForKeyRow,
+          schemaForValueRow,
+          NoPrefixKeyStateEncoderSpec(schemaForKeyRow),
+          session.sqlContext.sessionState,
+          Some(session.sqlContext.streams.stateStoreCoordinator),
+          useColumnFamilies = true
+        ) {
+          case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
+            processData(store, singleIterator)
+        }
+      } else {
+        // If the query is running in batch mode, we need to create a new StateStore and instantiate

Review Comment:
   nit: apply the same practice while we are here? broadcast



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -341,8 +444,37 @@ case class TransformWithStateExec(
     processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
     processDataWithPartition(singleIterator, store, processorHandle)
   }
+
+  private def processDataWithInitialState(
+      store: StateStore,
+      childDataIterator: Iterator[InternalRow],
+      initStateIterator: Iterator[InternalRow]):
+    CompletionIterator[InternalRow, Iterator[InternalRow]] = {
+    val processorHandle = new StatefulProcessorHandleImpl(store, getStateInfo.queryRunId,
+      keyEncoder, timeoutMode, isStreaming)
+    assert(processorHandle.getHandleState == StatefulProcessorHandleState.CREATED)
+    statefulProcessor.setHandle(processorHandle)
+    statefulProcessor.init(outputMode, timeoutMode)
+    processorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
+
+    // Check if is first batch
+    // Only process initial states for first batch
+    if (processorHandle.getQueryInfo().getBatchId == 0) {

Review Comment:
   OK I see we have multiple checks. Though still better to change the condition in IncrementalExecution as reader can misunderstand that there are inconsistency between flatMapGroupsWithState and transformWithState.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47363][SS] Initial State without state reader implementation for State API v2. [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR closed pull request #45467: [SPARK-47363][SS] Initial State without state reader implementation for State API v2.
URL: https://github.com/apache/spark/pull/45467


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1523663013


##########
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessor.scala:
##########
@@ -85,3 +85,21 @@ private[sql] trait StatefulProcessor[K, I, O] extends Serializable {
     statefulProcessorHandle
   }
 }
+
+/**
+ * Similar usage as StatefulProcessor. Represents the arbitrary stateful logic that needs to

Review Comment:
   Maybe reword this - `Stateful processor with support for specifying initial state. Accepts a user-defined type as initial state to be initialized in the first batch. This can be used for starting a new streaming query with existing state from a previous streaming query` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1530881452


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -271,57 +340,122 @@ case class TransformWithStateExec(
       case _ =>
     }
 
-    if (isStreaming) {
-      child.execute().mapPartitionsWithStateStore[InternalRow](
-        getStateInfo,
-        schemaForKeyRow,
-        schemaForValueRow,
-        numColsPrefixKey = 0,
-        session.sqlContext.sessionState,
-        Some(session.sqlContext.streams.stateStoreCoordinator),
-        useColumnFamilies = true,
-        useMultipleValuesPerKey = true
-      ) {
-        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-          processData(store, singleIterator)
+    if (hasInitialState) {
+      if (isStreaming) {
+        val storeConf = new StateStoreConf(session.sqlContext.sessionState.conf)
+        val hadoopConfBroadcast = sparkContext.broadcast(
+          new SerializableConfiguration(session.sqlContext.sessionState.newHadoopConf()))
+        child.execute().stateStoreAwareZipPartitions(
+          initialState.execute(),
+          getStateInfo,
+          storeNames = Seq(),
+          session.sqlContext.streams.stateStoreCoordinator) {
+          // The state store aware zip partitions will provide us with two iterators,
+          // child data iterator and the initial state iterator per partition.
+          case (partitionId, childDataIterator, initStateIterator) =>
+            val stateStoreId = StateStoreId(stateInfo.get.checkpointLocation,
+              stateInfo.get.operatorId, partitionId)
+            val storeProviderId = StateStoreProviderId(stateStoreId, stateInfo.get.queryRunId)
+            val store = StateStore.get(
+              storeProviderId,
+              schemaForKeyRow,
+              schemaForValueRow,
+              0,

Review Comment:
   Lets mark this explicitly - for `numPrefixCols` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1530882509


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -271,57 +340,122 @@ case class TransformWithStateExec(
       case _ =>
     }
 
-    if (isStreaming) {
-      child.execute().mapPartitionsWithStateStore[InternalRow](
-        getStateInfo,
-        schemaForKeyRow,
-        schemaForValueRow,
-        numColsPrefixKey = 0,
-        session.sqlContext.sessionState,
-        Some(session.sqlContext.streams.stateStoreCoordinator),
-        useColumnFamilies = true,
-        useMultipleValuesPerKey = true
-      ) {
-        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-          processData(store, singleIterator)
+    if (hasInitialState) {
+      if (isStreaming) {
+        val storeConf = new StateStoreConf(session.sqlContext.sessionState.conf)
+        val hadoopConfBroadcast = sparkContext.broadcast(
+          new SerializableConfiguration(session.sqlContext.sessionState.newHadoopConf()))
+        child.execute().stateStoreAwareZipPartitions(
+          initialState.execute(),
+          getStateInfo,
+          storeNames = Seq(),
+          session.sqlContext.streams.stateStoreCoordinator) {
+          // The state store aware zip partitions will provide us with two iterators,
+          // child data iterator and the initial state iterator per partition.
+          case (partitionId, childDataIterator, initStateIterator) =>
+            val stateStoreId = StateStoreId(stateInfo.get.checkpointLocation,
+              stateInfo.get.operatorId, partitionId)
+            val storeProviderId = StateStoreProviderId(stateStoreId, stateInfo.get.queryRunId)
+            val store = StateStore.get(
+              storeProviderId,
+              schemaForKeyRow,
+              schemaForValueRow,
+              0,
+              stateInfo.get.storeVersion,
+              useColumnFamilies = true,
+              storeConf, hadoopConfBroadcast.value.value
+            )
+
+            processDataWithInitialState(store, childDataIterator, initStateIterator)
+        }
+      } else {
+        // If the query is running in batch mode, we need to create a new StateStore and instantiate
+        // a temp directory on the executors in zipPartitionsWithIndex.
+        val broadcastedHadoopConf =
+          new SerializableConfiguration(session.sessionState.newHadoopConf())
+        child.execute().zipPartitionsWithIndex(
+          initialState.execute()

Review Comment:
   nit: move to line above ? same for 1 line below 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1524250080


##########
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##########
@@ -676,6 +676,42 @@ class KeyValueGroupedDataset[K, V] private[sql](
     )
   }
 
+  /**
+   * (Scala-specific)
+   * Invokes methods defined in the stateful processor used in arbitrary state API v2.
+   * Functions as the function above, but with additional initial state.
+   *
+   * @tparam U The type of the output objects. Must be encodable to Spark SQL types.
+   * @tparam S The type of initial state objects. Must be encodable to Spark SQL types.
+   * @param statefulProcessor Instance of statefulProcessor whose functions will be invoked by the
+   *                          operator.
+   * @param timeoutMode       The timeout mode of the stateful processor.
+   * @param outputMode        The output mode of the stateful processor. Defaults to APPEND mode.
+   * @param initialState      User provided initial state that will be used to initiate state for
+   *                          the query in the first batch.
+   *
+   */
+  def transformWithState[U: Encoder, S: Encoder](

Review Comment:
   `private[sql]`
   
   We want to defer exposing the API to public till we complete the work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1530881452


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -271,57 +340,122 @@ case class TransformWithStateExec(
       case _ =>
     }
 
-    if (isStreaming) {
-      child.execute().mapPartitionsWithStateStore[InternalRow](
-        getStateInfo,
-        schemaForKeyRow,
-        schemaForValueRow,
-        numColsPrefixKey = 0,
-        session.sqlContext.sessionState,
-        Some(session.sqlContext.streams.stateStoreCoordinator),
-        useColumnFamilies = true,
-        useMultipleValuesPerKey = true
-      ) {
-        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-          processData(store, singleIterator)
+    if (hasInitialState) {
+      if (isStreaming) {
+        val storeConf = new StateStoreConf(session.sqlContext.sessionState.conf)
+        val hadoopConfBroadcast = sparkContext.broadcast(
+          new SerializableConfiguration(session.sqlContext.sessionState.newHadoopConf()))
+        child.execute().stateStoreAwareZipPartitions(
+          initialState.execute(),
+          getStateInfo,
+          storeNames = Seq(),
+          session.sqlContext.streams.stateStoreCoordinator) {
+          // The state store aware zip partitions will provide us with two iterators,
+          // child data iterator and the initial state iterator per partition.
+          case (partitionId, childDataIterator, initStateIterator) =>
+            val stateStoreId = StateStoreId(stateInfo.get.checkpointLocation,
+              stateInfo.get.operatorId, partitionId)
+            val storeProviderId = StateStoreProviderId(stateStoreId, stateInfo.get.queryRunId)
+            val store = StateStore.get(
+              storeProviderId,
+              schemaForKeyRow,
+              schemaForValueRow,
+              0,

Review Comment:
   Lets mark this explicitly - for `numColsPrefixKey` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1530876659


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -85,23 +94,39 @@ case class TransformWithStateExec(
     }
   }
 
-  override protected def withNewChildInternal(
-    newChild: SparkPlan): TransformWithStateExec = copy(child = newChild)
+  override def left: SparkPlan = child
+
+  override def right: SparkPlan = initialState
+
+  override protected def withNewChildrenInternal(
+      newLeft: SparkPlan, newRight: SparkPlan): TransformWithStateExec =
+    copy(child = newLeft, initialState = newRight)
 
   override def keyExpressions: Seq[Attribute] = groupingAttributes
 
   protected val schemaForKeyRow: StructType = new StructType().add("key", BinaryType)
 
   protected val schemaForValueRow: StructType = new StructType().add("value", BinaryType)
 
+  /**
+   * Distribute by grouping attributes - We need the underlying data and the initial state data
+   * to have the same grouping so that the data are co-lacated on the same task.

Review Comment:
   Nit: `co-located on`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "jingz-db (via GitHub)" <gi...@apache.org>.
jingz-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1523797687


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.{Encoders, KeyValueGroupedDataset}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, RocksDBStateStoreProvider, StateStoreMultipleColumnFamiliesNotSupportedException}
+import org.apache.spark.sql.internal.SQLConf
+
+case class InitInputRow(key: String, action: String, value: Double)
+
+class StatefulProcessorWithInitialStateTestClass extends StatefulProcessorWithInitialState[
+    String, InitInputRow, (String, String, Double), (String, Double)] {
+  @transient var _valState: ValueState[Double] = _
+  @transient var _listState: ListState[Double] = _
+  @transient var _mapState: MapState[Double, Int] = _
+
+  override def handleInitialState(
+      key: String,
+      initialState: (String, Double)): Unit = {
+    val initStateVal = initialState._2
+    _valState.update(initStateVal)

Review Comment:
   Not sure if I understand you correctly, do you mean we should have a test case where `initialState` is a case class, and inside `handleInitialState`, we update the value for listState/mapState variable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47363][SS] Initial State without state reader implementation for State API v2. [spark]

Posted by "jingz-db (via GitHub)" <gi...@apache.org>.
jingz-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1542128879


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -271,57 +320,111 @@ case class TransformWithStateExec(
       case _ =>
     }
 
-    if (isStreaming) {
-      child.execute().mapPartitionsWithStateStore[InternalRow](
+    if (hasInitialState) {
+      val storeConf = new StateStoreConf(session.sqlContext.sessionState.conf)
+      val hadoopConfBroadcast = sparkContext.broadcast(
+        new SerializableConfiguration(session.sqlContext.sessionState.newHadoopConf()))
+      child.execute().stateStoreAwareZipPartitions(
+        initialState.execute(),
         getStateInfo,
-        schemaForKeyRow,
-        schemaForValueRow,
-        NoPrefixKeyStateEncoderSpec(schemaForKeyRow),
-        session.sqlContext.sessionState,
-        Some(session.sqlContext.streams.stateStoreCoordinator),
-        useColumnFamilies = true,
-        useMultipleValuesPerKey = true
-      ) {
-        case (store: StateStore, singleIterator: Iterator[InternalRow]) =>
-          processData(store, singleIterator)
+        storeNames = Seq(),
+        session.sqlContext.streams.stateStoreCoordinator) {
+        // The state store aware zip partitions will provide us with two iterators,
+        // child data iterator and the initial state iterator per partition.
+        case (partitionId, childDataIterator, initStateIterator) =>
+          if (isStreaming) {
+            val stateStoreId = StateStoreId(stateInfo.get.checkpointLocation,
+              stateInfo.get.operatorId, partitionId)
+            val storeProviderId = StateStoreProviderId(stateStoreId, stateInfo.get.queryRunId)
+            val store = StateStore.get(
+              storeProviderId = storeProviderId,
+              keySchema = schemaForKeyRow,
+              valueSchema = schemaForValueRow,
+              NoPrefixKeyStateEncoderSpec(schemaForKeyRow),
+              version = stateInfo.get.storeVersion,
+              useColumnFamilies = true,
+              storeConf = storeConf,
+              hadoopConf = hadoopConfBroadcast.value.value
+            )
+
+            processDataWithInitialState(store, childDataIterator, initStateIterator)
+          } else {
+            val providerId = {
+              val tempDirPath = Utils.createTempDir().getAbsolutePath
+              new StateStoreProviderId(
+                StateStoreId(tempDirPath, 0, partitionId), getStateInfo.queryRunId)
+            }
+            val sqlConf = new SQLConf()
+            sqlConf.setConfString(SQLConf.STATE_STORE_PROVIDER_CLASS.key,
+              classOf[RocksDBStateStoreProvider].getName)
+
+            // Create StateStoreProvider for this partition
+            val stateStoreProvider = StateStoreProvider.createAndInit(
+              providerId,
+              schemaForKeyRow,
+              schemaForValueRow,
+              NoPrefixKeyStateEncoderSpec(schemaForKeyRow),
+              useColumnFamilies = true,
+              storeConf = new StateStoreConf(sqlConf),
+              hadoopConf = hadoopConfBroadcast.value.value,
+              useMultipleValuesPerKey = true)
+            val store = stateStoreProvider.getStore(0)
+
+            processDataWithInitialState(store, childDataIterator, initStateIterator)

Review Comment:
   Good advice! Refactored duplicated codes into `initNewStateStoreAndProcessData()`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1530875269


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.{Encoders, KeyValueGroupedDataset}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, RocksDBStateStoreProvider, StateStoreMultipleColumnFamiliesNotSupportedException}
+import org.apache.spark.sql.internal.SQLConf
+
+case class InitInputRow(key: String, action: String, value: Double)
+case class InputRowForInitialState(
+    value: Double, entries: List[Double], mapping: Map[Double, Int])
+
+abstract class StatefulProcessorWithInitialStateTestClass[V]
+    extends StatefulProcessorWithInitialState[
+        String, InitInputRow, (String, String, Double), (String, V)] {
+  @transient var _valState: ValueState[Double] = _
+  @transient var _listState: ListState[Double] = _
+  @transient var _mapState: MapState[Double, Int] = _
+
+  override def init(outputMode: OutputMode, timeoutMode: TimeoutMode): Unit = {
+    _valState = getHandle.getValueState[Double]("testValueInit", Encoders.scalaDouble)
+    _listState = getHandle.getListState[Double]("testListInit", Encoders.scalaDouble)
+    _mapState = getHandle.getMapState[Double, Int](
+      "testMapInit", Encoders.scalaDouble, Encoders.scalaInt)
+  }
+
+  override def close(): Unit = {}
+
+  override def handleInputRows(
+      key: String,
+      inputRows: Iterator[InitInputRow],
+      timerValues: TimerValues,
+      expiredTimerInfo: ExpiredTimerInfo): Iterator[(String, String, Double)] = {
+    var output = List[(String, String, Double)]()
+    for (row <- inputRows) {
+      if (row.action == "getOption") {
+        output = (key, row.action, _valState.getOption().getOrElse(-1.0)) :: output
+      } else if (row.action == "update") {
+        _valState.update(row.value)
+      } else if (row.action == "remove") {
+        _valState.clear()
+      } else if (row.action == "getList") {
+        _listState.get().foreach { element =>
+          output = (key, row.action, element) :: output
+        }
+      } else if (row.action == "appendList") {
+        _listState.appendValue(row.value)
+      } else if (row.action == "clearList") {
+        _listState.clear()
+      } else if (row.action == "getCount") {
+        val count =
+          if (!_mapState.containsKey(row.value)) 0
+          else _mapState.getValue(row.value)
+        output = (key, row.action, count.toDouble) :: output
+      } else if (row.action == "incCount") {
+        val count =
+          if (!_mapState.containsKey(row.value)) 0
+          else _mapState.getValue(row.value)
+        _mapState.updateValue(row.value, count + 1)
+      } else if (row.action == "clearCount") {
+        _mapState.removeKey(row.value)
+      }
+    }
+    output.iterator
+  }
+}
+
+class AccumulateStatefulProcessorWithInitState
+    extends StatefulProcessorWithInitialStateTestClass[Double] {
+  override def handleInitialState(
+      key: String,
+      initialState: (String, Double)): Unit = {
+    _valState.update(initialState._2)
+  }
+
+  override def handleInputRows(
+      key: String,
+      inputRows: Iterator[InitInputRow],
+      timerValues: TimerValues,
+      expiredTimerInfo: ExpiredTimerInfo): Iterator[(String, String, Double)] = {
+    var output = List[(String, String, Double)]()
+    for (row <- inputRows) {
+      if (row.action == "getOption") {
+        output = (key, row.action, _valState.getOption().getOrElse(0.0)) :: output
+      } else if (row.action == "add") {
+        // Update state variable as accumulative sum
+        val accumulateSum = _valState.getOption().getOrElse(0.0) + row.value
+        _valState.update(accumulateSum)
+      } else if (row.action == "remove") {
+        _valState.clear()
+      }
+    }
+    output.iterator
+  }
+}
+
+class InitialStateInMemoryTestClass
+  extends StatefulProcessorWithInitialStateTestClass[InputRowForInitialState] {
+  override def handleInitialState(
+      key: String,
+      initialState: (String, InputRowForInitialState)): Unit = {
+    _valState.update(initialState._2.value)
+    _listState.appendList(initialState._2.entries.toArray)
+    val inMemoryMap = initialState._2.mapping
+    inMemoryMap.foreach { kvPair =>
+      _mapState.updateValue(kvPair._1, kvPair._2)
+    }
+  }
+}
+
+/**
+ * Class that adds tests for transformWithState stateful
+ * streaming operator with user-defined initial state
+ */
+class TransformWithStateInitialStateSuite extends StateStoreMetricsTest
+  with AlsoTestWithChangelogCheckpointingEnabled {
+
+  import testImplicits._
+
+  private def createInitialDfForTest: KeyValueGroupedDataset[String, (String, Double)] = {
+    Seq(("init_1", 40.0), ("init_2", 100.0)).toDS()
+      .groupByKey(x => x._1)
+      .mapValues(x => x)
+  }
+
+
+  test("transformWithStateWithInitialState - correctness test, " +
+    "run with multiple state variables - in-memory type") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName) {
+
+      val inputData = MemoryStream[InitInputRow]
+      val kvDataSet = inputData.toDS()
+        .groupByKey(x => x.key)
+      val initStateDf =
+        Seq(("init_1", InputRowForInitialState(40.0, List(40.0), Map(40.0 -> 1))),
+          ("init_2", InputRowForInitialState(100.0, List(100.0), Map(100.0 -> 1))))
+          .toDS().groupByKey(x => x._1).mapValues(x => x)
+      val query = kvDataSet.transformWithState(new InitialStateInMemoryTestClass(),
+            TimeoutMode.NoTimeouts (), OutputMode.Append (), initStateDf)
+
+      testStream(query, OutputMode.Update())(
+        // non-exist key test
+        AddData(inputData, InitInputRow("k1", "update", 37.0)),
+        AddData(inputData, InitInputRow("k2", "update", 40.0)),
+        AddData(inputData, InitInputRow("non-exist", "getOption", -1.0)),
+        CheckNewAnswer(("non-exist", "getOption", -1.0)),
+        AddData(inputData, InitInputRow("k1", "appendList", 37.0)),
+        AddData(inputData, InitInputRow("k2", "appendList", 40.0)),
+        AddData(inputData, InitInputRow("non-exist", "getList", -1.0)),
+        CheckNewAnswer(),
+
+        AddData(inputData, InitInputRow("k1", "incCount", 37.0)),
+        AddData(inputData, InitInputRow("k2", "incCount", 40.0)),
+        AddData(inputData, InitInputRow("non-exist", "getCount", -1.0)),
+        CheckNewAnswer(("non-exist", "getCount", 0.0)),
+        AddData(inputData, InitInputRow("k2", "incCount", 40.0)),
+        AddData(inputData, InitInputRow("k2", "getCount", 40.0)),
+        CheckNewAnswer(("k2", "getCount", 2.0)),
+
+        // test every row in initial State is processed
+        AddData(inputData, InitInputRow("init_1", "getOption", -1.0)),
+        CheckNewAnswer(("init_1", "getOption", 40.0)),
+        AddData(inputData, InitInputRow("init_2", "getOption", -1.0)),
+        CheckNewAnswer(("init_2", "getOption", 100.0)),
+
+        AddData(inputData, InitInputRow("init_1", "getList", -1.0)),
+        CheckNewAnswer(("init_1", "getList", 40.0)),
+        AddData(inputData, InitInputRow("init_2", "getList", -1.0)),
+        CheckNewAnswer(("init_2", "getList", 100.0)),
+
+        AddData(inputData, InitInputRow("init_1", "getCount", 40.0)),
+        CheckNewAnswer(("init_1", "getCount", 1.0)),
+        AddData(inputData, InitInputRow("init_2", "getCount", 100.0)),
+        CheckNewAnswer(("init_2", "getCount", 1.0)),
+
+        // Update row with key in initial row will work
+        AddData(inputData, InitInputRow("init_1", "update", 50.0)),
+        AddData(inputData, InitInputRow("init_1", "getOption", -1.0)),
+        CheckNewAnswer(("init_1", "getOption", 50.0)),
+        AddData(inputData, InitInputRow("init_1", "remove", -1.0)),
+        AddData(inputData, InitInputRow("init_1", "getOption", -1.0)),
+        CheckNewAnswer(("init_1", "getOption", -1.0)),
+
+        AddData(inputData, InitInputRow("init_1", "appendList", 50.0)),
+        AddData(inputData, InitInputRow("init_1", "getList", -1.0)),
+        CheckNewAnswer(("init_1", "getList", 50.0), ("init_1", "getList", 40.0)),
+
+        AddData(inputData, InitInputRow("init_1", "incCount", 40.0)),
+        AddData(inputData, InitInputRow("init_1", "getCount", 40.0)),
+        CheckNewAnswer(("init_1", "getCount", 2.0)),
+
+        // test remove
+        AddData(inputData, InitInputRow("k1", "remove", -1.0)),
+        AddData(inputData, InitInputRow("k1", "getOption", -1.0)),
+        CheckNewAnswer(("k1", "getOption", -1.0)),
+
+        AddData(inputData, InitInputRow("init_1", "clearCount", -1.0)),
+        AddData(inputData, InitInputRow("init_1", "getCount", -1.0)),
+        CheckNewAnswer(("init_1", "getCount", 0.0)),
+
+        AddData(inputData, InitInputRow("init_1", "clearList", -1.0)),
+        AddData(inputData, InitInputRow("init_1", "getList", -1.0)),
+        CheckNewAnswer()
+      )
+    }
+  }
+
+
+  test("transformWithStateWithInitialState -" +
+    " correctness test, processInitialState should only run once") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName) {
+      val initStateDf = createInitialDfForTest
+      val inputData = MemoryStream[InitInputRow]
+      val query = inputData.toDS()
+        .groupByKey(x => x.key)
+        .transformWithState(new AccumulateStatefulProcessorWithInitState(),
+          TimeoutMode.NoTimeouts(), OutputMode.Append(), initStateDf
+        )
+      testStream(query, OutputMode.Update())(
+        AddData(inputData, InitInputRow("init_1", "add", 50.0)),
+        AddData(inputData, InitInputRow("init_2", "add", 60.0)),
+        AddData(inputData, InitInputRow("init_1", "add", 50.0)),
+        // If processInitialState was processed multiple times,
+        // following checks will fail
+        AddData(inputData,
+          InitInputRow("init_1", "getOption", -1.0), InitInputRow("init_2", "getOption", -1.0)),
+        CheckNewAnswer(("init_2", "getOption", 160.0), ("init_1", "getOption", 140.0))
+      )
+    }
+  }
+
+  test("transformWithStateWithInitialState - batch should succeed") {
+    val inputData = Seq(InitInputRow("k1", "add", 37.0), InitInputRow("k1", "getOption", -1.0))
+    val result = inputData.toDS()
+      .groupByKey(x => x.key)
+      .transformWithState(new AccumulateStatefulProcessorWithInitState(),
+        TimeoutMode.NoTimeouts(),
+        OutputMode.Append(),
+        createInitialDfForTest)
+
+    val df = result.toDF()
+    checkAnswer(df, Seq(("k1", "getOption", 37.0)).toDF())
+  }
+}
+
+class TransformWithStateInitialStateValidationSuite extends StateStoreMetricsTest {

Review Comment:
   we can just move this test to the validation suite we have in `TransformWithStateSuite` ? or we can move the validation tests out as a separate file and move this test there ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1530874118


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala:
##########
@@ -0,0 +1,286 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.{Encoders, KeyValueGroupedDataset}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, RocksDBStateStoreProvider, StateStoreMultipleColumnFamiliesNotSupportedException}
+import org.apache.spark.sql.internal.SQLConf
+
+case class InitInputRow(key: String, action: String, value: Double)
+case class InputRowForInitialState(
+    value: Double, entries: List[Double], mapping: Map[Double, Int])
+
+abstract class StatefulProcessorWithInitialStateTestClass[V]
+    extends StatefulProcessorWithInitialState[
+        String, InitInputRow, (String, String, Double), (String, V)] {
+  @transient var _valState: ValueState[Double] = _
+  @transient var _listState: ListState[Double] = _
+  @transient var _mapState: MapState[Double, Int] = _
+
+  override def init(outputMode: OutputMode, timeoutMode: TimeoutMode): Unit = {
+    _valState = getHandle.getValueState[Double]("testValueInit", Encoders.scalaDouble)
+    _listState = getHandle.getListState[Double]("testListInit", Encoders.scalaDouble)
+    _mapState = getHandle.getMapState[Double, Int](
+      "testMapInit", Encoders.scalaDouble, Encoders.scalaInt)
+  }
+
+  override def close(): Unit = {}
+
+  override def handleInputRows(
+      key: String,
+      inputRows: Iterator[InitInputRow],
+      timerValues: TimerValues,
+      expiredTimerInfo: ExpiredTimerInfo): Iterator[(String, String, Double)] = {
+    var output = List[(String, String, Double)]()
+    for (row <- inputRows) {
+      if (row.action == "getOption") {
+        output = (key, row.action, _valState.getOption().getOrElse(-1.0)) :: output
+      } else if (row.action == "update") {
+        _valState.update(row.value)
+      } else if (row.action == "remove") {
+        _valState.clear()
+      } else if (row.action == "getList") {
+        _listState.get().foreach { element =>
+          output = (key, row.action, element) :: output
+        }
+      } else if (row.action == "appendList") {
+        _listState.appendValue(row.value)
+      } else if (row.action == "clearList") {
+        _listState.clear()
+      } else if (row.action == "getCount") {
+        val count =
+          if (!_mapState.containsKey(row.value)) 0
+          else _mapState.getValue(row.value)
+        output = (key, row.action, count.toDouble) :: output
+      } else if (row.action == "incCount") {
+        val count =
+          if (!_mapState.containsKey(row.value)) 0
+          else _mapState.getValue(row.value)
+        _mapState.updateValue(row.value, count + 1)
+      } else if (row.action == "clearCount") {
+        _mapState.removeKey(row.value)
+      }
+    }
+    output.iterator
+  }
+}
+
+class AccumulateStatefulProcessorWithInitState
+    extends StatefulProcessorWithInitialStateTestClass[Double] {
+  override def handleInitialState(
+      key: String,
+      initialState: (String, Double)): Unit = {
+    _valState.update(initialState._2)
+  }
+
+  override def handleInputRows(
+      key: String,
+      inputRows: Iterator[InitInputRow],
+      timerValues: TimerValues,
+      expiredTimerInfo: ExpiredTimerInfo): Iterator[(String, String, Double)] = {
+    var output = List[(String, String, Double)]()
+    for (row <- inputRows) {
+      if (row.action == "getOption") {
+        output = (key, row.action, _valState.getOption().getOrElse(0.0)) :: output
+      } else if (row.action == "add") {
+        // Update state variable as accumulative sum
+        val accumulateSum = _valState.getOption().getOrElse(0.0) + row.value
+        _valState.update(accumulateSum)
+      } else if (row.action == "remove") {
+        _valState.clear()
+      }
+    }
+    output.iterator
+  }
+}
+
+class InitialStateInMemoryTestClass
+  extends StatefulProcessorWithInitialStateTestClass[InputRowForInitialState] {
+  override def handleInitialState(
+      key: String,
+      initialState: (String, InputRowForInitialState)): Unit = {
+    _valState.update(initialState._2.value)
+    _listState.appendList(initialState._2.entries.toArray)
+    val inMemoryMap = initialState._2.mapping
+    inMemoryMap.foreach { kvPair =>
+      _mapState.updateValue(kvPair._1, kvPair._2)
+    }
+  }
+}
+
+/**
+ * Class that adds tests for transformWithState stateful
+ * streaming operator with user-defined initial state
+ */
+class TransformWithStateInitialStateSuite extends StateStoreMetricsTest
+  with AlsoTestWithChangelogCheckpointingEnabled {
+
+  import testImplicits._
+
+  private def createInitialDfForTest: KeyValueGroupedDataset[String, (String, Double)] = {
+    Seq(("init_1", 40.0), ("init_2", 100.0)).toDS()
+      .groupByKey(x => x._1)
+      .mapValues(x => x)
+  }
+
+
+  test("transformWithStateWithInitialState - correctness test, " +
+    "run with multiple state variables - in-memory type") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName) {
+
+      val inputData = MemoryStream[InitInputRow]
+      val kvDataSet = inputData.toDS()
+        .groupByKey(x => x.key)
+      val initStateDf =
+        Seq(("init_1", InputRowForInitialState(40.0, List(40.0), Map(40.0 -> 1))),
+          ("init_2", InputRowForInitialState(100.0, List(100.0), Map(100.0 -> 1))))
+          .toDS().groupByKey(x => x._1).mapValues(x => x)
+      val query = kvDataSet.transformWithState(new InitialStateInMemoryTestClass(),
+            TimeoutMode.NoTimeouts (), OutputMode.Append (), initStateDf)
+
+      testStream(query, OutputMode.Update())(
+        // non-exist key test
+        AddData(inputData, InitInputRow("k1", "update", 37.0)),
+        AddData(inputData, InitInputRow("k2", "update", 40.0)),
+        AddData(inputData, InitInputRow("non-exist", "getOption", -1.0)),
+        CheckNewAnswer(("non-exist", "getOption", -1.0)),
+        AddData(inputData, InitInputRow("k1", "appendList", 37.0)),
+        AddData(inputData, InitInputRow("k2", "appendList", 40.0)),
+        AddData(inputData, InitInputRow("non-exist", "getList", -1.0)),
+        CheckNewAnswer(),
+
+        AddData(inputData, InitInputRow("k1", "incCount", 37.0)),
+        AddData(inputData, InitInputRow("k2", "incCount", 40.0)),
+        AddData(inputData, InitInputRow("non-exist", "getCount", -1.0)),
+        CheckNewAnswer(("non-exist", "getCount", 0.0)),
+        AddData(inputData, InitInputRow("k2", "incCount", 40.0)),
+        AddData(inputData, InitInputRow("k2", "getCount", 40.0)),
+        CheckNewAnswer(("k2", "getCount", 2.0)),
+
+        // test every row in initial State is processed
+        AddData(inputData, InitInputRow("init_1", "getOption", -1.0)),
+        CheckNewAnswer(("init_1", "getOption", 40.0)),
+        AddData(inputData, InitInputRow("init_2", "getOption", -1.0)),
+        CheckNewAnswer(("init_2", "getOption", 100.0)),
+
+        AddData(inputData, InitInputRow("init_1", "getList", -1.0)),
+        CheckNewAnswer(("init_1", "getList", 40.0)),
+        AddData(inputData, InitInputRow("init_2", "getList", -1.0)),
+        CheckNewAnswer(("init_2", "getList", 100.0)),
+
+        AddData(inputData, InitInputRow("init_1", "getCount", 40.0)),
+        CheckNewAnswer(("init_1", "getCount", 1.0)),
+        AddData(inputData, InitInputRow("init_2", "getCount", 100.0)),
+        CheckNewAnswer(("init_2", "getCount", 1.0)),
+
+        // Update row with key in initial row will work
+        AddData(inputData, InitInputRow("init_1", "update", 50.0)),
+        AddData(inputData, InitInputRow("init_1", "getOption", -1.0)),
+        CheckNewAnswer(("init_1", "getOption", 50.0)),
+        AddData(inputData, InitInputRow("init_1", "remove", -1.0)),
+        AddData(inputData, InitInputRow("init_1", "getOption", -1.0)),
+        CheckNewAnswer(("init_1", "getOption", -1.0)),
+
+        AddData(inputData, InitInputRow("init_1", "appendList", 50.0)),
+        AddData(inputData, InitInputRow("init_1", "getList", -1.0)),
+        CheckNewAnswer(("init_1", "getList", 50.0), ("init_1", "getList", 40.0)),
+
+        AddData(inputData, InitInputRow("init_1", "incCount", 40.0)),
+        AddData(inputData, InitInputRow("init_1", "getCount", 40.0)),
+        CheckNewAnswer(("init_1", "getCount", 2.0)),
+
+        // test remove
+        AddData(inputData, InitInputRow("k1", "remove", -1.0)),
+        AddData(inputData, InitInputRow("k1", "getOption", -1.0)),
+        CheckNewAnswer(("k1", "getOption", -1.0)),
+
+        AddData(inputData, InitInputRow("init_1", "clearCount", -1.0)),
+        AddData(inputData, InitInputRow("init_1", "getCount", -1.0)),
+        CheckNewAnswer(("init_1", "getCount", 0.0)),
+
+        AddData(inputData, InitInputRow("init_1", "clearList", -1.0)),
+        AddData(inputData, InitInputRow("init_1", "getList", -1.0)),
+        CheckNewAnswer()
+      )
+    }
+  }
+
+

Review Comment:
   Nit: extra newline



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1530876311


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -85,23 +94,39 @@ case class TransformWithStateExec(
     }
   }
 
-  override protected def withNewChildInternal(
-    newChild: SparkPlan): TransformWithStateExec = copy(child = newChild)
+  override def left: SparkPlan = child
+
+  override def right: SparkPlan = initialState
+
+  override protected def withNewChildrenInternal(
+      newLeft: SparkPlan, newRight: SparkPlan): TransformWithStateExec =

Review Comment:
   nit: indent ? not sure if this is correct/expected ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "jingz-db (via GitHub)" <gi...@apache.org>.
jingz-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1534431954


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -271,57 +320,111 @@ case class TransformWithStateExec(
       case _ =>
     }
 
-    if (isStreaming) {
-      child.execute().mapPartitionsWithStateStore[InternalRow](
+    if (hasInitialState) {
+      val storeConf = new StateStoreConf(session.sqlContext.sessionState.conf)
+      val hadoopConfBroadcast =

Review Comment:
   We will also need this for `StateStore.get` here: [https://github.com/apache/spark/blob/40465b6760fb120c9cc3ac1a4ee42a82843f4bc5/sql/[…]ache/spark/sql/execution/streaming/TransformWithStateExec.scala](https://github.com/apache/spark/blob/40465b6760fb120c9cc3ac1a4ee42a82843f4bc5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala#L347)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1523668147


##########
sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala:
##########
@@ -665,7 +665,8 @@ class KeyValueGroupedDataset[K, V] private[sql](
       outputMode: OutputMode = OutputMode.Append()): Dataset[U] = {
     Dataset[U](
       sparkSession,
-      TransformWithState[K, V, U](
+      // The last K type is only to silence compiler error

Review Comment:
   Any way to avoid this ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1523670747


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -64,31 +68,52 @@ case class TransformWithStateExec(
     eventTimeWatermarkForLateEvents: Option[Long],
     eventTimeWatermarkForEviction: Option[Long],
     child: SparkPlan,
-    isStreaming: Boolean = true)
-  extends UnaryExecNode with StateStoreWriter with WatermarkSupport with ObjectProducerExec {
+    isStreaming: Boolean = true,
+    hasInitialState: Boolean = false,
+    initialStateGroupingAttrs: Seq[Attribute],
+    initialStateDataAttrs: Seq[Attribute],
+    initialStateDeserializer: Expression,
+    initialState: SparkPlan)
+  extends BinaryExecNode with StateStoreWriter with WatermarkSupport with ObjectProducerExec {
 
   override def shortName: String = "transformWithStateExec"
 
   // TODO: update this to run no-data batches when timer support is added
   override def shouldRunAnotherBatch(newInputWatermark: Long): Boolean = false
 
-  override protected def withNewChildInternal(
-    newChild: SparkPlan): TransformWithStateExec = copy(child = newChild)
+  override def left: SparkPlan = child
+
+  override def right: SparkPlan = initialState
+
+  override protected def withNewChildrenInternal(
+      newLeft: SparkPlan, newRight: SparkPlan): TransformWithStateExec =
+    copy(child = newLeft, initialState = newRight)
 
   override def keyExpressions: Seq[Attribute] = groupingAttributes
 
   protected val schemaForKeyRow: StructType = new StructType().add("key", BinaryType)
 
   protected val schemaForValueRow: StructType = new StructType().add("value", BinaryType)
 
+  /**
+   * Distribute by grouping attributes - We need the underlying data and the initial state data
+   * to have the same grouping so that the data are co-lacated on the same task.
+   */
   override def requiredChildDistribution: Seq[Distribution] = {
-    StatefulOperatorPartitioning.getCompatibleDistribution(groupingAttributes,
-      getStateInfo, conf) ::
+    StatefulOperatorPartitioning.getCompatibleDistribution(
+      groupingAttributes, getStateInfo, conf) ::
+      StatefulOperatorPartitioning.getCompatibleDistribution(

Review Comment:
   nit: indent ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -111,6 +136,26 @@ case class TransformWithStateExec(
     mappedIterator
   }
 
+  private def processInitialStateRows(keyRow: UnsafeRow, initStateIter: Iterator[InternalRow]):
+  Unit = {

Review Comment:
   indent ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on PR #45467:
URL: https://github.com/apache/spark/pull/45467#issuecomment-1995127603

   Error seems relevant on the MIMA checks - 
   ```
   problems with Sql module: 
   method transformWithState(org.apache.spark.sql.streaming.StatefulProcessorWithInitialState,org.apache.spark.sql.streaming.TimeoutMode,org.apache.spark.sql.streaming.OutputMode,org.apache.spark.sql.KeyValueGroupedDataset,org.apache.spark.sql.Encoder,org.apache.spark.sql.Encoder)org.apache.spark.sql.Dataset in class org.apache.spark.sql.KeyValueGroupedDataset does not have a correspondent in client version
   ```
   
   we probably need to update the Connect variants as well


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1530883632


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -127,13 +152,53 @@ case class TransformWithStateExec(
     mappedIterator
   }
 
+  private def processInitialStateRows(
+      keyRow: UnsafeRow,
+      initStateIter: Iterator[InternalRow]): Unit = {
+    val getKeyObj =
+      ObjectOperator.deserializeRowToObject(keyDeserializer, groupingAttributes)
+
+    val getStateValueObj =
+      ObjectOperator.deserializeRowToObject(initialStateDeserializer, initialStateDataAttrs)
+
+    val keyObj = getKeyObj(keyRow) // convert key to objects
+    ImplicitGroupingKeyTracker.setImplicitKey(keyObj)
+    val initStateObjIter = initStateIter.map(getStateValueObj.apply)
+
+    initStateObjIter.foreach { initState =>
+      statefulProcessor
+        .asInstanceOf[StatefulProcessorWithInitialState[Any, Any, Any, Any]]
+        .handleInitialState(keyObj, initState)
+    }
+    ImplicitGroupingKeyTracker.removeImplicitKey()
+  }
+
   private def processNewData(dataIter: Iterator[InternalRow]): Iterator[InternalRow] = {
     val groupedIter = GroupedIterator(dataIter, groupingAttributes, child.output)
     groupedIter.flatMap { case (keyRow, valueRowIter) =>
       val keyUnsafeRow = keyRow.asInstanceOf[UnsafeRow]
       handleInputRows(keyUnsafeRow, valueRowIter)
     }
   }
+// TODO double check this

Review Comment:
   nit: whats pending here ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1530878089


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -127,13 +152,53 @@ case class TransformWithStateExec(
     mappedIterator
   }
 
+  private def processInitialStateRows(
+      keyRow: UnsafeRow,
+      initStateIter: Iterator[InternalRow]): Unit = {
+    val getKeyObj =
+      ObjectOperator.deserializeRowToObject(keyDeserializer, groupingAttributes)
+
+    val getStateValueObj =

Review Comment:
   `getInitialStateValueObj` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "jingz-db (via GitHub)" <gi...@apache.org>.
jingz-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1531068752


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##########
@@ -127,13 +152,53 @@ case class TransformWithStateExec(
     mappedIterator
   }
 
+  private def processInitialStateRows(
+      keyRow: UnsafeRow,
+      initStateIter: Iterator[InternalRow]): Unit = {
+    val getKeyObj =
+      ObjectOperator.deserializeRowToObject(keyDeserializer, groupingAttributes)
+
+    val getStateValueObj =
+      ObjectOperator.deserializeRowToObject(initialStateDeserializer, initialStateDataAttrs)
+
+    val keyObj = getKeyObj(keyRow) // convert key to objects
+    ImplicitGroupingKeyTracker.setImplicitKey(keyObj)
+    val initStateObjIter = initStateIter.map(getStateValueObj.apply)
+
+    initStateObjIter.foreach { initState =>
+      statefulProcessor
+        .asInstanceOf[StatefulProcessorWithInitialState[Any, Any, Any, Any]]
+        .handleInitialState(keyObj, initState)
+    }
+    ImplicitGroupingKeyTracker.removeImplicitKey()
+  }
+
   private def processNewData(dataIter: Iterator[InternalRow]): Iterator[InternalRow] = {
     val groupedIter = GroupedIterator(dataIter, groupingAttributes, child.output)
     groupedIter.flatMap { case (keyRow, valueRowIter) =>
       val keyUnsafeRow = keyRow.asInstanceOf[UnsafeRow]
       handleInputRows(keyUnsafeRow, valueRowIter)
     }
   }
+// TODO double check this
+  private def processNewDataWithInitialState(
+      dataIter: Iterator[InternalRow],
+      initStateIter: Iterator[InternalRow]): Iterator[InternalRow] = {
+
+    val groupedChildDataIter = GroupedIterator(dataIter, groupingAttributes, child.output)
+    val groupedInitialStateIter =
+      GroupedIterator(initStateIter, initialStateGroupingAttrs, initialState.output)
+
+    // Create a CoGroupedIterator that will group the two iterators together for every key group.
+    new CoGroupedIterator(
+      groupedChildDataIter, groupedInitialStateIter, groupingAttributes).flatMap {
+      case (keyRow, valueRowIter, initialStateRowIter) =>
+        // TODO in design doc: trying to re-initialize state for the same
+        // grouping key will result in an error?

Review Comment:
   Not sure if I understand the scenario correctly, does it mean user tries to reassign state variable values in user defined function `handleInitialState()`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1533098209


##########
common/utils/src/main/resources/error/error-classes.json:
##########
@@ -3503,6 +3503,12 @@
     ],
     "sqlState" : "42802"
   },
+  "STATEFUL_PROCESSOR_CANNOT_REINITIALIZE_STATE_ON_KEY" : {
+    "message" : [
+      "Cannot re-initialize state on the same grouping key during initial state handling for stateful processor. Invalid grouping key = <groupingKey>."

Review Comment:
   nit: remove the space before/after equal
   
   ```
   Invalid grouping key=<groupingKey>
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS][SPARK-47363] Initial State without state reader implementation for State API v2. [spark]

Posted by "jingz-db (via GitHub)" <gi...@apache.org>.
jingz-db commented on code in PR #45467:
URL: https://github.com/apache/spark/pull/45467#discussion_r1523797687


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInitialStateSuite.scala:
##########
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.{Encoders, KeyValueGroupedDataset}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.{AlsoTestWithChangelogCheckpointingEnabled, RocksDBStateStoreProvider, StateStoreMultipleColumnFamiliesNotSupportedException}
+import org.apache.spark.sql.internal.SQLConf
+
+case class InitInputRow(key: String, action: String, value: Double)
+
+class StatefulProcessorWithInitialStateTestClass extends StatefulProcessorWithInitialState[
+    String, InitInputRow, (String, String, Double), (String, Double)] {
+  @transient var _valState: ValueState[Double] = _
+  @transient var _listState: ListState[Double] = _
+  @transient var _mapState: MapState[Double, Int] = _
+
+  override def handleInitialState(
+      key: String,
+      initialState: (String, Double)): Unit = {
+    val initStateVal = initialState._2
+    _valState.update(initStateVal)

Review Comment:
   Not sure if I understand you correctly, do you mean we should have a test case where `initialState` is a case class, and inside `handleInitialState`, we update the value for listState/mapState variable?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47363][SS] Initial State without state reader implementation for State API v2. [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #45467:
URL: https://github.com/apache/spark/pull/45467#issuecomment-2024453733

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47363][SS] Initial State without state reader implementation for State API v2. [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #45467:
URL: https://github.com/apache/spark/pull/45467#issuecomment-2024453641

   CI failure isn't related - only pyspark-connect failed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org