You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/10 03:38:11 UTC
[06/37] git commit: Minor style clean-up
Minor style clean-up
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/c9c0f745
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/c9c0f745
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/c9c0f745
Branch: refs/heads/master
Commit: c9c0f745afcf00c17fa073e4ca6dd9433400be95
Parents: b2b7514
Author: Patrick Wendell <pw...@gmail.com>
Authored: Wed Dec 25 00:54:34 2013 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Wed Dec 25 01:19:25 2013 -0800
----------------------------------------------------------------------
.../spark/deploy/client/DriverClient.scala | 2 ++
.../org/apache/spark/deploy/master/Master.scala | 3 +--
.../spark/deploy/master/ui/IndexPage.scala | 22 ++++++++++----------
.../spark/deploy/worker/DriverRunner.scala | 4 ++--
.../org/apache/spark/deploy/worker/Worker.scala | 3 ---
.../spark/examples/DriverSubmissionTest.scala | 1 +
6 files changed, 17 insertions(+), 18 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c9c0f745/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
index 9c0a626..28c851b 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
@@ -101,6 +101,8 @@ object DriverClient {
def main(args: Array[String]) {
val driverArgs = new DriverClientArguments(args)
+ // TODO: See if we can initialize akka so return messages are sent back using the same TCP
+ // flow. Else, this (sadly) requires the DriverClient be routable from the Master.
val (actorSystem, boundPort) = AkkaUtils.createActorSystem(
"driverClient", Utils.localHostName(), 0)
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c9c0f745/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index f5d6fda..0528ef4 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -185,7 +185,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
schedule()
// TODO: It might be good to instead have the submission client poll the master to determine
- // the current status of the driver. Since we may already want to expose this.
+ // the current status of the driver. For now it's simply "fire and forget".
sender ! SubmitDriverResponse(true, "Driver successfully submitted")
}
@@ -611,7 +611,6 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
}
- /** Generate a new driver ID given a driver's submission date */
def newDriverId(submitDate: Date): String = {
val appId = "driver-%s-%04d".format(DATE_FORMAT.format(submitDate), nextDriverNumber)
nextDriverNumber += 1
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c9c0f745/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
index 6a99d7a..3c6fca3 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/ui/IndexPage.scala
@@ -106,20 +106,20 @@ private[spark] class IndexPage(parent: MasterWebUI) {
</div>
</div>
- <div class="row-fluid">
- <div class="span12">
- <h4> Active Drivers </h4>
+ <div class="row-fluid">
+ <div class="span12">
+ <h4> Active Drivers </h4>
- {activeDriversTable}
- </div>
+ {activeDriversTable}
</div>
+ </div>
- <div class="row-fluid">
- <div class="span12">
- <h4> Completed Drivers </h4>
- {completedDriversTable}
- </div>
- </div>;
+ <div class="row-fluid">
+ <div class="span12">
+ <h4> Completed Drivers </h4>
+ {completedDriversTable}
+ </div>
+ </div>;
UIUtils.basicSparkPage(content, "Spark Master at " + state.uri)
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c9c0f745/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
index b030d60..28d4297 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala
@@ -32,7 +32,7 @@ import org.apache.spark.deploy.master.DriverState
import org.apache.spark.util.Utils
/**
- * Manages the execution of one driver process.
+ * Manages the execution of one driver, including automatically restarting the driver on failure.
*/
private[spark] class DriverRunner(
val driverId: String,
@@ -133,7 +133,7 @@ private[spark] class DriverRunner(
localJarFilename
}
- /** Continue launching the supplied command until it exits zero. */
+ /** Continue launching the supplied command until it exits zero or is killed. */
def runCommandWithRetry(command: Seq[String], envVars: Seq[(String, String)], baseDir: File) = {
// Time to wait between submission retries.
var waitSeconds = 1
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c9c0f745/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
index b6a84fc..42c28cf 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala
@@ -254,17 +254,14 @@ private[spark] class Worker(
case KillDriver(driverId) => {
logInfo(s"Asked to kill driver $driverId")
-
drivers.find(_._1 == driverId) match {
case Some((id, runner)) =>
runner.kill()
case None =>
logError(s"Asked to kill unknown driver $driverId")
}
-
}
-
case DriverStateChanged(driverId, state, exception) => {
state match {
case DriverState.FAILED =>
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/c9c0f745/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala
index 9055ce7..65251e9 100644
--- a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala
@@ -39,6 +39,7 @@ object DriverSubmissionTest {
properties.filter{case (k, v) => k.toString.contains("spark.test")}.foreach(println)
for (i <- 1 until numSecondsToSleep) {
+ println(s"Alive for $i out of $numSecondsToSleep seconds")
Thread.sleep(1000)
}
}