You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/01/10 03:38:20 UTC

[15/37] git commit: Minor fixes

Minor fixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-spark/commit/5c1b4f64
Tree: http://git-wip-us.apache.org/repos/asf/incubator-spark/tree/5c1b4f64
Diff: http://git-wip-us.apache.org/repos/asf/incubator-spark/diff/5c1b4f64

Branch: refs/heads/master
Commit: 5c1b4f64052e8fae0d942def4d6085a971faee4e
Parents: c23d640
Author: Patrick Wendell <pw...@gmail.com>
Authored: Thu Dec 26 14:14:49 2013 -0800
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Thu Dec 26 14:39:39 2013 -0800

----------------------------------------------------------------------
 .../spark/deploy/client/DriverClient.scala      |  1 -
 .../org/apache/spark/deploy/master/Master.scala | 36 ++++++++++++--------
 .../spark/deploy/worker/ui/IndexPage.scala      |  8 ++---
 3 files changed, 25 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5c1b4f64/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
index d2f3c09..8f19294 100644
--- a/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/client/DriverClient.scala
@@ -90,7 +90,6 @@ object DriverClient extends Logging {
         case e: TimeoutException => (false, s"Master $master failed to respond in time")
       }
     if (success) logInfo(message) else logError(message)
-    actorSystem.stop(driver)
     actorSystem.shutdown()
     actorSystem.awaitTermination()
   }

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5c1b4f64/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 7f9ad8a..a0db2a2 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -37,6 +37,7 @@ import org.apache.spark.deploy.master.MasterMessages._
 import org.apache.spark.deploy.master.ui.MasterWebUI
 import org.apache.spark.metrics.MetricsSystem
 import org.apache.spark.util.{AkkaUtils, Utils}
+import org.apache.spark.deploy.master.DriverState.DriverState
 
 private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
   import context.dispatcher
@@ -268,21 +269,11 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
     }
 
     case DriverStateChanged(driverId, state, exception) => {
-      if (!(state == DriverState.FAILED || state == DriverState.FINISHED ||
-          state == DriverState.KILLED)) {
-        throw new Exception(s"Received unexpected state update for driver $driverId: $state")
-      }
-      drivers.find(_.id == driverId) match {
-        case Some(driver) => {
-          drivers -= driver
-          completedDrivers += driver
-          persistenceEngine.removeDriver(driver)
-          driver.state = state
-          driver.exception = exception
-          driver.worker.foreach(w => w.removeDriver(driver))
-        }
-        case None =>
-          logWarning(s"Got driver update for unknown driver $driverId")
+      state match {
+        case DriverState.FAILED | DriverState.FINISHED | DriverState.KILLED =>
+          removeDriver(driverId, state, exception)
+        case _ =>
+          throw new Exception(s"Received unexpected state update for driver $driverId: $state")
       }
     }
 
@@ -638,6 +629,21 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
     worker.actor ! LaunchDriver(driver.id, driver.desc)
     driver.state = DriverState.RUNNING
   }
+
+  def removeDriver(driverId: String, finalState: DriverState, exception: Option[Exception]) {
+    drivers.find(d => d.id == driverId) match {
+      case Some(driver) =>
+        logInfo(s"Removing driver: $driverId")
+        drivers -= driver
+        completedDrivers += driver
+        persistenceEngine.removeDriver(driver)
+        driver.state = finalState
+        driver.exception = exception
+        driver.worker.foreach(w => w.removeDriver(driver))
+      case None =>
+        logWarning(s"Asked to remove unknown driver: $driverId")
+    }
+  }
 }
 
 private[spark] object Master {

http://git-wip-us.apache.org/repos/asf/incubator-spark/blob/5c1b4f64/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
index c8cafac..35a1507 100644
--- a/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/worker/ui/IndexPage.scala
@@ -53,10 +53,10 @@ private[spark] class IndexPage(parent: WorkerWebUI) {
       UIUtils.listingTable(executorHeaders, executorRow, workerState.finishedExecutors)
 
     val driverHeaders = Seq("DriverID", "Main Class", "Cores", "Memory", "Logs")
-    val runningDriverTable =
-      UIUtils.listingTable(driverHeaders, driverRow, workerState.drivers)
-    def finishedDriverTable =
-      UIUtils.listingTable(driverHeaders, driverRow, workerState.finishedDrivers)
+    val runningDrivers = workerState.drivers.sortBy(_.driverId).reverse
+    val runningDriverTable = UIUtils.listingTable(driverHeaders, driverRow, runningDrivers)
+    val finishedDrivers = workerState.finishedDrivers.sortBy(_.driverId).reverse
+    def finishedDriverTable = UIUtils.listingTable(driverHeaders, driverRow, finishedDrivers)
 
     val content =
         <div class="row-fluid"> <!-- Worker Details -->