You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by va...@apache.org on 2017/10/31 16:50:03 UTC

spark git commit: [SPARK-11334][CORE] Fix bug in Executor allocation manager in running tasks calculation

Repository: spark
Updated Branches:
  refs/heads/master 4d9ebf383 -> 7986cc09b


[SPARK-11334][CORE] Fix bug in Executor allocation manager in running tasks calculation

## What changes were proposed in this pull request?

We often see the issue of Spark jobs stuck because the Executor Allocation Manager does not ask for any executor even if there are pending tasks in case dynamic allocation is turned on. Looking at the logic in Executor Allocation Manager, which calculates the running tasks, it can happen that the calculation will be wrong and the number of running tasks can become negative.

## How was this patch tested?

Added unit test

Author: Sital Kedia <sk...@fb.com>

Closes #19580 from sitalkedia/skedia/fix_stuck_job.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7986cc09
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7986cc09
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7986cc09

Branch: refs/heads/master
Commit: 7986cc09b1b2100fc061d0aea8aa2e1e1b162c75
Parents: 4d9ebf3
Author: Sital Kedia <sk...@fb.com>
Authored: Tue Oct 31 09:49:58 2017 -0700
Committer: Marcelo Vanzin <va...@cloudera.com>
Committed: Tue Oct 31 09:49:58 2017 -0700

----------------------------------------------------------------------
 .../spark/ExecutorAllocationManager.scala       | 29 ++++++++++++--------
 .../spark/ExecutorAllocationManagerSuite.scala  | 22 +++++++++++++++
 2 files changed, 40 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7986cc09/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 119b426..5bc2d9e 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -267,6 +267,10 @@ private[spark] class ExecutorAllocationManager(
     (numRunningOrPendingTasks + tasksPerExecutor - 1) / tasksPerExecutor
   }
 
+  private def totalRunningTasks(): Int = synchronized {
+    listener.totalRunningTasks
+  }
+
   /**
    * This is called at a fixed interval to regulate the number of pending executor requests
    * and number of executors running.
@@ -602,12 +606,11 @@ private[spark] class ExecutorAllocationManager(
   private class ExecutorAllocationListener extends SparkListener {
 
     private val stageIdToNumTasks = new mutable.HashMap[Int, Int]
+    // Number of running tasks per stage including speculative tasks.
+    // Should be 0 when no stages are active.
+    private val stageIdToNumRunningTask = new mutable.HashMap[Int, Int]
     private val stageIdToTaskIndices = new mutable.HashMap[Int, mutable.HashSet[Int]]
     private val executorIdToTaskIds = new mutable.HashMap[String, mutable.HashSet[Long]]
-    // Number of tasks currently running on the cluster including speculative tasks.
-    // Should be 0 when no stages are active.
-    private var numRunningTasks: Int = _
-
     // Number of speculative tasks to be scheduled in each stage
     private val stageIdToNumSpeculativeTasks = new mutable.HashMap[Int, Int]
     // The speculative tasks started in each stage
@@ -625,6 +628,7 @@ private[spark] class ExecutorAllocationManager(
       val numTasks = stageSubmitted.stageInfo.numTasks
       allocationManager.synchronized {
         stageIdToNumTasks(stageId) = numTasks
+        stageIdToNumRunningTask(stageId) = 0
         allocationManager.onSchedulerBacklogged()
 
         // Compute the number of tasks requested by the stage on each host
@@ -651,6 +655,7 @@ private[spark] class ExecutorAllocationManager(
       val stageId = stageCompleted.stageInfo.stageId
       allocationManager.synchronized {
         stageIdToNumTasks -= stageId
+        stageIdToNumRunningTask -= stageId
         stageIdToNumSpeculativeTasks -= stageId
         stageIdToTaskIndices -= stageId
         stageIdToSpeculativeTaskIndices -= stageId
@@ -663,10 +668,6 @@ private[spark] class ExecutorAllocationManager(
         // This is needed in case the stage is aborted for any reason
         if (stageIdToNumTasks.isEmpty && stageIdToNumSpeculativeTasks.isEmpty) {
           allocationManager.onSchedulerQueueEmpty()
-          if (numRunningTasks != 0) {
-            logWarning("No stages are running, but numRunningTasks != 0")
-            numRunningTasks = 0
-          }
         }
       }
     }
@@ -678,7 +679,9 @@ private[spark] class ExecutorAllocationManager(
       val executorId = taskStart.taskInfo.executorId
 
       allocationManager.synchronized {
-        numRunningTasks += 1
+        if (stageIdToNumRunningTask.contains(stageId)) {
+          stageIdToNumRunningTask(stageId) += 1
+        }
         // This guards against the race condition in which the `SparkListenerTaskStart`
         // event is posted before the `SparkListenerBlockManagerAdded` event, which is
         // possible because these events are posted in different threads. (see SPARK-4951)
@@ -709,7 +712,9 @@ private[spark] class ExecutorAllocationManager(
       val taskIndex = taskEnd.taskInfo.index
       val stageId = taskEnd.stageId
       allocationManager.synchronized {
-        numRunningTasks -= 1
+        if (stageIdToNumRunningTask.contains(stageId)) {
+          stageIdToNumRunningTask(stageId) -= 1
+        }
         // If the executor is no longer running any scheduled tasks, mark it as idle
         if (executorIdToTaskIds.contains(executorId)) {
           executorIdToTaskIds(executorId) -= taskId
@@ -787,7 +792,9 @@ private[spark] class ExecutorAllocationManager(
     /**
      * The number of tasks currently running across all stages.
      */
-    def totalRunningTasks(): Int = numRunningTasks
+    def totalRunningTasks(): Int = {
+      stageIdToNumRunningTask.values.sum
+    }
 
     /**
      * Return true if an executor is not currently running a task, and false otherwise.

http://git-wip-us.apache.org/repos/asf/spark/blob/7986cc09/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index a91e09b..90b7ec4 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -227,6 +227,23 @@ class ExecutorAllocationManagerSuite
     assert(numExecutorsToAdd(manager) === 1)
   }
 
+  test("ignore task end events from completed stages") {
+    sc = createSparkContext(0, 10, 0)
+    val manager = sc.executorAllocationManager.get
+    val stage = createStageInfo(0, 5)
+    post(sc.listenerBus, SparkListenerStageSubmitted(stage))
+    val taskInfo1 = createTaskInfo(0, 0, "executor-1")
+    val taskInfo2 = createTaskInfo(1, 1, "executor-1")
+    post(sc.listenerBus, SparkListenerTaskStart(0, 0, taskInfo1))
+    post(sc.listenerBus, SparkListenerTaskStart(0, 0, taskInfo2))
+
+    post(sc.listenerBus, SparkListenerStageCompleted(stage))
+
+    post(sc.listenerBus, SparkListenerTaskEnd(0, 0, null, Success, taskInfo1, null))
+    post(sc.listenerBus, SparkListenerTaskEnd(2, 0, null, Success, taskInfo2, null))
+    assert(totalRunningTasks(manager) === 0)
+  }
+
   test("cancel pending executors when no longer needed") {
     sc = createSparkContext(0, 10, 0)
     val manager = sc.executorAllocationManager.get
@@ -1107,6 +1124,7 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
   private val _localityAwareTasks = PrivateMethod[Int]('localityAwareTasks)
   private val _hostToLocalTaskCount = PrivateMethod[Map[String, Int]]('hostToLocalTaskCount)
   private val _onSpeculativeTaskSubmitted = PrivateMethod[Unit]('onSpeculativeTaskSubmitted)
+  private val _totalRunningTasks = PrivateMethod[Int]('totalRunningTasks)
 
   private def numExecutorsToAdd(manager: ExecutorAllocationManager): Int = {
     manager invokePrivate _numExecutorsToAdd()
@@ -1190,6 +1208,10 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
     manager invokePrivate _localityAwareTasks()
   }
 
+  private def totalRunningTasks(manager: ExecutorAllocationManager): Int = {
+    manager invokePrivate _totalRunningTasks()
+  }
+
   private def hostToLocalTaskCount(manager: ExecutorAllocationManager): Map[String, Int] = {
     manager invokePrivate _hostToLocalTaskCount()
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org