You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sa...@apache.org on 2021/08/28 09:04:29 UTC
[spark] branch branch-3.1 updated: [SPARK-36509][CORE] Fix the
issue that executors are never re-scheduled if the worker stops with
standalone cluster
This is an automated email from the ASF dual-hosted git repository.
sarutak pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new 0af666a [SPARK-36509][CORE] Fix the issue that executors are never re-scheduled if the worker stops with standalone cluster
0af666a is described below
commit 0af666a310590367a80439000d74975526064c87
Author: Kousuke Saruta <sa...@oss.nttdata.com>
AuthorDate: Sat Aug 28 18:01:55 2021 +0900
[SPARK-36509][CORE] Fix the issue that executors are never re-scheduled if the worker stops with standalone cluster
### What changes were proposed in this pull request?
This PR fixes an issue that executors are never re-scheduled if the worker which the executors run on stops.
As a result, the application stucks.
You can easily reproduce this issue by the following procedures.
```
# Run master
$ sbin/start-master.sh
# Run worker 1
$ SPARK_LOG_DIR=/tmp/worker1 SPARK_PID_DIR=/tmp/worker1/ sbin/start-worker.sh -c 1 -h localhost -d /tmp/worker1 --webui-port 8081 spark://<hostname>:7077
# Run worker 2
$ SPARK_LOG_DIR=/tmp/worker2 SPARK_PID_DIR=/tmp/worker2/ sbin/start-worker.sh -c 1 -h localhost -d /tmp/worker2 --webui-port 8082 spark://<hostname>:7077
# Run Spark Shell
$ bin/spark-shell --master spark://<hostname>:7077 --executor-cores 1 --total-executor-cores 1
# Check which worker the executor runs on and then kill the worker.
$ kill <worker pid>
```
With the procedure above, we will expect that the executor is re-scheduled on the other worker but it won't.
The reason seems that `Master.schedule` cannot be called after the worker is marked as `WorkerState.DEAD`.
So, the solution this PR proposes is to call `Master.schedule` whenever `Master.removeWorker` is called.
This PR also fixes an issue that `ExecutorRunner` can send `ExecutorStateChanged` message without changing its state.
This issue causes assertion error.
```
2021-08-13 14:05:37,991 [dispatcher-event-loop-9] ERROR: Ignoring errorjava.lang.AssertionError: assertion failed: executor 0 state transfer from RUNNING to RUNNING is illegal
```
### Why are the changes needed?
It's a critical bug.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Manually tested with the procedure shown above and confirmed the executor is re-scheduled.
Closes #33818 from sarutak/fix-scheduling-stuck.
Authored-by: Kousuke Saruta <sa...@oss.nttdata.com>
Signed-off-by: Kousuke Saruta <sa...@oss.nttdata.com>
(cherry picked from commit ea8c31e5ea233da4407f6821b2d6dd7f3c88f8d9)
Signed-off-by: Kousuke Saruta <sa...@oss.nttdata.com>
---
core/src/main/scala/org/apache/spark/deploy/master/Master.scala | 1 +
core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala | 2 +-
2 files changed, 2 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 9f1b36a..1cbeacf 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -964,6 +964,7 @@ private[deploy] class Master(
app.driver.send(WorkerRemoved(worker.id, worker.host, msg))
}
persistenceEngine.removeWorker(worker)
+ schedule()
}
private def relaunchDriver(driver: DriverInfo): Unit = {
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 974c2d6..40d9407 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -83,7 +83,7 @@ private[deploy] class ExecutorRunner(
shutdownHook = ShutdownHookManager.addShutdownHook { () =>
// It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
// be `ExecutorState.LAUNCHING`. In this case, we should set `state` to `FAILED`.
- if (state == ExecutorState.LAUNCHING) {
+ if (state == ExecutorState.LAUNCHING || state == ExecutorState.RUNNING) {
state = ExecutorState.FAILED
}
killProcess(Some("Worker shutting down")) }
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org