You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2016/10/25 00:21:22 UTC

spark git commit: [SPARK-17624][SQL][STREAMING][TEST] Fixed flaky StateStoreSuite.maintenance

Repository: spark
Updated Branches:
  refs/heads/master 81d6933e7 -> 407c3cedf


[SPARK-17624][SQL][STREAMING][TEST] Fixed flaky StateStoreSuite.maintenance

## What changes were proposed in this pull request?

The reason for the flakiness was follows. The test starts the maintenance background thread, and then writes 20 versions of the state store. The maintenance thread is expected to create snapshots in the middle, and clean up old files that are not needed any more. The earliest delta file (1.delta) is expected to be deleted as snapshots will ensure that the earliest delta would not be needed.

However, the default configuration for the maintenance thread is to retain files such that last 2 versions can be recovered, and delete the rest. Now while generating the versions, the maintenance thread can kick in and create snapshots anywhere between version 10 and 20 (at least 10 deltas needed for snapshot). Then later it will choose to retain only version 20 and 19 (last 2). There are two cases.

- Common case: One of the version between 10 and 19 gets snapshotted. Then recovering versions 19 and 20 just needs 19.snapshot and 20.delta, so 1.delta gets deleted.

- Uncommon case (reason for flakiness): Only version 20 gets snapshotted. Then recovering versoin 20 requires 20.snapshot, and recovering version 19 all the previous 19...1.delta. So 1.delta does not get deleted.

This PR rearranges the checks such that it create 20 versions, and then waits that there is at least one snapshot, then creates another 20. This will ensure that the latest 2 versions cannot require anything older than the first snapshot generated, and therefore will 1.delta will be deleted.

In addition, I have added more logs, and comments that I felt would help future debugging and understanding what is going on.

## How was this patch tested?

Ran the StateStoreSuite > 6K times in a heavily loaded machine (10 instances of tests running in parallel). No failures.

Author: Tathagata Das <ta...@gmail.com>

Closes #15592 from tdas/SPARK-17624.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/407c3ced
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/407c3ced
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/407c3ced

Branch: refs/heads/master
Commit: 407c3cedf29a4413339dcde758295dc3225a0054
Parents: 81d6933
Author: Tathagata Das <ta...@gmail.com>
Authored: Mon Oct 24 17:21:16 2016 -0700
Committer: Shixiong Zhu <sh...@databricks.com>
Committed: Mon Oct 24 17:21:16 2016 -0700

----------------------------------------------------------------------
 .../state/HDFSBackedStateStoreProvider.scala    | 18 ++++---
 .../streaming/state/StateStoreCoordinator.scala | 18 +++++--
 .../streaming/state/StateStoreSuite.scala       | 49 +++++++++++++-------
 3 files changed, 57 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/407c3ced/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
index 7d71f52..f1e7f1d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala
@@ -159,7 +159,7 @@ private[state] class HDFSBackedStateStoreProvider(
       } catch {
         case NonFatal(e) =>
           throw new IllegalStateException(
-            s"Error committing version $newVersion into ${HDFSBackedStateStoreProvider.this}", e)
+            s"Error committing version $newVersion into $this", e)
       }
     }
 
@@ -205,6 +205,10 @@ private[state] class HDFSBackedStateStoreProvider(
     override private[state] def hasCommitted: Boolean = {
       state == COMMITTED
     }
+
+    override def toString(): String = {
+      s"HDFSStateStore[id = (op=${id.operatorId}, part=${id.partitionId}), dir = $baseDir]"
+    }
   }
 
   /** Get the state store for making updates to create a new `version` of the store. */
@@ -215,7 +219,7 @@ private[state] class HDFSBackedStateStoreProvider(
       newMap.putAll(loadMap(version))
     }
     val store = new HDFSBackedStateStore(version, newMap)
-    logInfo(s"Retrieved version $version of $this for update")
+    logInfo(s"Retrieved version $version of ${HDFSBackedStateStoreProvider.this} for update")
     store
   }
 
@@ -231,7 +235,7 @@ private[state] class HDFSBackedStateStoreProvider(
   }
 
   override def toString(): String = {
-    s"StateStore[id = (op=${id.operatorId}, part=${id.partitionId}), dir = $baseDir]"
+    s"HDFSStateStoreProvider[id = (op=${id.operatorId}, part=${id.partitionId}), dir = $baseDir]"
   }
 
   /* Internal classes and methods */
@@ -493,10 +497,12 @@ private[state] class HDFSBackedStateStoreProvider(
             val mapsToRemove = loadedMaps.keys.filter(_ < earliestVersionToRetain).toSeq
             mapsToRemove.foreach(loadedMaps.remove)
           }
-          files.filter(_.version < earliestFileToRetain.version).foreach { f =>
+          val filesToDelete = files.filter(_.version < earliestFileToRetain.version)
+          filesToDelete.foreach { f =>
             fs.delete(f.path, true)
           }
-          logInfo(s"Deleted files older than ${earliestFileToRetain.version} for $this")
+          logInfo(s"Deleted files older than ${earliestFileToRetain.version} for $this: " +
+            filesToDelete.mkString(", "))
         }
       }
     } catch {
@@ -560,7 +566,7 @@ private[state] class HDFSBackedStateStoreProvider(
       }
     }
     val storeFiles = versionToFiles.values.toSeq.sortBy(_.version)
-    logDebug(s"Current set of files for $this: $storeFiles")
+    logDebug(s"Current set of files for $this: ${storeFiles.mkString(", ")}")
     storeFiles
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/407c3ced/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
index d945d7a..267d176 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
@@ -38,7 +38,7 @@ private case class VerifyIfInstanceActive(storeId: StateStoreId, executorId: Str
 private case class GetLocation(storeId: StateStoreId)
   extends StateStoreCoordinatorMessage
 
-private case class DeactivateInstances(storeRootLocation: String)
+private case class DeactivateInstances(checkpointLocation: String)
   extends StateStoreCoordinatorMessage
 
 private object StopCoordinator
@@ -111,11 +111,13 @@ class StateStoreCoordinatorRef private(rpcEndpointRef: RpcEndpointRef) {
  * Class for coordinating instances of [[StateStore]]s loaded in executors across the cluster,
  * and get their locations for job scheduling.
  */
-private class StateStoreCoordinator(override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint {
+private class StateStoreCoordinator(override val rpcEnv: RpcEnv)
+    extends ThreadSafeRpcEndpoint with Logging {
   private val instances = new mutable.HashMap[StateStoreId, ExecutorCacheTaskLocation]
 
   override def receive: PartialFunction[Any, Unit] = {
     case ReportActiveInstance(id, host, executorId) =>
+      logDebug(s"Reported state store $id is active at $executorId")
       instances.put(id, ExecutorCacheTaskLocation(host, executorId))
   }
 
@@ -125,19 +127,25 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv) extends ThreadS
         case Some(location) => location.executorId == execId
         case None => false
       }
+      logDebug(s"Verified that state store $id is active: $response")
       context.reply(response)
 
     case GetLocation(id) =>
-      context.reply(instances.get(id).map(_.toString))
+      val executorId = instances.get(id).map(_.toString)
+      logDebug(s"Got location of the state store $id: $executorId")
+      context.reply(executorId)
 
-    case DeactivateInstances(loc) =>
+    case DeactivateInstances(checkpointLocation) =>
       val storeIdsToRemove =
-        instances.keys.filter(_.checkpointLocation == loc).toSeq
+        instances.keys.filter(_.checkpointLocation == checkpointLocation).toSeq
       instances --= storeIdsToRemove
+      logDebug(s"Deactivating instances related to checkpoint location $checkpointLocation: " +
+        storeIdsToRemove.mkString(", "))
       context.reply(true)
 
     case StopCoordinator =>
       stop() // Stop before replying to ensure that endpoint name has been deregistered
+      logInfo("StateStoreCoordinator stopped")
       context.reply(true)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/407c3ced/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index 06f1bd6..fcf300b 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -367,7 +367,10 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
     val conf = new SparkConf()
       .setMaster("local")
       .setAppName("test")
+      // Make maintenance thread do snapshots and cleanups very fast
       .set(StateStore.MAINTENANCE_INTERVAL_CONFIG, "10ms")
+      // Make sure that when SparkContext stops, the StateStore maintenance thread 'quickly'
+      // fails to talk to the StateStoreCoordinator and unloads all the StateStores
       .set("spark.rpc.numRetries", "1")
     val opId = 0
     val dir = Utils.createDirectory(tempDir, Random.nextString(5)).toString
@@ -377,37 +380,49 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
     val provider = new HDFSBackedStateStoreProvider(
       storeId, keySchema, valueSchema, storeConf, hadoopConf)
 
+    var latestStoreVersion = 0
+
+    def generateStoreVersions() {
+      for (i <- 1 to 20) {
+        val store = StateStore.get(
+          storeId, keySchema, valueSchema, latestStoreVersion, storeConf, hadoopConf)
+        put(store, "a", i)
+        store.commit()
+        latestStoreVersion += 1
+      }
+    }
 
     quietly {
       withSpark(new SparkContext(conf)) { sc =>
         withCoordinatorRef(sc) { coordinatorRef =>
           require(!StateStore.isMaintenanceRunning, "StateStore is unexpectedly running")
 
-          for (i <- 1 to 20) {
-            val store = StateStore.get(
-              storeId, keySchema, valueSchema, i - 1, storeConf, hadoopConf)
-            put(store, "a", i)
-            store.commit()
-          }
+          // Generate sufficient versions of store for snapshots
+          generateStoreVersions()
 
           eventually(timeout(10 seconds)) {
+            // Store should have been reported to the coordinator
             assert(coordinatorRef.getLocation(storeId).nonEmpty, "active instance was not reported")
-          }
 
-          // Background maintenance should clean up and generate snapshots
-          assert(StateStore.isMaintenanceRunning, "Maintenance task is not running")
-
-          eventually(timeout(10 seconds)) {
-            // Earliest delta file should get cleaned up
-            assert(!fileExists(provider, 1, isSnapshot = false), "earliest file not deleted")
+            // Background maintenance should clean up and generate snapshots
+            assert(StateStore.isMaintenanceRunning, "Maintenance task is not running")
 
             // Some snapshots should have been generated
-            val snapshotVersions = (0 to 20).filter { version =>
+            val snapshotVersions = (1 to latestStoreVersion).filter { version =>
               fileExists(provider, version, isSnapshot = true)
             }
             assert(snapshotVersions.nonEmpty, "no snapshot file found")
           }
 
+          // Generate more versions such that there is another snapshot and
+          // the earliest delta file will be cleaned up
+          generateStoreVersions()
+
+          // Earliest delta file should get cleaned up
+          eventually(timeout(10 seconds)) {
+            assert(!fileExists(provider, 1, isSnapshot = false), "earliest file not deleted")
+          }
+
           // If driver decides to deactivate all instances of the store, then this instance
           // should be unloaded
           coordinatorRef.deactivateInstances(dir)
@@ -416,7 +431,7 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
           }
 
           // Reload the store and verify
-          StateStore.get(storeId, keySchema, valueSchema, 20, storeConf, hadoopConf)
+          StateStore.get(storeId, keySchema, valueSchema, latestStoreVersion, storeConf, hadoopConf)
           assert(StateStore.isLoaded(storeId))
 
           // If some other executor loads the store, then this instance should be unloaded
@@ -426,14 +441,14 @@ class StateStoreSuite extends SparkFunSuite with BeforeAndAfter with PrivateMeth
           }
 
           // Reload the store and verify
-          StateStore.get(storeId, keySchema, valueSchema, 20, storeConf, hadoopConf)
+          StateStore.get(storeId, keySchema, valueSchema, latestStoreVersion, storeConf, hadoopConf)
           assert(StateStore.isLoaded(storeId))
         }
       }
 
       // Verify if instance is unloaded if SparkContext is stopped
-      require(SparkEnv.get === null)
       eventually(timeout(10 seconds)) {
+        require(SparkEnv.get === null)
         assert(!StateStore.isLoaded(storeId))
         assert(!StateStore.isMaintenanceRunning)
       }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org