You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2019/02/05 04:14:13 UTC
[spark] branch branch-2.4 updated: [SPARK-26758][CORE] Idle
Executors are not getting killed after
spark.dynamiAllocation.executorIdleTimeout value
This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 7187c01 [SPARK-26758][CORE] Idle Executors are not getting killed after spark.dynamiAllocation.executorIdleTimeout value
7187c01 is described below
commit 7187c012213eb4f13b5546f60514be5f08c7392a
Author: sandeep-katta <sa...@gmail.com>
AuthorDate: Mon Feb 4 20:13:22 2019 -0800
[SPARK-26758][CORE] Idle Executors are not getting killed after spark.dynamiAllocation.executorIdleTimeout value
## What changes were proposed in this pull request?
**updateAndSyncNumExecutorsTarget** API should be called after **initializing** flag is unset
## How was this patch tested?
Added UT and also manually tested
After Fix
![afterfix](https://user-images.githubusercontent.com/35216143/51983136-ed4a5000-24bd-11e9-90c8-c4a562c17a4b.png)
Closes #23697 from sandeep-katta/executorIssue.
Authored-by: sandeep-katta <sa...@gmail.com>
Signed-off-by: Sean Owen <se...@databricks.com>
(cherry picked from commit 1dd7419702c5bc7e36fee9fa1eec06b66f25806e)
Signed-off-by: Sean Owen <se...@databricks.com>
---
.../apache/spark/ExecutorAllocationManager.scala | 4 ++--
.../spark/ExecutorAllocationManagerSuite.scala | 26 +++++++++++++++++-----
2 files changed, 22 insertions(+), 8 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index c3e5b96..49fa80c 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -306,8 +306,6 @@ private[spark] class ExecutorAllocationManager(
private def schedule(): Unit = synchronized {
val now = clock.getTimeMillis
- updateAndSyncNumExecutorsTarget(now)
-
val executorIdsToBeRemoved = ArrayBuffer[String]()
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
@@ -317,6 +315,8 @@ private[spark] class ExecutorAllocationManager(
}
!expired
}
+ // Update executor target number only after initializing flag is unset
+ updateAndSyncNumExecutorsTarget(now)
if (executorIdsToBeRemoved.nonEmpty) {
removeExecutors(executorIdsToBeRemoved)
}
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 5c718cb..f50ad78 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -935,12 +935,7 @@ class ExecutorAllocationManagerSuite
assert(maxNumExecutorsNeeded(manager) === 0)
schedule(manager)
- // Verify executor is timeout but numExecutorsTarget is not recalculated
- assert(numExecutorsTarget(manager) === 3)
-
- // Schedule again to recalculate the numExecutorsTarget after executor is timeout
- schedule(manager)
- // Verify that current number of executors should be ramp down when executor is timeout
+ // Verify executor is timeout,numExecutorsTarget is recalculated
assert(numExecutorsTarget(manager) === 2)
}
@@ -1147,6 +1142,25 @@ class ExecutorAllocationManagerSuite
verify(mockAllocationClient).killExecutors(Seq("executor-1"), false, false, false)
}
+ test("SPARK-26758 check executor target number after idle time out ") {
+ sc = createSparkContext(1, 5, 3)
+ val manager = sc.executorAllocationManager.get
+ val clock = new ManualClock(10000L)
+ manager.setClock(clock)
+ assert(numExecutorsTarget(manager) === 3)
+ manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
+ clock.getTimeMillis(), "executor-1", new ExecutorInfo("host1", 1, Map.empty)))
+ manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
+ clock.getTimeMillis(), "executor-2", new ExecutorInfo("host1", 2, Map.empty)))
+ manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
+ clock.getTimeMillis(), "executor-3", new ExecutorInfo("host1", 3, Map.empty)))
+ // make all the executors as idle, so that it will be killed
+ clock.advance(executorIdleTimeout * 1000)
+ schedule(manager)
+ // once the schedule is run target executor number should be 1
+ assert(numExecutorsTarget(manager) === 1)
+ }
+
private def createSparkContext(
minExecutors: Int = 1,
maxExecutors: Int = 5,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org