You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/23 03:58:13 UTC

[1/4] git commit: fix for SPARK-1027

Updated Branches:
  refs/heads/master 3184facdc -> 034dce2a7


fix for SPARK-1027

change TestClient & Worker to Some("xxx")

kill manager if it is started

remove unnecessary .get when fetch "SPARK_HOME" values


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/29f4b6a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/29f4b6a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/29f4b6a2

Branch: refs/heads/master
Commit: 29f4b6a2d9f42a727691444312964e59ef9b95ee
Parents: f9a95d6
Author: CodingCat <zh...@gmail.com>
Authored: Wed Jan 15 20:46:14 2014 -0500
Committer: CodingCat <zh...@gmail.com>
Committed: Mon Jan 20 02:50:30 2014 -0500

----------------------------------------------------------------------
 .../apache/spark/deploy/ApplicationDescription.scala    |  2 +-
 .../scala/org/apache/spark/deploy/DeployMessage.scala   |  3 +--
 .../org/apache/spark/deploy/client/TestClient.scala     |  2 +-
 .../scala/org/apache/spark/deploy/master/Master.scala   |  8 ++++----
 .../scala/org/apache/spark/deploy/worker/Worker.scala   | 12 +++++++-----
 .../scheduler/cluster/SparkDeploySchedulerBackend.scala |  2 +-
 .../org/apache/spark/deploy/JsonProtocolSuite.scala     |  2 +-
 .../apache/spark/deploy/worker/ExecutorRunnerTest.scala |  4 ++--
 8 files changed, 18 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29f4b6a2/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
index e38459b..449b953 100644
--- a/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/ApplicationDescription.scala
@@ -22,7 +22,7 @@ private[spark] class ApplicationDescription(
     val maxCores: Option[Int],
     val memoryPerSlave: Int,
     val command: Command,
-    val sparkHome: String,
+    val sparkHome: Option[String],
     val appUiUrl: String)
   extends Serializable {
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29f4b6a2/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
index 5e824e1..83ce14a 100644
--- a/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala
@@ -79,8 +79,7 @@ private[deploy] object DeployMessages {
       execId: Int,
       appDesc: ApplicationDescription,
       cores: Int,
-      memory: Int,
-      sparkHome: String)
+      memory: Int)
     extends DeployMessage
 
   case class LaunchDriver(driverId: String, driverDesc: DriverDescription) extends DeployMessage

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29f4b6a2/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
index ffa909c..8017932 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/TestClient.scala
@@ -49,7 +49,7 @@ private[spark] object TestClient {
       conf = new SparkConf)
     val desc = new ApplicationDescription(
       "TestClient", Some(1), 512, Command("spark.deploy.client.TestExecutor", Seq(), Map()),
-      "dummy-spark-home", "ignored")
+      Some("dummy-spark-home"), "ignored")
     val listener = new TestListener
     val client = new AppClient(actorSystem, Array(url), desc, listener, new SparkConf)
     client.start()

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29f4b6a2/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index d9ea96a..fe9770c 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -480,7 +480,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
         for (pos <- 0 until numUsable) {
           if (assigned(pos) > 0) {
             val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
-            launchExecutor(usableWorkers(pos), exec, app.desc.sparkHome)
+            launchExecutor(usableWorkers(pos), exec)
             app.state = ApplicationState.RUNNING
           }
         }
@@ -493,7 +493,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
             val coresToUse = math.min(worker.coresFree, app.coresLeft)
             if (coresToUse > 0) {
               val exec = app.addExecutor(worker, coresToUse)
-              launchExecutor(worker, exec, app.desc.sparkHome)
+              launchExecutor(worker, exec)
               app.state = ApplicationState.RUNNING
             }
           }
@@ -502,11 +502,11 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
     }
   }
 
-  def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) {
+  def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) {
     logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
     worker.addExecutor(exec)
     worker.actor ! LaunchExecutor(masterUrl,
-      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome)
+      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
     exec.application.driver ! ExecutorAdded(
       exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29f4b6a2/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index c9e4fc2..de45da2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -204,17 +204,15 @@ private[spark] class Worker(
         System.exit(1)
       }
 
-    case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_, execSparkHome_) =>
+    case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
       if (masterUrl != activeMasterUrl) {
         logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
       } else {
         try {
           logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
-          // TODO (pwendell): We shuld make sparkHome an Option[String] in
-          // ApplicationDescription to be more explicit about this.
-          val effectiveSparkHome = Option(execSparkHome_).getOrElse(sparkHome.getAbsolutePath)
           val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
-            self, workerId, host, new File(effectiveSparkHome), workDir, akkaUrl, ExecutorState.RUNNING)
+            self, workerId, host, new File(appDesc.sparkHome.getOrElse(sparkHome.getAbsolutePath)),
+            workDir, akkaUrl, ExecutorState.RUNNING)
           executors(appId + "/" + execId) = manager
           manager.start()
           coresUsed += cores_
@@ -224,6 +222,10 @@ private[spark] class Worker(
           }
         } catch {
           case e: Exception => {
+            logError("Failed to launch exector %s/%d for %s".format(appId, execId, appDesc.name))
+            if (executors.contains(appId + "/" + execId)) {
+              executors(appId + "/" + execId).kill()
+            }
             masterLock.synchronized {
               master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
             }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29f4b6a2/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
index faa6e1e..33aac52 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/SparkDeploySchedulerBackend.scala
@@ -50,7 +50,7 @@ private[spark] class SparkDeploySchedulerBackend(
     val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
     val command = Command(
       "org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs)
-    val sparkHome = sc.getSparkHome().getOrElse(null)
+    val sparkHome = sc.getSparkHome()
     val appDesc = new ApplicationDescription(appName, maxCores, sc.executorMemory, command, sparkHome,
         "http://" + sc.ui.appUIAddress)
 

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29f4b6a2/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
index d05bbd6..693b1ab 100644
--- a/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/JsonProtocolSuite.scala
@@ -74,7 +74,7 @@ class JsonProtocolSuite extends FunSuite {
 
   def createAppDesc(): ApplicationDescription = {
     val cmd = new Command("mainClass", List("arg1", "arg2"), Map())
-    new ApplicationDescription("name", Some(4), 1234, cmd, "sparkHome", "appUiUrl")
+    new ApplicationDescription("name", Some(4), 1234, cmd, Some("sparkHome"), "appUiUrl")
   }
 
   def createAppInfo() : ApplicationInfo = {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/29f4b6a2/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
index a79ee69..4baa656 100644
--- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala
@@ -26,11 +26,11 @@ import org.apache.spark.deploy.{ExecutorState, Command, ApplicationDescription}
 class ExecutorRunnerTest extends FunSuite {
   test("command includes appId") {
     def f(s:String) = new File(s)
-    val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home")).get
+    val sparkHome = sys.env.get("SPARK_HOME").orElse(sys.props.get("spark.home"))
     val appDesc = new ApplicationDescription("app name", Some(8), 500, Command("foo", Seq(),Map()),
       sparkHome, "appUiUrl")
     val appId = "12345-worker321-9876"
-    val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome),
+    val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome.getOrElse(".")),
       f("ooga"), "blah", ExecutorState.RUNNING)
 
     assert(er.getCommandSeq.last === appId)


[4/4] git commit: Merge pull request #447 from CodingCat/SPARK-1027

Posted by pw...@apache.org.
Merge pull request #447 from CodingCat/SPARK-1027

fix for SPARK-1027

fix for SPARK-1027  (https://spark-project.atlassian.net/browse/SPARK-1027)

FIXES

1. change sparkhome from String to Option(String) in ApplicationDesc

2. remove sparkhome parameter in LaunchExecutor message

3. adjust involved files


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/034dce2a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/034dce2a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/034dce2a

Branch: refs/heads/master
Commit: 034dce2a7e0537836e15b9bfecc333b28fc76a6a
Parents: 3184fac 2b3c461
Author: Patrick Wendell <pw...@gmail.com>
Authored: Wed Jan 22 18:58:02 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Wed Jan 22 18:58:02 2014 -0800

----------------------------------------------------------------------
 .../spark/deploy/ApplicationDescription.scala   |  2 +-
 .../org/apache/spark/deploy/DeployMessage.scala |  3 +-
 .../apache/spark/deploy/client/TestClient.scala |  2 +-
 .../org/apache/spark/deploy/master/Master.scala |  8 ++--
 .../org/apache/spark/deploy/worker/Worker.scala | 41 +++++++++++++-------
 .../cluster/SparkDeploySchedulerBackend.scala   |  2 +-
 .../apache/spark/deploy/JsonProtocolSuite.scala |  2 +-
 .../deploy/worker/ExecutorRunnerTest.scala      |  4 +-
 8 files changed, 37 insertions(+), 27 deletions(-)
----------------------------------------------------------------------



[2/4] git commit: executor creation failed should not make the worker restart

Posted by pw...@apache.org.
executor creation failed should not make the worker restart


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/f9a95d67
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/f9a95d67
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/f9a95d67

Branch: refs/heads/master
Commit: f9a95d67365509cdd260858e858e7a9b120c1d1b
Parents: 792d908
Author: CodingCat <zh...@gmail.com>
Authored: Wed Jan 15 19:32:50 2014 -0500
Committer: CodingCat <zh...@gmail.com>
Committed: Mon Jan 20 02:50:30 2014 -0500

----------------------------------------------------------------------
 .../org/apache/spark/deploy/worker/Worker.scala | 32 ++++++++++++--------
 1 file changed, 20 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/f9a95d67/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 312560d..c9e4fc2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -208,18 +208,26 @@ private[spark] class Worker(
       if (masterUrl != activeMasterUrl) {
         logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
       } else {
-        logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
-        // TODO (pwendell): We shuld make sparkHome an Option[String] in
-        // ApplicationDescription to be more explicit about this.
-        val effectiveSparkHome = Option(execSparkHome_).getOrElse(sparkHome.getAbsolutePath)
-        val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
-          self, workerId, host, new File(effectiveSparkHome), workDir, akkaUrl, ExecutorState.RUNNING)
-        executors(appId + "/" + execId) = manager
-        manager.start()
-        coresUsed += cores_
-        memoryUsed += memory_
-        masterLock.synchronized {
-          master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
+        try {
+          logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
+          // TODO (pwendell): We shuld make sparkHome an Option[String] in
+          // ApplicationDescription to be more explicit about this.
+          val effectiveSparkHome = Option(execSparkHome_).getOrElse(sparkHome.getAbsolutePath)
+          val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
+            self, workerId, host, new File(effectiveSparkHome), workDir, akkaUrl, ExecutorState.RUNNING)
+          executors(appId + "/" + execId) = manager
+          manager.start()
+          coresUsed += cores_
+          memoryUsed += memory_
+          masterLock.synchronized {
+            master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
+          }
+        } catch {
+          case e: Exception => {
+            masterLock.synchronized {
+              master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
+            }
+          }
         }
       }
 


[3/4] git commit: refactor sparkHome to val

Posted by pw...@apache.org.
refactor sparkHome to val

clean code


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/2b3c4614
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/2b3c4614
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/2b3c4614

Branch: refs/heads/master
Commit: 2b3c461451ac2c163956af258dfbf3f208596dbf
Parents: 29f4b6a
Author: CodingCat <zh...@gmail.com>
Authored: Wed Jan 22 19:32:51 2014 -0500
Committer: CodingCat <zh...@gmail.com>
Committed: Wed Jan 22 20:20:46 2014 -0500

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/deploy/worker/Worker.scala    | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/2b3c4614/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index de45da2..fbf2e0f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -76,7 +76,7 @@ private[spark] class Worker(
   @volatile var registered = false
   @volatile var connected = false
   val workerId = generateWorkerId()
-  var sparkHome: File = null
+  val sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
   var workDir: File = null
   val executors = new HashMap[String, ExecutorRunner]
   val finishedExecutors = new HashMap[String, ExecutorRunner]
@@ -120,7 +120,6 @@ private[spark] class Worker(
     assert(!registered)
     logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
       host, port, cores, Utils.megabytesToString(memory)))
-    sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
     logInfo("Spark home: " + sparkHome)
     createWorkDir()
     webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
@@ -211,7 +210,8 @@ private[spark] class Worker(
         try {
           logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
           val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
-            self, workerId, host, new File(appDesc.sparkHome.getOrElse(sparkHome.getAbsolutePath)),
+            self, workerId, host,
+            appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
             workDir, akkaUrl, ExecutorState.RUNNING)
           executors(appId + "/" + execId) = manager
           manager.start()
@@ -225,6 +225,7 @@ private[spark] class Worker(
             logError("Failed to launch exector %s/%d for %s".format(appId, execId, appDesc.name))
             if (executors.contains(appId + "/" + execId)) {
               executors(appId + "/" + execId).kill()
+              executors -= appId + "/" + execId
             }
             masterLock.synchronized {
               master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)