You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/04/22 02:34:02 UTC

spark git commit: [SPARK-6014] [core] Revamp Spark shutdown hooks, fix shutdown races.

Repository: spark
Updated Branches:
  refs/heads/master b063a61b9 -> e72c16e30


[SPARK-6014] [core] Revamp Spark shutdown hooks, fix shutdown races.

This change adds some new utility code to handle shutdown hooks in
Spark. The main goal is to take advantage of Hadoop 2.x's API for
shutdown hooks, which allows Spark to register a hook that will
run before the one that cleans up HDFS clients, and thus avoids
some races that would cause exceptions to show up and other issues
such as failure to properly close event logs.

Unfortunately, Hadoop 1.x does not have such APIs, so in that case
correctness is still left to chance.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #5560 from vanzin/SPARK-6014 and squashes the following commits:

edfafb1 [Marcelo Vanzin] Better scaladoc.
fcaeedd [Marcelo Vanzin] Merge branch 'master' into SPARK-6014
e7039dc [Marcelo Vanzin] [SPARK-6014] [core] Revamp Spark shutdown hooks, fix shutdown races.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e72c16e3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e72c16e3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e72c16e3

Branch: refs/heads/master
Commit: e72c16e30d85cdc394d318b5551698885cfda9b8
Parents: b063a61
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Tue Apr 21 20:33:57 2015 -0400
Committer: Sean Owen <so...@cloudera.com>
Committed: Tue Apr 21 20:33:57 2015 -0400

----------------------------------------------------------------------
 .../spark/deploy/history/HistoryServer.scala    |   6 +-
 .../spark/deploy/worker/ExecutorRunner.scala    |  12 +-
 .../apache/spark/storage/DiskBlockManager.scala |  18 +--
 .../spark/storage/TachyonBlockManager.scala     |  24 ++--
 .../scala/org/apache/spark/util/Utils.scala     | 136 ++++++++++++++++---
 .../org/apache/spark/util/UtilsSuite.scala      |  32 +++--
 .../hive/thriftserver/HiveThriftServer2.scala   |   9 +-
 .../hive/thriftserver/SparkSQLCLIDriver.scala   |   9 +-
 .../spark/deploy/yarn/ApplicationMaster.scala   |  63 ++++-----
 9 files changed, 195 insertions(+), 114 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e72c16e3/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 72f6048..56bef57 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -27,7 +27,7 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf}
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.ui.{SparkUI, UIUtils, WebUI}
 import org.apache.spark.ui.JettyUtils._
-import org.apache.spark.util.SignalLogger
+import org.apache.spark.util.{SignalLogger, Utils}
 
 /**
  * A web server that renders SparkUIs of completed applications.
@@ -194,9 +194,7 @@ object HistoryServer extends Logging {
     val server = new HistoryServer(conf, provider, securityManager, port)
     server.bind()
 
-    Runtime.getRuntime().addShutdownHook(new Thread("HistoryServerStopper") {
-      override def run(): Unit = server.stop()
-    })
+    Utils.addShutdownHook { () => server.stop() }
 
     // Wait until the end of the world... or if the HistoryServer process is manually stopped
     while(true) { Thread.sleep(Int.MaxValue) }

http://git-wip-us.apache.org/repos/asf/spark/blob/e72c16e3/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
index 7d5acab..7aa85b7 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala
@@ -28,6 +28,7 @@ import com.google.common.io.Files
 import org.apache.spark.{SparkConf, Logging}
 import org.apache.spark.deploy.{ApplicationDescription, ExecutorState}
 import org.apache.spark.deploy.DeployMessages.ExecutorStateChanged
+import org.apache.spark.util.Utils
 import org.apache.spark.util.logging.FileAppender
 
 /**
@@ -61,7 +62,7 @@ private[deploy] class ExecutorRunner(
 
   // NOTE: This is now redundant with the automated shut-down enforced by the Executor. It might
   // make sense to remove this in the future.
-  private var shutdownHook: Thread = null
+  private var shutdownHook: AnyRef = null
 
   private[worker] def start() {
     workerThread = new Thread("ExecutorRunner for " + fullId) {
@@ -69,12 +70,7 @@ private[deploy] class ExecutorRunner(
     }
     workerThread.start()
     // Shutdown hook that kills actors on shutdown.
-    shutdownHook = new Thread() {
-      override def run() {
-        killProcess(Some("Worker shutting down"))
-      }
-    }
-    Runtime.getRuntime.addShutdownHook(shutdownHook)
+    shutdownHook = Utils.addShutdownHook { () => killProcess(Some("Worker shutting down")) }
   }
 
   /**
@@ -106,7 +102,7 @@ private[deploy] class ExecutorRunner(
       workerThread = null
       state = ExecutorState.KILLED
       try {
-        Runtime.getRuntime.removeShutdownHook(shutdownHook)
+        Utils.removeShutdownHook(shutdownHook)
       } catch {
         case e: IllegalStateException => None
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/e72c16e3/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
index 2883137..7ea5e54 100644
--- a/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala
@@ -138,25 +138,17 @@ private[spark] class DiskBlockManager(blockManager: BlockManager, conf: SparkCon
     }
   }
 
-  private def addShutdownHook(): Thread = {
-    val shutdownHook = new Thread("delete Spark local dirs") {
-      override def run(): Unit = Utils.logUncaughtExceptions {
-        logDebug("Shutdown hook called")
-        DiskBlockManager.this.doStop()
-      }
+  private def addShutdownHook(): AnyRef = {
+    Utils.addShutdownHook { () =>
+      logDebug("Shutdown hook called")
+      DiskBlockManager.this.doStop()
     }
-    Runtime.getRuntime.addShutdownHook(shutdownHook)
-    shutdownHook
   }
 
   /** Cleanup local dirs and stop shuffle sender. */
   private[spark] def stop() {
     // Remove the shutdown hook.  It causes memory leaks if we leave it around.
-    try {
-      Runtime.getRuntime.removeShutdownHook(shutdownHook)
-    } catch {
-      case e: IllegalStateException => None
-    }
+    Utils.removeShutdownHook(shutdownHook)
     doStop()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e72c16e3/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
index af87303..951897c 100644
--- a/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
+++ b/core/src/main/scala/org/apache/spark/storage/TachyonBlockManager.scala
@@ -135,21 +135,19 @@ private[spark] class TachyonBlockManager(
 
   private def addShutdownHook() {
     tachyonDirs.foreach(tachyonDir => Utils.registerShutdownDeleteDir(tachyonDir))
-    Runtime.getRuntime.addShutdownHook(new Thread("delete Spark tachyon dirs") {
-      override def run(): Unit = Utils.logUncaughtExceptions {
-        logDebug("Shutdown hook called")
-        tachyonDirs.foreach { tachyonDir =>
-          try {
-            if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) {
-              Utils.deleteRecursively(tachyonDir, client)
-            }
-          } catch {
-            case e: Exception =>
-              logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
+    Utils.addShutdownHook { () =>
+      logDebug("Shutdown hook called")
+      tachyonDirs.foreach { tachyonDir =>
+        try {
+          if (!Utils.hasRootAsShutdownDeleteDir(tachyonDir)) {
+            Utils.deleteRecursively(tachyonDir, client)
           }
+        } catch {
+          case e: Exception =>
+            logError("Exception while deleting tachyon spark dir: " + tachyonDir, e)
         }
-        client.close()
       }
-    })
+      client.close()
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e72c16e3/core/src/main/scala/org/apache/spark/util/Utils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala
index 1029b0f..7b0de1a 100644
--- a/core/src/main/scala/org/apache/spark/util/Utils.scala
+++ b/core/src/main/scala/org/apache/spark/util/Utils.scala
@@ -21,7 +21,7 @@ import java.io._
 import java.lang.management.ManagementFactory
 import java.net._
 import java.nio.ByteBuffer
-import java.util.{Properties, Locale, Random, UUID}
+import java.util.{PriorityQueue, Properties, Locale, Random, UUID}
 import java.util.concurrent._
 import javax.net.ssl.HttpsURLConnection
 
@@ -30,7 +30,7 @@ import scala.collection.Map
 import scala.collection.mutable.ArrayBuffer
 import scala.io.Source
 import scala.reflect.ClassTag
-import scala.util.Try
+import scala.util.{Failure, Success, Try}
 import scala.util.control.{ControlThrowable, NonFatal}
 
 import com.google.common.io.{ByteStreams, Files}
@@ -64,9 +64,15 @@ private[spark] object CallSite {
 private[spark] object Utils extends Logging {
   val random = new Random()
 
+  val DEFAULT_SHUTDOWN_PRIORITY = 100
+
   private val MAX_DIR_CREATION_ATTEMPTS: Int = 10
   @volatile private var localRootDirs: Array[String] = null
 
+
+  private val shutdownHooks = new SparkShutdownHookManager()
+  shutdownHooks.install()
+
   /** Serialize an object using Java serialization */
   def serialize[T](o: T): Array[Byte] = {
     val bos = new ByteArrayOutputStream()
@@ -176,18 +182,16 @@ private[spark] object Utils extends Logging {
   private val shutdownDeleteTachyonPaths = new scala.collection.mutable.HashSet[String]()
 
   // Add a shutdown hook to delete the temp dirs when the JVM exits
-  Runtime.getRuntime.addShutdownHook(new Thread("delete Spark temp dirs") {
-    override def run(): Unit = Utils.logUncaughtExceptions {
-      logDebug("Shutdown hook called")
-      shutdownDeletePaths.foreach { dirPath =>
-        try {
-          Utils.deleteRecursively(new File(dirPath))
-        } catch {
-          case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e)
-        }
+  addShutdownHook { () =>
+    logDebug("Shutdown hook called")
+    shutdownDeletePaths.foreach { dirPath =>
+      try {
+        Utils.deleteRecursively(new File(dirPath))
+      } catch {
+        case e: Exception => logError(s"Exception while deleting Spark temp dir: $dirPath", e)
       }
     }
-  })
+  }
 
   // Register the path to be deleted via shutdown hook
   def registerShutdownDeleteDir(file: File) {
@@ -613,7 +617,7 @@ private[spark] object Utils extends Logging {
         }
         Utils.setupSecureURLConnection(uc, securityMgr)
 
-        val timeoutMs = 
+        val timeoutMs =
           conf.getTimeAsSeconds("spark.files.fetchTimeout", "60s").toInt * 1000
         uc.setConnectTimeout(timeoutMs)
         uc.setReadTimeout(timeoutMs)
@@ -1172,7 +1176,7 @@ private[spark] object Utils extends Logging {
   /**
    * Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the
    * default UncaughtExceptionHandler
-   * 
+   *
    * NOTE: This method is to be called by the spark-started JVM process.
    */
   def tryOrExit(block: => Unit) {
@@ -1185,11 +1189,11 @@ private[spark] object Utils extends Logging {
   }
 
   /**
-   * Execute a block of code that evaluates to Unit, stop SparkContext is there is any uncaught 
+   * Execute a block of code that evaluates to Unit, stop SparkContext is there is any uncaught
    * exception
-   *  
-   * NOTE: This method is to be called by the driver-side components to avoid stopping the 
-   * user-started JVM process completely; in contrast, tryOrExit is to be called in the 
+   *
+   * NOTE: This method is to be called by the driver-side components to avoid stopping the
+   * user-started JVM process completely; in contrast, tryOrExit is to be called in the
    * spark-started JVM process .
    */
   def tryOrStopSparkContext(sc: SparkContext)(block: => Unit) {
@@ -2132,6 +2136,102 @@ private[spark] object Utils extends Logging {
       .getOrElse(UserGroupInformation.getCurrentUser().getShortUserName())
   }
 
+  /**
+   * Adds a shutdown hook with default priority.
+   *
+   * @param hook The code to run during shutdown.
+   * @return A handle that can be used to unregister the shutdown hook.
+   */
+  def addShutdownHook(hook: () => Unit): AnyRef = {
+    addShutdownHook(DEFAULT_SHUTDOWN_PRIORITY, hook)
+  }
+
+  /**
+   * Adds a shutdown hook with the given priority. Hooks with lower priority values run
+   * first.
+   *
+   * @param hook The code to run during shutdown.
+   * @return A handle that can be used to unregister the shutdown hook.
+   */
+  def addShutdownHook(priority: Int, hook: () => Unit): AnyRef = {
+    shutdownHooks.add(priority, hook)
+  }
+
+  /**
+   * Remove a previously installed shutdown hook.
+   *
+   * @param ref A handle returned by `addShutdownHook`.
+   * @return Whether the hook was removed.
+   */
+  def removeShutdownHook(ref: AnyRef): Boolean = {
+    shutdownHooks.remove(ref)
+  }
+
+}
+
+private [util] class SparkShutdownHookManager {
+
+  private val hooks = new PriorityQueue[SparkShutdownHook]()
+  private var shuttingDown = false
+
+  /**
+   * Install a hook to run at shutdown and run all registered hooks in order. Hadoop 1.x does not
+   * have `ShutdownHookManager`, so in that case we just use the JVM's `Runtime` object and hope for
+   * the best.
+   */
+  def install(): Unit = {
+    val hookTask = new Runnable() {
+      override def run(): Unit = runAll()
+    }
+    Try(Class.forName("org.apache.hadoop.util.ShutdownHookManager")) match {
+      case Success(shmClass) =>
+        val fsPriority = classOf[FileSystem].getField("SHUTDOWN_HOOK_PRIORITY").get()
+          .asInstanceOf[Int]
+        val shm = shmClass.getMethod("get").invoke(null)
+        shm.getClass().getMethod("addShutdownHook", classOf[Runnable], classOf[Int])
+          .invoke(shm, hookTask, Integer.valueOf(fsPriority + 30))
+
+      case Failure(_) =>
+        Runtime.getRuntime.addShutdownHook(new Thread(hookTask, "Spark Shutdown Hook"));
+    }
+  }
+
+  def runAll(): Unit = synchronized {
+    shuttingDown = true
+    while (!hooks.isEmpty()) {
+      Utils.logUncaughtExceptions(hooks.poll().run())
+    }
+  }
+
+  def add(priority: Int, hook: () => Unit): AnyRef = synchronized {
+    checkState()
+    val hookRef = new SparkShutdownHook(priority, hook)
+    hooks.add(hookRef)
+    hookRef
+  }
+
+  def remove(ref: AnyRef): Boolean = synchronized {
+    checkState()
+    hooks.remove(ref)
+  }
+
+  private def checkState(): Unit = {
+    if (shuttingDown) {
+      throw new IllegalStateException("Shutdown hooks cannot be modified during shutdown.")
+    }
+  }
+
+}
+
+private class SparkShutdownHook(private val priority: Int, hook: () => Unit)
+  extends Comparable[SparkShutdownHook] {
+
+  override def compareTo(other: SparkShutdownHook): Int = {
+    other.priority - priority
+  }
+
+  def run(): Unit = hook()
+
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/e72c16e3/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
index fb97e65..1ba9980 100644
--- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala
@@ -17,14 +17,16 @@
 
 package org.apache.spark.util
 
-import scala.util.Random
-
 import java.io.{File, ByteArrayOutputStream, ByteArrayInputStream, FileOutputStream}
 import java.net.{BindException, ServerSocket, URI}
 import java.nio.{ByteBuffer, ByteOrder}
 import java.text.DecimalFormatSymbols
 import java.util.concurrent.TimeUnit
 import java.util.Locale
+import java.util.PriorityQueue
+
+import scala.collection.mutable.ListBuffer
+import scala.util.Random
 
 import com.google.common.base.Charsets.UTF_8
 import com.google.common.io.Files
@@ -36,14 +38,14 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.SparkConf
 
 class UtilsSuite extends FunSuite with ResetSystemProperties {
-  
+
   test("timeConversion") {
     // Test -1
     assert(Utils.timeStringAsSeconds("-1") === -1)
-    
+
     // Test zero
     assert(Utils.timeStringAsSeconds("0") === 0)
-    
+
     assert(Utils.timeStringAsSeconds("1") === 1)
     assert(Utils.timeStringAsSeconds("1s") === 1)
     assert(Utils.timeStringAsSeconds("1000ms") === 1)
@@ -52,7 +54,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
     assert(Utils.timeStringAsSeconds("1min") === TimeUnit.MINUTES.toSeconds(1))
     assert(Utils.timeStringAsSeconds("1h") === TimeUnit.HOURS.toSeconds(1))
     assert(Utils.timeStringAsSeconds("1d") === TimeUnit.DAYS.toSeconds(1))
-    
+
     assert(Utils.timeStringAsMs("1") === 1)
     assert(Utils.timeStringAsMs("1ms") === 1)
     assert(Utils.timeStringAsMs("1000us") === 1)
@@ -61,7 +63,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
     assert(Utils.timeStringAsMs("1min") === TimeUnit.MINUTES.toMillis(1))
     assert(Utils.timeStringAsMs("1h") === TimeUnit.HOURS.toMillis(1))
     assert(Utils.timeStringAsMs("1d") === TimeUnit.DAYS.toMillis(1))
-    
+
     // Test invalid strings
     intercept[NumberFormatException] {
       Utils.timeStringAsMs("This breaks 600s")
@@ -79,7 +81,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
       Utils.timeStringAsMs("This 123s breaks")
     }
   }
-  
+
   test("bytesToString") {
     assert(Utils.bytesToString(10) === "10.0 B")
     assert(Utils.bytesToString(1500) === "1500.0 B")
@@ -466,4 +468,18 @@ class UtilsSuite extends FunSuite with ResetSystemProperties {
     val newFileName = new File(testFileDir, testFileName)
     assert(newFileName.isFile())
   }
+
+  test("shutdown hook manager") {
+    val manager = new SparkShutdownHookManager()
+    val output = new ListBuffer[Int]()
+
+    val hook1 = manager.add(1, () => output += 1)
+    manager.add(3, () => output += 3)
+    manager.add(2, () => output += 2)
+    manager.add(4, () => output += 4)
+    manager.remove(hook1)
+
+    manager.runAll()
+    assert(output.toList === List(4, 3, 2))
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e72c16e3/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
index c3a3f8c..832596f 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/HiveThriftServer2.scala
@@ -28,6 +28,7 @@ import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.sql.hive.HiveContext
 import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._
 import org.apache.spark.scheduler.{SparkListenerApplicationEnd, SparkListener}
+import org.apache.spark.util.Utils
 
 /**
  * The main entry point for the Spark SQL port of HiveServer2.  Starts up a `SparkSQLContext` and a
@@ -57,13 +58,7 @@ object HiveThriftServer2 extends Logging {
     logInfo("Starting SparkContext")
     SparkSQLEnv.init()
 
-    Runtime.getRuntime.addShutdownHook(
-      new Thread() {
-        override def run() {
-          SparkSQLEnv.stop()
-        }
-      }
-    )
+    Utils.addShutdownHook { () => SparkSQLEnv.stop() }
 
     try {
       val server = new HiveThriftServer2(SparkSQLEnv.hiveContext)

http://git-wip-us.apache.org/repos/asf/spark/blob/e72c16e3/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
----------------------------------------------------------------------
diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
index 85281c6..7e307bb 100644
--- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
+++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala
@@ -40,6 +40,7 @@ import org.apache.thrift.transport.TSocket
 
 import org.apache.spark.Logging
 import org.apache.spark.sql.hive.HiveShim
+import org.apache.spark.util.Utils
 
 private[hive] object SparkSQLCLIDriver {
   private var prompt = "spark-sql"
@@ -101,13 +102,7 @@ private[hive] object SparkSQLCLIDriver {
     SessionState.start(sessionState)
 
     // Clean up after we exit
-    Runtime.getRuntime.addShutdownHook(
-      new Thread() {
-        override def run() {
-          SparkSQLEnv.stop()
-        }
-      }
-    )
+    Utils.addShutdownHook { () => SparkSQLEnv.stop() }
 
     // "-h" option has been passed, so connect to Hive thrift server.
     if (sessionState.getHost != null) {

http://git-wip-us.apache.org/repos/asf/spark/blob/e72c16e3/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
----------------------------------------------------------------------
diff --git a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
index f7a8420..93ae451 100644
--- a/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
+++ b/yarn/src/main/scala/org/apache/spark/deploy/yarn/ApplicationMaster.scala
@@ -25,7 +25,6 @@ import java.net.{Socket, URL}
 import java.util.concurrent.atomic.AtomicReference
 
 import org.apache.hadoop.fs.{FileSystem, Path}
-import org.apache.hadoop.util.ShutdownHookManager
 import org.apache.hadoop.yarn.api._
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
@@ -95,44 +94,38 @@ private[spark] class ApplicationMaster(
       logInfo("ApplicationAttemptId: " + appAttemptId)
 
       val fs = FileSystem.get(yarnConf)
-      val cleanupHook = new Runnable {
-        override def run() {
-          // If the SparkContext is still registered, shut it down as a best case effort in case
-          // users do not call sc.stop or do System.exit().
-          val sc = sparkContextRef.get()
-          if (sc != null) {
-            logInfo("Invoking sc stop from shutdown hook")
-            sc.stop()
-          }
-          val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
-          val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
-
-          if (!finished) {
-            // This happens when the user application calls System.exit(). We have the choice
-            // of either failing or succeeding at this point. We report success to avoid
-            // retrying applications that have succeeded (System.exit(0)), which means that
-            // applications that explicitly exit with a non-zero status will also show up as
-            // succeeded in the RM UI.
-            finish(finalStatus,
-              ApplicationMaster.EXIT_SUCCESS,
-              "Shutdown hook called before final status was reported.")
-          }
 
-          if (!unregistered) {
-            // we only want to unregister if we don't want the RM to retry
-            if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
-              unregister(finalStatus, finalMsg)
-              cleanupStagingDir(fs)
-            }
+      Utils.addShutdownHook { () =>
+        // If the SparkContext is still registered, shut it down as a best case effort in case
+        // users do not call sc.stop or do System.exit().
+        val sc = sparkContextRef.get()
+        if (sc != null) {
+          logInfo("Invoking sc stop from shutdown hook")
+          sc.stop()
+        }
+        val maxAppAttempts = client.getMaxRegAttempts(sparkConf, yarnConf)
+        val isLastAttempt = client.getAttemptId().getAttemptId() >= maxAppAttempts
+
+        if (!finished) {
+          // This happens when the user application calls System.exit(). We have the choice
+          // of either failing or succeeding at this point. We report success to avoid
+          // retrying applications that have succeeded (System.exit(0)), which means that
+          // applications that explicitly exit with a non-zero status will also show up as
+          // succeeded in the RM UI.
+          finish(finalStatus,
+            ApplicationMaster.EXIT_SUCCESS,
+            "Shutdown hook called before final status was reported.")
+        }
+
+        if (!unregistered) {
+          // we only want to unregister if we don't want the RM to retry
+          if (finalStatus == FinalApplicationStatus.SUCCEEDED || isLastAttempt) {
+            unregister(finalStatus, finalMsg)
+            cleanupStagingDir(fs)
           }
         }
       }
 
-      // Use higher priority than FileSystem.
-      assert(ApplicationMaster.SHUTDOWN_HOOK_PRIORITY > FileSystem.SHUTDOWN_HOOK_PRIORITY)
-      ShutdownHookManager
-        .get().addShutdownHook(cleanupHook, ApplicationMaster.SHUTDOWN_HOOK_PRIORITY)
-
       // Call this to force generation of secret so it gets populated into the
       // Hadoop UGI. This has to happen before the startUserApplication which does a
       // doAs in order for the credentials to be passed on to the executor containers.
@@ -546,8 +539,6 @@ private[spark] class ApplicationMaster(
 
 object ApplicationMaster extends Logging {
 
-  val SHUTDOWN_HOOK_PRIORITY: Int = 30
-
   // exit codes for different causes, no reason behind the values
   private val EXIT_SUCCESS = 0
   private val EXIT_UNCAUGHT_EXCEPTION = 10


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org