You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sa...@apache.org on 2021/08/28 09:04:29 UTC

[spark] branch branch-3.1 updated: [SPARK-36509][CORE] Fix the issue that executors are never re-scheduled if the worker stops with standalone cluster

This is an automated email from the ASF dual-hosted git repository.

sarutak pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new 0af666a  [SPARK-36509][CORE] Fix the issue that executors are never re-scheduled if the worker stops with standalone cluster
0af666a is described below

commit 0af666a310590367a80439000d74975526064c87
Author: Kousuke Saruta <sa...@oss.nttdata.com>
AuthorDate: Sat Aug 28 18:01:55 2021 +0900

    [SPARK-36509][CORE] Fix the issue that executors are never re-scheduled if the worker stops with standalone cluster
    
    ### What changes were proposed in this pull request?
    
    This PR fixes an issue that executors are never re-scheduled if the worker which the executors run on stops.
    As a result, the application stucks.
    You can easily reproduce this issue by the following procedures.
    
    ```
    # Run master
    $ sbin/start-master.sh
    
    # Run worker 1
    $ SPARK_LOG_DIR=/tmp/worker1 SPARK_PID_DIR=/tmp/worker1/ sbin/start-worker.sh -c 1 -h localhost -d /tmp/worker1 --webui-port 8081 spark://<hostname>:7077
    
    # Run worker 2
    $ SPARK_LOG_DIR=/tmp/worker2 SPARK_PID_DIR=/tmp/worker2/ sbin/start-worker.sh -c 1 -h localhost -d /tmp/worker2 --webui-port 8082 spark://<hostname>:7077
    
    # Run Spark Shell
    $ bin/spark-shell --master spark://<hostname>:7077 --executor-cores 1 --total-executor-cores 1
    
    # Check which worker the executor runs on and then kill the worker.
    $ kill <worker pid>
    ```
    
    With the procedure above, we will expect that the executor is re-scheduled on the other worker but it won't.
    
    The reason seems that `Master.schedule` cannot be called after the worker is marked as `WorkerState.DEAD`.
    So, the solution this PR proposes is to call `Master.schedule` whenever `Master.removeWorker` is called.
    
    This PR also fixes an issue that `ExecutorRunner` can send `ExecutorStateChanged` message without changing its state.
    This issue causes assertion error.
    ```
    2021-08-13 14:05:37,991 [dispatcher-event-loop-9] ERROR: Ignoring errorjava.lang.AssertionError: assertion failed: executor 0 state transfer from RUNNING to RUNNING is illegal
    ```
    
    ### Why are the changes needed?
    
    It's a critical bug.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    Manually tested with the procedure shown above and confirmed the executor is re-scheduled.
    
    Closes #33818 from sarutak/fix-scheduling-stuck.
    
    Authored-by: Kousuke Saruta <sa...@oss.nttdata.com>
    Signed-off-by: Kousuke Saruta <sa...@oss.nttdata.com>
    (cherry picked from commit ea8c31e5ea233da4407f6821b2d6dd7f3c88f8d9)
    Signed-off-by: Kousuke Saruta <sa...@oss.nttdata.com>
---
 core/src/main/scala/org/apache/spark/deploy/master/Master.scala         | 1 +
 core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala | 2 +-
 2 files changed, 2 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 9f1b36a..1cbeacf 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -964,6 +964,7 @@ private[deploy] class Master(
       app.driver.send(WorkerRemoved(worker.id, worker.host, msg))
     }
     persistenceEngine.removeWorker(worker)
+    schedule()
   }
 
   private def relaunchDriver(driver: DriverInfo): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 974c2d6..40d9407 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -83,7 +83,7 @@ private[deploy] class ExecutorRunner(
     shutdownHook = ShutdownHookManager.addShutdownHook { () =>
       // It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
       // be `ExecutorState.LAUNCHING`. In this case, we should set `state` to `FAILED`.
-      if (state == ExecutorState.LAUNCHING) {
+      if (state == ExecutorState.LAUNCHING || state == ExecutorState.RUNNING) {
         state = ExecutorState.FAILED
       }
       killProcess(Some("Worker shutting down")) }

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org