You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ji...@apache.org on 2019/05/03 22:49:10 UTC

[spark] branch master updated: [SPARK-27510][CORE] Avoid Master falls into dead loop while launching executor failed in Worker

This is an automated email from the ASF dual-hosted git repository.

jiangxb1987 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 51de86b  [SPARK-27510][CORE] Avoid Master falls into dead loop while launching executor failed in Worker
51de86b is described below

commit 51de86baed0776304c6184f2c04b6303ef48df90
Author: wuyi <ng...@163.com>
AuthorDate: Fri May 3 15:48:37 2019 -0700

    [SPARK-27510][CORE] Avoid Master falls into dead loop while launching executor failed in Worker
    
    ## What changes were proposed in this pull request?
    
    This is a long standing issue which I met before and I've seen other people got trouble with it:
    [test cases stuck on "local-cluster mode" of ReplSuite?](http://apache-spark-developers-list.1001551.n3.nabble.com/test-cases-stuck-on-quot-local-cluster-mode-quot-of-ReplSuite-td3086.html)
    [Spark tests hang on local machine due to "testGuavaOptional" in JavaAPISuite](http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-tests-hang-on-local-machine-due-to-quot-testGuavaOptional-quot-in-JavaAPISuite-tc10999.html)
    
    When running test under local-cluster mode with wrong SPARK_HOME(spark.test.home), test just get stuck and no response forever. After looking into SPARK_WORKER_DIR, I found there's endless executor directories under it. So, this explains what happens during test getting stuck.
    
    The whole process looks like:
    
    1. Driver submits an app to Master and asks for N executors
    2. Master inits executor state with LAUNCHING and sends `LaunchExecutor` to Worker
    3. Worker receives `LaunchExecutor`, launches ExecutorRunner asynchronously and sends `ExecutorStateChanged(state=RUNNING)` to Mater immediately
    4. Master receives `ExecutorStateChanged(state=RUNNING)` and reset `_retyCount` to 0.
    5. ExecutorRunner throws exception during executor launching, sends `ExecutorStateChanged(state=FAILED)` to Worker, Worker forwards the msg to Master
    6. Master receives `ExecutorStateChanged(state=FAILED)`. Since Master always reset `_retyCount` when it receives RUNNING msg, so, event if a Worker fails to launch executor for continuous many times, ` _retryCount` would never exceed `maxExecutorRetries`. So, Master continue to launch executor and fall into the dead loop.
    
    The problem exists in step 3. Worker sends `ExecutorStateChanged(state=RUNNING)` to Master immediately while executor is still launching. And, when Master receive that msg, it believes the executor has launched successfully, and reset `_retryCount` subsequently. However, that's not true.
    
    This pr suggests to remove step 3 and requires Worker only send `ExecutorStateChanged(state=RUNNING)` after executor has really launched successfully.
    
    ## How was this patch tested?
    
    Tested Manually.
    
    Closes #24408 from Ngone51/fix-dead-loop.
    
    Authored-by: wuyi <ng...@163.com>
    Signed-off-by: Xingbo Jiang <xi...@databricks.com>
---
 .../spark/deploy/worker/ExecutorRunner.scala       |  6 ++-
 .../org/apache/spark/deploy/worker/Worker.scala    |  4 +-
 .../apache/spark/deploy/master/MasterSuite.scala   | 60 ++++++++++++++++++++++
 3 files changed, 66 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 9d2301c..6f1484c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -78,8 +78,8 @@ private[deploy] class ExecutorRunner(
     // Shutdown hook that kills actors on shutdown.
     shutdownHook = ShutdownHookManager.addShutdownHook { () =>
       // It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
-      // be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`.
-      if (state == ExecutorState.RUNNING) {
+      // be `ExecutorState.LAUNCHING`. In this case, we should set `state` to `FAILED`.
+      if (state == ExecutorState.LAUNCHING) {
         state = ExecutorState.FAILED
       }
       killProcess(Some("Worker shutting down")) }
@@ -183,6 +183,8 @@ private[deploy] class ExecutorRunner(
       Files.write(header, stderr, StandardCharsets.UTF_8)
       stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
 
+      state = ExecutorState.RUNNING
+      worker.send(ExecutorStateChanged(appId, execId, state, None, None))
       // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
       // or with nonzero exit code
       val exitCode = process.waitFor()
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index eb2add3..f8ec5b6 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -540,12 +540,12 @@ private[deploy] class Worker(
             executorDir,
             workerUri,
             conf,
-            appLocalDirs, ExecutorState.RUNNING)
+            appLocalDirs,
+            ExecutorState.LAUNCHING)
           executors(appId + "/" + execId) = manager
           manager.start()
           coresUsed += cores_
           memoryUsed += memory_
-          sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
         } catch {
           case e: Exception =>
             logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 61aaaa5..0c4b105 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -89,6 +89,17 @@ class MockWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) extend
   }
 }
 
+class MockExecutorLaunchFailWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf)
+  extends MockWorker(master, conf) {
+  var failedCnt = 0
+  override def receive: PartialFunction[Any, Unit] = {
+    case LaunchExecutor(_, appId, execId, _, _, _) =>
+      failedCnt += 1
+      master.send(ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None))
+    case otherMsg => super.receive(otherMsg)
+  }
+}
+
 class MasterSuite extends SparkFunSuite
   with Matchers with Eventually with PrivateMethodTester with BeforeAndAfter {
 
@@ -635,6 +646,55 @@ class MasterSuite extends SparkFunSuite
     }
   }
 
+  test("SPARK-27510: Master should avoid dead loop while launching executor failed in Worker") {
+    val master = makeMaster()
+    master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
+    eventually(timeout(10.seconds)) {
+      val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
+      assert(masterState.status === RecoveryState.ALIVE, "Master is not alive")
+    }
+
+    var worker: MockExecutorLaunchFailWorker = null
+    try {
+      worker = new MockExecutorLaunchFailWorker(master.self)
+      worker.rpcEnv.setupEndpoint("worker", worker)
+      val workerRegMsg = RegisterWorker(
+        worker.id,
+        "localhost",
+        9999,
+        worker.self,
+        10,
+        1234 * 3,
+        "http://localhost:8080",
+        master.rpcEnv.address)
+      master.self.send(workerRegMsg)
+      val driver = DeployTestUtils.createDriverDesc()
+      // mimic DriverClient to send RequestSubmitDriver to master
+      master.self.askSync[SubmitDriverResponse](RequestSubmitDriver(driver))
+      var appId: String = null
+      eventually(timeout(10.seconds)) {
+        // an app would be registered with Master once Driver set up
+        assert(worker.apps.nonEmpty)
+        appId = worker.apps.head._1
+        assert(master.idToApp.contains(appId))
+      }
+
+      eventually(timeout(10.seconds)) {
+        // Master would continually launch executors until reach MAX_EXECUTOR_RETRIES
+        assert(worker.failedCnt == master.conf.get(MAX_EXECUTOR_RETRIES))
+        // Master would remove the app if no executor could be launched for it
+        assert(!master.idToApp.contains(appId))
+      }
+    } finally {
+      if (worker != null) {
+        worker.rpcEnv.shutdown()
+      }
+      if (master != null) {
+        master.rpcEnv.shutdown()
+      }
+    }
+  }
+
   test("SPARK-19900: there should be a corresponding driver for the app after relaunching driver") {
     val conf = new SparkConf().set(WORKER_TIMEOUT, 1L)
     val master = makeMaster(conf)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org