You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2020/06/14 21:53:19 UTC

[spark] branch branch-2.4 updated: [SPARK-29152][CORE][2.4] Executor Plugin shutdown when dynamic allocation is enabled

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 90e928c  [SPARK-29152][CORE][2.4] Executor Plugin shutdown when dynamic allocation is enabled
90e928c is described below

commit 90e928c05073561d8f2ee40ebe50b9f7c5208754
Author: iRakson <ra...@gmail.com>
AuthorDate: Sun Jun 14 14:51:27 2020 -0700

    [SPARK-29152][CORE][2.4] Executor Plugin shutdown when dynamic allocation is enabled
    
    ### What changes were proposed in this pull request?
    Added a Shutdown Hook in `executor.scala` which will ensure that executor's `stop()` method is always called.
    
    ### Why are the changes needed?
    In case executors are not going down gracefully, their `stop()` is not called.
    
    ### Does this PR introduce any user-facing change?
    No.
    
    ### How was this patch tested?
    Manually
    
    Closes #26901 from iRakson/SPARK-29152_2.4.
    
    Authored-by: iRakson <ra...@gmail.com>
    Signed-off-by: Dongjoon Hyun <do...@apache.org>
---
 .../scala/org/apache/spark/executor/Executor.scala | 40 +++++++++++++---------
 1 file changed, 24 insertions(+), 16 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index f7ff0b8..d142e43 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -24,6 +24,7 @@ import java.net.{URI, URL}
 import java.nio.ByteBuffer
 import java.util.Properties
 import java.util.concurrent._
+import java.util.concurrent.atomic.AtomicBoolean
 import javax.annotation.concurrent.GuardedBy
 
 import scala.collection.JavaConverters._
@@ -63,6 +64,11 @@ private[spark] class Executor(
 
   logInfo(s"Starting executor ID $executorId on host $executorHostname")
 
+  private val executorShutdown = new AtomicBoolean(false)
+  ShutdownHookManager.addShutdownHook(
+    () => stop()
+  )
+
   // Application dependencies (added through SparkContext) that we've fetched so far on this node.
   // Each map holds the master's timestamp for the version of that file or JAR we got.
   private val currentFiles: HashMap[String, Long] = new HashMap[String, Long]()
@@ -244,24 +250,26 @@ private[spark] class Executor(
   }
 
   def stop(): Unit = {
-    env.metricsSystem.report()
-    heartbeater.shutdown()
-    heartbeater.awaitTermination(10, TimeUnit.SECONDS)
-    threadPool.shutdown()
-
-    // Notify plugins that executor is shutting down so they can terminate cleanly
-    Utils.withContextClassLoader(replClassLoader) {
-      executorPlugins.foreach { plugin =>
-        try {
-          plugin.shutdown()
-        } catch {
-          case e: Exception =>
-            logWarning("Plugin " + plugin.getClass().getCanonicalName() + " shutdown failed", e)
+    if (!executorShutdown.getAndSet(true)) {
+      env.metricsSystem.report()
+      heartbeater.shutdown()
+      heartbeater.awaitTermination(10, TimeUnit.SECONDS)
+      threadPool.shutdown()
+
+      // Notify plugins that executor is shutting down so they can terminate cleanly
+      Utils.withContextClassLoader(replClassLoader) {
+        executorPlugins.foreach { plugin =>
+          try {
+            plugin.shutdown()
+          } catch {
+            case e: Exception =>
+              logWarning("Plugin " + plugin.getClass().getCanonicalName() + " shutdown failed", e)
+          }
         }
       }
-    }
-    if (!isLocal) {
-      env.stop()
+      if (!isLocal) {
+        env.stop()
+      }
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org