You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/04/12 20:50:55 UTC

spark git commit: [SPARK-14556][SQL] Code clean-ups for package o.a.s.sql.execution.streaming.state

Repository: spark
Updated Branches:
  refs/heads/master 111a62474 -> 852bbc6c0


[SPARK-14556][SQL] Code clean-ups for package o.a.s.sql.execution.streaming.state

## What changes were proposed in this pull request?

- `StateStoreConf.**max**DeltasForSnapshot` was renamed to `StateStoreConf.**min**DeltasForSnapshot`
- some state switch checks were added
- improved consistency between method names and string literals
- other comments & typo fix

## How was this patch tested?

N/A

Author: Liwei Lin <lw...@gmail.com>

Closes #12323 from lw-lin/streaming-state-clean-up.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/852bbc6c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/852bbc6c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/852bbc6c

Branch: refs/heads/master
Commit: 852bbc6c0046d194fef0b6d0b99162ea2cc10286
Parents: 111a624
Author: Liwei Lin <lw...@gmail.com>
Authored: Tue Apr 12 11:50:51 2016 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Tue Apr 12 11:50:51 2016 -0700

----------------------------------------------------------------------
 .../state/HDFSBackedStateStoreProvider.scala    | 40 +++++++++++---------
 .../execution/streaming/state/StateStore.scala  |  7 ++--
 .../streaming/state/StateStoreConf.scala        |  3 +-
 .../streaming/state/StateStoreCoordinator.scala |  7 +---
 .../streaming/state/StateStoreRDD.scala         |  6 +--
 5 files changed, 31 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/852bbc6c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 1e0a4a5..3335755 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -46,12 +46,14 @@ import org.apache.spark.util.Utils
  * Usage:
  * To update the data in the state store, the following order of operations are needed.
  *
- * - val store = StateStore.get(operatorId, partitionId, version) // to get the right store
- * - store.update(...)
+ * // get the right store
+ * - val store = StateStore.get(
+ *      StateStoreId(checkpointLocation, operatorId, partitionId), ..., version, ...)
+ * - store.put(...)
  * - store.remove(...)
- * - store.commit()    // commits all the updates to made with version number
+ * - store.commit()    // commits all the updates to made; the new version will be returned
  * - store.iterator()  // key-value data after last commit as an iterator
- * - store.updates()   // updates made in the last as an iterator
+ * - store.updates()   // updates made in the last commit as an iterator
  *
  * Fault-tolerance model:
  * - Every set of updates is written to a delta file before committing.
@@ -99,7 +101,7 @@ private[state] class HDFSBackedStateStoreProvider(
     }
 
     override def put(key: UnsafeRow, value: UnsafeRow): Unit = {
-      verify(state == UPDATING, "Cannot remove after already committed or cancelled")
+      verify(state == UPDATING, "Cannot remove after already committed or aborted")
 
       val isNewKey = !mapToUpdate.containsKey(key)
       mapToUpdate.put(key, value)
@@ -109,7 +111,7 @@ private[state] class HDFSBackedStateStoreProvider(
           // Value did not exist in previous version and was added already, keep it marked as added
           allUpdates.put(key, ValueAdded(key, value))
         case Some(ValueUpdated(_, _)) | Some(KeyRemoved(_)) =>
-          // Value existed in prev version and updated/removed, mark it as updated
+          // Value existed in previous version and updated/removed, mark it as updated
           allUpdates.put(key, ValueUpdated(key, value))
         case None =>
           // There was no prior update, so mark this as added or updated according to its presence
@@ -122,7 +124,7 @@ private[state] class HDFSBackedStateStoreProvider(
 
     /** Remove keys that match the following condition */
     override def remove(condition: UnsafeRow => Boolean): Unit = {
-      verify(state == UPDATING, "Cannot remove after already committed or cancelled")
+      verify(state == UPDATING, "Cannot remove after already committed or aborted")
       val keyIter = mapToUpdate.keySet().iterator()
       while (keyIter.hasNext) {
         val key = keyIter.next
@@ -146,7 +148,7 @@ private[state] class HDFSBackedStateStoreProvider(
 
     /** Commit all the updates that have been made to the store, and return the new version. */
     override def commit(): Long = {
-      verify(state == UPDATING, "Cannot commit after already committed or cancelled")
+      verify(state == UPDATING, "Cannot commit after already committed or aborted")
 
       try {
         finalizeDeltaFile(tempDeltaFileStream)
@@ -161,8 +163,10 @@ private[state] class HDFSBackedStateStoreProvider(
       }
     }
 
-    /** Cancel all the updates made on this store. This store will not be usable any more. */
+    /** Abort all the updates made on this store. This store will not be usable any more. */
     override def abort(): Unit = {
+      verify(state == UPDATING || state == ABORTED, "Cannot abort after already committed")
+
       state = ABORTED
       if (tempDeltaFileStream != null) {
         tempDeltaFileStream.close()
@@ -170,7 +174,7 @@ private[state] class HDFSBackedStateStoreProvider(
       if (tempDeltaFile != null && fs.exists(tempDeltaFile)) {
         fs.delete(tempDeltaFile, true)
       }
-      logInfo("Canceled ")
+      logInfo("Aborted")
     }
 
     /**
@@ -178,7 +182,8 @@ private[state] class HDFSBackedStateStoreProvider(
      * This can be called only after committing all the updates made in the current thread.
      */
     override def iterator(): Iterator[(UnsafeRow, UnsafeRow)] = {
-      verify(state == COMMITTED, "Cannot get iterator of store data before committing")
+      verify(state == COMMITTED,
+        "Cannot get iterator of store data before committing or after aborting")
       HDFSBackedStateStoreProvider.this.iterator(newVersion)
     }
 
@@ -187,7 +192,8 @@ private[state] class HDFSBackedStateStoreProvider(
      * This can be called only after committing all the updates made in the current thread.
      */
     override def updates(): Iterator[StoreUpdate] = {
-      verify(state == COMMITTED, "Cannot get iterator of updates before committing")
+      verify(state == COMMITTED,
+        "Cannot get iterator of updates before committing or after aborting")
       allUpdates.values().asScala.toIterator
     }
 
@@ -223,7 +229,7 @@ private[state] class HDFSBackedStateStoreProvider(
   }
 
   override def toString(): String = {
-    s"StateStore[id = (op=${id.operatorId},part=${id.partitionId}), dir = $baseDir]"
+    s"StateStore[id = (op=${id.operatorId}, part=${id.partitionId}), dir = $baseDir]"
   }
 
   /* Internal classes and methods */
@@ -277,7 +283,7 @@ private[state] class HDFSBackedStateStoreProvider(
     } else {
       if (!fs.isDirectory(baseDir)) {
         throw new IllegalStateException(
-          s"Cannot use ${id.checkpointLocation} for storing state data for $this as" +
+          s"Cannot use ${id.checkpointLocation} for storing state data for $this as " +
             s"$baseDir already exists and is not a directory")
       }
     }
@@ -453,11 +459,11 @@ private[state] class HDFSBackedStateStoreProvider(
           filesForVersion(files, lastVersion).filter(_.isSnapshot == false)
         synchronized { loadedMaps.get(lastVersion) } match {
           case Some(map) =>
-            if (deltaFilesForLastVersion.size > storeConf.maxDeltasForSnapshot) {
+            if (deltaFilesForLastVersion.size > storeConf.minDeltasForSnapshot) {
               writeSnapshotFile(lastVersion, map)
             }
           case None =>
-            // The last map is not loaded, probably some other instance is incharge
+            // The last map is not loaded, probably some other instance is in charge
         }
 
       }
@@ -506,7 +512,6 @@ private[state] class HDFSBackedStateStoreProvider(
       .lastOption
     val deltaBatchFiles = latestSnapshotFileBeforeVersion match {
       case Some(snapshotFile) =>
-        val deltaBatchIds = (snapshotFile.version + 1) to version
 
         val deltaFiles = allFiles.filter { file =>
           file.version > snapshotFile.version && file.version <= version
@@ -579,4 +584,3 @@ private[state] class HDFSBackedStateStoreProvider(
     }
   }
 }
-

http://git-wip-us.apache.org/repos/asf/spark/blob/852bbc6c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index 07f63f9..cc5327e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -17,7 +17,6 @@
 
 package org.apache.spark.sql.execution.streaming.state
 
-import java.util.Timer
 import java.util.concurrent.{ScheduledFuture, TimeUnit}
 
 import scala.collection.mutable
@@ -63,7 +62,7 @@ trait StateStore {
    */
   def commit(): Long
 
-  /** Cancel all the updates that have been made to the store. */
+  /** Abort all the updates that have been made to the store. */
   def abort(): Unit
 
   /**
@@ -109,8 +108,8 @@ case class KeyRemoved(key: UnsafeRow) extends StoreUpdate
 /**
  * Companion object to [[StateStore]] that provides helper methods to create and retrieve stores
  * by their unique ids. In addition, when a SparkContext is active (i.e. SparkEnv.get is not null),
- * it also runs a periodic background tasks to do maintenance on the loaded stores. For each
- * store, tt uses the [[StateStoreCoordinator]] to ensure whether the current loaded instance of
+ * it also runs a periodic background task to do maintenance on the loaded stores. For each
+ * store, it uses the [[StateStoreCoordinator]] to ensure whether the current loaded instance of
  * the store is the active instance. Accordingly, it either keeps it loaded and performs
  * maintenance, or unloads the store.
  */

http://git-wip-us.apache.org/repos/asf/spark/blob/852bbc6c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
index f0f1f3a..e55f63a 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreConf.scala
@@ -26,7 +26,7 @@ private[streaming] class StateStoreConf(@transient private val conf: SQLConf) ex
 
   import SQLConf._
 
-  val maxDeltasForSnapshot = conf.getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
+  val minDeltasForSnapshot = conf.getConf(STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT)
 
   val minVersionsToRetain = conf.getConf(STATE_STORE_MIN_VERSIONS_TO_RETAIN)
 }
@@ -34,4 +34,3 @@ private[streaming] class StateStoreConf(@transient private val conf: SQLConf) ex
 private[streaming] object StateStoreConf {
   val empty = new StateStoreConf()
 }
-

http://git-wip-us.apache.org/repos/asf/spark/blob/852bbc6c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
index 812e1b0..e418217 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
@@ -50,8 +50,7 @@ private[sql] object StateStoreCoordinatorRef extends Logging {
   private val endpointName = "StateStoreCoordinator"
 
   /**
-   * Create a reference to a [[StateStoreCoordinator]], This can be called from driver as well as
-   * executors.
+   * Create a reference to a [[StateStoreCoordinator]]
    */
   def forDriver(env: SparkEnv): StateStoreCoordinatorRef = synchronized {
     try {
@@ -75,7 +74,7 @@ private[sql] object StateStoreCoordinatorRef extends Logging {
 }
 
 /**
- * Reference to a [[StateStoreCoordinator]] that can be used to coordinator instances of
+ * Reference to a [[StateStoreCoordinator]] that can be used to coordinate instances of
  * [[StateStore]]s across all the executors, and get their locations for job scheduling.
  */
 private[sql] class StateStoreCoordinatorRef private(rpcEndpointRef: RpcEndpointRef) {
@@ -142,5 +141,3 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) extends ThreadS
       context.reply(true)
   }
 }
-
-

http://git-wip-us.apache.org/repos/asf/spark/blob/852bbc6c/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
index df3d82c..d708486 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDD.scala
@@ -22,12 +22,12 @@ import scala.reflect.ClassTag
 import org.apache.spark.{Partition, TaskContext}
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.types.StructType
-import org.apache.spark.util.{SerializableConfiguration, Utils}
+import org.apache.spark.util.SerializableConfiguration
 
 /**
  * An RDD that allows computations to be executed against [[StateStore]]s. It
- * uses the [[StateStoreCoordinator]] to use the locations of loaded state stores as
- * preferred locations.
+ * uses the [[StateStoreCoordinator]] to get the locations of loaded state stores
+ * and use that as the preferred locations.
  */
 class StateStoreRDD[T: ClassTag, U: ClassTag](
     dataRDD: RDD[T],


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org