You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/04/16 11:48:36 UTC

spark git commit: [SPARK-4194] [core] Make SparkContext initialization exception-safe.

Repository: spark
Updated Branches:
  refs/heads/master 6179a9483 -> de4fa6b6d


[SPARK-4194] [core] Make SparkContext initialization exception-safe.

SparkContext has a very long constructor, where multiple things are
initialized, multiple threads are spawned, and multiple opportunities
for exceptions to be thrown exist. If one of these happens at an
innoportune time, lots of garbage tends to stick around.

This patch re-organizes SparkContext so that its internal state is
initialized in a big "try" block. The fields keeping state are now
completely private to SparkContext, and are "vars", because Scala
doesn't allow you to initialize a val later. The existing API interface
is kept by turning vals into defs (which works because Scala guarantees
the same binary interface for those).

On top of that, a few things in other areas were changed to avoid more
things leaking:

- Executor was changed to explicitly wait for the heartbeat thread to
  stop. LocalBackend was changed to wait for the "StopExecutor"
  message to be received, since otherwise there could be a race
  between that message arriving and the actor system being shut down.
- ConnectionManager could possibly hang during shutdown, because an
  interrupt at the wrong moment could cause the selector thread to
  still call select and then wait forever. So also wake up the
  selector so that this situation is avoided.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #5335 from vanzin/SPARK-4194 and squashes the following commits:

746b661 [Marcelo Vanzin] Fix borked merge.
80fc00e [Marcelo Vanzin] Merge branch 'master' into SPARK-4194
408dada [Marcelo Vanzin] Merge branch 'master' into SPARK-4194
2621609 [Marcelo Vanzin] Merge branch 'master' into SPARK-4194
6b73fcb [Marcelo Vanzin] Scalastyle.
c671c46 [Marcelo Vanzin] Fix merge.
3979aad [Marcelo Vanzin] Merge branch 'master' into SPARK-4194
8caa8b3 [Marcelo Vanzin] [SPARK-4194] [core] Make SparkContext initialization exception-safe.
071f16e [Marcelo Vanzin] Nits.
27456b9 [Marcelo Vanzin] More exception safety.
a0b0881 [Marcelo Vanzin] Stop alloc manager before scheduler.
5545d83 [Marcelo Vanzin] [SPARK-6650] [core] Stop ExecutorAllocationManager when context stops.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de4fa6b6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de4fa6b6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de4fa6b6

Branch: refs/heads/master
Commit: de4fa6b6d12e2bee0307ffba2abfca0c33f15e45
Parents: 6179a94
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Thu Apr 16 10:48:31 2015 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Thu Apr 16 10:48:31 2015 +0100

----------------------------------------------------------------------
 .../scala/org/apache/spark/SparkContext.scala   | 505 +++++++++++--------
 .../org/apache/spark/executor/Executor.scala    |  33 +-
 .../spark/network/nio/ConnectionManager.scala   |   7 +-
 .../spark/scheduler/TaskSchedulerImpl.scala     |   3 +-
 .../spark/scheduler/local/LocalBackend.scala    |  19 +-
 .../spark/ExecutorAllocationManagerSuite.scala  |   6 -
 6 files changed, 329 insertions(+), 244 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/de4fa6b6/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3f1a7dd..e106c5c 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -31,6 +31,7 @@ import scala.collection.JavaConversions._
 import scala.collection.generic.Growable
 import scala.collection.mutable.HashMap
 import scala.reflect.{ClassTag, classTag}
+import scala.util.control.NonFatal
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
@@ -50,9 +51,10 @@ import org.apache.spark.executor.{ExecutorEndpoint, TriggerThreadDump}
 import org.apache.spark.input.{StreamInputFormat, PortableDataStream, WholeTextFileInputFormat,
   FixedLengthBinaryInputFormat}
 import org.apache.spark.io.CompressionCodec
+import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
 import org.apache.spark.rdd._
-import org.apache.spark.rpc.RpcAddress
+import org.apache.spark.rpc.{RpcAddress, RpcEndpointRef}
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.{CoarseGrainedSchedulerBackend,
   SparkDeploySchedulerBackend, SimrSchedulerBackend}
@@ -192,8 +194,42 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   // log out Spark Version in Spark driver log
   logInfo(s"Running Spark version $SPARK_VERSION")
 
-  private[spark] val conf = config.clone()
-  conf.validateSettings()
+  /* ------------------------------------------------------------------------------------- *
+   | Private variables. These variables keep the internal state of the context, and are    |
+   | not accessible by the outside world. They're mutable since we want to initialize all  |
+   | of them to some neutral value ahead of time, so that calling "stop()" while the       |
+   | constructor is still running is safe.                                                 |
+   * ------------------------------------------------------------------------------------- */
+
+  private var _conf: SparkConf = _
+  private var _eventLogDir: Option[URI] = None
+  private var _eventLogCodec: Option[String] = None
+  private var _env: SparkEnv = _
+  private var _metadataCleaner: MetadataCleaner = _
+  private var _jobProgressListener: JobProgressListener = _
+  private var _statusTracker: SparkStatusTracker = _
+  private var _progressBar: Option[ConsoleProgressBar] = None
+  private var _ui: Option[SparkUI] = None
+  private var _hadoopConfiguration: Configuration = _
+  private var _executorMemory: Int = _
+  private var _schedulerBackend: SchedulerBackend = _
+  private var _taskScheduler: TaskScheduler = _
+  private var _heartbeatReceiver: RpcEndpointRef = _
+  @volatile private var _dagScheduler: DAGScheduler = _
+  private var _applicationId: String = _
+  private var _eventLogger: Option[EventLoggingListener] = None
+  private var _executorAllocationManager: Option[ExecutorAllocationManager] = None
+  private var _cleaner: Option[ContextCleaner] = None
+  private var _listenerBusStarted: Boolean = false
+  private var _jars: Seq[String] = _
+  private var _files: Seq[String] = _
+
+  /* ------------------------------------------------------------------------------------- *
+   | Accessors and public fields. These provide access to the internal state of the        |
+   | context.                                                                              |
+   * ------------------------------------------------------------------------------------- */
+
+  private[spark] def conf: SparkConf = _conf
 
   /**
    * Return a copy of this SparkContext's configuration. The configuration ''cannot'' be
@@ -201,65 +237,24 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    */
   def getConf: SparkConf = conf.clone()
 
-  if (!conf.contains("spark.master")) {
-    throw new SparkException("A master URL must be set in your configuration")
-  }
-  if (!conf.contains("spark.app.name")) {
-    throw new SparkException("An application name must be set in your configuration")
-  }
-
-  if (conf.getBoolean("spark.logConf", false)) {
-    logInfo("Spark configuration:\n" + conf.toDebugString)
-  }
-
-  // Set Spark driver host and port system properties
-  conf.setIfMissing("spark.driver.host", Utils.localHostName())
-  conf.setIfMissing("spark.driver.port", "0")
-
-  val jars: Seq[String] =
-    conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
-
-  val files: Seq[String] =
-    conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
-
-  val master = conf.get("spark.master")
-  val appName = conf.get("spark.app.name")
+  def jars: Seq[String] = _jars
+  def files: Seq[String] = _files
+  def master: String = _conf.get("spark.master")
+  def appName: String = _conf.get("spark.app.name")
 
-  private[spark] val isEventLogEnabled = conf.getBoolean("spark.eventLog.enabled", false)
-  private[spark] val eventLogDir: Option[URI] = {
-    if (isEventLogEnabled) {
-      val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
-        .stripSuffix("/")
-      Some(Utils.resolveURI(unresolvedDir))
-    } else {
-      None
-    }
-  }
-  private[spark] val eventLogCodec: Option[String] = {
-    val compress = conf.getBoolean("spark.eventLog.compress", false)
-    if (compress && isEventLogEnabled) {
-      Some(CompressionCodec.getCodecName(conf)).map(CompressionCodec.getShortName)
-    } else {
-      None
-    }
-  }
+  private[spark] def isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false)
+  private[spark] def eventLogDir: Option[URI] = _eventLogDir
+  private[spark] def eventLogCodec: Option[String] = _eventLogCodec
 
   // Generate the random name for a temp folder in Tachyon
   // Add a timestamp as the suffix here to make it more safe
   val tachyonFolderName = "spark-" + randomUUID.toString()
-  conf.set("spark.tachyonStore.folderName", tachyonFolderName)
 
-  val isLocal = (master == "local" || master.startsWith("local["))
-
-  if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
+  def isLocal: Boolean = (master == "local" || master.startsWith("local["))
 
   // An asynchronous listener bus for Spark events
   private[spark] val listenerBus = new LiveListenerBus
 
-  conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
-
-  // Create the Spark execution environment (cache, map output tracker, etc)
-
   // This function allows components created by SparkEnv to be mocked in unit tests:
   private[spark] def createSparkEnv(
       conf: SparkConf,
@@ -268,8 +263,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
     SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
   }
 
-  private[spark] val env = createSparkEnv(conf, isLocal, listenerBus)
-  SparkEnv.set(env)
+  private[spark] def env: SparkEnv = _env
 
   // Used to store a URL for each static file/jar together with the file's local timestamp
   private[spark] val addedFiles = HashMap[String, Long]()
@@ -277,35 +271,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
 
   // Keeps track of all persisted RDDs
   private[spark] val persistentRdds = new TimeStampedWeakValueHashMap[Int, RDD[_]]
-  private[spark] val metadataCleaner =
-    new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)
-
+  private[spark] def metadataCleaner: MetadataCleaner = _metadataCleaner
+  private[spark] def jobProgressListener: JobProgressListener = _jobProgressListener
 
-  private[spark] val jobProgressListener = new JobProgressListener(conf)
-  listenerBus.addListener(jobProgressListener)
+  def statusTracker: SparkStatusTracker = _statusTracker
 
-  val statusTracker = new SparkStatusTracker(this)
+  private[spark] def progressBar: Option[ConsoleProgressBar] = _progressBar
 
-  private[spark] val progressBar: Option[ConsoleProgressBar] =
-    if (conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) {
-      Some(new ConsoleProgressBar(this))
-    } else {
-      None
-    }
-
-  // Initialize the Spark UI
-  private[spark] val ui: Option[SparkUI] =
-    if (conf.getBoolean("spark.ui.enabled", true)) {
-      Some(SparkUI.createLiveUI(this, conf, listenerBus, jobProgressListener,
-        env.securityManager,appName))
-    } else {
-      // For tests, do not enable the UI
-      None
-    }
-
-  // Bind the UI before starting the task scheduler to communicate
-  // the bound port to the cluster manager properly
-  ui.foreach(_.bind())
+  private[spark] def ui: Option[SparkUI] = _ui
 
   /**
    * A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse.
@@ -313,134 +286,248 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * '''Note:''' As it will be reused in all Hadoop RDDs, it's better not to modify it unless you
    * plan to set some global configurations for all Hadoop RDDs.
    */
-  val hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(conf)
+  def hadoopConfiguration: Configuration = _hadoopConfiguration
+
+  private[spark] def executorMemory: Int = _executorMemory
+
+  // Environment variables to pass to our executors.
+  private[spark] val executorEnvs = HashMap[String, String]()
+
+  // Set SPARK_USER for user who is running SparkContext.
+  val sparkUser = Utils.getCurrentUserName()
 
-  // Add each JAR given through the constructor
-  if (jars != null) {
-    jars.foreach(addJar)
+  private[spark] def schedulerBackend: SchedulerBackend = _schedulerBackend
+  private[spark] def schedulerBackend_=(sb: SchedulerBackend): Unit = {
+    _schedulerBackend = sb
   }
 
-  if (files != null) {
-    files.foreach(addFile)
+  private[spark] def taskScheduler: TaskScheduler = _taskScheduler
+  private[spark] def taskScheduler_=(ts: TaskScheduler): Unit = {
+    _taskScheduler = ts
   }
 
+  private[spark] def dagScheduler: DAGScheduler = _dagScheduler
+  private[spark] def dagScheduler_=(ds: DAGScheduler): Unit = {
+    _dagScheduler = ds
+  }
+
+  def applicationId: String = _applicationId
+
+  def metricsSystem: MetricsSystem = if (_env != null) _env.metricsSystem else null
+
+  private[spark] def eventLogger: Option[EventLoggingListener] = _eventLogger
+
+  private[spark] def executorAllocationManager: Option[ExecutorAllocationManager] =
+    _executorAllocationManager
+
+  private[spark] def cleaner: Option[ContextCleaner] = _cleaner
+
+  private[spark] var checkpointDir: Option[String] = None
+
+  // Thread Local variable that can be used by users to pass information down the stack
+  private val localProperties = new InheritableThreadLocal[Properties] {
+    override protected def childValue(parent: Properties): Properties = new Properties(parent)
+    override protected def initialValue(): Properties = new Properties()
+  }
+
+  /* ------------------------------------------------------------------------------------- *
+   | Initialization. This code initializes the context in a manner that is exception-safe. |
+   | All internal fields holding state are initialized here, and any error prompts the     |
+   | stop() method to be called.                                                           |
+   * ------------------------------------------------------------------------------------- */
+
   private def warnSparkMem(value: String): String = {
     logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " +
       "deprecated, please use spark.executor.memory instead.")
     value
   }
 
-  private[spark] val executorMemory = conf.getOption("spark.executor.memory")
-    .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
-    .orElse(Option(System.getenv("SPARK_MEM")).map(warnSparkMem))
-    .map(Utils.memoryStringToMb)
-    .getOrElse(512)
+  try {
+    _conf = config.clone()
+    _conf.validateSettings()
 
-  // Environment variables to pass to our executors.
-  private[spark] val executorEnvs = HashMap[String, String]()
+    if (!_conf.contains("spark.master")) {
+      throw new SparkException("A master URL must be set in your configuration")
+    }
+    if (!_conf.contains("spark.app.name")) {
+      throw new SparkException("An application name must be set in your configuration")
+    }
 
-  // Convert java options to env vars as a work around
-  // since we can't set env vars directly in sbt.
-  for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
-    value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
-    executorEnvs(envKey) = value
-  }
-  Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
-    executorEnvs("SPARK_PREPEND_CLASSES") = v
-  }
-  // The Mesos scheduler backend relies on this environment variable to set executor memory.
-  // TODO: Set this only in the Mesos scheduler.
-  executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
-  executorEnvs ++= conf.getExecutorEnv
+    if (_conf.getBoolean("spark.logConf", false)) {
+      logInfo("Spark configuration:\n" + _conf.toDebugString)
+    }
 
-  // Set SPARK_USER for user who is running SparkContext.
-  val sparkUser = Utils.getCurrentUserName()
-  executorEnvs("SPARK_USER") = sparkUser
+    // Set Spark driver host and port system properties
+    _conf.setIfMissing("spark.driver.host", Utils.localHostName())
+    _conf.setIfMissing("spark.driver.port", "0")
 
-  // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
-  // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
-  private val heartbeatReceiver = env.rpcEnv.setupEndpoint(
-    HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
+    _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
 
-  // Create and start the scheduler
-  private[spark] var (schedulerBackend, taskScheduler) =
-    SparkContext.createTaskScheduler(this, master)
+    _jars =_conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten
+    _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0))
+      .toSeq.flatten
 
-  heartbeatReceiver.send(TaskSchedulerIsSet)
+    _eventLogDir =
+      if (isEventLogEnabled) {
+        val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
+          .stripSuffix("/")
+        Some(Utils.resolveURI(unresolvedDir))
+      } else {
+        None
+      }
 
-  @volatile private[spark] var dagScheduler: DAGScheduler = _
-  try {
-    dagScheduler = new DAGScheduler(this)
-  } catch {
-    case e: Exception => {
-      try {
-        stop()
-      } finally {
-        throw new SparkException("Error while constructing DAGScheduler", e)
+    _eventLogCodec = {
+      val compress = _conf.getBoolean("spark.eventLog.compress", false)
+      if (compress && isEventLogEnabled) {
+        Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
+      } else {
+        None
       }
     }
-  }
 
-  // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
-  // constructor
-  taskScheduler.start()
+    _conf.set("spark.tachyonStore.folderName", tachyonFolderName)
 
-  val applicationId: String = taskScheduler.applicationId()
-  conf.set("spark.app.id", applicationId)
+    if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
 
-  env.blockManager.initialize(applicationId)
+    // Create the Spark execution environment (cache, map output tracker, etc)
+    _env = createSparkEnv(_conf, isLocal, listenerBus)
+    SparkEnv.set(_env)
 
-  val metricsSystem = env.metricsSystem
+    _metadataCleaner = new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, _conf)
 
-  // The metrics system for Driver need to be set spark.app.id to app ID.
-  // So it should start after we get app ID from the task scheduler and set spark.app.id.
-  metricsSystem.start()
-  // Attach the driver metrics servlet handler to the web ui after the metrics system is started.
-  metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
+    _jobProgressListener = new JobProgressListener(_conf)
+    listenerBus.addListener(jobProgressListener)
 
-  // Optionally log Spark events
-  private[spark] val eventLogger: Option[EventLoggingListener] = {
-    if (isEventLogEnabled) {
-      val logger =
-        new EventLoggingListener(applicationId, eventLogDir.get, conf, hadoopConfiguration)
-      logger.start()
-      listenerBus.addListener(logger)
-      Some(logger)
-    } else None
-  }
+    _statusTracker = new SparkStatusTracker(this)
 
-  // Optionally scale number of executors dynamically based on workload. Exposed for testing.
-  private val dynamicAllocationEnabled = conf.getBoolean("spark.dynamicAllocation.enabled", false)
-  private val dynamicAllocationTesting = conf.getBoolean("spark.dynamicAllocation.testing", false)
-  private[spark] val executorAllocationManager: Option[ExecutorAllocationManager] =
-    if (dynamicAllocationEnabled) {
-      assert(supportDynamicAllocation,
-        "Dynamic allocation of executors is currently only supported in YARN mode")
-      Some(new ExecutorAllocationManager(this, listenerBus, conf))
-    } else {
-      None
+    _progressBar =
+      if (_conf.getBoolean("spark.ui.showConsoleProgress", true) && !log.isInfoEnabled) {
+        Some(new ConsoleProgressBar(this))
+      } else {
+        None
+      }
+
+    _ui =
+      if (conf.getBoolean("spark.ui.enabled", true)) {
+        Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener,
+          _env.securityManager,appName))
+      } else {
+        // For tests, do not enable the UI
+        None
+      }
+    // Bind the UI before starting the task scheduler to communicate
+    // the bound port to the cluster manager properly
+    _ui.foreach(_.bind())
+
+    _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
+
+    // Add each JAR given through the constructor
+    if (jars != null) {
+      jars.foreach(addJar)
     }
-  executorAllocationManager.foreach(_.start())
 
-  private[spark] val cleaner: Option[ContextCleaner] = {
-    if (conf.getBoolean("spark.cleaner.referenceTracking", true)) {
-      Some(new ContextCleaner(this))
-    } else {
-      None
+    if (files != null) {
+      files.foreach(addFile)
     }
-  }
-  cleaner.foreach(_.start())
 
-  setupAndStartListenerBus()
-  postEnvironmentUpdate()
-  postApplicationStart()
+    _executorMemory = _conf.getOption("spark.executor.memory")
+      .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
+      .orElse(Option(System.getenv("SPARK_MEM"))
+      .map(warnSparkMem))
+      .map(Utils.memoryStringToMb)
+      .getOrElse(512)
+
+    // Convert java options to env vars as a work around
+    // since we can't set env vars directly in sbt.
+    for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
+      value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
+      executorEnvs(envKey) = value
+    }
+    Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
+      executorEnvs("SPARK_PREPEND_CLASSES") = v
+    }
+    // The Mesos scheduler backend relies on this environment variable to set executor memory.
+    // TODO: Set this only in the Mesos scheduler.
+    executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
+    executorEnvs ++= _conf.getExecutorEnv
+    executorEnvs("SPARK_USER") = sparkUser
+
+    // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
+    // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
+    _heartbeatReceiver = env.rpcEnv.setupEndpoint(
+      HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
+
+    // Create and start the scheduler
+    val (sched, ts) = SparkContext.createTaskScheduler(this, master)
+    _schedulerBackend = sched
+    _taskScheduler = ts
+    _dagScheduler = new DAGScheduler(this)
+    _heartbeatReceiver.send(TaskSchedulerIsSet)
+
+    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
+    // constructor
+    _taskScheduler.start()
+
+    _applicationId = _taskScheduler.applicationId()
+    _conf.set("spark.app.id", _applicationId)
+    _env.blockManager.initialize(_applicationId)
+
+    // The metrics system for Driver need to be set spark.app.id to app ID.
+    // So it should start after we get app ID from the task scheduler and set spark.app.id.
+    metricsSystem.start()
+    // Attach the driver metrics servlet handler to the web ui after the metrics system is started.
+    metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
+
+    _eventLogger =
+      if (isEventLogEnabled) {
+        val logger =
+          new EventLoggingListener(_applicationId, _eventLogDir.get, _conf, _hadoopConfiguration)
+        logger.start()
+        listenerBus.addListener(logger)
+        Some(logger)
+      } else {
+        None
+      }
 
-  private[spark] var checkpointDir: Option[String] = None
+    // Optionally scale number of executors dynamically based on workload. Exposed for testing.
+    val dynamicAllocationEnabled = _conf.getBoolean("spark.dynamicAllocation.enabled", false)
+    _executorAllocationManager =
+      if (dynamicAllocationEnabled) {
+        assert(supportDynamicAllocation,
+          "Dynamic allocation of executors is currently only supported in YARN mode")
+        Some(new ExecutorAllocationManager(this, listenerBus, _conf))
+      } else {
+        None
+      }
+    _executorAllocationManager.foreach(_.start())
 
-  // Thread Local variable that can be used by users to pass information down the stack
-  private val localProperties = new InheritableThreadLocal[Properties] {
-    override protected def childValue(parent: Properties): Properties = new Properties(parent)
-    override protected def initialValue(): Properties = new Properties()
+    _cleaner =
+      if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
+        Some(new ContextCleaner(this))
+      } else {
+        None
+      }
+    _cleaner.foreach(_.start())
+
+    setupAndStartListenerBus()
+    postEnvironmentUpdate()
+    postApplicationStart()
+
+    // Post init
+    _taskScheduler.postStartHook()
+    _env.metricsSystem.registerSource(new DAGSchedulerSource(dagScheduler))
+    _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
+  } catch {
+    case NonFatal(e) =>
+      logError("Error initializing SparkContext.", e)
+      try {
+        stop()
+      } catch {
+        case NonFatal(inner) =>
+          logError("Error stopping SparkContext after init error.", inner)
+      } finally {
+        throw e
+      }
   }
 
   /**
@@ -544,19 +631,6 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
     setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, null)
   }
 
-  // Post init
-  taskScheduler.postStartHook()
-
-  private val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
-  private val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
-
-  private def initDriverMetrics() {
-    SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
-    SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
-  }
-
-  initDriverMetrics()
-
   // Methods for creating RDDs
 
   /** Distribute a local Scala collection to form an RDD.
@@ -1146,7 +1220,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * this application is supported. This is currently only available for YARN.
    */
   private[spark] def supportDynamicAllocation =
-    master.contains("yarn") || dynamicAllocationTesting
+    master.contains("yarn") || _conf.getBoolean("spark.dynamicAllocation.testing", false)
 
   /**
    * :: DeveloperApi ::
@@ -1163,7 +1237,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * This is currently only supported in YARN mode. Return whether the request is received.
    */
   private[spark] override def requestTotalExecutors(numExecutors: Int): Boolean = {
-    assert(master.contains("yarn") || dynamicAllocationTesting,
+    assert(supportDynamicAllocation,
       "Requesting executors is currently only supported in YARN mode")
     schedulerBackend match {
       case b: CoarseGrainedSchedulerBackend =>
@@ -1403,28 +1477,40 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
   def stop() {
     // Use the stopping variable to ensure no contention for the stop scenario.
     // Still track the stopped variable for use elsewhere in the code.
-    
     if (!stopped.compareAndSet(false, true)) {
       logInfo("SparkContext already stopped.")
       return
     }
-    
+
     postApplicationEnd()
-    ui.foreach(_.stop())
-    env.metricsSystem.report()
-    metadataCleaner.cancel()
-    cleaner.foreach(_.stop()) 
-    executorAllocationManager.foreach(_.stop())
-    dagScheduler.stop()
-    dagScheduler = null
-    listenerBus.stop()
-    eventLogger.foreach(_.stop())
-    env.rpcEnv.stop(heartbeatReceiver)
-    progressBar.foreach(_.stop())
-    taskScheduler = null
+    _ui.foreach(_.stop())
+    if (env != null) {
+      env.metricsSystem.report()
+    }
+    if (metadataCleaner != null) {
+      metadataCleaner.cancel()
+    }
+    _cleaner.foreach(_.stop())
+    _executorAllocationManager.foreach(_.stop())
+    if (_dagScheduler != null) {
+      _dagScheduler.stop()
+      _dagScheduler = null
+    }
+    if (_listenerBusStarted) {
+      listenerBus.stop()
+      _listenerBusStarted = false
+    }
+    _eventLogger.foreach(_.stop())
+    if (env != null && _heartbeatReceiver != null) {
+      env.rpcEnv.stop(_heartbeatReceiver)
+    }
+    _progressBar.foreach(_.stop())
+    _taskScheduler = null
     // TODO: Cache.stop()?
-    env.stop()
-    SparkEnv.set(null)
+    if (_env != null) {
+      _env.stop()
+      SparkEnv.set(null)
+    }
     SparkContext.clearActiveContext()
     logInfo("Successfully stopped SparkContext")
   }
@@ -1749,6 +1835,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
     }
 
     listenerBus.start(this)
+    _listenerBusStarted = true
   }
 
   /** Post the application start event */
@@ -2152,7 +2239,7 @@ object SparkContext extends Logging {
     master match {
       case "local" =>
         val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
-        val backend = new LocalBackend(scheduler, 1)
+        val backend = new LocalBackend(sc.getConf, scheduler, 1)
         scheduler.initialize(backend)
         (backend, scheduler)
 
@@ -2164,7 +2251,7 @@ object SparkContext extends Logging {
           throw new SparkException(s"Asked to run locally with $threadCount threads")
         }
         val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
-        val backend = new LocalBackend(scheduler, threadCount)
+        val backend = new LocalBackend(sc.getConf, scheduler, threadCount)
         scheduler.initialize(backend)
         (backend, scheduler)
 
@@ -2174,7 +2261,7 @@ object SparkContext extends Logging {
         // local[N, M] means exactly N threads with M failures
         val threadCount = if (threads == "*") localCpuCount else threads.toInt
         val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
-        val backend = new LocalBackend(scheduler, threadCount)
+        val backend = new LocalBackend(sc.getConf, scheduler, threadCount)
         scheduler.initialize(backend)
         (backend, scheduler)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/de4fa6b6/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 516f619..1b5fdeb 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -21,7 +21,7 @@ import java.io.File
 import java.lang.management.ManagementFactory
 import java.net.URL
 import java.nio.ByteBuffer
-import java.util.concurrent.ConcurrentHashMap
+import java.util.concurrent.{ConcurrentHashMap, Executors, TimeUnit}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.{ArrayBuffer, HashMap}
@@ -60,8 +60,6 @@ private[spark] class Executor(
 
   private val conf = env.conf
 
-  @volatile private var isStopped = false
-
   // No ip or host:port - just hostname
   Utils.checkHost(executorHostname, "Expected executed slave to be a hostname")
   // must not have port specified.
@@ -114,6 +112,10 @@ private[spark] class Executor(
   // Maintains the list of running tasks.
   private val runningTasks = new ConcurrentHashMap[Long, TaskRunner]
 
+  // Executor for the heartbeat task.
+  private val heartbeater = Executors.newSingleThreadScheduledExecutor(
+    Utils.namedThreadFactory("driver-heartbeater"))
+
   startDriverHeartbeater()
 
   def launchTask(
@@ -138,7 +140,8 @@ private[spark] class Executor(
   def stop(): Unit = {
     env.metricsSystem.report()
     env.rpcEnv.stop(executorEndpoint)
-    isStopped = true
+    heartbeater.shutdown()
+    heartbeater.awaitTermination(10, TimeUnit.SECONDS)
     threadPool.shutdown()
     if (!isLocal) {
       env.stop()
@@ -432,23 +435,17 @@ private[spark] class Executor(
   }
 
   /**
-   * Starts a thread to report heartbeat and partial metrics for active tasks to driver.
-   * This thread stops running when the executor is stopped.
+   * Schedules a task to report heartbeat and partial metrics for active tasks to driver.
    */
   private def startDriverHeartbeater(): Unit = {
     val intervalMs = conf.getTimeAsMs("spark.executor.heartbeatInterval", "10s")
-    val thread = new Thread() {
-      override def run() {
-        // Sleep a random interval so the heartbeats don't end up in sync
-        Thread.sleep(intervalMs + (math.random * intervalMs).asInstanceOf[Int])
-        while (!isStopped) {
-          reportHeartBeat()
-          Thread.sleep(intervalMs)
-        }
-      }
+
+    // Wait a random interval so the heartbeats don't end up in sync
+    val initialDelay = intervalMs + (math.random * intervalMs).asInstanceOf[Int]
+
+    val heartbeatTask = new Runnable() {
+      override def run(): Unit = Utils.logUncaughtExceptions(reportHeartBeat())
     }
-    thread.setDaemon(true)
-    thread.setName("driver-heartbeater")
-    thread.start()
+    heartbeater.scheduleAtFixedRate(heartbeatTask, initialDelay, intervalMs, TimeUnit.MILLISECONDS)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/de4fa6b6/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
index 8e3c30f..5a74c13 100644
--- a/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
+++ b/core/src/main/scala/org/apache/spark/network/nio/ConnectionManager.scala
@@ -86,11 +86,11 @@ private[nio] class ConnectionManager(
       conf.get("spark.network.timeout", "120s"))
 
   // Get the thread counts from the Spark Configuration.
-  // 
+  //
   // Even though the ThreadPoolExecutor constructor takes both a minimum and maximum value,
   // we only query for the minimum value because we are using LinkedBlockingDeque.
-  // 
-  // The JavaDoc for ThreadPoolExecutor points out that when using a LinkedBlockingDeque (which is 
+  //
+  // The JavaDoc for ThreadPoolExecutor points out that when using a LinkedBlockingDeque (which is
   // an unbounded queue) no more than corePoolSize threads will ever be created, so only the "min"
   // parameter is necessary.
   private val handlerThreadCount = conf.getInt("spark.core.connection.handler.threads.min", 20)
@@ -989,6 +989,7 @@ private[nio] class ConnectionManager(
 
   def stop() {
     ackTimeoutMonitor.stop()
+    selector.wakeup()
     selectorThread.interrupt()
     selectorThread.join()
     selector.close()

http://git-wip-us.apache.org/repos/asf/spark/blob/de4fa6b6/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index ecc8bf1..13a52d8 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -142,11 +142,10 @@ private[spark] class TaskSchedulerImpl(
 
     if (!isLocal && conf.getBoolean("spark.speculation", false)) {
       logInfo("Starting speculative execution thread")
-      import sc.env.actorSystem.dispatcher
       sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL_MS milliseconds,
             SPECULATION_INTERVAL_MS milliseconds) {
         Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() }
-      }
+      }(sc.env.actorSystem.dispatcher)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/de4fa6b6/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
index 70a477a..50ba0b9 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/local/LocalBackend.scala
@@ -20,12 +20,12 @@ package org.apache.spark.scheduler.local
 import java.nio.ByteBuffer
 import java.util.concurrent.{Executors, TimeUnit}
 
-import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcEndpointRef, RpcEnv}
-import org.apache.spark.util.Utils
-import org.apache.spark.{Logging, SparkContext, SparkEnv, TaskState}
+import org.apache.spark.{Logging, SparkConf, SparkContext, SparkEnv, TaskState}
 import org.apache.spark.TaskState.TaskState
 import org.apache.spark.executor.{Executor, ExecutorBackend}
+import org.apache.spark.rpc.{ThreadSafeRpcEndpoint, RpcCallContext, RpcEndpointRef, RpcEnv}
 import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
+import org.apache.spark.util.Utils
 
 private case class ReviveOffers()
 
@@ -71,11 +71,15 @@ private[spark] class LocalEndpoint(
 
     case KillTask(taskId, interruptThread) =>
       executor.killTask(taskId, interruptThread)
+  }
 
+  override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
     case StopExecutor =>
       executor.stop()
+      context.reply(true)
   }
 
+
   def reviveOffers() {
     val offers = Seq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores))
     val tasks = scheduler.resourceOffers(offers).flatten
@@ -104,8 +108,11 @@ private[spark] class LocalEndpoint(
  * master all run in the same JVM. It sits behind a TaskSchedulerImpl and handles launching tasks
  * on a single Executor (created by the LocalBackend) running locally.
  */
-private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores: Int)
-  extends SchedulerBackend with ExecutorBackend {
+private[spark] class LocalBackend(
+    conf: SparkConf,
+    scheduler: TaskSchedulerImpl,
+    val totalCores: Int)
+  extends SchedulerBackend with ExecutorBackend with Logging {
 
   private val appId = "local-" + System.currentTimeMillis
   var localEndpoint: RpcEndpointRef = null
@@ -116,7 +123,7 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores:
   }
 
   override def stop() {
-    localEndpoint.send(StopExecutor)
+    localEndpoint.sendWithReply(StopExecutor)
   }
 
   override def reviveOffers() {

http://git-wip-us.apache.org/repos/asf/spark/blob/de4fa6b6/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index 6b3049b..22acc27 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -56,19 +56,13 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext wit
     // Min < 0
     val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "-1")
     intercept[SparkException] { contexts += new SparkContext(conf1) }
-    SparkEnv.get.stop()
-    SparkContext.clearActiveContext()
 
     // Max < 0
     val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "-1")
     intercept[SparkException] { contexts += new SparkContext(conf2) }
-    SparkEnv.get.stop()
-    SparkContext.clearActiveContext()
 
     // Both min and max, but min > max
     intercept[SparkException] { createSparkContext(2, 1) }
-    SparkEnv.get.stop()
-    SparkContext.clearActiveContext()
 
     // Both min and max, and min == max
     val sc1 = createSparkContext(1, 1)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org