You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/09/04 03:42:07 UTC

git commit: [SPARK-3233] Executor never stop its SparnEnv, BlockManager, ConnectionManager etc.

Repository: spark
Updated Branches:
  refs/heads/master e08ea7393 -> 4bba10c41


[SPARK-3233] Executor never stop its SparnEnv, BlockManager, ConnectionManager etc.

Author: Kousuke Saruta <sa...@oss.nttdata.co.jp>

Closes #2138 from sarutak/SPARK-3233 and squashes the following commits:

c0205b7 [Kousuke Saruta] Merge branch 'SPARK-3233' of github.com:sarutak/spark into SPARK-3233
064679d [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3233
d3005fd [Kousuke Saruta] Modified Class definition format of BlockManagerMaster
039b747 [Kousuke Saruta] Modified style
889e2d1 [Kousuke Saruta] Modified BlockManagerMaster to be able to be past isDriver flag
4da8535 [Kousuke Saruta] Modified BlockManagerMaster#stop to send StopBlockManagerMaster message when sender is Driver
6518c3a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3233
d5ab19a [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3233
6bce25c [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-3233
6058a58 [Kousuke Saruta] Modified Executor not to invoke SparkEnv#stop in local mode
e5ad9d3 [Kousuke Saruta] Modified Executor to stop SparnEnv at the end of itself


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4bba10c4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4bba10c4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4bba10c4

Branch: refs/heads/master
Commit: 4bba10c41acaf84a1c4a8e2db467c22f5ab7cbb9
Parents: e08ea73
Author: Kousuke Saruta <sa...@oss.nttdata.co.jp>
Authored: Wed Sep 3 18:42:01 2014 -0700
Committer: Andrew Or <an...@gmail.com>
Committed: Wed Sep 3 18:42:01 2014 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/SparkEnv.scala          | 2 +-
 core/src/main/scala/org/apache/spark/executor/Executor.scala | 3 +++
 .../scala/org/apache/spark/storage/BlockManagerMaster.scala  | 8 ++++++--
 .../main/scala/org/apache/spark/storage/ThreadingTest.scala  | 2 +-
 .../scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala | 2 +-
 .../scala/org/apache/spark/storage/BlockManagerSuite.scala   | 2 +-
 6 files changed, 13 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4bba10c4/core/src/main/scala/org/apache/spark/SparkEnv.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala
index 7271656..2973d00 100644
--- a/core/src/main/scala/org/apache/spark/SparkEnv.scala
+++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala
@@ -225,7 +225,7 @@ object SparkEnv extends Logging {
 
     val blockManagerMaster = new BlockManagerMaster(registerOrLookup(
       "BlockManagerMaster",
-      new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf)
+      new BlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver)
 
     val blockManager = new BlockManager(executorId, actorSystem, blockManagerMaster,
       serializer, conf, securityManager, mapOutputTracker, shuffleManager)

http://git-wip-us.apache.org/repos/asf/spark/blob/4bba10c4/core/src/main/scala/org/apache/spark/executor/Executor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index d7d19f6..dd903dc 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -123,6 +123,9 @@ private[spark] class Executor(
     env.metricsSystem.report()
     isStopped = true
     threadPool.shutdown()
+    if (!isLocal) {
+      env.stop()
+    }
   }
 
   class TaskRunner(

http://git-wip-us.apache.org/repos/asf/spark/blob/4bba10c4/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
index e67b3dc..2e26259 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala
@@ -27,7 +27,11 @@ import org.apache.spark.storage.BlockManagerMessages._
 import org.apache.spark.util.AkkaUtils
 
 private[spark]
-class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Logging {
+class BlockManagerMaster(
+    var driverActor: ActorRef,
+    conf: SparkConf,
+    isDriver: Boolean)
+  extends Logging {
   private val AKKA_RETRY_ATTEMPTS: Int = AkkaUtils.numRetries(conf)
   private val AKKA_RETRY_INTERVAL_MS: Int = AkkaUtils.retryWaitMs(conf)
 
@@ -196,7 +200,7 @@ class BlockManagerMaster(var driverActor: ActorRef, conf: SparkConf) extends Log
 
   /** Stop the driver actor, called only on the Spark driver node */
   def stop() {
-    if (driverActor != null) {
+    if (driverActor != null && isDriver) {
       tell(StopBlockManagerMaster)
       driverActor = null
       logInfo("BlockManagerMaster stopped")

http://git-wip-us.apache.org/repos/asf/spark/blob/4bba10c4/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
index aa83ea9..7540f0d 100644
--- a/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
+++ b/core/src/main/scala/org/apache/spark/storage/ThreadingTest.scala
@@ -99,7 +99,7 @@ private[spark] object ThreadingTest {
     val serializer = new KryoSerializer(conf)
     val blockManagerMaster = new BlockManagerMaster(
       actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
-      conf)
+      conf, true)
     val blockManager = new BlockManager(
       "<driver>", actorSystem, blockManagerMaster, serializer, 1024 * 1024, conf,
       new SecurityManager(conf), new MapOutputTrackerMaster(conf), new HashShuffleManager(conf))

http://git-wip-us.apache.org/repos/asf/spark/blob/4bba10c4/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
index 1a42fc1..0bb91fe 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala
@@ -120,7 +120,7 @@ class DAGSchedulerSuite extends TestKit(ActorSystem("DAGSchedulerSuite")) with F
    */
   val cacheLocations = new HashMap[(Int, Int), Seq[BlockManagerId]]
   // stub out BlockManagerMaster.getLocations to use our cacheLocations
-  val blockManagerMaster = new BlockManagerMaster(null, conf) {
+  val blockManagerMaster = new BlockManagerMaster(null, conf, true) {
       override def getLocations(blockIds: Array[BlockId]): Seq[Seq[BlockManagerId]] = {
         blockIds.map {
           _.asRDDId.map(id => (id.rddId -> id.splitIndex)).flatMap(key => cacheLocations.get(key)).

http://git-wip-us.apache.org/repos/asf/spark/blob/4bba10c4/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
index 14ffada..c200654 100644
--- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala
@@ -93,7 +93,7 @@ class BlockManagerSuite extends FunSuite with Matchers with BeforeAndAfter
 
     master = new BlockManagerMaster(
       actorSystem.actorOf(Props(new BlockManagerMasterActor(true, conf, new LiveListenerBus))),
-      conf)
+      conf, true)
 
     val initialize = PrivateMethod[Unit]('initialize)
     SizeEstimator invokePrivate initialize()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org