You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/12/19 01:04:45 UTC

spark git commit: [SPARK-9552] Return "false" while nothing to kill in killExecutors

Repository: spark
Updated Branches:
  refs/heads/master 2377b707f -> 60da0e11f


[SPARK-9552] Return "false" while nothing to kill in killExecutors

In discussion (SPARK-9552), we proposed a force kill in `killExecutors`. But if there is nothing to kill, it will return back with true (acknowledgement). And then, it causes the certain executor(s) (which is not eligible to kill) adding to pendingToRemove list for further actions.

In this patch, we'd like to change the return semantics. If there is nothing to kill, we will return "false". and therefore  all those non-eligible executors won't be added to the pendingToRemove list.

vanzin andrewor14 As the follow up of PR#7888, please let me know your comments.

Author: Grace <ji...@intel.com>
Author: Jie Huang <hj...@fosun.com>
Author: Andrew Or <an...@databricks.com>

Closes #9796 from GraceH/emptyPendingToRemove.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/60da0e11
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/60da0e11
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/60da0e11

Branch: refs/heads/master
Commit: 60da0e11f6724d86df16795a7a1166879215d547
Parents: 2377b70
Author: Grace <ji...@intel.com>
Authored: Fri Dec 18 16:04:42 2015 -0800
Committer: Andrew Or <an...@databricks.com>
Committed: Fri Dec 18 16:04:42 2015 -0800

----------------------------------------------------------------------
 .../spark/ExecutorAllocationManager.scala       |  4 +--
 .../cluster/CoarseGrainedSchedulerBackend.scala |  8 ++++--
 .../StandaloneDynamicAllocationSuite.scala      | 29 ++++++++++++--------
 3 files changed, 24 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/60da0e11/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 6176e25..4926caf 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -423,7 +423,8 @@ private[spark] class ExecutorAllocationManager(
       executorsPendingToRemove.add(executorId)
       true
     } else {
-      logWarning(s"Unable to reach the cluster manager to kill executor $executorId!")
+      logWarning(s"Unable to reach the cluster manager to kill executor $executorId," +
+        s"or no executor eligible to kill!")
       false
     }
   }
@@ -524,7 +525,6 @@ private[spark] class ExecutorAllocationManager(
   private def onExecutorBusy(executorId: String): Unit = synchronized {
     logDebug(s"Clearing idle timer for $executorId because it is now running a task")
     removeTimes.remove(executorId)
-    executorsPendingToRemove.remove(executorId)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/60da0e11/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 7efe167..2279e8c 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -471,7 +471,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
 
   /**
    * Request that the cluster manager kill the specified executors.
-   * @return whether the kill request is acknowledged.
+   * @return whether the kill request is acknowledged. If list to kill is empty, it will return
+   *         false.
    */
   final override def killExecutors(executorIds: Seq[String]): Boolean = synchronized {
     killExecutors(executorIds, replace = false, force = false)
@@ -487,7 +488,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
    * @param executorIds identifiers of executors to kill
    * @param replace whether to replace the killed executors with new ones
    * @param force whether to force kill busy executors
-   * @return whether the kill request is acknowledged.
+   * @return whether the kill request is acknowledged. If list to kill is empty, it will return
+   *         false.
    */
   final def killExecutors(
       executorIds: Seq[String],
@@ -516,7 +518,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
       numPendingExecutors += knownExecutors.size
     }
 
-    doKillExecutors(executorsToKill)
+    !executorsToKill.isEmpty && doKillExecutors(executorsToKill)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/60da0e11/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
index 2fa795f..314517d 100644
--- a/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/StandaloneDynamicAllocationSuite.scala
@@ -365,7 +365,7 @@ class StandaloneDynamicAllocationSuite
     val executors = getExecutorIds(sc)
     assert(executors.size === 2)
     assert(sc.killExecutor(executors.head))
-    assert(sc.killExecutor(executors.head))
+    assert(!sc.killExecutor(executors.head))
     val apps = getApplications()
     assert(apps.head.executors.size === 1)
     // The limit should not be lowered twice
@@ -386,23 +386,28 @@ class StandaloneDynamicAllocationSuite
     // the driver refuses to kill executors it does not know about
     syncExecutors(sc)
     val executors = getExecutorIds(sc)
+    val executorIdsBefore = executors.toSet
     assert(executors.size === 2)
-    // kill executor 1, and replace it
+    // kill and replace an executor
     assert(sc.killAndReplaceExecutor(executors.head))
     eventually(timeout(10.seconds), interval(10.millis)) {
       val apps = getApplications()
       assert(apps.head.executors.size === 2)
+      val executorIdsAfter = getExecutorIds(sc).toSet
+      // make sure the executor was killed and replaced
+      assert(executorIdsBefore != executorIdsAfter)
     }
 
-    var apps = getApplications()
-    // kill executor 1
-    assert(sc.killExecutor(executors.head))
-    apps = getApplications()
-    assert(apps.head.executors.size === 2)
-    assert(apps.head.getExecutorLimit === 2)
-    // kill executor 2
-    assert(sc.killExecutor(executors(1)))
-    apps = getApplications()
+    // kill old executor (which is killedAndReplaced) should fail
+    assert(!sc.killExecutor(executors.head))
+
+    // refresh executors list
+    val newExecutors = getExecutorIds(sc)
+    syncExecutors(sc)
+
+    // kill newly created executor and do not replace it
+    assert(sc.killExecutor(newExecutors(1)))
+    val apps = getApplications()
     assert(apps.head.executors.size === 1)
     assert(apps.head.getExecutorLimit === 1)
   }
@@ -430,7 +435,7 @@ class StandaloneDynamicAllocationSuite
     val executorIdToTaskCount = taskScheduler invokePrivate getMap()
     executorIdToTaskCount(executors.head) = 1
     // kill the busy executor without force; this should fail
-    assert(killExecutor(sc, executors.head, force = false))
+    assert(!killExecutor(sc, executors.head, force = false))
     apps = getApplications()
     assert(apps.head.executors.size === 2)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org