You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/23 03:58:15 UTC

[3/4] git commit: refactor sparkHome to val

refactor sparkHome to val

clean code


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/2b3c4614
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/2b3c4614
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/2b3c4614

Branch: refs/heads/master
Commit: 2b3c461451ac2c163956af258dfbf3f208596dbf
Parents: 29f4b6a
Author: CodingCat <zh...@gmail.com>
Authored: Wed Jan 22 19:32:51 2014 -0500
Committer: CodingCat <zh...@gmail.com>
Committed: Wed Jan 22 20:20:46 2014 -0500

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/deploy/worker/Worker.scala    | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2b3c4614/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index de45da2..fbf2e0f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -76,7 +76,7 @@ private[spark] class Worker(
   @volatile var registered = false
   @volatile var connected = false
   val workerId = generateWorkerId()
-  var sparkHome: File = null
+  val sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
   var workDir: File = null
   val executors = new HashMap[String, ExecutorRunner]
   val finishedExecutors = new HashMap[String, ExecutorRunner]
@@ -120,7 +120,6 @@ private[spark] class Worker(
     assert(!registered)
     logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
       host, port, cores, Utils.megabytesToString(memory)))
-    sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
     logInfo("Spark home: " + sparkHome)
     createWorkDir()
     webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
@@ -211,7 +210,8 @@ private[spark] class Worker(
         try {
           logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
           val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
-            self, workerId, host, new File(appDesc.sparkHome.getOrElse(sparkHome.getAbsolutePath)),
+            self, workerId, host,
+            appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
             workDir, akkaUrl, ExecutorState.RUNNING)
           executors(appId + "/" + execId) = manager
           manager.start()
@@ -225,6 +225,7 @@ private[spark] class Worker(
             logError("Failed to launch exector %s/%d for %s".format(appId, execId, appDesc.name))
             if (executors.contains(appId + "/" + execId)) {
               executors(appId + "/" + execId).kill()
+              executors -= appId + "/" + execId
             }
             masterLock.synchronized {
               master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)