You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "anishshri-db (via GitHub)" <gi...@apache.org> on 2024/03/05 01:23:03 UTC

Re: [PR] [SPARK-47272][SS] Add MapState implementation for State API v2. [spark]

anishshri-db commented on code in PR #45341:
URL: https://github.com/apache/spark/pull/45341#discussion_r1512002599


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MapStateImpl.scala:
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
+import org.apache.spark.sql.execution.streaming.state.{StateStore, UnsafeRowPair}
+import org.apache.spark.sql.streaming.MapState
+import org.apache.spark.sql.types.{BinaryType, StructType}
+
+class MapStateImpl[K, V](
+    store: StateStore,
+    stateName: String,
+    keyExprEnc: ExpressionEncoder[Any]) extends MapState[K, V] with Logging {
+
+  store.setUseCompositeKey()
+
+  private val schemaForKeyRow: StructType = new StructType().add("key", BinaryType)

Review Comment:
   Can we not encode the grouping key and user key together here ?



##########
sql/api/src/main/scala/org/apache/spark/sql/streaming/MapState.scala:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.annotation.{Evolving, Experimental}
+
+@Experimental
+@Evolving
+/**
+ * Interface used for arbitrary stateful operations with the v2 API to capture
+ * map value state.
+ */
+trait MapState[K, V] extends Serializable {
+  /** Whether state exists or not. */
+  def exists(): Boolean
+
+  /** Get the state value if it exists */
+  def getValue(key: K): V
+
+  /** Check if the user key is contained in the map */
+  def containsKey(key: K): Boolean
+
+  /** Update value for given user key */
+  def updateValue(key: K, value: V) : Unit
+
+  /** Get the map associated with grouping key */
+  def getMap(): Map[K, V]
+
+  /** Get the list of keys present in map associated with grouping key */
+  def getKeys(): Iterator[K]
+
+  /** Get the list of values present in map associated with grouping key */
+  def getValues(): Iterator[V]
+
+  /** Remove user key from map state */
+  def removeKey(key: K): Unit
+
+  /** Remove this state. */
+  def remove(): Unit

Review Comment:
   Can we rename this to `clear` ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateTypesEncoderUtils.scala:
##########
@@ -66,6 +66,14 @@ class StateTypesEncoder[GK](
     keyRow
   }
 
+  def encodeUserKey[K](userKey: K): UnsafeRow = {
+    val schemaForKeyRow: StructType = new StructType().add("userKey", BinaryType)
+    val keyByteArr = SerializationUtils.serialize(userKey.asInstanceOf[Serializable])

Review Comment:
   Lets serialize using Spark SQL here ?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala:
##########
@@ -84,6 +84,13 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with
       map.iterator()
     }
 
+    override def getWithCompositeKey(

Review Comment:
   Not sure I understand why we need to update the store providers themselves ? Can't we just use an updated schema that encodes the required information within the key



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org