You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2019/03/12 21:14:32 UTC

[spark] branch branch-2.4 updated: [SPARK-26927][CORE] Ensure executor is active when processing events in dynamic allocation manager.

This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 432ea69  [SPARK-26927][CORE] Ensure executor is active when processing events in dynamic allocation manager.
432ea69 is described below

commit 432ea6924142c9688d8b6c64b46a531810691a8c
Author: Liupengcheng <li...@xiaomi.com>
AuthorDate: Tue Mar 12 13:53:42 2019 -0700

    [SPARK-26927][CORE] Ensure executor is active when processing events in dynamic allocation manager.
    
    There is a race condition in the `ExecutorAllocationManager` that the `SparkListenerExecutorRemoved` event is posted before the `SparkListenerTaskStart` event, which will cause the incorrect result of `executorIds`. Then, when some executor idles, the real executors will be removed even actual executor number is equal to `minNumExecutors` due to the incorrect computation of `newExecutorTotal`(may greater than the `minNumExecutors`), thus may finally causing zero available executors bu [...]
    
    What's more, even the `SparkListenerTaskEnd` event can not make the fake `executorIds` released, because later idle event for the fake executors can not cause the real removal of these executors, as they are already removed and they are not exist in the `executorDataMap`  of `CoaseGrainedSchedulerBackend`, so that the `onExecutorRemoved` method will never be called again.
    
    For details see https://issues.apache.org/jira/browse/SPARK-26927
    
    This PR is to fix this problem.
    
    existUT and added UT
    
    Closes #23842 from liupc/Fix-race-condition-that-casues-dyanmic-allocation-not-working.
    
    Lead-authored-by: Liupengcheng <li...@xiaomi.com>
    Co-authored-by: liupengcheng <li...@xiaomi.com>
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
    (cherry picked from commit d5cfe08fdc7ad07e948f329c0bdeeca5c2574a18)
    Signed-off-by: Marcelo Vanzin <va...@cloudera.com>
---
 .../apache/spark/ExecutorAllocationManager.scala   | 13 +++++++----
 .../spark/ExecutorAllocationManagerSuite.scala     | 26 +++++++++++++++++++++-
 2 files changed, 34 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 49fa80c..36819aa 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -719,10 +719,15 @@ private[spark] class ExecutorAllocationManager(
         if (stageIdToNumRunningTask.contains(stageId)) {
           stageIdToNumRunningTask(stageId) += 1
         }
-        // This guards against the race condition in which the `SparkListenerTaskStart`
-        // event is posted before the `SparkListenerBlockManagerAdded` event, which is
-        // possible because these events are posted in different threads. (see SPARK-4951)
-        if (!allocationManager.executorIds.contains(executorId)) {
+        // This guards against the following race condition:
+        // 1. The `SparkListenerTaskStart` event is posted before the
+        // `SparkListenerExecutorAdded` event
+        // 2. The `SparkListenerExecutorRemoved` event is posted before the
+        // `SparkListenerTaskStart` event
+        // Above cases are possible because these events are posted in different threads.
+        // (see SPARK-4951 SPARK-26927)
+        if (!allocationManager.executorIds.contains(executorId) &&
+            client.getExecutorIds().contains(executorId)) {
           allocationManager.onExecutorAdded(executorId)
         }
 
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index f50ad78..a69045f 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -420,6 +420,7 @@ class ExecutorAllocationManagerSuite
     // Remove when numExecutorsTarget is the same as the current number of executors
     assert(addExecutors(manager) === 1)
     assert(addExecutors(manager) === 2)
+    (1 to 8).foreach(execId => onExecutorAdded(manager, execId.toString))
     (1 to 8).map { i => createTaskInfo(i, i, s"$i") }.foreach {
       info => post(sc.listenerBus, SparkListenerTaskStart(0, 0, info)) }
     assert(executorIds(manager).size === 8)
@@ -833,7 +834,7 @@ class ExecutorAllocationManagerSuite
     assert(removeTimes(manager).size === 1)
   }
 
-  test("SPARK-4951: call onTaskStart before onBlockManagerAdded") {
+  test("SPARK-4951: call onTaskStart before onExecutorAdded") {
     sc = createSparkContext(2, 10, 2)
     val manager = sc.executorAllocationManager.get
     assert(executorIds(manager).isEmpty)
@@ -1161,6 +1162,29 @@ class ExecutorAllocationManagerSuite
     assert(numExecutorsTarget(manager) === 1)
   }
 
+  test("SPARK-26927 call onExecutorRemoved before onTaskStart") {
+    sc = createSparkContext(2, 5)
+    val manager = sc.executorAllocationManager.get
+    assert(executorIds(manager).isEmpty)
+    post(sc.listenerBus, SparkListenerExecutorAdded(
+      0L, "1", new ExecutorInfo("host1", 1, Map.empty)))
+    post(sc.listenerBus, SparkListenerExecutorAdded(
+      0L, "2", new ExecutorInfo("host2", 1, Map.empty)))
+    post(sc.listenerBus, SparkListenerExecutorAdded(
+      0L, "3", new ExecutorInfo("host3", 1, Map.empty)))
+    assert(executorIds(manager).size === 3)
+
+    post(sc.listenerBus, SparkListenerExecutorRemoved(0L, "3", "disconnected"))
+    assert(executorIds(manager).size === 2)
+    assert(executorIds(manager) === Set("1", "2"))
+
+    val taskInfo1 = createTaskInfo(0, 0, "3")
+    post(sc.listenerBus, SparkListenerTaskStart(0, 0, taskInfo1))
+    // Verify taskStart not adding already removed executors.
+    assert(executorIds(manager).size === 2)
+    assert(executorIds(manager) === Set("1", "2"))
+  }
+
   private def createSparkContext(
       minExecutors: Int = 1,
       maxExecutors: Int = 5,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org