You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/04/03 04:48:59 UTC
spark git commit: [SPARK-6650] [core] Stop ExecutorAllocationManager
when context stops.
Repository: spark
Updated Branches:
refs/heads/master 052dee070 -> 45134ec92
[SPARK-6650] [core] Stop ExecutorAllocationManager when context stops.
This fixes the thread leak. I also changed the unit test to keep track
of allocated contexts and make sure they're closed after tests are
run; this is needed since some tests use this pattern:
val sc = createContext()
doSomethingThatMayThrow()
sc.stop()
Author: Marcelo Vanzin <va...@cloudera.com>
Closes #5311 from vanzin/SPARK-6650 and squashes the following commits:
652c73b [Marcelo Vanzin] Nits.
5711512 [Marcelo Vanzin] More exception safety.
cc5a744 [Marcelo Vanzin] Stop alloc manager before scheduler.
9886f69 [Marcelo Vanzin] [SPARK-6650] [core] Stop ExecutorAllocationManager when context stops.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/45134ec9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/45134ec9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/45134ec9
Branch: refs/heads/master
Commit: 45134ec920c3766c22aefd4366b4b60ec99bd810
Parents: 052dee0
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Thu Apr 2 19:48:55 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Thu Apr 2 19:48:55 2015 -0700
----------------------------------------------------------------------
.../spark/ExecutorAllocationManager.scala | 38 ++++++++---------
.../scala/org/apache/spark/SparkContext.scala | 3 +-
.../spark/ExecutorAllocationManagerSuite.scala | 44 +++++++++++++-------
3 files changed, 49 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/45134ec9/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
index 21c6e6f..9385f55 100644
--- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
+++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala
@@ -17,10 +17,12 @@
package org.apache.spark
+import java.util.concurrent.{Executors, TimeUnit}
+
import scala.collection.mutable
import org.apache.spark.scheduler._
-import org.apache.spark.util.{SystemClock, Clock}
+import org.apache.spark.util.{Clock, SystemClock, Utils}
/**
* An agent that dynamically allocates and removes executors based on the workload.
@@ -129,6 +131,10 @@ private[spark] class ExecutorAllocationManager(
// Listener for Spark events that impact the allocation policy
private val listener = new ExecutorAllocationListener
+ // Executor that handles the scheduling task.
+ private val executor = Executors.newSingleThreadScheduledExecutor(
+ Utils.namedThreadFactory("spark-dynamic-executor-allocation"))
+
/**
* Verify that the settings specified through the config are valid.
* If not, throw an appropriate exception.
@@ -173,32 +179,24 @@ private[spark] class ExecutorAllocationManager(
}
/**
- * Register for scheduler callbacks to decide when to add and remove executors.
+ * Register for scheduler callbacks to decide when to add and remove executors, and start
+ * the scheduling task.
*/
def start(): Unit = {
listenerBus.addListener(listener)
- startPolling()
+
+ val scheduleTask = new Runnable() {
+ override def run(): Unit = Utils.logUncaughtExceptions(schedule())
+ }
+ executor.scheduleAtFixedRate(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
}
/**
- * Start the main polling thread that keeps track of when to add and remove executors.
+ * Stop the allocation manager.
*/
- private def startPolling(): Unit = {
- val t = new Thread {
- override def run(): Unit = {
- while (true) {
- try {
- schedule()
- } catch {
- case e: Exception => logError("Exception in dynamic executor allocation thread!", e)
- }
- Thread.sleep(intervalMillis)
- }
- }
- }
- t.setName("spark-dynamic-executor-allocation")
- t.setDaemon(true)
- t.start()
+ def stop(): Unit = {
+ executor.shutdown()
+ executor.awaitTermination(10, TimeUnit.SECONDS)
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/45134ec9/core/src/main/scala/org/apache/spark/SparkContext.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 3904f7d..5b3778e 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -1136,7 +1136,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Return whether dynamically adjusting the amount of resources allocated to
* this application is supported. This is currently only available for YARN.
*/
- private[spark] def supportDynamicAllocation =
+ private[spark] def supportDynamicAllocation =
master.contains("yarn") || dynamicAllocationTesting
/**
@@ -1400,6 +1400,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
env.metricsSystem.report()
metadataCleaner.cancel()
cleaner.foreach(_.stop())
+ executorAllocationManager.foreach(_.stop())
dagScheduler.stop()
dagScheduler = null
listenerBus.stop()
http://git-wip-us.apache.org/repos/asf/spark/blob/45134ec9/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
index abfcee7..3ded1e4 100644
--- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala
@@ -19,7 +19,7 @@ package org.apache.spark
import scala.collection.mutable
-import org.scalatest.{FunSuite, PrivateMethodTester}
+import org.scalatest.{BeforeAndAfter, FunSuite, PrivateMethodTester}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
@@ -28,10 +28,20 @@ import org.apache.spark.util.ManualClock
/**
* Test add and remove behavior of ExecutorAllocationManager.
*/
-class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
+class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext with BeforeAndAfter {
import ExecutorAllocationManager._
import ExecutorAllocationManagerSuite._
+ private val contexts = new mutable.ListBuffer[SparkContext]()
+
+ before {
+ contexts.clear()
+ }
+
+ after {
+ contexts.foreach(_.stop())
+ }
+
test("verify min/max executors") {
val conf = new SparkConf()
.setMaster("local")
@@ -39,18 +49,19 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
.set("spark.dynamicAllocation.enabled", "true")
.set("spark.dynamicAllocation.testing", "true")
val sc0 = new SparkContext(conf)
+ contexts += sc0
assert(sc0.executorAllocationManager.isDefined)
sc0.stop()
// Min < 0
val conf1 = conf.clone().set("spark.dynamicAllocation.minExecutors", "-1")
- intercept[SparkException] { new SparkContext(conf1) }
+ intercept[SparkException] { contexts += new SparkContext(conf1) }
SparkEnv.get.stop()
SparkContext.clearActiveContext()
// Max < 0
val conf2 = conf.clone().set("spark.dynamicAllocation.maxExecutors", "-1")
- intercept[SparkException] { new SparkContext(conf2) }
+ intercept[SparkException] { contexts += new SparkContext(conf2) }
SparkEnv.get.stop()
SparkContext.clearActiveContext()
@@ -665,16 +676,6 @@ class ExecutorAllocationManagerSuite extends FunSuite with LocalSparkContext {
assert(removeTimes(manager).contains("executor-2"))
assert(!removeTimes(manager).contains("executor-1"))
}
-}
-
-/**
- * Helper methods for testing ExecutorAllocationManager.
- * This includes methods to access private methods and fields in ExecutorAllocationManager.
- */
-private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
- private val schedulerBacklogTimeout = 1L
- private val sustainedSchedulerBacklogTimeout = 2L
- private val executorIdleTimeout = 3L
private def createSparkContext(minExecutors: Int = 1, maxExecutors: Int = 5): SparkContext = {
val conf = new SparkConf()
@@ -688,9 +689,22 @@ private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
sustainedSchedulerBacklogTimeout.toString)
.set("spark.dynamicAllocation.executorIdleTimeout", executorIdleTimeout.toString)
.set("spark.dynamicAllocation.testing", "true")
- new SparkContext(conf)
+ val sc = new SparkContext(conf)
+ contexts += sc
+ sc
}
+}
+
+/**
+ * Helper methods for testing ExecutorAllocationManager.
+ * This includes methods to access private methods and fields in ExecutorAllocationManager.
+ */
+private object ExecutorAllocationManagerSuite extends PrivateMethodTester {
+ private val schedulerBacklogTimeout = 1L
+ private val sustainedSchedulerBacklogTimeout = 2L
+ private val executorIdleTimeout = 3L
+
private def createStageInfo(stageId: Int, numTasks: Int): StageInfo = {
new StageInfo(stageId, 0, "name", numTasks, Seq.empty, "no details")
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org