You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/06/15 01:49:27 UTC
[spark] branch branch-2.4 updated: Revert "[SPARK-29152][CORE][2.4]
Executor Plugin shutdown when dynamic allocation is enabled"
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new a89a674 Revert "[SPARK-29152][CORE][2.4] Executor Plugin shutdown when dynamic allocation is enabled"
a89a674 is described below
commit a89a674553b4e91fd7b5c95816d1be36d35e4fb5
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Sun Jun 14 18:48:17 2020 -0700
Revert "[SPARK-29152][CORE][2.4] Executor Plugin shutdown when dynamic allocation is enabled"
This reverts commit 90e928c05073561d8f2ee40ebe50b9f7c5208754.
---
.../scala/org/apache/spark/executor/Executor.scala | 40 +++++++++-------------
1 file changed, 16 insertions(+), 24 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index d142e43..f7ff0b8 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -24,7 +24,6 @@ import java.net.{URI, URL}
import java.nio.ByteBuffer
import java.util.Properties
import java.util.concurrent._
-import java.util.concurrent.atomic.AtomicBoolean
import javax.annotation.concurrent.GuardedBy
import scala.collection.JavaConverters._
@@ -64,11 +63,6 @@ private[spark] class Executor(
logInfo(s"Starting executor ID $executorId on host $executorHostname")
- private val executorShutdown = new AtomicBoolean(false)
- ShutdownHookManager.addShutdownHook(
- () => stop()
- )
-
// Application dependencies (added through SparkContext) that we've fetched so far on this node.
// Each map holds the master's timestamp for the version of that file or JAR we got.
private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
@@ -250,26 +244,24 @@ private[spark] class Executor(
}
def stop(): Unit = {
- if (!executorShutdown.getAndSet(true)) {
- env.metricsSystem.report()
- heartbeater.shutdown()
- heartbeater.awaitTermination(10, TimeUnit.SECONDS)
- threadPool.shutdown()
-
- // Notify plugins that executor is shutting down so they can terminate cleanly
- Utils.withContextClassLoader(replClassLoader) {
- executorPlugins.foreach { plugin =>
- try {
- plugin.shutdown()
- } catch {
- case e: Exception =>
- logWarning("Plugin " + plugin.getClass().getCanonicalName() + " shutdown failed", e)
- }
+ env.metricsSystem.report()
+ heartbeater.shutdown()
+ heartbeater.awaitTermination(10, TimeUnit.SECONDS)
+ threadPool.shutdown()
+
+ // Notify plugins that executor is shutting down so they can terminate cleanly
+ Utils.withContextClassLoader(replClassLoader) {
+ executorPlugins.foreach { plugin =>
+ try {
+ plugin.shutdown()
+ } catch {
+ case e: Exception =>
+ logWarning("Plugin " + plugin.getClass().getCanonicalName() + " shutdown failed", e)
}
}
- if (!isLocal) {
- env.stop()
- }
+ }
+ if (!isLocal) {
+ env.stop()
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org