You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2020/10/29 14:30:58 UTC

[GitHub] [spark] xuanyuanking commented on a change in pull request #26935: [SPARK-30294][SS] Explicitly defines read-only StateStore and optimize for HDFSBackedStateStore

xuanyuanking commented on a change in pull request #26935:
URL: https://github.com/apache/spark/pull/26935#discussion_r514114319



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
##########
@@ -389,7 +453,19 @@ object StateStore extends Logging {
       storeConf: StateStoreConf,
       hadoopConf: Configuration): StateStore = {

Review comment:
       How about adding a default param `readOnly: Boolean = false` here? The code changes will be smaller since the refactor and getReadOnly are no longer needed.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
##########
@@ -89,16 +116,16 @@ trait StateStore {
   def commit(): Long
 
   /**
-   * Abort all the updates that have been made to the store. Implementations should ensure that
-   * no more updates (puts, removes) can be after an abort in order to avoid incorrect usage.
+   * Return an iterator containing all the key-value pairs in the StateStore. Implementations must

Review comment:
       nit: how about reducing the changes by keeping the original order and only adding the `override` keyword?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
##########
@@ -36,10 +36,14 @@ import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.{ThreadUtils, Utils}
 
 /**
- * Base trait for a versioned key-value store. Each instance of a `StateStore` represents a specific
- * version of state data, and such instances are created through a [[StateStoreProvider]].
+ * Base trait for a versioned key-value store which provides read operations. Each instance of a
+ * `ReadStateStore` represents a specific version of state data, and such instances are created
+ * through a [[StateStoreProvider]].
+ *
+ * `abort` method will be called when the task is completed - please clean up the resources in
+ * the method.
  */
-trait StateStore {
+trait ReadStateStore {

Review comment:
       How about renaming this to ReadOnlyStateStore. I think it should be a better name consistent with other new functions in this PR, e.g mapPartitionsWithReadOnlyStateStore, ReadOnlyStateStoreRDD, etc

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
##########
@@ -29,14 +29,51 @@ import org.apache.spark.sql.internal.SessionState
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.SerializableConfiguration
 
+// This doesn't directly override RDD methods as MiMa complains it.
+abstract class BaseStateStoreRDD[T: ClassTag, U: ClassTag](
+    dataRDD: RDD[T],
+    checkpointLocation: String,
+    queryRunId: UUID,
+    operatorId: Long,
+    sessionState: SessionState,
+    @transient private val storeCoordinator: Option[StateStoreCoordinatorRef],
+    extraOptions: Map[String, String] = Map.empty) extends RDD[U](dataRDD) {
+
+  protected val storeConf = new StateStoreConf(sessionState.conf, extraOptions)
+
+  // A Hadoop Configuration can be about 10 KB, which is pretty big, so broadcast it
+  protected val hadoopConfBroadcast = dataRDD.context.broadcast(
+    new SerializableConfiguration(sessionState.newHadoopConf()))
+
+  /** Implementations can simply call this method in getPreferredLocations. */
+  protected def _getPartitions: Array[Partition] = dataRDD.partitions

Review comment:
       Does mima complaining refers to this error?
   ```
   [error]  * abstract method getPartitions()Array[org.apache.spark.Partition] in class org.apache.spark.rdd.RDD does not have a correspondent in current version
   [error]    filter with: ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.rdd.RDD.getPartitions")
   ```
   Maybe we can add a filter since it's not a public API change. I'll also take a further look at this error message.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org