You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/10 03:38:20 UTC
[15/37] git commit: Minor fixes
Minor fixes
Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/5c1b4f64
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/5c1b4f64
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/5c1b4f64
Branch: refs/heads/master
Commit: 5c1b4f64052e8fae0d942def4d6085a971faee4e
Parents: c23d640
Author: Patrick Wendell <pw...@gmail.com>
Authored: Thu Dec 26 14:14:49 2013 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Dec 26 14:39:39 2013 -0800
----------------------------------------------------------------------
.../spark/deploy/client/DriverClient.scala | 1 -
.../org/apache/spark/deploy/master/Master.scala | 36 ++++++++++++--------
.../spark/deploy/worker/ui/IndexPage.scala | 8 ++---
3 files changed, 25 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5c1b4f64/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
index d2f3c09..8f19294 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
@@ -90,7 +90,6 @@ object DriverClient extends Logging {
case e: TimeoutException => (false, s"Master $master failed to respond in time")
}
if (success) logInfo(message) else logError(message)
- actorSystem.stop(driver)
actorSystem.shutdown()
actorSystem.awaitTermination()
}
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5c1b4f64/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 7f9ad8a..a0db2a2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -37,6 +37,7 @@ import org.apache.spark.deploy.master.MasterMessages._
import org.apache.spark.deploy.master.ui.MasterWebUI
import org.apache.spark.metrics.MetricsSystem
import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.deploy.master.DriverState.DriverState
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
import context.dispatcher
@@ -268,21 +269,11 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
}
case DriverStateChanged(driverId, state, exception) => {
- if (!(state == DriverState.FAILED || state == DriverState.FINISHED ||
- state == DriverState.KILLED)) {
- throw new Exception(s"Received unexpected state update for driver $driverId: $state")
- }
- drivers.find(_.id == driverId) match {
- case Some(driver) => {
- drivers -= driver
- completedDrivers += driver
- persistenceEngine.removeDriver(driver)
- driver.state = state
- driver.exception = exception
- driver.worker.foreach(w => w.removeDriver(driver))
- }
- case None =>
- logWarning(s"Got driver update for unknown driver $driverId")
+ state match {
+ case DriverState.FAILED | DriverState.FINISHED | DriverState.KILLED =>
+ removeDriver(driverId, state, exception)
+ case _ =>
+ throw new Exception(s"Received unexpected state update for driver $driverId: $state")
}
}
@@ -638,6 +629,21 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
worker.actor ! LaunchDriver(driver.id, driver.desc)
driver.state = DriverState.RUNNING
}
+
+ def removeDriver(driverId: String, finalState: DriverState, exception: Option[Exception]) {
+ drivers.find(d => d.id == driverId) match {
+ case Some(driver) =>
+ logInfo(s"Removing driver: $driverId")
+ drivers -= driver
+ completedDrivers += driver
+ persistenceEngine.removeDriver(driver)
+ driver.state = finalState
+ driver.exception = exception
+ driver.worker.foreach(w => w.removeDriver(driver))
+ case None =>
+ logWarning(s"Asked to remove unknown driver: $driverId")
+ }
+ }
}
private[spark] object Master {
http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5c1b4f64/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
index c8cafac..35a1507 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
@@ -53,10 +53,10 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
UIUtils.listingTable(executorHeaders, executorRow, workerState.finishedExecutors)
val driverHeaders = Seq("DriverID", "Main Class", "Cores", "Memory", "Logs")
- val runningDriverTable =
- UIUtils.listingTable(driverHeaders, driverRow, workerState.drivers)
- def finishedDriverTable =
- UIUtils.listingTable(driverHeaders, driverRow, workerState.finishedDrivers)
+ val runningDrivers = workerState.drivers.sortBy(_.driverId).reverse
+ val runningDriverTable = UIUtils.listingTable(driverHeaders, driverRow, runningDrivers)
+ val finishedDrivers = workerState.finishedDrivers.sortBy(_.driverId).reverse
+ def finishedDriverTable = UIUtils.listingTable(driverHeaders, driverRow, finishedDrivers)
val content =
<div class="row-fluid"> <!-- Worker Details -->