You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/06/15 01:49:27 UTC

[spark] branch branch-2.4 updated: Revert "[SPARK-29152][CORE][2.4] Executor Plugin shutdown when dynamic allocation is enabled"

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new a89a674  Revert "[SPARK-29152][CORE][2.4] Executor Plugin shutdown when dynamic allocation is enabled"
a89a674 is described below

commit a89a674553b4e91fd7b5c95816d1be36d35e4fb5
Author: Dongjoon Hyun <do...@apache.org>
AuthorDate: Sun Jun 14 18:48:17 2020 -0700

    Revert "[SPARK-29152][CORE][2.4] Executor Plugin shutdown when dynamic allocation is enabled"
    
    This reverts commit 90e928c05073561d8f2ee40ebe50b9f7c5208754.
---
 .../scala/org/apache/spark/executor/Executor.scala | 40 +++++++++-------------
 1 file changed, 16 insertions(+), 24 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index d142e43..f7ff0b8 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -24,7 +24,6 @@ import java.net.{URI, URL}
 import java.nio.ByteBuffer
 import java.util.Properties
 import java.util.concurrent._
-import java.util.concurrent.atomic.AtomicBoolean
 import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.JavaConverters._
@@ -64,11 +63,6 @@ private[spark] class Executor(
 
   logInfo(s"Starting executor ID $executorId on host $executorHostname")
 
-  private val executorShutdown = new AtomicBoolean(false)
-  ShutdownHookManager.addShutdownHook(
-    () => stop()
-  )
-
   // Application dependencies (added through SparkContext) that we've fetched so far on this node.
   // Each map holds the master's timestamp for the version of that file or JAR we got.
   private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
@@ -250,26 +244,24 @@ private[spark] class Executor(
   }
 
   def stop(): Unit = {
-    if (!executorShutdown.getAndSet(true)) {
-      env.metricsSystem.report()
-      heartbeater.shutdown()
-      heartbeater.awaitTermination(10, TimeUnit.SECONDS)
-      threadPool.shutdown()
-
-      // Notify plugins that executor is shutting down so they can terminate cleanly
-      Utils.withContextClassLoader(replClassLoader) {
-        executorPlugins.foreach { plugin =>
-          try {
-            plugin.shutdown()
-          } catch {
-            case e: Exception =>
-              logWarning("Plugin " + plugin.getClass().getCanonicalName() + " shutdown failed", e)
-          }
+    env.metricsSystem.report()
+    heartbeater.shutdown()
+    heartbeater.awaitTermination(10, TimeUnit.SECONDS)
+    threadPool.shutdown()
+
+    // Notify plugins that executor is shutting down so they can terminate cleanly
+    Utils.withContextClassLoader(replClassLoader) {
+      executorPlugins.foreach { plugin =>
+        try {
+          plugin.shutdown()
+        } catch {
+          case e: Exception =>
+            logWarning("Plugin " + plugin.getClass().getCanonicalName() + " shutdown failed", e)
         }
       }
-      if (!isLocal) {
-        env.stop()
-      }
+    }
+    if (!isLocal) {
+      env.stop()
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org