You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ji...@apache.org on 2019/05/03 22:49:10 UTC
[spark] branch master updated: [SPARK-27510][CORE] Avoid Master
falls into dead loop while launching executor failed in Worker
This is an automated email from the ASF dual-hosted git repository.
jiangxb1987 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 51de86b [SPARK-27510][CORE] Avoid Master falls into dead loop while launching executor failed in Worker
51de86b is described below
commit 51de86baed0776304c6184f2c04b6303ef48df90
Author: wuyi <ng...@163.com>
AuthorDate: Fri May 3 15:48:37 2019 -0700
[SPARK-27510][CORE] Avoid Master falls into dead loop while launching executor failed in Worker
## What changes were proposed in this pull request?
This is a long standing issue which I met before and I've seen other people got trouble with it:
[test cases stuck on "local-cluster mode" of ReplSuite?](http://apache-spark-developers-list.1001551.n3.nabble.com/test-cases-stuck-on-quot-local-cluster-mode-quot-of-ReplSuite-td3086.html)
[Spark tests hang on local machine due to "testGuavaOptional" in JavaAPISuite](http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-tests-hang-on-local-machine-due-to-quot-testGuavaOptional-quot-in-JavaAPISuite-tc10999.html)
When running test under local-cluster mode with wrong SPARK_HOME(spark.test.home), test just get stuck and no response forever. After looking into SPARK_WORKER_DIR, I found there's endless executor directories under it. So, this explains what happens during test getting stuck.
The whole process looks like:
1. Driver submits an app to Master and asks for N executors
2. Master inits executor state with LAUNCHING and sends `LaunchExecutor` to Worker
3. Worker receives `LaunchExecutor`, launches ExecutorRunner asynchronously and sends `ExecutorStateChanged(state=RUNNING)` to Mater immediately
4. Master receives `ExecutorStateChanged(state=RUNNING)` and reset `_retyCount` to 0.
5. ExecutorRunner throws exception during executor launching, sends `ExecutorStateChanged(state=FAILED)` to Worker, Worker forwards the msg to Master
6. Master receives `ExecutorStateChanged(state=FAILED)`. Since Master always reset `_retyCount` when it receives RUNNING msg, so, event if a Worker fails to launch executor for continuous many times, ` _retryCount` would never exceed `maxExecutorRetries`. So, Master continue to launch executor and fall into the dead loop.
The problem exists in step 3. Worker sends `ExecutorStateChanged(state=RUNNING)` to Master immediately while executor is still launching. And, when Master receive that msg, it believes the executor has launched successfully, and reset `_retryCount` subsequently. However, that's not true.
This pr suggests to remove step 3 and requires Worker only send `ExecutorStateChanged(state=RUNNING)` after executor has really launched successfully.
## How was this patch tested?
Tested Manually.
Closes #24408 from Ngone51/fix-dead-loop.
Authored-by: wuyi <ng...@163.com>
Signed-off-by: Xingbo Jiang <xi...@databricks.com>
---
.../spark/deploy/worker/ExecutorRunner.scala | 6 ++-
.../org/apache/spark/deploy/worker/Worker.scala | 4 +-
.../apache/spark/deploy/master/MasterSuite.scala | 60 ++++++++++++++++++++++
3 files changed, 66 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 9d2301c..6f1484c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -78,8 +78,8 @@ private[deploy] class ExecutorRunner(
// Shutdown hook that kills actors on shutdown.
shutdownHook = ShutdownHookManager.addShutdownHook { () =>
// It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
- // be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`.
- if (state == ExecutorState.RUNNING) {
+ // be `ExecutorState.LAUNCHING`. In this case, we should set `state` to `FAILED`.
+ if (state == ExecutorState.LAUNCHING) {
state = ExecutorState.FAILED
}
killProcess(Some("Worker shutting down")) }
@@ -183,6 +183,8 @@ private[deploy] class ExecutorRunner(
Files.write(header, stderr, StandardCharsets.UTF_8)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
+ state = ExecutorState.RUNNING
+ worker.send(ExecutorStateChanged(appId, execId, state, None, None))
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
// or with nonzero exit code
val exitCode = process.waitFor()
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index eb2add3..f8ec5b6 100755
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -540,12 +540,12 @@ private[deploy] class Worker(
executorDir,
workerUri,
conf,
- appLocalDirs, ExecutorState.RUNNING)
+ appLocalDirs,
+ ExecutorState.LAUNCHING)
executors(appId + "/" + execId) = manager
manager.start()
coresUsed += cores_
memoryUsed += memory_
- sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
} catch {
case e: Exception =>
logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 61aaaa5..0c4b105 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -89,6 +89,17 @@ class MockWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) extend
}
}
+class MockExecutorLaunchFailWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf)
+ extends MockWorker(master, conf) {
+ var failedCnt = 0
+ override def receive: PartialFunction[Any, Unit] = {
+ case LaunchExecutor(_, appId, execId, _, _, _) =>
+ failedCnt += 1
+ master.send(ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None))
+ case otherMsg => super.receive(otherMsg)
+ }
+}
+
class MasterSuite extends SparkFunSuite
with Matchers with Eventually with PrivateMethodTester with BeforeAndAfter {
@@ -635,6 +646,55 @@ class MasterSuite extends SparkFunSuite
}
}
+ test("SPARK-27510: Master should avoid dead loop while launching executor failed in Worker") {
+ val master = makeMaster()
+ master.rpcEnv.setupEndpoint(Master.ENDPOINT_NAME, master)
+ eventually(timeout(10.seconds)) {
+ val masterState = master.self.askSync[MasterStateResponse](RequestMasterState)
+ assert(masterState.status === RecoveryState.ALIVE, "Master is not alive")
+ }
+
+ var worker: MockExecutorLaunchFailWorker = null
+ try {
+ worker = new MockExecutorLaunchFailWorker(master.self)
+ worker.rpcEnv.setupEndpoint("worker", worker)
+ val workerRegMsg = RegisterWorker(
+ worker.id,
+ "localhost",
+ 9999,
+ worker.self,
+ 10,
+ 1234 * 3,
+ "http://localhost:8080",
+ master.rpcEnv.address)
+ master.self.send(workerRegMsg)
+ val driver = DeployTestUtils.createDriverDesc()
+ // mimic DriverClient to send RequestSubmitDriver to master
+ master.self.askSync[SubmitDriverResponse](RequestSubmitDriver(driver))
+ var appId: String = null
+ eventually(timeout(10.seconds)) {
+ // an app would be registered with Master once Driver set up
+ assert(worker.apps.nonEmpty)
+ appId = worker.apps.head._1
+ assert(master.idToApp.contains(appId))
+ }
+
+ eventually(timeout(10.seconds)) {
+ // Master would continually launch executors until reach MAX_EXECUTOR_RETRIES
+ assert(worker.failedCnt == master.conf.get(MAX_EXECUTOR_RETRIES))
+ // Master would remove the app if no executor could be launched for it
+ assert(!master.idToApp.contains(appId))
+ }
+ } finally {
+ if (worker != null) {
+ worker.rpcEnv.shutdown()
+ }
+ if (master != null) {
+ master.rpcEnv.shutdown()
+ }
+ }
+ }
+
test("SPARK-19900: there should be a corresponding driver for the app after relaunching driver") {
val conf = new SparkConf().set(WORKER_TIMEOUT, 1L)
val master = makeMaster(conf)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org