You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "jingz-db (via GitHub)" <gi...@apache.org> on 2024/02/29 23:12:37 UTC

[PR] [SS] Add MapState implementation for State API v2. [spark]

jingz-db opened a new pull request, #45341:
URL: https://github.com/apache/spark/pull/45341

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html
     2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
     7. If you want to add a new configuration, please read the guideline first for naming configurations in
        'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'.
     8. If you want to add or modify an error type or message, please read the guideline first in
        'core/src/main/resources/error/README.md'.
   -->
   
   ### What changes were proposed in this pull request?
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. 
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     2. If you fix some SQL features, you can provide some references of other DBMSes.
     3. If there is design documentation, please add the link.
     4. If there is a discussion in the mailing list, please add the link.
   -->
   This PR adds changes for MapState implementation in State Api v2. This implementation adds a new encoder/decoder to encode grouping key and user key into a composite key to be put into RocksDB so that we could retrieve key-value pair by user specified user key by one rocksdb get.
   
   ### Why are the changes needed?
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   These changes are needed to support map values in the State Store. The changes are part of the work around adding new stateful streaming operator for arbitrary state mgmt that provides a bunch of new features listed in the SPIP JIRA here - https://issues.apache.org/jira/browse/SPARK-45939
   
   ### Does this PR introduce _any_ user-facing change?
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   Yes
   This PR introduces a new state type (MapState) that users can use in their Spark streaming queries.
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks.
   -->
   Unit tests in `TransforWithMapStateSuite`.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   <!--
   If generative AI tooling has been used in the process of authoring this patch, please include the
   phrase: 'Generated-by: ' followed by the name of the tool and its version.
   If no, write 'No'.
   Please refer to the [ASF Generative Tooling Guidance](https://www.apache.org/legal/generative-tooling.html) for details.
   -->
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1512013529


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -29,6 +29,14 @@ sealed trait RocksDBKeyStateEncoder {
   def extractPrefixKey(key: UnsafeRow): UnsafeRow
   def encodeKey(row: UnsafeRow): Array[Byte]
   def decodeKey(keyBytes: Array[Byte]): UnsafeRow
+
+  def encodeCompositeKey(groupingKeyRow: UnsafeRow, userKeyRow: UnsafeRow): Array[Byte] = {

Review Comment:
   Yes - could we please do this ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1513480414


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateTypesEncoderUtils.scala:
##########
@@ -66,6 +68,38 @@ class StateTypesEncoder[GK](
     keyRow
   }
 
+  def encodeCompositeKey[K](userKey: K, userKeyEnc: Encoder[K]): UnsafeRow = {
+    val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+    if (keyOption.isEmpty) {
+      throw StateStoreErrors.implicitKeyNotFound(stateName)
+    }
+    val groupingKey = keyOption.get.asInstanceOf[GK]
+    // get grouping key byte array
+    val keyByteArr = keySerializer.apply(groupingKey).asInstanceOf[UnsafeRow].getBytes()
+    // get user key byte array
+    val userKeySerializer = encoderFor(userKeyEnc).createSerializer()

Review Comment:
   Could we reuse this instead of creating new one each time ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateTypesEncoderUtils.scala:
##########
@@ -66,6 +68,38 @@ class StateTypesEncoder[GK](
     keyRow
   }
 
+  def encodeCompositeKey[K](userKey: K, userKeyEnc: Encoder[K]): UnsafeRow = {
+    val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+    if (keyOption.isEmpty) {
+      throw StateStoreErrors.implicitKeyNotFound(stateName)
+    }
+    val groupingKey = keyOption.get.asInstanceOf[GK]
+    // get grouping key byte array
+    val keyByteArr = keySerializer.apply(groupingKey).asInstanceOf[UnsafeRow].getBytes()
+    // get user key byte array
+    val userKeySerializer = encoderFor(userKeyEnc).createSerializer()
+    val userKeyBytesArr = userKeySerializer.apply(userKey).asInstanceOf[UnsafeRow].getBytes()
+
+    val schemaForCompositeKeyRow: StructType =

Review Comment:
   Same here



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateTypesEncoderUtils.scala:
##########
@@ -66,6 +68,38 @@ class StateTypesEncoder[GK](
     keyRow
   }
 
+  def encodeCompositeKey[K](userKey: K, userKeyEnc: Encoder[K]): UnsafeRow = {
+    val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+    if (keyOption.isEmpty) {
+      throw StateStoreErrors.implicitKeyNotFound(stateName)
+    }
+    val groupingKey = keyOption.get.asInstanceOf[GK]
+    // get grouping key byte array
+    val keyByteArr = keySerializer.apply(groupingKey).asInstanceOf[UnsafeRow].getBytes()
+    // get user key byte array
+    val userKeySerializer = encoderFor(userKeyEnc).createSerializer()
+    val userKeyBytesArr = userKeySerializer.apply(userKey).asInstanceOf[UnsafeRow].getBytes()
+
+    val schemaForCompositeKeyRow: StructType =
+      new StructType()
+        .add("key", BinaryType)
+        .add("userKey", BinaryType)
+
+    val compositeKeyProjection = UnsafeProjection.create(schemaForCompositeKeyRow)

Review Comment:
   Same here as well



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS] Add MapState implementation for State API v2. [spark]

Posted by "jingz-db (via GitHub)" <gi...@apache.org>.
jingz-db commented on PR #45341:
URL: https://github.com/apache/spark/pull/45341#issuecomment-1977336528

   Thanks Eric for reviews on my old PR. I've resolved them and incorporated in this one already.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1513480749


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateTypesEncoderUtils.scala:
##########
@@ -66,6 +68,38 @@ class StateTypesEncoder[GK](
     keyRow
   }
 
+  def encodeCompositeKey[K](userKey: K, userKeyEnc: Encoder[K]): UnsafeRow = {
+    val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+    if (keyOption.isEmpty) {
+      throw StateStoreErrors.implicitKeyNotFound(stateName)
+    }
+    val groupingKey = keyOption.get.asInstanceOf[GK]
+    // get grouping key byte array
+    val keyByteArr = keySerializer.apply(groupingKey).asInstanceOf[UnsafeRow].getBytes()
+    // get user key byte array
+    val userKeySerializer = encoderFor(userKeyEnc).createSerializer()
+    val userKeyBytesArr = userKeySerializer.apply(userKey).asInstanceOf[UnsafeRow].getBytes()
+
+    val schemaForCompositeKeyRow: StructType =
+      new StructType()
+        .add("key", BinaryType)
+        .add("userKey", BinaryType)
+
+    val compositeKeyProjection = UnsafeProjection.create(schemaForCompositeKeyRow)
+    val compositeKeyRow = compositeKeyProjection(InternalRow(keyByteArr, userKeyBytesArr))
+    compositeKeyRow
+  }
+
+  def decodeCompositeKey[K](row: UnsafeRow, userKeyEnc: Encoder[K]): K = {
+    val bytes = row.getBinary(1)
+    val reuseRow = new UnsafeRow(userKeyEnc.schema.fields.length)
+    reuseRow.pointTo(bytes, bytes.length)
+    val valExpressionEnc = encoderFor(userKeyEnc)
+    val rowToObjDeserializer = valExpressionEnc.resolveAndBind().createDeserializer()

Review Comment:
   Same here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on PR #45341:
URL: https://github.com/apache/spark/pull/45341#issuecomment-1981548558

   @jingz-db - test failure seems related ? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS] Add MapState implementation for State API v2. [spark]

Posted by "jingz-db (via GitHub)" <gi...@apache.org>.
jingz-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1511693823


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -60,13 +60,25 @@ trait ReadStateStore {
   /** Version of the data in this store before committing updates. */
   def version: Long
 
+  /** Whether composite key is used for state store. */
+  var useCompositeKey: Boolean = false

Review Comment:
   This is an ugly implementation. We may consider use similar ways as `useMultipleValuesPerKey`. But we may get lost in `TransformWithStateExec` in calling `createAndInit`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS] Add MapState implementation for State API v2. [spark]

Posted by "jingz-db (via GitHub)" <gi...@apache.org>.
jingz-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1511697862


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -29,6 +29,14 @@ sealed trait RocksDBKeyStateEncoder {
   def extractPrefixKey(key: UnsafeRow): UnsafeRow
   def encodeKey(row: UnsafeRow): Array[Byte]
   def decodeKey(keyBytes: Array[Byte]): UnsafeRow
+
+  def encodeCompositeKey(groupingKeyRow: UnsafeRow, userKeyRow: UnsafeRow): Array[Byte] = {

Review Comment:
   We can also change the key schema - let key schema column 1 = grouping key, column 2 = user key, and pass this new key schema to `PrefixKeyScanStateEncoder`, and remove all these compositeKey interfaces to reuse Jungteak's prefix functions.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1513618200


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala:
##########
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.util.UUID
+
+import scala.util.Random
+
+import org.apache.hadoop.conf.Configuration
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.SparkIllegalArgumentException
+import org.apache.spark.sql.Encoders
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, MemoryStream, StatefulProcessorHandleImpl}
+import org.apache.spark.sql.execution.streaming.state.{RocksDBStateStoreProvider, StateStore, StateStoreConf, StateStoreId, StateStoreProvider, StateStoreTestsHelper}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{BinaryType, StructType}
+
+case class InputMapRow(key: String, action: String, value: (String, String))
+
+class TestMapStateProcessor
+  extends StatefulProcessor[String, InputMapRow, (String, String, String)] {
+
+  @transient var _mapState: MapState[String, String] = _
+
+  override def init(outputMode: OutputMode): Unit = {
+    _mapState = getHandle.getMapState("sessionState", Encoders.STRING)
+  }
+
+  override def handleInputRows(
+    key: String,
+    inputRows: Iterator[InputMapRow],
+    timerValues: TimerValues): Iterator[(String, String, String)] = {
+
+    var output = List[(String, String, String)]()
+
+    for (row <- inputRows) {
+      if (row.action == "exists") {
+        output = (key, "exists", _mapState.exists().toString) :: output
+      } else if (row.action == "getValue") {
+        output = (key, row.value._1, _mapState.getValue(row.value._1)) :: output
+      } else if (row.action == "containsKey") {
+        output = (key, row.value._1,
+          if (_mapState.containsKey(row.value._1)) "true" else "false") :: output
+      } else if (row.action == "updateValue") {
+        _mapState.updateValue(row.value._1, row.value._2)
+      } else if (row.action == "getMap") {
+        val res = _mapState.getMap()
+        res.foreach { pair =>
+          output = (key, pair._1, pair._2) :: output
+        }
+      } else if (row.action == "getKeys") {
+        _mapState.getKeys().foreach { key =>
+          output = (row.key, key, row.value._2) :: output
+        }
+      } else if (row.action == "getValues") {
+        _mapState.getValues().foreach { value =>
+          output = (row.key, row.value._1, value) :: output
+        }
+      } else if (row.action == "removeKey") {
+        _mapState.removeKey(row.value._1)
+      } else if (row.action == "clear") {
+        _mapState.clear()
+      }
+    }
+    output.iterator
+  }
+
+  override def close(): Unit = {}
+}
+
+/** Pure unit tests for MapState */
+class MapStateSuite extends SharedSparkSession

Review Comment:
   Could we move this to a new file please ? Thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1513544356


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkIllegalArgumentException
+import org.apache.spark.sql.Encoders
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
+import org.apache.spark.sql.internal.SQLConf
+
+case class InputMapRow(key: String, action: String, value: (String, String))
+
+class TestMapStateProcessor

Review Comment:
   Could we also add pure unit tests similar to ValueStateSuite - https://github.com/apache/spark/blob/master/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala ? Thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1520713399


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala:
##########
@@ -73,24 +74,31 @@ class MapStateImpl[K, V](
   }
 
   /** Get the map associated with grouping key */
-  override def getMap(): Map[K, V] = {
+  override def getMap(): Iterator[(K, V)] = {
     val encodedGroupingKey = stateTypesEncoder.encodeGroupingKey()
-    store.prefixScan(encodedGroupingKey, stateName)
-      .map {
-        case iter: UnsafeRowPair =>
-          (stateTypesEncoder.decodeCompositeKey(iter.key),
-            stateTypesEncoder.decodeValue(iter.value))
-      }.toMap
+    val pairsIterator = store.prefixScan(encodedGroupingKey, stateName)
+
+    new Iterator[(K, V)] {

Review Comment:
   It's OK to defer the change of method name in follow up PR if we want to have a time to figure out the best name. The first one is something good to do this before merging the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "jingz-db (via GitHub)" <gi...@apache.org>.
jingz-db commented on PR #45341:
URL: https://github.com/apache/spark/pull/45341#issuecomment-1992613952

   Test suite failure seems unrelated in pyspark.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "jingz-db (via GitHub)" <gi...@apache.org>.
jingz-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1521842235


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala:
##########
@@ -73,24 +74,31 @@ class MapStateImpl[K, V](
   }
 
   /** Get the map associated with grouping key */
-  override def getMap(): Map[K, V] = {
+  override def getMap(): Iterator[(K, V)] = {
     val encodedGroupingKey = stateTypesEncoder.encodeGroupingKey()
-    store.prefixScan(encodedGroupingKey, stateName)
-      .map {
-        case iter: UnsafeRowPair =>
-          (stateTypesEncoder.decodeCompositeKey(iter.key),
-            stateTypesEncoder.decodeValue(iter.value))
-      }.toMap
+    val pairsIterator = store.prefixScan(encodedGroupingKey, stateName)
+
+    new Iterator[(K, V)] {

Review Comment:
   Thanks Jungtaek! As discussed, I changed `getMap()` name to `iterator` and pair it with `getKeys()`, `getValues()` to `keys()` and `values()` accordingly.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1513485319


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.Encoders
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
+import org.apache.spark.sql.internal.SQLConf
+
+case class InputMapRow(key: String, action: String, value: (String, String))
+
+class TestMapStateProcessor
+  extends StatefulProcessor[String, InputMapRow, (String, String, String)] {
+
+  @transient var _processorHandle: StatefulProcessorHandle = _
+  @transient var _mapState: MapState[String, String] = _
+
+  override def init(handle: StatefulProcessorHandle, outputMode: OutputMode): Unit = {
+    _processorHandle = handle
+    _mapState = handle.getMapState("sessionState", Encoders.STRING)
+  }
+
+  override def handleInputRows(
+    key: String,
+    inputRows: Iterator[InputMapRow],
+    timerValues: TimerValues): Iterator[(String, String, String)] = {
+
+    var output = List[(String, String, String)]()
+
+    for (row <- inputRows) {
+      if (row.action == "exists") {
+        output = (key, "exists", _mapState.exists().toString) :: output
+      } else if (row.action == "getValue") {
+        output = (key, row.value._1, _mapState.getValue(row.value._1)) :: output
+      } else if (row.action == "containsKey") {
+        output = (key, row.value._1,
+          if (_mapState.containsKey(row.value._1)) "true" else "false") :: output
+      } else if (row.action == "updateValue") {
+        _mapState.updateValue(row.value._1, row.value._2)
+      } else if (row.action == "getMap") {
+        val res = _mapState.getMap()
+        res.foreach { pair =>
+          output = (key, pair._1, pair._2) :: output
+        }
+      } else if (row.action == "getKeys") {
+        _mapState.getKeys().foreach { key =>
+          output = (row.key, key, row.value._2) :: output
+        }
+      } else if (row.action == "getValues") {
+        _mapState.getValues().foreach { value =>
+          output = (row.key, row.value._1, value) :: output
+        }
+      } else if (row.action == "removeKey") {
+        _mapState.removeKey(row.value._1)
+      } else if (row.action == "remove") {
+        _mapState.clear()
+      }
+    }
+    output.iterator
+  }
+
+  override def close(): Unit = {}
+}
+
+class TransformWithMapStateSuite extends StreamTest {
+  import testImplicits._
+
+  private def testMapStateWithNullUserKey(inputMapRow: InputMapRow): Unit = {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName) {
+
+      val inputData = MemoryStream[InputMapRow]
+      val result = inputData.toDS()
+        .groupByKey(x => x.key)
+        .transformWithState(new TestMapStateProcessor(),
+          TimeoutMode.NoTimeouts(),
+          OutputMode.Update())
+
+      testStream(result, OutputMode.Update())(
+        AddData(inputData, inputMapRow),
+        ExpectFailure[SparkException](e => {
+          assert(e.getMessage.contains("User key cannot be null"))
+        })
+      )
+    }
+  }
+
+  test("Test retrieving value with non-exist user key") {

Review Comment:
   Nit: `non-existing` or `absent`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1514975454


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkIllegalArgumentException
+import org.apache.spark.sql.Encoders
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
+import org.apache.spark.sql.internal.SQLConf
+
+case class InputMapRow(key: String, action: String, value: (String, String))
+
+class TestMapStateProcessor
+  extends StatefulProcessor[String, InputMapRow, (String, String, String)] {
+
+  @transient var _mapState: MapState[String, String] = _
+
+  override def init(outputMode: OutputMode): Unit = {
+    _mapState = getHandle.getMapState("sessionState", Encoders.STRING)
+  }
+
+  override def handleInputRows(
+    key: String,
+    inputRows: Iterator[InputMapRow],
+    timerValues: TimerValues): Iterator[(String, String, String)] = {
+
+    var output = List[(String, String, String)]()
+
+    for (row <- inputRows) {
+      if (row.action == "exists") {
+        output = (key, "exists", _mapState.exists().toString) :: output
+      } else if (row.action == "getValue") {
+        output = (key, row.value._1, _mapState.getValue(row.value._1)) :: output
+      } else if (row.action == "containsKey") {
+        output = (key, row.value._1,
+          if (_mapState.containsKey(row.value._1)) "true" else "false") :: output
+      } else if (row.action == "updateValue") {
+        _mapState.updateValue(row.value._1, row.value._2)
+      } else if (row.action == "getMap") {
+        val res = _mapState.getMap()
+        res.foreach { pair =>
+          output = (key, pair._1, pair._2) :: output
+        }
+      } else if (row.action == "getKeys") {
+        _mapState.getKeys().foreach { key =>
+          output = (row.key, key, row.value._2) :: output
+        }
+      } else if (row.action == "getValues") {
+        _mapState.getValues().foreach { value =>
+          output = (row.key, row.value._1, value) :: output
+        }
+      } else if (row.action == "removeKey") {
+        _mapState.removeKey(row.value._1)
+      } else if (row.action == "clear") {
+        _mapState.clear()
+      }
+    }
+    output.iterator
+  }
+
+  override def close(): Unit = {}
+}
+
+class TransformWithMapStateSuite extends StreamTest {
+  import testImplicits._
+
+  private def testMapStateWithNullUserKey(inputMapRow: InputMapRow): Unit = {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName) {
+
+      val inputData = MemoryStream[InputMapRow]
+      val result = inputData.toDS()
+        .groupByKey(x => x.key)
+        .transformWithState(new TestMapStateProcessor(),
+          TimeoutMode.NoTimeouts(),
+          OutputMode.Update())
+
+      testStream(result, OutputMode.Update())(
+        AddData(inputData, inputMapRow),
+        ExpectFailure[SparkIllegalArgumentException](e => {
+          assert(e.getMessage.contains("ILLEGAL_STATE_STORE_VALUE.NULL_VALUE"))
+        })
+      )
+    }
+  }
+
+  test("Test retrieving value with non-existing user key") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName) {
+
+      val inputData = MemoryStream[InputMapRow]
+      val result = inputData.toDS()
+        .groupByKey(x => x.key)
+        .transformWithState(new TestMapStateProcessor(),
+          TimeoutMode.NoTimeouts(),
+          OutputMode.Update())
+
+      testStream(result, OutputMode.Update())(
+        AddData(inputData, InputMapRow("k1", "getValue", ("v1", ""))),
+        CheckAnswer(("k1", "v1", null))
+      )
+    }
+  }
+
+  Seq("getValue", "containsKey", "updateValue", "removeKey").foreach { mapImplFunc =>
+    test(s"Test $mapImplFunc with null user key") {
+      testMapStateWithNullUserKey(InputMapRow("k1", mapImplFunc, (null, "")))
+    }
+  }
+
+  test("Test put value with null value") {

Review Comment:
   Can we add a test for batch query using `mapState` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #45341:
URL: https://github.com/apache/spark/pull/45341#issuecomment-1992673425

   Yeah, doesn't look to be related. I'm ignoring the CI error.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on PR #45341:
URL: https://github.com/apache/spark/pull/45341#issuecomment-1992674919

   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR closed pull request #45341: [SPARK-47272][SS] Add MapState implementation for State API v2. 
URL: https://github.com/apache/spark/pull/45341


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS] Add MapState implementation for State API v2. [spark]

Posted by "jingz-db (via GitHub)" <gi...@apache.org>.
jingz-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1511697862


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -29,6 +29,14 @@ sealed trait RocksDBKeyStateEncoder {
   def extractPrefixKey(key: UnsafeRow): UnsafeRow
   def encodeKey(row: UnsafeRow): Array[Byte]
   def decodeKey(keyBytes: Array[Byte]): UnsafeRow
+
+  def encodeCompositeKey(groupingKeyRow: UnsafeRow, userKeyRow: UnsafeRow): Array[Byte] = {

Review Comment:
   We can also pass in a two-column key schema to `PrefixKeyScanStateEncoder`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1512002599


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.execution.streaming.state.{StateStore, UnsafeRowPair}
+import org.apache.spark.sql.streaming.MapState
+import org.apache.spark.sql.types.{BinaryType, StructType}
+
+class MapStateImpl[K, V](
+    store: StateStore,
+    stateName: String,
+    keyExprEnc: ExpressionEncoder[Any]) extends MapState[K, V] with Logging {
+
+  store.setUseCompositeKey()
+
+  private val schemaForKeyRow: StructType = new StructType().add("key", BinaryType)

Review Comment:
   Can we not encode the grouping key and user key together here ?



##########
sql/api/src/main/scala/org/apache/spark/sql/streaming/MapState.scala:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.annotation.{Evolving, Experimental}
+
+@Experimental
+@Evolving
+/**
+ * Interface used for arbitrary stateful operations with the v2 API to capture
+ * map value state.
+ */
+trait MapState[K, V] extends Serializable {
+  /** Whether state exists or not. */
+  def exists(): Boolean
+
+  /** Get the state value if it exists */
+  def getValue(key: K): V
+
+  /** Check if the user key is contained in the map */
+  def containsKey(key: K): Boolean
+
+  /** Update value for given user key */
+  def updateValue(key: K, value: V) : Unit
+
+  /** Get the map associated with grouping key */
+  def getMap(): Map[K, V]
+
+  /** Get the list of keys present in map associated with grouping key */
+  def getKeys(): Iterator[K]
+
+  /** Get the list of values present in map associated with grouping key */
+  def getValues(): Iterator[V]
+
+  /** Remove user key from map state */
+  def removeKey(key: K): Unit
+
+  /** Remove this state. */
+  def remove(): Unit

Review Comment:
   Can we rename this to `clear` ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateTypesEncoderUtils.scala:
##########
@@ -66,6 +66,14 @@ class StateTypesEncoder[GK](
     keyRow
   }
 
+  def encodeUserKey[K](userKey: K): UnsafeRow = {
+    val schemaForKeyRow: StructType = new StructType().add("userKey", BinaryType)
+    val keyByteArr = SerializationUtils.serialize(userKey.asInstanceOf[Serializable])

Review Comment:
   Lets serialize using Spark SQL here ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -84,6 +84,13 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
       map.iterator()
     }
 
+    override def getWithCompositeKey(

Review Comment:
   Not sure I understand why we need to update the store providers themselves ? Can't we just use an updated schema that encodes the required information within the key



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS] Add MapState implementation for State API v2. [spark]

Posted by "jingz-db (via GitHub)" <gi...@apache.org>.
jingz-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1511697862


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -29,6 +29,14 @@ sealed trait RocksDBKeyStateEncoder {
   def extractPrefixKey(key: UnsafeRow): UnsafeRow
   def encodeKey(row: UnsafeRow): Array[Byte]
   def decodeKey(keyBytes: Array[Byte]): UnsafeRow
+
+  def encodeCompositeKey(groupingKeyRow: UnsafeRow, userKeyRow: UnsafeRow): Array[Byte] = {

Review Comment:
   We can also pass in a two-column key schema to `PrefixKeyScanStateEncoder`, and remove all these compositeKey interfaces.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS] Add MapState implementation for State API v2. [spark]

Posted by "jingz-db (via GitHub)" <gi...@apache.org>.
jingz-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1511697862


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -29,6 +29,14 @@ sealed trait RocksDBKeyStateEncoder {
   def extractPrefixKey(key: UnsafeRow): UnsafeRow
   def encodeKey(row: UnsafeRow): Array[Byte]
   def decodeKey(keyBytes: Array[Byte]): UnsafeRow
+
+  def encodeCompositeKey(groupingKeyRow: UnsafeRow, userKeyRow: UnsafeRow): Array[Byte] = {

Review Comment:
   We can also change the key schema to `PrefixKeyScanStateEncoder`, and remove all these compositeKey interfaces.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on PR #45341:
URL: https://github.com/apache/spark/pull/45341#issuecomment-1982166978

   @jingz-db - could you please fix this style error ?
   
   ```
   [info] compiling 1 Scala source to /home/runner/work/spark/spark/tools/target/scala-2.13/classes ...
   [error] /home/runner/work/spark/spark/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala:195:0: Whitespace at end of line
   [info]   Compilation completed in 14.316s.
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1514974624


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala:
##########
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkIllegalArgumentException
+import org.apache.spark.sql.Encoders
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
+import org.apache.spark.sql.internal.SQLConf
+
+case class InputMapRow(key: String, action: String, value: (String, String))

Review Comment:
   Could we add a class level comment describing what this test suite does ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS] Add MapState implementation for State API v2. [spark]

Posted by "jingz-db (via GitHub)" <gi...@apache.org>.
jingz-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1511697862


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateEncoder.scala:
##########
@@ -29,6 +29,14 @@ sealed trait RocksDBKeyStateEncoder {
   def extractPrefixKey(key: UnsafeRow): UnsafeRow
   def encodeKey(row: UnsafeRow): Array[Byte]
   def decodeKey(keyBytes: Array[Byte]): UnsafeRow
+
+  def encodeCompositeKey(groupingKeyRow: UnsafeRow, userKeyRow: UnsafeRow): Array[Byte] = {

Review Comment:
   We can also change the key schema - let key schema column 1 = grouping key, column 2 = user key, and pass this new key schema to `PrefixKeyScanStateEncoder`, and remove all these compositeKey interfaces.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1513485844


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.Encoders
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
+import org.apache.spark.sql.internal.SQLConf
+
+case class InputMapRow(key: String, action: String, value: (String, String))
+
+class TestMapStateProcessor
+  extends StatefulProcessor[String, InputMapRow, (String, String, String)] {
+
+  @transient var _processorHandle: StatefulProcessorHandle = _
+  @transient var _mapState: MapState[String, String] = _
+
+  override def init(handle: StatefulProcessorHandle, outputMode: OutputMode): Unit = {

Review Comment:
   You probably need to merge back from master to update this ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1513559241


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Encoder
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreErrors, UnsafeRowPair}
+import org.apache.spark.sql.streaming.MapState
+import org.apache.spark.sql.types.{BinaryType, StructType}
+
+class MapStateImpl[K, V](
+    store: StateStore,
+    stateName: String,
+    keyExprEnc: ExpressionEncoder[Any],
+    userKeyExprEnc: Encoder[K]) extends MapState[K, V] with Logging {
+
+  // Pack grouping key and user key together as a prefixed composite key
+  private val schemaForCompositeKeyRow: StructType =
+    new StructType()
+    .add("key", BinaryType)
+    .add("userKey", BinaryType)
+  private val schemaForValueRow: StructType = new StructType().add("value", BinaryType)
+  private val keySerializer = keyExprEnc.createSerializer()
+  private val stateTypesEncoder = CompositeKeyStateEncoder(keySerializer, stateName, userKeyExprEnc)
+
+  store.createColFamilyIfAbsent(stateName, schemaForCompositeKeyRow, numColsPrefixKey = 1,
+    schemaForValueRow)
+
+  /** Whether state exists or not. */
+  override def exists(): Boolean = {
+    !store.prefixScan(stateTypesEncoder.encodeGroupingKey(), stateName).isEmpty
+  }
+
+  /** Get the state value if it exists */
+  override def getValue(key: K): V = {
+    // TODO do we want to reuse this function,
+    // or create a new error for null user key?

Review Comment:
   Its ok to reuse I think. As far as the user is concerned, they are writing a value out. So should be ok to reuse



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "jingz-db (via GitHub)" <gi...@apache.org>.
jingz-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1515298710


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateTypesEncoderUtils.scala:
##########
@@ -86,3 +88,53 @@ object StateTypesEncoder {
     new StateTypesEncoder[GK](keySerializer, stateName)
   }
 }
+
+class CompositeKeyStateEncoder[GK, K](
+    keySerializer: Serializer[GK],
+    stateName: String,
+    userKeyEnc: Encoder[K])
+  extends StateTypesEncoder[GK](keySerializer: Serializer[GK], stateName: String) {
+
+  private val schemaForCompositeKeyRow: StructType =
+    new StructType()
+      .add("key", BinaryType)
+      .add("userKey", BinaryType)
+  private val compositeKeyProjection = UnsafeProjection.create(schemaForCompositeKeyRow)
+  private val reuseRow = new UnsafeRow(userKeyEnc.schema.fields.length)
+  private val userKeyExpressionEnc = encoderFor(userKeyEnc)
+
+  private val userKeyRowToObjDeserializer =
+    userKeyExpressionEnc.resolveAndBind().createDeserializer()
+  private val userKeySerializer = encoderFor(userKeyEnc).createSerializer()
+
+  def encodeCompositeKey(userKey: K): UnsafeRow = {
+    val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+    if (keyOption.isEmpty) {
+      throw StateStoreErrors.implicitKeyNotFound(stateName)
+    }
+    val groupingKey = keyOption.get.asInstanceOf[GK]
+    // generate grouping key byte array
+    val groupingKeyByteArr = keySerializer.apply(groupingKey).asInstanceOf[UnsafeRow].getBytes()
+    // generate user key byte array
+    val userKeyBytesArr = userKeySerializer.apply(userKey).asInstanceOf[UnsafeRow].getBytes()
+
+    val compositeKeyRow = compositeKeyProjection(InternalRow(groupingKeyByteArr, userKeyBytesArr))
+    compositeKeyRow
+  }
+
+  def decodeCompositeKey(row: UnsafeRow): K = {
+    val bytes = row.getBinary(1)
+    reuseRow.pointTo(bytes, bytes.length)
+    val value = userKeyRowToObjDeserializer.apply(reuseRow)
+    value
+  }
+}
+
+object CompositeKeyStateEncoder {

Review Comment:
   I was following Bhuwan's style in the base class. Maybe I am missing something but did not find anything useful in the [style guide](https://github.com/databricks/scala-style-guide).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1520711461


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala:
##########
@@ -73,24 +74,31 @@ class MapStateImpl[K, V](
   }
 
   /** Get the map associated with grouping key */
-  override def getMap(): Map[K, V] = {
+  override def getMap(): Iterator[(K, V)] = {
     val encodedGroupingKey = stateTypesEncoder.encodeGroupingKey()
-    store.prefixScan(encodedGroupingKey, stateName)
-      .map {
-        case iter: UnsafeRowPair =>
-          (stateTypesEncoder.decodeCompositeKey(iter.key),
-            stateTypesEncoder.decodeValue(iter.value))
-      }.toMap
+    val pairsIterator = store.prefixScan(encodedGroupingKey, stateName)
+
+    new Iterator[(K, V)] {

Review Comment:
   maybe final nit: your previous code just works and it seems much simpler. We just don't call `.toMap` and done. 
   
   Also maybe the method name to remove the map from the name? 
   
   * `getIterator` - consistent with getKeys/getValues
   * `iterator` - consistent with Map collection. need to change other methods as well, e.g. getKeys to keys, getValues to values
   * `get()` - consistent with other type of state. The value of origin type is retrieved with get() consistently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1520688468


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MapStateSuite.scala:
##########
@@ -67,6 +67,7 @@ class MapStateSuite extends StateVariableSuiteBase {
       assert(!testState.exists())
       assert(testState.getMap().hasNext === false)
     }
+    ImplicitGroupingKeyTracker.removeImplicitKey()

Review Comment:
   Maybe just do this in setup (before) or teardown (after) in StateVariableSuiteBase, to ensure the cleanup is guaranteed to be done - as failure in test A won't clean up the thread local and introduce another failure. We do like to avoid cascading failures.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "jingz-db (via GitHub)" <gi...@apache.org>.
jingz-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1520256936


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Encoder
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreErrors, UnsafeRowPair}
+import org.apache.spark.sql.streaming.MapState
+import org.apache.spark.sql.types.{BinaryType, StructType}
+
+class MapStateImpl[K, V](
+    store: StateStore,
+    stateName: String,
+    keyExprEnc: ExpressionEncoder[Any],
+    userKeyExprEnc: Encoder[K]) extends MapState[K, V] with Logging {
+
+  // Pack grouping key and user key together as a prefixed composite key
+  private val schemaForCompositeKeyRow: StructType =
+    new StructType()
+    .add("key", BinaryType)
+    .add("userKey", BinaryType)
+  private val schemaForValueRow: StructType = new StructType().add("value", BinaryType)
+  private val keySerializer = keyExprEnc.createSerializer()
+  private val stateTypesEncoder = CompositeKeyStateEncoder(
+    keySerializer, schemaForCompositeKeyRow, stateName, userKeyExprEnc)
+
+  store.createColFamilyIfAbsent(stateName, schemaForCompositeKeyRow, numColsPrefixKey = 1,
+    schemaForValueRow)
+
+  /** Whether state exists or not. */
+  override def exists(): Boolean = {
+    !store.prefixScan(stateTypesEncoder.encodeGroupingKey(), stateName).isEmpty
+  }
+
+  /** Get the state value if it exists */
+  override def getValue(key: K): V = {
+    StateStoreErrors.requireNonNullStateValue(key, stateName)
+    val encodedCompositeKey = stateTypesEncoder.encodeCompositeKey(key)
+    val unsafeRowValue = store.get(encodedCompositeKey, stateName)
+
+    if (unsafeRowValue == null) return null.asInstanceOf[V]
+    stateTypesEncoder.decodeValue(unsafeRowValue)
+  }
+
+  /** Check if the user key is contained in the map */
+  override def containsKey(key: K): Boolean = {
+    StateStoreErrors.requireNonNullStateValue(key, stateName)
+    getValue(key) != null
+  }
+
+  /** Update value for given user key */
+  override def updateValue(key: K, value: V): Unit = {
+    StateStoreErrors.requireNonNullStateValue(key, stateName)
+    StateStoreErrors.requireNonNullStateValue(value, stateName)
+    val encodedValue = stateTypesEncoder.encodeValue(value)
+    val encodedCompositeKey = stateTypesEncoder.encodeCompositeKey(key)
+    store.put(encodedCompositeKey, encodedValue, stateName)
+  }
+
+  /** Get the map associated with grouping key */
+  override def getMap(): Map[K, V] = {
+    val encodedGroupingKey = stateTypesEncoder.encodeGroupingKey()
+    store.prefixScan(encodedGroupingKey, stateName)
+      .map {
+        case iter: UnsafeRowPair =>
+          (stateTypesEncoder.decodeCompositeKey(iter.key),

Review Comment:
   Thanks for the input! Not sure if I understand you correctly, are you trying to say that: we want to return`Iterator` instead of `Map` to reduce the copy, and we need to use different reused rows for key/value in `StateTypesEncoder`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SS] Add MapState implementation for State API v2. [spark]

Posted by "jingz-db (via GitHub)" <gi...@apache.org>.
jingz-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1511693823


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##########
@@ -60,13 +60,25 @@ trait ReadStateStore {
   /** Version of the data in this store before committing updates. */
   def version: Long
 
+  /** Whether composite key is used for state store. */
+  var useCompositeKey: Boolean = false

Review Comment:
   This is an ugly implementation. We may consider use similar ways as `useMultipleValuesPerKey`. But we may get lost in `TransformWithStateExec` in calling `createAndInit`? We will need to distinguish between `ListState` and `MapState`, because in `TransformWithStateExec`, `useMultipleValuesPerKey` is by default true. If we want to do similar things and set `useCompositeKey` to true, we might get messed up for calling the right keyEncoder.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "jingz-db (via GitHub)" <gi...@apache.org>.
jingz-db commented on PR #45341:
URL: https://github.com/apache/spark/pull/45341#issuecomment-1981833646

   > @jingz-db - test failure seems related ?
   
   Weirdly is passing locally. Let me resolve your comments and retrigger the CI and see if it still fails. Thanks for the review!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "jingz-db (via GitHub)" <gi...@apache.org>.
jingz-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1513621273


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateTypesEncoderUtils.scala:
##########
@@ -86,3 +88,53 @@ object StateTypesEncoder {
     new StateTypesEncoder[GK](keySerializer, stateName)
   }
 }
+
+class CompositeKeyStateEncoder[GK, K](
+    keySerializer: Serializer[GK],
+    stateName: String,
+    userKeyEnc: Encoder[K])
+  extends StateTypesEncoder[GK](keySerializer: Serializer[GK], stateName: String) {
+
+  private val schemaForCompositeKeyRow: StructType =
+    new StructType()
+      .add("key", BinaryType)
+      .add("userKey", BinaryType)
+  private val compositeKeyProjection = UnsafeProjection.create(schemaForCompositeKeyRow)
+  private val reuseRow = new UnsafeRow(userKeyEnc.schema.fields.length)
+  private val userKeyExpressionEnc = encoderFor(userKeyEnc)
+
+  private val userKeyRowToObjDeserializer =
+    userKeyExpressionEnc.resolveAndBind().createDeserializer()
+  private val userKeySerializer = encoderFor(userKeyEnc).createSerializer()
+
+  def encodeCompositeKey(userKey: K): UnsafeRow = {
+    val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+    if (keyOption.isEmpty) {
+      throw StateStoreErrors.implicitKeyNotFound(stateName)
+    }
+    val groupingKey = keyOption.get.asInstanceOf[GK]
+    // generate grouping key byte array
+    val groupingKeyByteArr = keySerializer.apply(groupingKey).asInstanceOf[UnsafeRow].getBytes()

Review Comment:
   Compiler will complain in the next line:
   `keySerializer.apply(groupingKey)` where groupingKey will be of `Any` type if we directly call `keyOption.get`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1513570178


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateTypesEncoderUtils.scala:
##########
@@ -86,3 +88,53 @@ object StateTypesEncoder {
     new StateTypesEncoder[GK](keySerializer, stateName)
   }
 }
+
+class CompositeKeyStateEncoder[GK, K](
+    keySerializer: Serializer[GK],
+    stateName: String,
+    userKeyEnc: Encoder[K])
+  extends StateTypesEncoder[GK](keySerializer: Serializer[GK], stateName: String) {
+
+  private val schemaForCompositeKeyRow: StructType =
+    new StructType()
+      .add("key", BinaryType)
+      .add("userKey", BinaryType)
+  private val compositeKeyProjection = UnsafeProjection.create(schemaForCompositeKeyRow)
+  private val reuseRow = new UnsafeRow(userKeyEnc.schema.fields.length)
+  private val userKeyExpressionEnc = encoderFor(userKeyEnc)
+
+  private val userKeyRowToObjDeserializer =
+    userKeyExpressionEnc.resolveAndBind().createDeserializer()
+  private val userKeySerializer = encoderFor(userKeyEnc).createSerializer()
+
+  def encodeCompositeKey(userKey: K): UnsafeRow = {
+    val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+    if (keyOption.isEmpty) {
+      throw StateStoreErrors.implicitKeyNotFound(stateName)
+    }
+    val groupingKey = keyOption.get.asInstanceOf[GK]
+    // generate grouping key byte array
+    val groupingKeyByteArr = keySerializer.apply(groupingKey).asInstanceOf[UnsafeRow].getBytes()
+    // generate user key byte array
+    val userKeyBytesArr = userKeySerializer.apply(userKey).asInstanceOf[UnsafeRow].getBytes()
+
+    val compositeKeyRow = compositeKeyProjection(InternalRow(groupingKeyByteArr, userKeyBytesArr))
+    compositeKeyRow
+  }
+
+  def decodeCompositeKey(row: UnsafeRow): K = {
+    val bytes = row.getBinary(1)
+    reuseRow.pointTo(bytes, bytes.length)
+    val value = userKeyRowToObjDeserializer.apply(reuseRow)
+    value
+  }
+}
+
+object CompositeKeyStateEncoder {

Review Comment:
   Hmm - do we really need these singleton objects ? Could we just call `new CompositeKeyStateEncoder` instead ?
   cc - @HeartSaVioR - is this a preferred pattern for Spark ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1513570476


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateTypesEncoderUtils.scala:
##########
@@ -86,3 +88,53 @@ object StateTypesEncoder {
     new StateTypesEncoder[GK](keySerializer, stateName)
   }
 }
+
+class CompositeKeyStateEncoder[GK, K](
+    keySerializer: Serializer[GK],
+    stateName: String,
+    userKeyEnc: Encoder[K])
+  extends StateTypesEncoder[GK](keySerializer: Serializer[GK], stateName: String) {
+
+  private val schemaForCompositeKeyRow: StructType =

Review Comment:
   Lets pass the schema as an argument to make this more generic ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1513559752


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Encoder
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreErrors, UnsafeRowPair}
+import org.apache.spark.sql.streaming.MapState
+import org.apache.spark.sql.types.{BinaryType, StructType}
+
+class MapStateImpl[K, V](
+    store: StateStore,
+    stateName: String,
+    keyExprEnc: ExpressionEncoder[Any],
+    userKeyExprEnc: Encoder[K]) extends MapState[K, V] with Logging {
+
+  // Pack grouping key and user key together as a prefixed composite key
+  private val schemaForCompositeKeyRow: StructType =
+    new StructType()
+    .add("key", BinaryType)
+    .add("userKey", BinaryType)
+  private val schemaForValueRow: StructType = new StructType().add("value", BinaryType)
+  private val keySerializer = keyExprEnc.createSerializer()
+  private val stateTypesEncoder = CompositeKeyStateEncoder(keySerializer, stateName, userKeyExprEnc)
+
+  store.createColFamilyIfAbsent(stateName, schemaForCompositeKeyRow, numColsPrefixKey = 1,
+    schemaForValueRow)
+
+  /** Whether state exists or not. */
+  override def exists(): Boolean = {
+    !store.prefixScan(stateTypesEncoder.encodeGroupingKey(), stateName).isEmpty
+  }
+
+  /** Get the state value if it exists */
+  override def getValue(key: K): V = {
+    // TODO do we want to reuse this function,
+    // or create a new error for null user key?
+    StateStoreErrors.requireNonNullStateValue(key, stateName)
+    val encodedCompositeKey = stateTypesEncoder.encodeCompositeKey(key)
+    val unsafeRowValue = store.get(encodedCompositeKey, stateName)
+
+    if (unsafeRowValue == null) return null.asInstanceOf[V]
+    stateTypesEncoder.decodeValue(unsafeRowValue)
+  }
+
+  /** Check if the user key is contained in the map */
+  override def containsKey(key: K): Boolean = {
+    StateStoreErrors.requireNonNullStateValue(key, stateName)
+    try {
+      getValue(key) != null

Review Comment:
   I guess we can remove the `try catch` block now ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1513544356


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala:
##########
@@ -0,0 +1,201 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkIllegalArgumentException
+import org.apache.spark.sql.Encoders
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
+import org.apache.spark.sql.internal.SQLConf
+
+case class InputMapRow(key: String, action: String, value: (String, String))
+
+class TestMapStateProcessor

Review Comment:
   Could we also add pure unit tests similar to ValueStateSuite - https://src.dev.databricks.com/databricks/runtime@master/-/blob/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala?subtree=true ? Thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "jingz-db (via GitHub)" <gi...@apache.org>.
jingz-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1520613982


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Encoder
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreErrors, UnsafeRowPair}
+import org.apache.spark.sql.streaming.MapState
+import org.apache.spark.sql.types.{BinaryType, StructType}
+
+class MapStateImpl[K, V](
+    store: StateStore,
+    stateName: String,
+    keyExprEnc: ExpressionEncoder[Any],
+    userKeyExprEnc: Encoder[K]) extends MapState[K, V] with Logging {
+
+  // Pack grouping key and user key together as a prefixed composite key
+  private val schemaForCompositeKeyRow: StructType =
+    new StructType()
+    .add("key", BinaryType)
+    .add("userKey", BinaryType)
+  private val schemaForValueRow: StructType = new StructType().add("value", BinaryType)
+  private val keySerializer = keyExprEnc.createSerializer()
+  private val stateTypesEncoder = CompositeKeyStateEncoder(
+    keySerializer, schemaForCompositeKeyRow, stateName, userKeyExprEnc)
+
+  store.createColFamilyIfAbsent(stateName, schemaForCompositeKeyRow, numColsPrefixKey = 1,
+    schemaForValueRow)
+
+  /** Whether state exists or not. */
+  override def exists(): Boolean = {
+    !store.prefixScan(stateTypesEncoder.encodeGroupingKey(), stateName).isEmpty
+  }
+
+  /** Get the state value if it exists */
+  override def getValue(key: K): V = {
+    StateStoreErrors.requireNonNullStateValue(key, stateName)
+    val encodedCompositeKey = stateTypesEncoder.encodeCompositeKey(key)
+    val unsafeRowValue = store.get(encodedCompositeKey, stateName)
+
+    if (unsafeRowValue == null) return null.asInstanceOf[V]
+    stateTypesEncoder.decodeValue(unsafeRowValue)
+  }
+
+  /** Check if the user key is contained in the map */
+  override def containsKey(key: K): Boolean = {
+    StateStoreErrors.requireNonNullStateValue(key, stateName)
+    getValue(key) != null
+  }
+
+  /** Update value for given user key */
+  override def updateValue(key: K, value: V): Unit = {
+    StateStoreErrors.requireNonNullStateValue(key, stateName)
+    StateStoreErrors.requireNonNullStateValue(value, stateName)
+    val encodedValue = stateTypesEncoder.encodeValue(value)
+    val encodedCompositeKey = stateTypesEncoder.encodeCompositeKey(key)
+    store.put(encodedCompositeKey, encodedValue, stateName)
+  }
+
+  /** Get the map associated with grouping key */
+  override def getMap(): Map[K, V] = {
+    val encodedGroupingKey = stateTypesEncoder.encodeGroupingKey()
+    store.prefixScan(encodedGroupingKey, stateName)
+      .map {
+        case iter: UnsafeRowPair =>
+          (stateTypesEncoder.decodeCompositeKey(iter.key),

Review Comment:
   Hi @anishshri-db, need your input on this: Do we want to return `Map` type or `Iterator` type for `getMap` function?
   Talked with Jungtaek on Slack, if we decide to return `Map` type, we'll probably need to materialize the map and copy everything in map into memory (because we reuse UnsafeRow in StateTypeEncoder). So Jungtaek is concerning about the case where we have a large map. I also feel like returning Iterator type makes more sense, because for ListState we also return Iterator for get list function.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1520688468


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MapStateSuite.scala:
##########
@@ -67,6 +67,7 @@ class MapStateSuite extends StateVariableSuiteBase {
       assert(!testState.exists())
       assert(testState.getMap().hasNext === false)
     }
+    ImplicitGroupingKeyTracker.removeImplicitKey()

Review Comment:
   Maybe just do this in setup (before) in StateVariableSuiteBase, to ensure the cleanup is guaranteed to be done - as failure in test A won't clean up the thread local and introduce another failure. We do like to avoid cascading failures.



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MapStateSuite.scala:
##########
@@ -67,6 +67,7 @@ class MapStateSuite extends StateVariableSuiteBase {
       assert(!testState.exists())
       assert(testState.getMap().hasNext === false)
     }
+    ImplicitGroupingKeyTracker.removeImplicitKey()

Review Comment:
   Maybe just do this in setup (before) or teardown (after) in StateVariableSuiteBase, to ensure the cleanup is guaranteed to be done - as failure in test A won't clean up the thread local and introduce another failure. We do like to avoid cascading failures.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1514973964


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala:
##########
@@ -218,3 +173,56 @@ class ValueStateSuite extends SharedSparkSession
     )
   }
 }
+
+/**
+ * Abstract Base Class that provides test utilities for different state variable
+ * types (ValueState, ListState, MapState) used in arbitrary stateful operators.
+ */
+abstract class StateVariableSuiteBase extends SharedSparkSession
+  with BeforeAndAfter {
+
+  before {
+    StateStore.stop()
+    require(!StateStore.isMaintenanceRunning)
+  }
+
+  after {
+    StateStore.stop()
+    require(!StateStore.isMaintenanceRunning)
+  }
+
+  import StateStoreTestsHelper._
+
+  val schemaForKeyRow: StructType = new StructType().add("key", BinaryType)
+  val schemaForValueRow: StructType = new StructType().add("value", BinaryType)
+  protected def newStoreProviderWithValueState(useColumnFamilies: Boolean):
+  RocksDBStateStoreProvider = {
+    newStoreProviderWithValueState(StateStoreId(newDir(), Random.nextInt(), 0),
+      numColsPrefixKey = 0,
+      useColumnFamilies = useColumnFamilies)
+  }
+
+  protected def newStoreProviderWithValueState(

Review Comment:
   Can we just rename this function to be generic ? i guess anyway the specific state variable schema is being managed by the individual type information right ? 



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/ValueStateSuite.scala:
##########
@@ -218,3 +173,56 @@ class ValueStateSuite extends SharedSparkSession
     )
   }
 }
+
+/**
+ * Abstract Base Class that provides test utilities for different state variable
+ * types (ValueState, ListState, MapState) used in arbitrary stateful operators.
+ */
+abstract class StateVariableSuiteBase extends SharedSparkSession
+  with BeforeAndAfter {
+
+  before {
+    StateStore.stop()
+    require(!StateStore.isMaintenanceRunning)
+  }
+
+  after {
+    StateStore.stop()
+    require(!StateStore.isMaintenanceRunning)
+  }
+
+  import StateStoreTestsHelper._
+
+  val schemaForKeyRow: StructType = new StructType().add("key", BinaryType)
+  val schemaForValueRow: StructType = new StructType().add("value", BinaryType)
+  protected def newStoreProviderWithValueState(useColumnFamilies: Boolean):

Review Comment:
   Same here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1513484718


##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala:
##########
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.Encoders
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
+import org.apache.spark.sql.internal.SQLConf
+
+case class InputMapRow(key: String, action: String, value: (String, String))
+
+class TestMapStateProcessor
+  extends StatefulProcessor[String, InputMapRow, (String, String, String)] {
+
+  @transient var _processorHandle: StatefulProcessorHandle = _
+  @transient var _mapState: MapState[String, String] = _
+
+  override def init(handle: StatefulProcessorHandle, outputMode: OutputMode): Unit = {
+    _processorHandle = handle
+    _mapState = handle.getMapState("sessionState", Encoders.STRING)
+  }
+
+  override def handleInputRows(
+    key: String,
+    inputRows: Iterator[InputMapRow],
+    timerValues: TimerValues): Iterator[(String, String, String)] = {
+
+    var output = List[(String, String, String)]()
+
+    for (row <- inputRows) {
+      if (row.action == "exists") {
+        output = (key, "exists", _mapState.exists().toString) :: output
+      } else if (row.action == "getValue") {
+        output = (key, row.value._1, _mapState.getValue(row.value._1)) :: output
+      } else if (row.action == "containsKey") {
+        output = (key, row.value._1,
+          if (_mapState.containsKey(row.value._1)) "true" else "false") :: output
+      } else if (row.action == "updateValue") {
+        _mapState.updateValue(row.value._1, row.value._2)
+      } else if (row.action == "getMap") {
+        val res = _mapState.getMap()
+        res.foreach { pair =>
+          output = (key, pair._1, pair._2) :: output
+        }
+      } else if (row.action == "getKeys") {
+        _mapState.getKeys().foreach { key =>
+          output = (row.key, key, row.value._2) :: output
+        }
+      } else if (row.action == "getValues") {
+        _mapState.getValues().foreach { value =>
+          output = (row.key, row.value._1, value) :: output
+        }
+      } else if (row.action == "removeKey") {
+        _mapState.removeKey(row.value._1)
+      } else if (row.action == "remove") {
+        _mapState.clear()
+      }
+    }
+    output.iterator
+  }
+
+  override def close(): Unit = {}
+}
+
+class TransformWithMapStateSuite extends StreamTest {
+  import testImplicits._
+
+  private def testMapStateWithNullUserKey(inputMapRow: InputMapRow): Unit = {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName) {
+
+      val inputData = MemoryStream[InputMapRow]
+      val result = inputData.toDS()
+        .groupByKey(x => x.key)
+        .transformWithState(new TestMapStateProcessor(),
+          TimeoutMode.NoTimeouts(),
+          OutputMode.Update())
+
+      testStream(result, OutputMode.Update())(
+        AddData(inputData, inputMapRow),
+        ExpectFailure[SparkException](e => {
+          assert(e.getMessage.contains("User key cannot be null"))
+        })
+      )
+    }
+  }
+
+  test("Test retrieving value with non-exist user key") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName) {
+
+      val inputData = MemoryStream[InputMapRow]
+      val result = inputData.toDS()
+        .groupByKey(x => x.key)
+        .transformWithState(new TestMapStateProcessor(),
+          TimeoutMode.NoTimeouts(),
+          OutputMode.Update())
+
+      testStream(result, OutputMode.Update())(
+        AddData(inputData, InputMapRow("k1", "getValue", ("v1", ""))),
+        ExpectFailure[SparkException](e => {
+          assert(e.getMessage.contains(
+            "No value found for given grouping key and user key in the map."))
+        })
+      )
+    }
+  }
+
+  Seq("getValue", "containsKey", "updateValue", "removeKey").foreach { mapImplFunc =>
+    test(s"Test $mapImplFunc with null user key") {
+      testMapStateWithNullUserKey(InputMapRow("k1", mapImplFunc, (null, "")))
+    }
+  }
+
+  test("Test put value with null value") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName) {
+
+      val inputData = MemoryStream[InputMapRow]
+      val result = inputData.toDS()
+        .groupByKey(x => x.key)
+        .transformWithState(new TestMapStateProcessor(),
+          TimeoutMode.NoTimeouts(),
+          OutputMode.Update())
+
+      testStream(result, OutputMode.Update())(
+        AddData(inputData, InputMapRow("k1", "updateValue", ("k1", null))),
+        ExpectFailure[SparkException](e => {
+          assert(e.getMessage.contains("Value put to map cannot be null."))
+        })
+      )
+    }
+  }
+
+  test("Test map state correctness") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName) {
+      val inputData = MemoryStream[InputMapRow]
+      val result = inputData.toDS()
+        .groupByKey(x => x.key)
+        .transformWithState(new TestMapStateProcessor(),
+          TimeoutMode.NoTimeouts(),
+          OutputMode.Append())
+      testStream(result, OutputMode.Append())(
+        // Test exists()
+        AddData(inputData, InputMapRow("k1", "updateValue", ("v1", "10"))),
+        AddData(inputData, InputMapRow("k1", "exists", ("", ""))),
+        AddData(inputData, InputMapRow("k2", "exists", ("", ""))),
+        CheckNewAnswer(("k1", "exists", "true"), ("k2", "exists", "false")),
+
+        // Test get and put with composite key
+        AddData(inputData, InputMapRow("k1", "updateValue", ("v2", "5"))),
+
+        AddData(inputData, InputMapRow("k2", "updateValue", ("v2", "3"))),
+        AddData(inputData, InputMapRow("k2", "updateValue", ("v2", "12"))),
+        AddData(inputData, InputMapRow("k2", "updateValue", ("v4", "1"))),
+
+        // Different grouping key, same user key
+        AddData(inputData, InputMapRow("k1", "getValue", ("v2", ""))),
+        CheckNewAnswer(("k1", "v2", "5")),
+        // Same grouping key, same user key, update value should reflect
+        AddData(inputData, InputMapRow("k2", "getValue", ("v2", ""))),
+        CheckNewAnswer(("k2", "v2", "12")),
+
+        // Test get full map for a given grouping key - prefixScan
+        AddData(inputData, InputMapRow("k2", "getMap", ("", ""))),
+        CheckNewAnswer(("k2", "v2", "12"), ("k2", "v4", "1")),
+
+        AddData(inputData, InputMapRow("k2", "getKeys", ("", ""))),
+        CheckNewAnswer(("k2", "v2", ""), ("k2", "v4", "")),
+
+        AddData(inputData, InputMapRow("k2", "getValues", ("", ""))),
+        CheckNewAnswer(("k2", "", "12"), ("k2", "", "1")),
+
+        // Test remove functionalities
+        AddData(inputData, InputMapRow("k1", "removeKey", ("v2", ""))),
+        AddData(inputData, InputMapRow("k1", "containsKey", ("v2", ""))),
+        CheckNewAnswer(("k1", "v2", "false")),
+
+        AddData(inputData, InputMapRow("k2", "remove", ("", ""))),

Review Comment:
   Can we test for `clear` too ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1513483233


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Encoder
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.execution.streaming.state.{StateStore, UnsafeRowPair}
+import org.apache.spark.sql.streaming.MapState
+import org.apache.spark.sql.types.{BinaryType, StructType}
+
+class MapStateImpl[K, V](
+    store: StateStore,
+    stateName: String,
+    keyExprEnc: ExpressionEncoder[Any],
+    userKeyExprEnc: Encoder[K]) extends MapState[K, V] with Logging {
+
+  private val schemaForKeyRow: StructType =
+    new StructType()
+    .add("key", BinaryType)
+    .add("userKey", BinaryType)
+  private val schemaForValueRow: StructType = new StructType().add("value", BinaryType)
+
+  private val keySerializer = keyExprEnc.createSerializer()
+  private val stateTypesEncoder = StateTypesEncoder(keySerializer, stateName)
+
+  store.createColFamilyIfAbsent(stateName, schemaForKeyRow, numColsPrefixKey = 1,
+    schemaForValueRow)
+
+  /** Whether state exists or not. */
+  override def exists(): Boolean = {
+    !store.prefixScan(stateTypesEncoder.encodeGroupingKey(), stateName).isEmpty
+  }
+
+  /** Get the state value if it exists */
+  override def getValue(key: K): V = {
+    require(key != null, "User key cannot be null.")
+    val encodedCompositeKey = stateTypesEncoder.encodeCompositeKey(key, userKeyExprEnc)
+    val unsafeRowValue = store.get(encodedCompositeKey, stateName)
+    if (unsafeRowValue == null) {

Review Comment:
   To be consistent, I think we can return `null` here similar to other state types
   
   ```
   null.asInstanceOf[V]
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1513568839


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateTypesEncoderUtils.scala:
##########
@@ -86,3 +88,53 @@ object StateTypesEncoder {
     new StateTypesEncoder[GK](keySerializer, stateName)
   }
 }
+
+class CompositeKeyStateEncoder[GK, K](
+    keySerializer: Serializer[GK],
+    stateName: String,
+    userKeyEnc: Encoder[K])
+  extends StateTypesEncoder[GK](keySerializer: Serializer[GK], stateName: String) {
+
+  private val schemaForCompositeKeyRow: StructType =
+    new StructType()
+      .add("key", BinaryType)
+      .add("userKey", BinaryType)
+  private val compositeKeyProjection = UnsafeProjection.create(schemaForCompositeKeyRow)
+  private val reuseRow = new UnsafeRow(userKeyEnc.schema.fields.length)
+  private val userKeyExpressionEnc = encoderFor(userKeyEnc)
+
+  private val userKeyRowToObjDeserializer =
+    userKeyExpressionEnc.resolveAndBind().createDeserializer()
+  private val userKeySerializer = encoderFor(userKeyEnc).createSerializer()
+
+  def encodeCompositeKey(userKey: K): UnsafeRow = {
+    val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+    if (keyOption.isEmpty) {
+      throw StateStoreErrors.implicitKeyNotFound(stateName)
+    }
+    val groupingKey = keyOption.get.asInstanceOf[GK]
+    // generate grouping key byte array
+    val groupingKeyByteArr = keySerializer.apply(groupingKey).asInstanceOf[UnsafeRow].getBytes()

Review Comment:
   I guess you can just directly call `keyOption.get` here ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1519035053


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Encoder
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreErrors, UnsafeRowPair}
+import org.apache.spark.sql.streaming.MapState
+import org.apache.spark.sql.types.{BinaryType, StructType}
+
+class MapStateImpl[K, V](
+    store: StateStore,
+    stateName: String,
+    keyExprEnc: ExpressionEncoder[Any],
+    userKeyExprEnc: Encoder[K]) extends MapState[K, V] with Logging {
+
+  // Pack grouping key and user key together as a prefixed composite key
+  private val schemaForCompositeKeyRow: StructType =
+    new StructType()
+    .add("key", BinaryType)
+    .add("userKey", BinaryType)
+  private val schemaForValueRow: StructType = new StructType().add("value", BinaryType)
+  private val keySerializer = keyExprEnc.createSerializer()
+  private val stateTypesEncoder = CompositeKeyStateEncoder(
+    keySerializer, schemaForCompositeKeyRow, stateName, userKeyExprEnc)
+
+  store.createColFamilyIfAbsent(stateName, schemaForCompositeKeyRow, numColsPrefixKey = 1,
+    schemaForValueRow)
+
+  /** Whether state exists or not. */
+  override def exists(): Boolean = {
+    !store.prefixScan(stateTypesEncoder.encodeGroupingKey(), stateName).isEmpty
+  }
+
+  /** Get the state value if it exists */
+  override def getValue(key: K): V = {
+    StateStoreErrors.requireNonNullStateValue(key, stateName)
+    val encodedCompositeKey = stateTypesEncoder.encodeCompositeKey(key)
+    val unsafeRowValue = store.get(encodedCompositeKey, stateName)
+
+    if (unsafeRowValue == null) return null.asInstanceOf[V]
+    stateTypesEncoder.decodeValue(unsafeRowValue)
+  }
+
+  /** Check if the user key is contained in the map */
+  override def containsKey(key: K): Boolean = {
+    StateStoreErrors.requireNonNullStateValue(key, stateName)
+    getValue(key) != null
+  }
+
+  /** Update value for given user key */
+  override def updateValue(key: K, value: V): Unit = {
+    StateStoreErrors.requireNonNullStateValue(key, stateName)
+    StateStoreErrors.requireNonNullStateValue(value, stateName)
+    val encodedValue = stateTypesEncoder.encodeValue(value)
+    val encodedCompositeKey = stateTypesEncoder.encodeCompositeKey(key)
+    store.put(encodedCompositeKey, encodedValue, stateName)
+  }
+
+  /** Get the map associated with grouping key */
+  override def getMap(): Map[K, V] = {
+    val encodedGroupingKey = stateTypesEncoder.encodeGroupingKey()
+    store.prefixScan(encodedGroupingKey, stateName)
+      .map {
+        case iter: UnsafeRowPair =>
+          (stateTypesEncoder.decodeCompositeKey(iter.key),

Review Comment:
   Note: As you will rebase with #45447, UnsafeProjection will reuse the row instance, so we can't store the row persistently unless copying it. If we do copy, we probably want to reduce the scope for key-value vs key vs value.
   
   Maybe good to have a private method decoding key and value in iterator but not creating a map. Each method can get the iterator from the new method, and pick key / value / both, and then copy rows.



##########
sql/api/src/main/scala/org/apache/spark/sql/streaming/StatefulProcessorHandle.scala:
##########
@@ -48,6 +49,18 @@ private[sql] trait StatefulProcessorHandle extends Serializable {
    */
   def getListState[T](stateName: String): ListState[T]
 
+  /**
+   * Creates new or returns existing map state associated with stateName.
+   * The MapState persists Key-Value pairs of type [K, V].
+   *
+   * @param stateName  - name of the state variable
+   * @param userKeyEnc  - spark sql encoder for the map key
+   * @tparam K - type of key for map state variable
+   * @tparam V - type of value for map state variable
+   * @return - instance of MapState of type [K,V] that can be used to store state persistently
+   */
+  def getMapState[K, V](stateName: String, userKeyEnc: Encoder[K]): MapState[K, V]

Review Comment:
   Just a friendly reminder: I expect value encoder will be in the parameter as well once this is rebased.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkIllegalArgumentException
+import org.apache.spark.sql.Encoders
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
+import org.apache.spark.sql.internal.SQLConf
+
+case class InputMapRow(key: String, action: String, value: (String, String))
+
+class TestMapStateProcessor
+  extends StatefulProcessor[String, InputMapRow, (String, String, String)] {
+
+  @transient var _mapState: MapState[String, String] = _
+
+  override def init(outputMode: OutputMode): Unit = {
+    _mapState = getHandle.getMapState("sessionState", Encoders.STRING)
+  }
+
+  override def handleInputRows(
+    key: String,

Review Comment:
   nit: indentation



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateTypesEncoderUtils.scala:
##########
@@ -86,3 +88,53 @@ object StateTypesEncoder {
     new StateTypesEncoder[GK](keySerializer, stateName)
   }
 }
+
+class CompositeKeyStateEncoder[GK, K](
+    keySerializer: Serializer[GK],
+    stateName: String,
+    userKeyEnc: Encoder[K])
+  extends StateTypesEncoder[GK](keySerializer: Serializer[GK], stateName: String) {
+
+  private val schemaForCompositeKeyRow: StructType =
+    new StructType()
+      .add("key", BinaryType)
+      .add("userKey", BinaryType)
+  private val compositeKeyProjection = UnsafeProjection.create(schemaForCompositeKeyRow)
+  private val reuseRow = new UnsafeRow(userKeyEnc.schema.fields.length)
+  private val userKeyExpressionEnc = encoderFor(userKeyEnc)
+
+  private val userKeyRowToObjDeserializer =
+    userKeyExpressionEnc.resolveAndBind().createDeserializer()
+  private val userKeySerializer = encoderFor(userKeyEnc).createSerializer()
+
+  def encodeCompositeKey(userKey: K): UnsafeRow = {
+    val keyOption = ImplicitGroupingKeyTracker.getImplicitKeyOption
+    if (keyOption.isEmpty) {
+      throw StateStoreErrors.implicitKeyNotFound(stateName)
+    }
+    val groupingKey = keyOption.get.asInstanceOf[GK]
+    // generate grouping key byte array
+    val groupingKeyByteArr = keySerializer.apply(groupingKey).asInstanceOf[UnsafeRow].getBytes()
+    // generate user key byte array
+    val userKeyBytesArr = userKeySerializer.apply(userKey).asInstanceOf[UnsafeRow].getBytes()
+
+    val compositeKeyRow = compositeKeyProjection(InternalRow(groupingKeyByteArr, userKeyBytesArr))
+    compositeKeyRow
+  }
+
+  def decodeCompositeKey(row: UnsafeRow): K = {
+    val bytes = row.getBinary(1)
+    reuseRow.pointTo(bytes, bytes.length)
+    val value = userKeyRowToObjDeserializer.apply(reuseRow)
+    value
+  }
+}
+
+object CompositeKeyStateEncoder {

Review Comment:
   If the list of parameters are exactly the same with default constructor, let's just use new.



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkIllegalArgumentException
+import org.apache.spark.sql.Encoders
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
+import org.apache.spark.sql.internal.SQLConf
+
+case class InputMapRow(key: String, action: String, value: (String, String))
+
+class TestMapStateProcessor
+  extends StatefulProcessor[String, InputMapRow, (String, String, String)] {
+
+  @transient var _mapState: MapState[String, String] = _
+
+  override def init(outputMode: OutputMode): Unit = {
+    _mapState = getHandle.getMapState("sessionState", Encoders.STRING)
+  }
+
+  override def handleInputRows(
+    key: String,
+    inputRows: Iterator[InputMapRow],
+    timerValues: TimerValues): Iterator[(String, String, String)] = {
+
+    var output = List[(String, String, String)]()
+
+    for (row <- inputRows) {
+      if (row.action == "exists") {
+        output = (key, "exists", _mapState.exists().toString) :: output
+      } else if (row.action == "getValue") {
+        output = (key, row.value._1, _mapState.getValue(row.value._1)) :: output
+      } else if (row.action == "containsKey") {
+        output = (key, row.value._1,
+          if (_mapState.containsKey(row.value._1)) "true" else "false") :: output
+      } else if (row.action == "updateValue") {
+        _mapState.updateValue(row.value._1, row.value._2)
+      } else if (row.action == "getMap") {
+        val res = _mapState.getMap()
+        res.foreach { pair =>
+          output = (key, pair._1, pair._2) :: output
+        }
+      } else if (row.action == "getKeys") {
+        _mapState.getKeys().foreach { key =>
+          output = (row.key, key, row.value._2) :: output
+        }
+      } else if (row.action == "getValues") {
+        _mapState.getValues().foreach { value =>
+          output = (row.key, row.value._1, value) :: output
+        }
+      } else if (row.action == "removeKey") {
+        _mapState.removeKey(row.value._1)
+      } else if (row.action == "clear") {
+        _mapState.clear()
+      }
+    }
+    output.iterator
+  }
+
+  override def close(): Unit = {}
+}
+
+/**
+ * Class that adds integration tests for MapState types used in arbitrary stateful
+ * operators such as transformWithState.
+ */
+class TransformWithMapStateSuite extends StreamTest {
+  import testImplicits._
+
+  private def testMapStateWithNullUserKey(inputMapRow: InputMapRow): Unit = {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName) {
+
+      val inputData = MemoryStream[InputMapRow]
+      val result = inputData.toDS()
+        .groupByKey(x => x.key)
+        .transformWithState(new TestMapStateProcessor(),
+          TimeoutMode.NoTimeouts(),
+          OutputMode.Update())
+
+      testStream(result, OutputMode.Update())(
+        AddData(inputData, inputMapRow),
+        ExpectFailure[SparkIllegalArgumentException](e => {
+          assert(e.getMessage.contains("ILLEGAL_STATE_STORE_VALUE.NULL_VALUE"))

Review Comment:
   Could we verify the exception with checkError?



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkIllegalArgumentException
+import org.apache.spark.sql.Encoders
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
+import org.apache.spark.sql.internal.SQLConf
+
+case class InputMapRow(key: String, action: String, value: (String, String))
+
+class TestMapStateProcessor
+  extends StatefulProcessor[String, InputMapRow, (String, String, String)] {
+
+  @transient var _mapState: MapState[String, String] = _
+
+  override def init(outputMode: OutputMode): Unit = {
+    _mapState = getHandle.getMapState("sessionState", Encoders.STRING)
+  }
+
+  override def handleInputRows(
+    key: String,
+    inputRows: Iterator[InputMapRow],
+    timerValues: TimerValues): Iterator[(String, String, String)] = {
+
+    var output = List[(String, String, String)]()
+
+    for (row <- inputRows) {
+      if (row.action == "exists") {
+        output = (key, "exists", _mapState.exists().toString) :: output
+      } else if (row.action == "getValue") {
+        output = (key, row.value._1, _mapState.getValue(row.value._1)) :: output
+      } else if (row.action == "containsKey") {
+        output = (key, row.value._1,
+          if (_mapState.containsKey(row.value._1)) "true" else "false") :: output
+      } else if (row.action == "updateValue") {
+        _mapState.updateValue(row.value._1, row.value._2)
+      } else if (row.action == "getMap") {
+        val res = _mapState.getMap()
+        res.foreach { pair =>
+          output = (key, pair._1, pair._2) :: output
+        }
+      } else if (row.action == "getKeys") {
+        _mapState.getKeys().foreach { key =>
+          output = (row.key, key, row.value._2) :: output
+        }
+      } else if (row.action == "getValues") {
+        _mapState.getValues().foreach { value =>
+          output = (row.key, row.value._1, value) :: output
+        }
+      } else if (row.action == "removeKey") {
+        _mapState.removeKey(row.value._1)
+      } else if (row.action == "clear") {
+        _mapState.clear()
+      }
+    }
+    output.iterator
+  }
+
+  override def close(): Unit = {}
+}
+
+/**
+ * Class that adds integration tests for MapState types used in arbitrary stateful
+ * operators such as transformWithState.
+ */
+class TransformWithMapStateSuite extends StreamTest {
+  import testImplicits._
+
+  private def testMapStateWithNullUserKey(inputMapRow: InputMapRow): Unit = {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName) {
+
+      val inputData = MemoryStream[InputMapRow]
+      val result = inputData.toDS()
+        .groupByKey(x => x.key)
+        .transformWithState(new TestMapStateProcessor(),
+          TimeoutMode.NoTimeouts(),
+          OutputMode.Update())
+
+      testStream(result, OutputMode.Update())(
+        AddData(inputData, inputMapRow),
+        ExpectFailure[SparkIllegalArgumentException](e => {

Review Comment:
   nit: `{ e =>` rather than `(e => {`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateTypesEncoderUtils.scala:
##########
@@ -86,3 +88,60 @@ object StateTypesEncoder {
     new StateTypesEncoder[GK](keySerializer, stateName)
   }
 }
+
+class CompositeKeyStateEncoder[GK, K](
+    keySerializer: Serializer[GK],
+    schemaForCompositeKeyRow: StructType,
+    stateName: String,
+    userKeyEnc: Encoder[K])
+  extends StateTypesEncoder[GK](keySerializer: Serializer[GK], stateName: String) {

Review Comment:
   nit: redundant type information for parameters in this line?



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkIllegalArgumentException
+import org.apache.spark.sql.Encoders
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
+import org.apache.spark.sql.internal.SQLConf
+
+case class InputMapRow(key: String, action: String, value: (String, String))
+
+class TestMapStateProcessor
+  extends StatefulProcessor[String, InputMapRow, (String, String, String)] {
+
+  @transient var _mapState: MapState[String, String] = _
+
+  override def init(outputMode: OutputMode): Unit = {
+    _mapState = getHandle.getMapState("sessionState", Encoders.STRING)
+  }
+
+  override def handleInputRows(
+    key: String,
+    inputRows: Iterator[InputMapRow],
+    timerValues: TimerValues): Iterator[(String, String, String)] = {
+
+    var output = List[(String, String, String)]()
+
+    for (row <- inputRows) {
+      if (row.action == "exists") {
+        output = (key, "exists", _mapState.exists().toString) :: output
+      } else if (row.action == "getValue") {
+        output = (key, row.value._1, _mapState.getValue(row.value._1)) :: output
+      } else if (row.action == "containsKey") {
+        output = (key, row.value._1,
+          if (_mapState.containsKey(row.value._1)) "true" else "false") :: output
+      } else if (row.action == "updateValue") {
+        _mapState.updateValue(row.value._1, row.value._2)
+      } else if (row.action == "getMap") {
+        val res = _mapState.getMap()
+        res.foreach { pair =>
+          output = (key, pair._1, pair._2) :: output
+        }
+      } else if (row.action == "getKeys") {
+        _mapState.getKeys().foreach { key =>
+          output = (row.key, key, row.value._2) :: output
+        }
+      } else if (row.action == "getValues") {
+        _mapState.getValues().foreach { value =>
+          output = (row.key, row.value._1, value) :: output
+        }
+      } else if (row.action == "removeKey") {
+        _mapState.removeKey(row.value._1)
+      } else if (row.action == "clear") {
+        _mapState.clear()
+      }
+    }
+    output.iterator
+  }
+
+  override def close(): Unit = {}
+}
+
+/**
+ * Class that adds integration tests for MapState types used in arbitrary stateful
+ * operators such as transformWithState.
+ */
+class TransformWithMapStateSuite extends StreamTest {
+  import testImplicits._
+
+  private def testMapStateWithNullUserKey(inputMapRow: InputMapRow): Unit = {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName) {
+
+      val inputData = MemoryStream[InputMapRow]
+      val result = inputData.toDS()
+        .groupByKey(x => x.key)
+        .transformWithState(new TestMapStateProcessor(),
+          TimeoutMode.NoTimeouts(),
+          OutputMode.Update())
+
+      testStream(result, OutputMode.Update())(
+        AddData(inputData, inputMapRow),
+        ExpectFailure[SparkIllegalArgumentException](e => {
+          assert(e.getMessage.contains("ILLEGAL_STATE_STORE_VALUE.NULL_VALUE"))
+        })
+      )
+    }
+  }
+
+  test("Test retrieving value with non-existing user key") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName) {
+
+      val inputData = MemoryStream[InputMapRow]
+      val result = inputData.toDS()
+        .groupByKey(x => x.key)
+        .transformWithState(new TestMapStateProcessor(),
+          TimeoutMode.NoTimeouts(),
+          OutputMode.Update())
+
+      testStream(result, OutputMode.Update())(
+        AddData(inputData, InputMapRow("k1", "getValue", ("v1", ""))),
+        CheckAnswer(("k1", "v1", null))
+      )
+    }
+  }
+
+  Seq("getValue", "containsKey", "updateValue", "removeKey").foreach { mapImplFunc =>
+    test(s"Test $mapImplFunc with null user key") {
+      testMapStateWithNullUserKey(InputMapRow("k1", mapImplFunc, (null, "")))
+    }
+  }
+
+  test("Test put value with null value") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName) {
+
+      val inputData = MemoryStream[InputMapRow]
+      val result = inputData.toDS()
+        .groupByKey(x => x.key)
+        .transformWithState(new TestMapStateProcessor(),
+          TimeoutMode.NoTimeouts(),
+          OutputMode.Update())
+
+      testStream(result, OutputMode.Update())(
+        AddData(inputData, InputMapRow("k1", "updateValue", ("k1", null))),
+        ExpectFailure[SparkIllegalArgumentException](e => {
+          assert(e.getMessage.contains("ILLEGAL_STATE_STORE_VALUE.NULL_VALUE"))

Review Comment:
   ditto



##########
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithMapStateSuite.scala:
##########
@@ -0,0 +1,217 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.SparkIllegalArgumentException
+import org.apache.spark.sql.Encoders
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider
+import org.apache.spark.sql.internal.SQLConf
+
+case class InputMapRow(key: String, action: String, value: (String, String))
+
+class TestMapStateProcessor
+  extends StatefulProcessor[String, InputMapRow, (String, String, String)] {
+
+  @transient var _mapState: MapState[String, String] = _
+
+  override def init(outputMode: OutputMode): Unit = {
+    _mapState = getHandle.getMapState("sessionState", Encoders.STRING)
+  }
+
+  override def handleInputRows(
+    key: String,
+    inputRows: Iterator[InputMapRow],
+    timerValues: TimerValues): Iterator[(String, String, String)] = {
+
+    var output = List[(String, String, String)]()
+
+    for (row <- inputRows) {
+      if (row.action == "exists") {
+        output = (key, "exists", _mapState.exists().toString) :: output
+      } else if (row.action == "getValue") {
+        output = (key, row.value._1, _mapState.getValue(row.value._1)) :: output
+      } else if (row.action == "containsKey") {
+        output = (key, row.value._1,
+          if (_mapState.containsKey(row.value._1)) "true" else "false") :: output
+      } else if (row.action == "updateValue") {
+        _mapState.updateValue(row.value._1, row.value._2)
+      } else if (row.action == "getMap") {
+        val res = _mapState.getMap()
+        res.foreach { pair =>
+          output = (key, pair._1, pair._2) :: output
+        }
+      } else if (row.action == "getKeys") {
+        _mapState.getKeys().foreach { key =>
+          output = (row.key, key, row.value._2) :: output
+        }
+      } else if (row.action == "getValues") {
+        _mapState.getValues().foreach { value =>
+          output = (row.key, row.value._1, value) :: output
+        }
+      } else if (row.action == "removeKey") {
+        _mapState.removeKey(row.value._1)
+      } else if (row.action == "clear") {
+        _mapState.clear()
+      }
+    }
+    output.iterator
+  }
+
+  override def close(): Unit = {}
+}
+
+/**
+ * Class that adds integration tests for MapState types used in arbitrary stateful
+ * operators such as transformWithState.
+ */
+class TransformWithMapStateSuite extends StreamTest {
+  import testImplicits._
+
+  private def testMapStateWithNullUserKey(inputMapRow: InputMapRow): Unit = {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName) {
+
+      val inputData = MemoryStream[InputMapRow]
+      val result = inputData.toDS()
+        .groupByKey(x => x.key)
+        .transformWithState(new TestMapStateProcessor(),
+          TimeoutMode.NoTimeouts(),
+          OutputMode.Update())
+
+      testStream(result, OutputMode.Update())(
+        AddData(inputData, inputMapRow),
+        ExpectFailure[SparkIllegalArgumentException](e => {
+          assert(e.getMessage.contains("ILLEGAL_STATE_STORE_VALUE.NULL_VALUE"))
+        })
+      )
+    }
+  }
+
+  test("Test retrieving value with non-existing user key") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName) {
+
+      val inputData = MemoryStream[InputMapRow]
+      val result = inputData.toDS()
+        .groupByKey(x => x.key)
+        .transformWithState(new TestMapStateProcessor(),
+          TimeoutMode.NoTimeouts(),
+          OutputMode.Update())
+
+      testStream(result, OutputMode.Update())(
+        AddData(inputData, InputMapRow("k1", "getValue", ("v1", ""))),
+        CheckAnswer(("k1", "v1", null))
+      )
+    }
+  }
+
+  Seq("getValue", "containsKey", "updateValue", "removeKey").foreach { mapImplFunc =>
+    test(s"Test $mapImplFunc with null user key") {
+      testMapStateWithNullUserKey(InputMapRow("k1", mapImplFunc, (null, "")))
+    }
+  }
+
+  test("Test put value with null value") {
+    withSQLConf(SQLConf.STATE_STORE_PROVIDER_CLASS.key ->
+      classOf[RocksDBStateStoreProvider].getName) {
+
+      val inputData = MemoryStream[InputMapRow]
+      val result = inputData.toDS()
+        .groupByKey(x => x.key)
+        .transformWithState(new TestMapStateProcessor(),
+          TimeoutMode.NoTimeouts(),
+          OutputMode.Update())
+
+      testStream(result, OutputMode.Update())(
+        AddData(inputData, InputMapRow("k1", "updateValue", ("k1", null))),
+        ExpectFailure[SparkIllegalArgumentException](e => {

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "anishshri-db (via GitHub)" <gi...@apache.org>.
anishshri-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1520618069


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala:
##########
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.Encoder
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreErrors, UnsafeRowPair}
+import org.apache.spark.sql.streaming.MapState
+import org.apache.spark.sql.types.{BinaryType, StructType}
+
+class MapStateImpl[K, V](
+    store: StateStore,
+    stateName: String,
+    keyExprEnc: ExpressionEncoder[Any],
+    userKeyExprEnc: Encoder[K]) extends MapState[K, V] with Logging {
+
+  // Pack grouping key and user key together as a prefixed composite key
+  private val schemaForCompositeKeyRow: StructType =
+    new StructType()
+    .add("key", BinaryType)
+    .add("userKey", BinaryType)
+  private val schemaForValueRow: StructType = new StructType().add("value", BinaryType)
+  private val keySerializer = keyExprEnc.createSerializer()
+  private val stateTypesEncoder = CompositeKeyStateEncoder(
+    keySerializer, schemaForCompositeKeyRow, stateName, userKeyExprEnc)
+
+  store.createColFamilyIfAbsent(stateName, schemaForCompositeKeyRow, numColsPrefixKey = 1,
+    schemaForValueRow)
+
+  /** Whether state exists or not. */
+  override def exists(): Boolean = {
+    !store.prefixScan(stateTypesEncoder.encodeGroupingKey(), stateName).isEmpty
+  }
+
+  /** Get the state value if it exists */
+  override def getValue(key: K): V = {
+    StateStoreErrors.requireNonNullStateValue(key, stateName)
+    val encodedCompositeKey = stateTypesEncoder.encodeCompositeKey(key)
+    val unsafeRowValue = store.get(encodedCompositeKey, stateName)
+
+    if (unsafeRowValue == null) return null.asInstanceOf[V]
+    stateTypesEncoder.decodeValue(unsafeRowValue)
+  }
+
+  /** Check if the user key is contained in the map */
+  override def containsKey(key: K): Boolean = {
+    StateStoreErrors.requireNonNullStateValue(key, stateName)
+    getValue(key) != null
+  }
+
+  /** Update value for given user key */
+  override def updateValue(key: K, value: V): Unit = {
+    StateStoreErrors.requireNonNullStateValue(key, stateName)
+    StateStoreErrors.requireNonNullStateValue(value, stateName)
+    val encodedValue = stateTypesEncoder.encodeValue(value)
+    val encodedCompositeKey = stateTypesEncoder.encodeCompositeKey(key)
+    store.put(encodedCompositeKey, encodedValue, stateName)
+  }
+
+  /** Get the map associated with grouping key */
+  override def getMap(): Map[K, V] = {
+    val encodedGroupingKey = stateTypesEncoder.encodeGroupingKey()
+    store.prefixScan(encodedGroupingKey, stateName)
+      .map {
+        case iter: UnsafeRowPair =>
+          (stateTypesEncoder.decodeCompositeKey(iter.key),

Review Comment:
   Yea sure - lets use the iterator approach. Thx



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "jingz-db (via GitHub)" <gi...@apache.org>.
jingz-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1520699470


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/MapStateSuite.scala:
##########
@@ -67,6 +67,7 @@ class MapStateSuite extends StateVariableSuiteBase {
       assert(!testState.exists())
       assert(testState.getMap().hasNext === false)
     }
+    ImplicitGroupingKeyTracker.removeImplicitKey()

Review Comment:
   Done! Thanks Jungtaek!
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

Posted by "HeartSaVioR (via GitHub)" <gi...@apache.org>.
HeartSaVioR commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1520711461


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala:
##########
@@ -73,24 +74,31 @@ class MapStateImpl[K, V](
   }
 
   /** Get the map associated with grouping key */
-  override def getMap(): Map[K, V] = {
+  override def getMap(): Iterator[(K, V)] = {
     val encodedGroupingKey = stateTypesEncoder.encodeGroupingKey()
-    store.prefixScan(encodedGroupingKey, stateName)
-      .map {
-        case iter: UnsafeRowPair =>
-          (stateTypesEncoder.decodeCompositeKey(iter.key),
-            stateTypesEncoder.decodeValue(iter.value))
-      }.toMap
+    val pairsIterator = store.prefixScan(encodedGroupingKey, stateName)
+
+    new Iterator[(K, V)] {

Review Comment:
   maybe final nit: your previous code just works and it seems much simpler. We just need to remove the call `.toMap` and done. 
   
   Also maybe the method name to remove the map from the name? 
   
   * `getIterator` - consistent with getKeys/getValues
   * `iterator` - consistent with Map collection. need to change other methods as well, e.g. getKeys to keys, getValues to values
   * `get()` - consistent with other type of state. The value of origin type is retrieved with get() consistently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org