You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/04/03 04:48:59 UTC

spark git commit: [SPARK-6650] [core] Stop ExecutorAllocationManager when context stops.

Repository: spark
Updated Branches:
  refs/heads/master 052dee070 -> 45134ec92


[SPARK-6650] [core] Stop ExecutorAllocationManager when context stops.

This fixes the thread leak. I also changed the unit test to keep track
of allocated contexts and make sure they're closed after tests are
run; this is needed since some tests use this pattern:

    val sc = createContext()
    doSomethingThatMayThrow()
    sc.stop()

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #5311 from vanzin/SPARK-6650 and squashes the following commits:

652c73b [Marcelo Vanzin] Nits.
5711512 [Marcelo Vanzin] More exception safety.
cc5a744 [Marcelo Vanzin] Stop alloc manager before scheduler.
9886f69 [Marcelo Vanzin] [SPARK-6650] [core] Stop ExecutorAllocationManager when context stops.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/45134ec9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/45134ec9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/45134ec9

Branch: refs/heads/master
Commit: 45134ec920c3766c22aefd4366b4b60ec99bd810
Parents: 052dee0
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Thu Apr 2 19:48:55 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Thu Apr 2 19:48:55 2015 -0700

----------------------------------------------------------------------
 .../spark/ExecutorAllocationManager.scala       | 38 ++++++++---------
 .../scala/org/apache/spark/SparkContext.scala   |  3 +-
 .../spark/ExecutorAllocationManagerSuite.scala  | 44 +++++++++++++-------
 3 files changed, 49 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/45134ec9/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 21c6e6f..9385f55 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -17,10 +17,12 @@
 
 package org.apache.spark
 
+import java.util.concurrent.{Executors, TimeUnit}
+
 import scala.collection.mutable
 
 import org.apache.spark.scheduler._
-import org.apache.spark.util.{SystemClock, Clock}
+import org.apache.spark.util.{Clock, SystemClock, Utils}
 
 /**
  * An agent that dynamically allocates and removes executors based on the workload.
@@ -129,6 +131,10 @@ private[spark] class ExecutorAllocationManager(
   // Listener for Spark events that impact the allocation policy
   private val listener = new ExecutorAllocationListener
 
+  // Executor that handles the scheduling task.
+  private val executor = Executors.newSingleThreadScheduledExecutor(
+    Utils.namedThreadFactory("spark-dynamic-executor-allocation"))
+
   /**
    * Verify that the settings specified through the config are valid.
    * If not, throw an appropriate exception.
@@ -173,32 +179,24 @@ private[spark] class ExecutorAllocationManager(
   }
 
   /**
-   * Register for scheduler callbacks to decide when to add and remove executors.
+   * Register for scheduler callbacks to decide when to add and remove executors, and start
+   * the scheduling task.
    */
   def start(): Unit = {
     listenerBus.addListener(listener)
-    startPolling()
+
+    val scheduleTask = new Runnable() {
+      override def run(): Unit = Utils.logUncaughtExceptions(schedule())
+    }
+    executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
   }
 
   /**
-   * Start the main polling thread that keeps track of when to add and remove executors.
+   * Stop the allocation manager.
    */
-  private def startPolling(): Unit = {
-    val t = new Thread {
-      override def run(): Unit = {
-        while (true) {
-          try {
-            schedule()
-          } catch {
-            case e: Exception => logError("Exception in dynamic executor allocation thread!", e)
-          }
-          Thread.sleep(intervalMillis)
-        }
-      }
-    }
-    t.setName("spark-dynamic-executor-allocation")
-    t.setDaemon(true)
-    t.start()
+  def stop(): Unit = {
+    executor.shutdown()
+    executor.awaitTermination(10, TimeUnit.SECONDS)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/45134ec9/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3904f7d..5b3778e 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1136,7 +1136,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
    * Return whether dynamically adjusting the amount of resources allocated to
    * this application is supported. This is currently only available for YARN.
    */
-  private[spark] def supportDynamicAllocation = 
+  private[spark] def supportDynamicAllocation =
     master.contains("yarn") || dynamicAllocationTesting
 
   /**
@@ -1400,6 +1400,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
         env.metricsSystem.report()
         metadataCleaner.cancel()
         cleaner.foreach(_.stop())
+        executorAllocationManager.foreach(_.stop())
         dagScheduler.stop()
         dagScheduler = null
         listenerBus.stop()

http://git-wip-us.apache.org/repos/asf/spark/blob/45134ec9/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index abfcee7..3ded1e4 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark
 
 import scala.collection.mutable
 
-import org.scalatest.{FunSuite, PrivateMethodTester}
+import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester}
 import org.apache.spark.executor.TaskMetrics
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.ExecutorInfo
@@ -28,10 +28,20 @@ import org.apache.spark.util.ManualClock
 /**
  * Test add and remove behavior of ExecutorAllocationManager.
  */
-class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
+class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAfter {
   import ExecutorAllocationManager._
   import ExecutorAllocationManagerSuite._
 
+  private val contexts = new mutable.ListBuffer[SparkContext]()
+
+  before {
+    contexts.clear()
+  }
+
+  after {
+    contexts.foreach(_.stop())
+  }
+
   test("verify min/max executors") {
     val conf = new SparkConf()
       .setMaster("local")
@@ -39,18 +49,19 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
       .set("spark.dynamicAllocation.enabled", "true")
       .set("spark.dynamicAllocation.testing", "true")
     val sc0 = new SparkContext(conf)
+    contexts += sc0
     assert(sc0.executorAllocationManager.isDefined)
     sc0.stop()
 
     // Min < 0
     val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "-1")
-    intercept[SparkException] { new SparkContext(conf1) }
+    intercept[SparkException] { contexts += new SparkContext(conf1) }
     SparkEnv.get.stop()
     SparkContext.clearActiveContext()
 
     // Max < 0
     val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "-1")
-    intercept[SparkException] { new SparkContext(conf2) }
+    intercept[SparkException] { contexts += new SparkContext(conf2) }
     SparkEnv.get.stop()
     SparkContext.clearActiveContext()
 
@@ -665,16 +676,6 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
     assert(removeTimes(manager).contains("executor-2"))
     assert(!removeTimes(manager).contains("executor-1"))
   }
-}
-
-/**
- * Helper methods for testing ExecutorAllocationManager.
- * This includes methods to access private methods and fields in ExecutorAllocationManager.
- */
-private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
-  private val schedulerBacklogTimeout = 1L
-  private val sustainedSchedulerBacklogTimeout = 2L
-  private val executorIdleTimeout = 3L
 
   private def createSparkContext(minExecutors: Int = 1, maxExecutors: Int = 5): SparkContext = {
     val conf = new SparkConf()
@@ -688,9 +689,22 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
         sustainedSchedulerBacklogTimeout.toString)
       .set("spark.dynamicAllocation.executorIdleTimeout", executorIdleTimeout.toString)
       .set("spark.dynamicAllocation.testing", "true")
-    new SparkContext(conf)
+    val sc = new SparkContext(conf)
+    contexts += sc
+    sc
   }
 
+}
+
+/**
+ * Helper methods for testing ExecutorAllocationManager.
+ * This includes methods to access private methods and fields in ExecutorAllocationManager.
+ */
+private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
+  private val schedulerBacklogTimeout = 1L
+  private val sustainedSchedulerBacklogTimeout = 2L
+  private val executorIdleTimeout = 3L
+
   private def createStageInfo(stageId: Int, numTasks: Int): StageInfo = {
     new StageInfo(stageId, 0, "name", numTasks, Seq.empty, "no details")
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org