You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/10 03:38:31 UTC

[26/37] git commit: Fixes after merge

Fixes after merge


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/6a3daead
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/6a3daead
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/6a3daead

Branch: refs/heads/master
Commit: 6a3daead2d5c82136fefa2de9bced036d1ccb759
Parents: c0498f9
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Jan 6 20:12:45 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Jan 6 20:12:45 2014 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/deploy/client/DriverClient.scala   | 7 ++++---
 .../scala/org/apache/spark/deploy/worker/DriverWrapper.scala  | 5 +++--
 .../main/scala/org/apache/spark/deploy/worker/Worker.scala    | 2 +-
 3 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6a3daead/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
index e319e75..1cd5d99 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
@@ -23,7 +23,7 @@ import scala.concurrent._
 
 import akka.actor._
 
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
 import org.apache.spark.deploy.{Command, DriverDescription}
 import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.deploy.master.Master
@@ -59,11 +59,12 @@ object DriverClient extends Logging {
 
   def main(args: Array[String]) {
     val driverArgs = new DriverClientArguments(args)
+    val conf = new SparkConf()
 
     // TODO: See if we can initialize akka so return messages are sent back using the same TCP
     //       flow. Else, this (sadly) requires the DriverClient be routable from the Master.
     val (actorSystem, _) = AkkaUtils.createActorSystem(
-      "driverClient", Utils.localHostName(), 0)
+      "driverClient", Utils.localHostName(), 0, false, conf)
     val master = driverArgs.master
     val response = promise[(Boolean, String)]
     val driver: ActorRef = actorSystem.actorOf(Props(new DriverActor(driverArgs.master, response)))
@@ -95,7 +96,7 @@ object DriverClient extends Logging {
 
     val (success, message) =
       try {
-        Await.result(response.future, AkkaUtils.askTimeout)
+        Await.result(response.future, AkkaUtils.askTimeout(conf))
       } catch {
         case e: TimeoutException => (false, s"Master $master failed to respond in time")
       }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6a3daead/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
index 2deb21a..1640d5f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
@@ -2,6 +2,7 @@ package org.apache.spark.deploy.worker
 
 import akka.actor._
 
+import org.apache.spark.SparkConf
 import org.apache.spark.util.{AkkaUtils, Utils}
 
 /**
@@ -12,7 +13,7 @@ object DriverWrapper {
     args.toList match {
       case workerUrl :: mainClass :: extraArgs =>
         val (actorSystem, _) = AkkaUtils.createActorSystem("Driver",
-          Utils.localHostName(), 0)
+          Utils.localHostName(), 0, false, new SparkConf())
         actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher")
 
         // Delegate to supplied main class
@@ -20,7 +21,7 @@ object DriverWrapper {
         val mainMethod = clazz.getMethod("main", classOf[Array[String]])
         mainMethod.invoke(null, extraArgs.toArray[String])
 
-        actorSystem.awaitTermination()
+        actorSystem.shutdown()
 
       case _ =>
         System.err.println("Usage: DriverWrapper <workerUrl> <driverMainClass> [options]")

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6a3daead/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 2072f00..4546e38 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -335,7 +335,7 @@ private[spark] object Worker {
     val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
       conf = conf)
     actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
-      masterUrls, workDir, conf), name = actorName)
+      masterUrls, systemName, actorName,  workDir, conf), name = actorName)
     (actorSystem, boundPort)
   }