You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/10/26 14:10:07 UTC

[GitHub] [spark] gaborgsomogyi commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore

gaborgsomogyi commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r511816408



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
##########
@@ -81,6 +74,40 @@ trait StateStore {
     iterator()
   }
 
+  /** Return an iterator containing all the key-value pairs in the StateStore. */
+  def iterator(): Iterator[UnsafeRowPair]
+
+  /**
+   * Clean up the resource.
+   *
+   * The method name is to respect backward compatibility on [[StateStore]].

Review comment:
       I think I understand what's the point of this comment but this is true for all API methods in `ReadStateStore`, right? Or have I missed something and this one is different somehow?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
##########
@@ -197,15 +212,26 @@ private[state] class HDFSBackedStateStoreProvider extends StateStoreProvider wit
   }
 
   /** Get the state store for making updates to create a new `version` of the store. */
-  override def getStore(version: Long): StateStore = synchronized {
+  override def getStore(version: Long): StateStore = {
+    val newMap = getLoadedMapForStore(version)
+    logInfo(s"Retrieved version $version of ${HDFSBackedStateStoreProvider.this} for update")
+    new HDFSBackedStateStore(version, newMap)
+  }
+
+  /** Get the state store for reading to specific `version` of the store. */
+  override def getReadOnlyStore(version: Long): ReadStateStore = {

Review comment:
       Nit: Since we call the method `...ReadOnly...` maybe we can call the state store too.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
##########
@@ -89,16 +116,16 @@ trait StateStore {
   def commit(): Long
 
   /**
-   * Abort all the updates that have been made to the store. Implementations should ensure that
-   * no more updates (puts, removes) can be after an abort in order to avoid incorrect usage.
+   * Return an iterator containing all the key-value pairs in the StateStore. Implementations must
+   * ensure that updates (puts, removes) can be made while iterating over this iterator.
    */
-  def abort(): Unit
+  override def iterator(): Iterator[UnsafeRowPair]

Review comment:
       Just for the record, do I understand correctly that this override is needed only because of the additional responsibility (which described in the comment)?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
##########
@@ -29,14 +29,51 @@ import org.apache.spark.sql.internal.SessionState
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
+// This doesn't directly override RDD methods as MiMa complains it.
+abstract class BaseStateStoreRDD[T: ClassTag, U: ClassTag](
+    dataRDD: RDD[T],
+    checkpointLocation: String,
+    queryRunId: UUID,
+    operatorId: Long,
+    sessionState: SessionState,
+    @transient private val storeCoordinator: Option[StateStoreCoordinatorRef],
+    extraOptions: Map[String, String] = Map.empty) extends RDD[U](dataRDD) {
+
+  protected val storeConf = new StateStoreConf(sessionState.conf, extraOptions)
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
+  protected val hadoopConfBroadcast = dataRDD.context.broadcast(
+    new SerializableConfiguration(sessionState.newHadoopConf()))
+
+  /** Implementations can simply call this method in getPreferredLocations. */
+  protected def _getPartitions: Array[Partition] = dataRDD.partitions

Review comment:
       Not yet see why don't we just provide a default implementation. Could you elaborate?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
##########
@@ -29,14 +29,51 @@ import org.apache.spark.sql.internal.SessionState
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
+// This doesn't directly override RDD methods as MiMa complains it.
+abstract class BaseStateStoreRDD[T: ClassTag, U: ClassTag](
+    dataRDD: RDD[T],
+    checkpointLocation: String,
+    queryRunId: UUID,
+    operatorId: Long,
+    sessionState: SessionState,
+    @transient private val storeCoordinator: Option[StateStoreCoordinatorRef],
+    extraOptions: Map[String, String] = Map.empty) extends RDD[U](dataRDD) {
+
+  protected val storeConf = new StateStoreConf(sessionState.conf, extraOptions)
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
+  protected val hadoopConfBroadcast = dataRDD.context.broadcast(
+    new SerializableConfiguration(sessionState.newHadoopConf()))
+
+  /** Implementations can simply call this method in getPreferredLocations. */
+  protected def _getPartitions: Array[Partition] = dataRDD.partitions
+
+  /**
+   * Set the preferred location of each partition using the executor that has the related
+   * [[StateStoreProvider]] already loaded.
+   *
+   * Implementations can simply call this method in getPreferredLocations.
+   */
+  protected def _getPreferredLocations(partition: Partition): Seq[String] = {

Review comment:
       Ditto.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
##########
@@ -89,16 +116,16 @@ trait StateStore {
   def commit(): Long
 
   /**
-   * Abort all the updates that have been made to the store. Implementations should ensure that
-   * no more updates (puts, removes) can be after an abort in order to avoid incorrect usage.
+   * Return an iterator containing all the key-value pairs in the StateStore. Implementations must
+   * ensure that updates (puts, removes) can be made while iterating over this iterator.
    */
-  def abort(): Unit
+  override def iterator(): Iterator[UnsafeRowPair]
 
   /**
-   * Return an iterator containing all the key-value pairs in the StateStore. Implementations must
-   * ensure that updates (puts, removes) can be made while iterating over this iterator.
+   * Abort all the updates that have been made to the store. Implementations should ensure that
+   * no more updates (puts, removes) can be after an abort in order to avoid incorrect usage.
    */
-  def iterator(): Iterator[UnsafeRowPair]
+  override def abort(): Unit

Review comment:
       Having an API which may or may not need to be called looks super odd.
   Not yet sure so asking doesn't this breaks the `Liskow` substitution law? All baseclass instances must be replaceable w/ the subclass instance. If I understand correctly `ReadStateStore.abort` always called to free up some resources. What happens when `StateStore.abort` called? According to the doc it may or may not be called (which is good) but does subclass handles a call correctly? If a sublass fails or makes the behavior different only by calling `abort` then something is not 100%.
   

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
##########
@@ -379,6 +428,21 @@ object StateStore extends Logging {
   @GuardedBy("loadedProviders")
   private var _coordRef: StateStoreCoordinatorRef = null
 
+  /** Get or create a read-only store associated with the id. */
+  def getReadOnly(
+      storeProviderId: StateStoreProviderId,
+      keySchema: StructType,
+      valueSchema: StructType,
+      indexOrdinal: Option[Int],
+      version: Long,
+      storeConf: StateStoreConf,
+      hadoopConf: Configuration): ReadStateStore = {
+    require(version >= 0)

Review comment:
       Nit: Can we move the 2 instances into `getStateStoreProvider`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org