You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/03/12 21:14:32 UTC
[spark] branch branch-2.4 updated: [SPARK-26927][CORE] Ensure
executor is active when processing events in dynamic allocation manager.
This is an automated email from the ASF dual-hosted git repository.
vanzin pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 432ea69 [SPARK-26927][CORE] Ensure executor is active when processing events in dynamic allocation manager.
432ea69 is described below
commit 432ea6924142c9688d8b6c64b46a531810691a8c
Author: Liupengcheng <li...@xiaomi.com>
AuthorDate: Tue Mar 12 13:53:42 2019 -0700
[SPARK-26927][CORE] Ensure executor is active when processing events in dynamic allocation manager.
There is a race condition in the `ExecutorAllocationManager` that the `SparkListenerExecutorRemoved` event is posted before the `SparkListenerTaskStart` event, which will cause the incorrect result of `executorIds`. Then, when some executor idles, the real executors will be removed even actual executor number is equal to `minNumExecutors` due to the incorrect computation of `newExecutorTotal`(may greater than the `minNumExecutors`), thus may finally causing zero available executors bu [...]
What's more, even the `SparkListenerTaskEnd` event can not make the fake `executorIds` released, because later idle event for the fake executors can not cause the real removal of these executors, as they are already removed and they are not exist in the `executorDataMap` of `CoaseGrainedSchedulerBackend`, so that the `onExecutorRemoved` method will never be called again.
For details see https://issues.apache.org/jira/browse/SPARK-26927
This PR is to fix this problem.
existUT and added UT
Closes #23842 from liupc/Fix-race-condition-that-casues-dyanmic-allocation-not-working.
Lead-authored-by: Liupengcheng <li...@xiaomi.com>
Co-authored-by: liupengcheng <li...@xiaomi.com>
Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
(cherry picked from commit d5cfe08fdc7ad07e948f329c0bdeeca5c2574a18)
Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
.../apache/spark/ExecutorAllocationManager.scala | 13 +++++++----
.../spark/ExecutorAllocationManagerSuite.scala | 26 +++++++++++++++++++++-
2 files changed, 34 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 49fa80c..36819aa 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -719,10 +719,15 @@ private[spark] class ExecutorAllocationManager(
if (stageIdToNumRunningTask.contains(stageId)) {
stageIdToNumRunningTask(stageId) += 1
}
- // This guards against the race condition in which the `SparkListenerTaskStart`
- // event is posted before the `SparkListenerBlockManagerAdded` event, which is
- // possible because these events are posted in different threads. (see SPARK-4951)
- if (!allocationManager.executorIds.contains(executorId)) {
+ // This guards against the following race condition:
+ // 1. The `SparkListenerTaskStart` event is posted before the
+ // `SparkListenerExecutorAdded` event
+ // 2. The `SparkListenerExecutorRemoved` event is posted before the
+ // `SparkListenerTaskStart` event
+ // Above cases are possible because these events are posted in different threads.
+ // (see SPARK-4951 SPARK-26927)
+ if (!allocationManager.executorIds.contains(executorId) &&
+ client.getExecutorIds().contains(executorId)) {
allocationManager.onExecutorAdded(executorId)
}
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index f50ad78..a69045f 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -420,6 +420,7 @@ class ExecutorAllocationManagerSuite
// Remove when numExecutorsTarget is the same as the current number of executors
assert(addExecutors(manager) === 1)
assert(addExecutors(manager) === 2)
+ (1 to 8).foreach(execId => onExecutorAdded(manager, execId.toString))
(1 to 8).map { i => createTaskInfo(i, i, s"$i") }.foreach {
info => post(sc.listenerBus, SparkListenerTaskStart(0, 0, info)) }
assert(executorIds(manager).size === 8)
@@ -833,7 +834,7 @@ class ExecutorAllocationManagerSuite
assert(removeTimes(manager).size === 1)
}
- test("SPARK-4951: call onTaskStart before onBlockManagerAdded") {
+ test("SPARK-4951: call onTaskStart before onExecutorAdded") {
sc = createSparkContext(2, 10, 2)
val manager = sc.executorAllocationManager.get
assert(executorIds(manager).isEmpty)
@@ -1161,6 +1162,29 @@ class ExecutorAllocationManagerSuite
assert(numExecutorsTarget(manager) === 1)
}
+ test("SPARK-26927 call onExecutorRemoved before onTaskStart") {
+ sc = createSparkContext(2, 5)
+ val manager = sc.executorAllocationManager.get
+ assert(executorIds(manager).isEmpty)
+ post(sc.listenerBus, SparkListenerExecutorAdded(
+ 0L, "1", new ExecutorInfo("host1", 1, Map.empty)))
+ post(sc.listenerBus, SparkListenerExecutorAdded(
+ 0L, "2", new ExecutorInfo("host2", 1, Map.empty)))
+ post(sc.listenerBus, SparkListenerExecutorAdded(
+ 0L, "3", new ExecutorInfo("host3", 1, Map.empty)))
+ assert(executorIds(manager).size === 3)
+
+ post(sc.listenerBus, SparkListenerExecutorRemoved(0L, "3", "disconnected"))
+ assert(executorIds(manager).size === 2)
+ assert(executorIds(manager) === Set("1", "2"))
+
+ val taskInfo1 = createTaskInfo(0, 0, "3")
+ post(sc.listenerBus, SparkListenerTaskStart(0, 0, taskInfo1))
+ // Verify taskStart not adding already removed executors.
+ assert(executorIds(manager).size === 2)
+ assert(executorIds(manager) === Set("1", "2"))
+ }
+
private def createSparkContext(
minExecutors: Int = 1,
maxExecutors: Int = 5,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org