You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/02/05 04:13:45 UTC

[spark] branch master updated: [SPARK-26758][CORE] Idle Executors are not getting killed after spark.dynamiAllocation.executorIdleTimeout value

This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1dd7419  [SPARK-26758][CORE] Idle Executors are not getting killed after spark.dynamiAllocation.executorIdleTimeout value
1dd7419 is described below

commit 1dd7419702c5bc7e36fee9fa1eec06b66f25806e
Author: sandeep-katta <sa...@gmail.com>
AuthorDate: Mon Feb 4 20:13:22 2019 -0800

    [SPARK-26758][CORE] Idle Executors are not getting killed after spark.dynamiAllocation.executorIdleTimeout value
    
    ## What changes were proposed in this pull request?
    
    **updateAndSyncNumExecutorsTarget**  API should be called after **initializing** flag is unset
    ## How was this patch tested?
    Added UT and also manually tested
    
    After Fix
    ![afterfix](https://user-images.githubusercontent.com/35216143/51983136-ed4a5000-24bd-11e9-90c8-c4a562c17a4b.png)
    
    Closes #23697 from sandeep-katta/executorIssue.
    
    Authored-by: sandeep-katta <sa...@gmail.com>
    Signed-off-by: Sean Owen <se...@databricks.com>
---
 .../apache/spark/ExecutorAllocationManager.scala   |  4 ++--
 .../spark/ExecutorAllocationManagerSuite.scala     | 26 +++++++++++++++++-----
 2 files changed, 22 insertions(+), 8 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index c9da30e..99f4d11 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -312,8 +312,6 @@ private[spark] class ExecutorAllocationManager(
   private def schedule(): Unit = synchronized {
     val now = clock.getTimeMillis
 
-    updateAndSyncNumExecutorsTarget(now)
-
     val executorIdsToBeRemoved = ArrayBuffer[String]()
     removeTimes.retain { case (executorId, expireTime) =>
       val expired = now >= expireTime
@@ -323,6 +321,8 @@ private[spark] class ExecutorAllocationManager(
       }
       !expired
     }
+    // Update executor target number only after initializing flag is unset
+    updateAndSyncNumExecutorsTarget(now)
     if (executorIdsToBeRemoved.nonEmpty) {
       removeExecutors(executorIdsToBeRemoved)
     }
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index ce0ef2e..5500329 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -936,12 +936,7 @@ class ExecutorAllocationManagerSuite
 
     assert(maxNumExecutorsNeeded(manager) === 0)
     schedule(manager)
-    // Verify executor is timeout but numExecutorsTarget is not recalculated
-    assert(numExecutorsTarget(manager) === 3)
-
-    // Schedule again to recalculate the numExecutorsTarget after executor is timeout
-    schedule(manager)
-    // Verify that current number of executors should be ramp down when executor is timeout
+    // Verify executor is timeout,numExecutorsTarget is recalculated
     assert(numExecutorsTarget(manager) === 2)
   }
 
@@ -1148,6 +1143,25 @@ class ExecutorAllocationManagerSuite
     verify(mockAllocationClient).killExecutors(Seq("executor-1"), false, false, false)
   }
 
+  test("SPARK-26758 check executor target number after idle time out ") {
+    sc = createSparkContext(1, 5, 3)
+    val manager = sc.executorAllocationManager.get
+    val clock = new ManualClock(10000L)
+    manager.setClock(clock)
+    assert(numExecutorsTarget(manager) === 3)
+    manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
+      clock.getTimeMillis(), "executor-1", new ExecutorInfo("host1", 1, Map.empty)))
+    manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
+      clock.getTimeMillis(), "executor-2", new ExecutorInfo("host1", 2, Map.empty)))
+    manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
+      clock.getTimeMillis(), "executor-3", new ExecutorInfo("host1", 3, Map.empty)))
+    // make all the executors as idle, so that it will be killed
+    clock.advance(executorIdleTimeout * 1000)
+    schedule(manager)
+    // once the schedule is run target executor number should be 1
+    assert(numExecutorsTarget(manager) === 1)
+  }
+
   private def createSparkContext(
       minExecutors: Int = 1,
       maxExecutors: Int = 5,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org