You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/06/14 21:53:19 UTC
[spark] branch branch-2.4 updated: [SPARK-29152][CORE][2.4]
Executor Plugin shutdown when dynamic allocation is enabled
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 90e928c [SPARK-29152][CORE][2.4] Executor Plugin shutdown when dynamic allocation is enabled
90e928c is described below
commit 90e928c05073561d8f2ee40ebe50b9f7c5208754
Author: iRakson <ra...@gmail.com>
AuthorDate: Sun Jun 14 14:51:27 2020 -0700
[SPARK-29152][CORE][2.4] Executor Plugin shutdown when dynamic allocation is enabled
### What changes were proposed in this pull request?
Added a Shutdown Hook in `executor.scala` which will ensure that executor's `stop()` method is always called.
### Why are the changes needed?
In case executors are not going down gracefully, their `stop()` is not called.
### Does this PR introduce any user-facing change?
No.
### How was this patch tested?
Manually
Closes #26901 from iRakson/SPARK-29152_2.4.
Authored-by: iRakson <ra...@gmail.com>
Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
.../scala/org/apache/spark/executor/Executor.scala | 40 +++++++++++++---------
1 file changed, 24 insertions(+), 16 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index f7ff0b8..d142e43 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -24,6 +24,7 @@ import java.net.{URI, URL}
import java.nio.ByteBuffer
import java.util.Properties
import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicBoolean
import javax.annotation.concurrent.GuardedBy
import scala.collection.JavaConverters._
@@ -63,6 +64,11 @@ private[spark] class Executor(
logInfo(s"Starting executor ID $executorId on host $executorHostname")
+ private val executorShutdown = new AtomicBoolean(false)
+ ShutdownHookManager.addShutdownHook(
+ () => stop()
+ )
+
// Application dependencies (added through SparkContext) that we've fetched so far on this node.
// Each map holds the master's timestamp for the version of that file or JAR we got.
private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
@@ -244,24 +250,26 @@ private[spark] class Executor(
}
def stop(): Unit = {
- env.metricsSystem.report()
- heartbeater.shutdown()
- heartbeater.awaitTermination(10, TimeUnit.SECONDS)
- threadPool.shutdown()
-
- // Notify plugins that executor is shutting down so they can terminate cleanly
- Utils.withContextClassLoader(replClassLoader) {
- executorPlugins.foreach { plugin =>
- try {
- plugin.shutdown()
- } catch {
- case e: Exception =>
- logWarning("Plugin " + plugin.getClass().getCanonicalName() + " shutdown failed", e)
+ if (!executorShutdown.getAndSet(true)) {
+ env.metricsSystem.report()
+ heartbeater.shutdown()
+ heartbeater.awaitTermination(10, TimeUnit.SECONDS)
+ threadPool.shutdown()
+
+ // Notify plugins that executor is shutting down so they can terminate cleanly
+ Utils.withContextClassLoader(replClassLoader) {
+ executorPlugins.foreach { plugin =>
+ try {
+ plugin.shutdown()
+ } catch {
+ case e: Exception =>
+ logWarning("Plugin " + plugin.getClass().getCanonicalName() + " shutdown failed", e)
+ }
}
}
- }
- if (!isLocal) {
- env.stop()
+ if (!isLocal) {
+ env.stop()
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org