You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2020/12/28 07:53:27 UTC

[spark] branch master updated: [SPARK-33827][SS] Unload inactive state store as soon as possible

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c75f779  [SPARK-33827][SS] Unload inactive state store as soon as possible
c75f779 is described below

commit c75f779fd7f5361c661c73883c3c8e78cd4808c3
Author: Liang-Chi Hsieh <vi...@gmail.com>
AuthorDate: Mon Dec 28 16:52:56 2020 +0900

    [SPARK-33827][SS] Unload inactive state store as soon as possible
    
    ### What changes were proposed in this pull request?
    
    This patch proposes to unload inactive state store as soon as possible. The timing of unload inactive state stores, happens when we get to load active state store provider at executors. At the time, state store coordinator will return back the state store provider list including loaded stores that are already loaded by other executors in new batch. Each state store provider in the list will go to unload.
    
    ### Why are the changes needed?
    
    Per the discussion at #30770, it makes sense to me we should unload inactive state store asap. Now we run a maintenance task periodically to unload inactive state stores. So there will be some delays between a state store becomes inactive and it is unloaded.
    
    However, we can force Spark to always allocate a state store to same executor, by using task locality configuration. This can reduce the possibility to have inactive state store.
    
    Normally, with locality configuration, we might not able to see inactive state store generally. There is still chance an executor can be failed and reallocated, but in this case, inactive state store is also lost too. So it is not an issue.
    
    Making driver-executor bi-directional for unloading inactive state store looks non-trivial, and seems to me, it is not worth, after considering what we can do with locality.
    
    This proposes a simpler but effective approach. We can check if loaded state store is already loaded at other executor during reporting active state store to the coordinator. If so, it means the loaded store is inactive now, and it is going to be unload by the next maintenance task. Then we unload that store immediately.
    
    How do we make sure the loaded state store in previous batch is loaded at other executor in this batch before reporting in this executor? With task locality and preferred location, once an executor is ready to be scheduled, Spark should assign the state store provider previously loaded at the executor. So when this executor gets a new assignment other than previously loaded state store, it means the previously loaded one is already assigned to other executor.
    
    There is still a delay between the state store is loaded at other executor, and unloading it when reporting active state store at this executor. But it should be minimized now. And there won't be multiple state store belonging to same operator are loaded at the same time at one single executor, because once the executor reports any active store, it will unload all inactive stores. This should not be an issue IMHO.
    
    This is a minimal change to unload inactive state store asap without significant change.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Unit test.
    
    Closes #30827 from viirya/SPARK-33827.
    
    Authored-by: Liang-Chi Hsieh <vi...@gmail.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 .../sql/execution/streaming/state/StateStore.scala | 16 +++++++--
 .../streaming/state/StateStoreCoordinator.scala    | 34 +++++++++++++-----
 .../state/StateStoreCoordinatorSuite.scala         | 12 +++----
 .../streaming/state/StateStoreRDDSuite.scala       |  4 +--
 .../streaming/state/StateStoreSuite.scala          | 41 ++++++++++++++--------
 .../spark/sql/streaming/StreamingJoinSuite.scala   |  2 +-
 6 files changed, 75 insertions(+), 34 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
index f87a2fb..d9d6768 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala
@@ -494,7 +494,9 @@ object StateStore extends Logging {
         StateStoreProvider.createAndInit(
           storeProviderId, keySchema, valueSchema, indexOrdinal, storeConf, hadoopConf)
       )
-      reportActiveStoreInstance(storeProviderId)
+      val otherProviderIds = loadedProviders.keys.filter(_ != storeProviderId).toSeq
+      val providerIdsToUnload = reportActiveStoreInstance(storeProviderId, otherProviderIds)
+      providerIdsToUnload.foreach(unload(_))
       provider
     }
   }
@@ -569,12 +571,20 @@ object StateStore extends Logging {
     }
   }
 
-  private def reportActiveStoreInstance(storeProviderId: StateStoreProviderId): Unit = {
+  private def reportActiveStoreInstance(
+      storeProviderId: StateStoreProviderId,
+      otherProviderIds: Seq[StateStoreProviderId]): Seq[StateStoreProviderId] = {
     if (SparkEnv.get != null) {
       val host = SparkEnv.get.blockManager.blockManagerId.host
       val executorId = SparkEnv.get.blockManager.blockManagerId.executorId
-      coordinatorRef.foreach(_.reportActiveInstance(storeProviderId, host, executorId))
+      val providerIdsToUnload = coordinatorRef
+        .map(_.reportActiveInstance(storeProviderId, host, executorId, otherProviderIds))
+        .getOrElse(Seq.empty[StateStoreProviderId])
       logInfo(s"Reported that the loaded instance $storeProviderId is active")
+      logDebug(s"The loaded instances are going to unload: ${providerIdsToUnload.mkString(", ")}")
+      providerIdsToUnload
+    } else {
+      Seq.empty[StateStoreProviderId]
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
index 2b14d37..84b77ef 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinator.scala
@@ -31,10 +31,19 @@ import org.apache.spark.util.RpcUtils
 private sealed trait StateStoreCoordinatorMessage extends Serializable
 
 /** Classes representing messages */
+
+/**
+ * This message is used to report active instance of a state store provider
+ * to [[StateStoreCoordinator]]. This message also carries other loaded state
+ * store providers on the same executor. [[StateStoreCoordinator]] will check
+ * if these providers are inactive now. Inactive providers will be returned
+ * back to the sender of the message for unloading.
+ */
 private case class ReportActiveInstance(
     storeId: StateStoreProviderId,
     host: String,
-    executorId: String)
+    executorId: String,
+    providerIdsToCheck: Seq[StateStoreProviderId])
   extends StateStoreCoordinatorMessage
 
 private case class VerifyIfInstanceActive(storeId: StateStoreProviderId, executorId: String)
@@ -87,8 +96,10 @@ class StateStoreCoordinatorRef private(rpcEndpointRef: RpcEndpointRef) {
   private[sql] def reportActiveInstance(
       stateStoreProviderId: StateStoreProviderId,
       host: String,
-      executorId: String): Unit = {
-    rpcEndpointRef.send(ReportActiveInstance(stateStoreProviderId, host, executorId))
+      executorId: String,
+      otherProviderIds: Seq[StateStoreProviderId]): Seq[StateStoreProviderId] = {
+    rpcEndpointRef.askSync[Seq[StateStoreProviderId]](
+      ReportActiveInstance(stateStoreProviderId, host, executorId, otherProviderIds))
   }
 
   /** Verify whether the given executor has the active instance of a state store */
@@ -122,13 +133,20 @@ private class StateStoreCoordinator(override val rpcEnv: RpcEnv)
     extends ThreadSafeRpcEndpoint with Logging {
   private val instances = new mutable.HashMap[StateStoreProviderId, ExecutorCacheTaskLocation]
 
-  override def receive: PartialFunction[Any, Unit] = {
-    case ReportActiveInstance(id, host, executorId) =>
+  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+    case ReportActiveInstance(id, host, executorId, providerIdsToCheck) =>
       logDebug(s"Reported state store $id is active at $executorId")
-      instances.put(id, ExecutorCacheTaskLocation(host, executorId))
-  }
+      val taskLocation = ExecutorCacheTaskLocation(host, executorId)
+      instances.put(id, taskLocation)
+
+      // Check if any loaded provider id is already loaded in other executor.
+      val providerIdsToUnload = providerIdsToCheck.filter { providerId =>
+        val providerLoc = instances.get(providerId)
+        // This provider is is already loaded in other executor. Marked it to unload.
+        providerLoc.map(_ != taskLocation).getOrElse(false)
+      }
+      context.reply(providerIdsToUnload)
 
-  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
     case VerifyIfInstanceActive(id, execId) =>
       val response = instances.get(id) match {
         case Some(location) => location.executorId == execId
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
index 7bca225..d039c72 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreCoordinatorSuite.scala
@@ -41,7 +41,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
       assert(coordinatorRef.verifyIfInstanceActive(id, "exec1") === false)
       assert(coordinatorRef.getLocation(id) === None)
 
-      coordinatorRef.reportActiveInstance(id, "hostX", "exec1")
+      coordinatorRef.reportActiveInstance(id, "hostX", "exec1", Seq.empty)
       eventually(timeout(5.seconds)) {
         assert(coordinatorRef.verifyIfInstanceActive(id, "exec1"))
         assert(
@@ -49,7 +49,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
             Some(ExecutorCacheTaskLocation("hostX", "exec1").toString))
       }
 
-      coordinatorRef.reportActiveInstance(id, "hostX", "exec2")
+      coordinatorRef.reportActiveInstance(id, "hostX", "exec2", Seq.empty)
 
       eventually(timeout(5.seconds)) {
         assert(coordinatorRef.verifyIfInstanceActive(id, "exec1") === false)
@@ -72,9 +72,9 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
       val host = "hostX"
       val exec = "exec1"
 
-      coordinatorRef.reportActiveInstance(id1, host, exec)
-      coordinatorRef.reportActiveInstance(id2, host, exec)
-      coordinatorRef.reportActiveInstance(id3, host, exec)
+      coordinatorRef.reportActiveInstance(id1, host, exec, Seq.empty)
+      coordinatorRef.reportActiveInstance(id2, host, exec, Seq.empty)
+      coordinatorRef.reportActiveInstance(id3, host, exec, Seq.empty)
 
       eventually(timeout(5.seconds)) {
         assert(coordinatorRef.verifyIfInstanceActive(id1, exec))
@@ -106,7 +106,7 @@ class StateStoreCoordinatorSuite extends SparkFunSuite with SharedSparkContext {
 
       val id = StateStoreProviderId(StateStoreId("x", 0, 0), UUID.randomUUID)
 
-      coordRef1.reportActiveInstance(id, "hostX", "exec1")
+      coordRef1.reportActiveInstance(id, "hostX", "exec1", Seq.empty)
 
       eventually(timeout(5.seconds)) {
         assert(coordRef2.verifyIfInstanceActive(id, "exec1"))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala
index 015415a..378aa1d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreRDDSuite.scala
@@ -159,8 +159,8 @@ class StateStoreRDDSuite extends SparkFunSuite with BeforeAndAfter with BeforeAn
         val coordinatorRef = sqlContext.streams.stateStoreCoordinator
         val storeProviderId1 = StateStoreProviderId(StateStoreId(path, opId, 0), queryRunId)
         val storeProviderId2 = StateStoreProviderId(StateStoreId(path, opId, 1), queryRunId)
-        coordinatorRef.reportActiveInstance(storeProviderId1, "host1", "exec1")
-        coordinatorRef.reportActiveInstance(storeProviderId2, "host2", "exec2")
+        coordinatorRef.reportActiveInstance(storeProviderId1, "host1", "exec1", Seq.empty)
+        coordinatorRef.reportActiveInstance(storeProviderId2, "host2", "exec2", Seq.empty)
 
         require(
           coordinatorRef.getLocation(storeProviderId1) ===
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
index d4cd3cd..291c05f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/StateStoreSuite.scala
@@ -394,21 +394,23 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
       // fails to talk to the StateStoreCoordinator and unloads all the StateStores
       .set(RPC_NUM_RETRIES, 1)
     val opId = 0
-    val dir = newDir()
-    val storeProviderId = StateStoreProviderId(StateStoreId(dir, opId, 0), UUID.randomUUID)
+    val dir1 = newDir()
+    val storeProviderId1 = StateStoreProviderId(StateStoreId(dir1, opId, 0), UUID.randomUUID)
+    val dir2 = newDir()
+    val storeProviderId2 = StateStoreProviderId(StateStoreId(dir2, opId, 1), UUID.randomUUID)
     val sqlConf = new SQLConf()
     sqlConf.setConf(SQLConf.MIN_BATCHES_TO_RETAIN, 2)
     // Make maintenance thread do snapshots and cleanups very fast
     sqlConf.setConf(SQLConf.STREAMING_MAINTENANCE_INTERVAL, 10L)
     val storeConf = StateStoreConf(sqlConf)
     val hadoopConf = new Configuration()
-    val provider = newStoreProvider(storeProviderId.storeId)
+    val provider = newStoreProvider(storeProviderId1.storeId)
 
     var latestStoreVersion = 0
 
     def generateStoreVersions(): Unit = {
       for (i <- 1 to 20) {
-        val store = StateStore.get(storeProviderId, keySchema, valueSchema, None,
+        val store = StateStore.get(storeProviderId1, keySchema, valueSchema, None,
           latestStoreVersion, storeConf, hadoopConf)
         put(store, "a", i)
         store.commit()
@@ -428,7 +430,7 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
 
           eventually(timeout(timeoutDuration)) {
             // Store should have been reported to the coordinator
-            assert(coordinatorRef.getLocation(storeProviderId).nonEmpty,
+            assert(coordinatorRef.getLocation(storeProviderId1).nonEmpty,
               "active instance was not reported")
 
             // Background maintenance should clean up and generate snapshots
@@ -452,33 +454,44 @@ class StateStoreSuite extends StateStoreSuiteBase[HDFSBackedStateStoreProvider]
 
           // If driver decides to deactivate all stores related to a query run,
           // then this instance should be unloaded
-          coordinatorRef.deactivateInstances(storeProviderId.queryRunId)
+          coordinatorRef.deactivateInstances(storeProviderId1.queryRunId)
           eventually(timeout(timeoutDuration)) {
-            assert(!StateStore.isLoaded(storeProviderId))
+            assert(!StateStore.isLoaded(storeProviderId1))
           }
 
           // Reload the store and verify
-          StateStore.get(storeProviderId, keySchema, valueSchema, indexOrdinal = None,
+          StateStore.get(storeProviderId1, keySchema, valueSchema, indexOrdinal = None,
             latestStoreVersion, storeConf, hadoopConf)
-          assert(StateStore.isLoaded(storeProviderId))
+          assert(StateStore.isLoaded(storeProviderId1))
 
           // If some other executor loads the store, then this instance should be unloaded
-          coordinatorRef.reportActiveInstance(storeProviderId, "other-host", "other-exec")
+          coordinatorRef
+            .reportActiveInstance(storeProviderId1, "other-host", "other-exec", Seq.empty)
           eventually(timeout(timeoutDuration)) {
-            assert(!StateStore.isLoaded(storeProviderId))
+            assert(!StateStore.isLoaded(storeProviderId1))
           }
 
           // Reload the store and verify
-          StateStore.get(storeProviderId, keySchema, valueSchema, indexOrdinal = None,
+          StateStore.get(storeProviderId1, keySchema, valueSchema, indexOrdinal = None,
             latestStoreVersion, storeConf, hadoopConf)
-          assert(StateStore.isLoaded(storeProviderId))
+          assert(StateStore.isLoaded(storeProviderId1))
+
+          // If some other executor loads the store, and when this executor loads other store,
+          // then this executor should unload inactive instances immediately.
+          coordinatorRef
+            .reportActiveInstance(storeProviderId1, "other-host", "other-exec", Seq.empty)
+          StateStore.get(storeProviderId2, keySchema, valueSchema, indexOrdinal = None,
+            0, storeConf, hadoopConf)
+          assert(!StateStore.isLoaded(storeProviderId1))
+          assert(StateStore.isLoaded(storeProviderId2))
         }
       }
 
       // Verify if instance is unloaded if SparkContext is stopped
       eventually(timeout(timeoutDuration)) {
         require(SparkEnv.get === null)
-        assert(!StateStore.isLoaded(storeProviderId))
+        assert(!StateStore.isLoaded(storeProviderId1))
+        assert(!StateStore.isLoaded(storeProviderId2))
         assert(!StateStore.isMaintenanceRunning)
       }
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
index d264886..40131e8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
@@ -523,7 +523,7 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite {
       }.toMap
       partitionAndStoreNameToLocation.foreach { case ((partIndex, storeName), hostName) =>
         val providerId = StateStoreProviderId(stateInfo, partIndex, storeName)
-        coordinatorRef.reportActiveInstance(providerId, hostName, s"exec-$hostName")
+        coordinatorRef.reportActiveInstance(providerId, hostName, s"exec-$hostName", Seq.empty)
         require(
           coordinatorRef.getLocation(providerId) ===
             Some(ExecutorCacheTaskLocation(hostName, s"exec-$hostName").toString))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org