You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/10 03:38:31 UTC
[26/37] git commit: Fixes after merge
Fixes after merge
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/6a3daead
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/6a3daead
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/6a3daead
Branch: refs/heads/master
Commit: 6a3daead2d5c82136fefa2de9bced036d1ccb759
Parents: c0498f9
Author: Patrick Wendell <pw...@gmail.com>
Authored: Mon Jan 6 20:12:45 2014 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Mon Jan 6 20:12:45 2014 -0800
----------------------------------------------------------------------
.../scala/org/apache/spark/deploy/client/DriverClient.scala | 7 ++++---
.../scala/org/apache/spark/deploy/worker/DriverWrapper.scala | 5 +++--
.../main/scala/org/apache/spark/deploy/worker/Worker.scala | 2 +-
3 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6a3daead/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
index e319e75..1cd5d99 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
@@ -23,7 +23,7 @@ import scala.concurrent._
import akka.actor._
-import org.apache.spark.Logging
+import org.apache.spark.{SparkConf, Logging}
import org.apache.spark.deploy.{Command, DriverDescription}
import org.apache.spark.deploy.DeployMessages._
import org.apache.spark.deploy.master.Master
@@ -59,11 +59,12 @@ object DriverClient extends Logging {
def main(args: Array[String]) {
val driverArgs = new DriverClientArguments(args)
+ val conf = new SparkConf()
// TODO: See if we can initialize akka so return messages are sent back using the same TCP
// flow. Else, this (sadly) requires the DriverClient be routable from the Master.
val (actorSystem, _) = AkkaUtils.createActorSystem(
- "driverClient", Utils.localHostName(), 0)
+ "driverClient", Utils.localHostName(), 0, false, conf)
val master = driverArgs.master
val response = promise[(Boolean, String)]
val driver: ActorRef = actorSystem.actorOf(Props(new DriverActor(driverArgs.master, response)))
@@ -95,7 +96,7 @@ object DriverClient extends Logging {
val (success, message) =
try {
- Await.result(response.future, AkkaUtils.askTimeout)
+ Await.result(response.future, AkkaUtils.askTimeout(conf))
} catch {
case e: TimeoutException => (false, s"Master $master failed to respond in time")
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6a3daead/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
index 2deb21a..1640d5f 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala
@@ -2,6 +2,7 @@ package org.apache.spark.deploy.worker
import akka.actor._
+import org.apache.spark.SparkConf
import org.apache.spark.util.{AkkaUtils, Utils}
/**
@@ -12,7 +13,7 @@ object DriverWrapper {
args.toList match {
case workerUrl :: mainClass :: extraArgs =>
val (actorSystem, _) = AkkaUtils.createActorSystem("Driver",
- Utils.localHostName(), 0)
+ Utils.localHostName(), 0, false, new SparkConf())
actorSystem.actorOf(Props(classOf[WorkerWatcher], workerUrl), name = "workerWatcher")
// Delegate to supplied main class
@@ -20,7 +21,7 @@ object DriverWrapper {
val mainMethod = clazz.getMethod("main", classOf[Array[String]])
mainMethod.invoke(null, extraArgs.toArray[String])
- actorSystem.awaitTermination()
+ actorSystem.shutdown()
case _ =>
System.err.println("Usage: DriverWrapper <workerUrl> <driverMainClass> [options]")
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/6a3daead/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index 2072f00..4546e38 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -335,7 +335,7 @@ private[spark] object Worker {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(systemName, host, port,
conf = conf)
actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory,
- masterUrls, workDir, conf), name = actorName)
+ masterUrls, systemName, actorName, workDir, conf), name = actorName)
(actorSystem, boundPort)
}