You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2015/01/28 20:03:03 UTC

spark git commit: [SPARK-5291][CORE] Add timestamp and reason why an executor is removed to SparkListenerExecutorAdded and SparkListenerExecutorRemoved

Repository: spark
Updated Branches:
  refs/heads/master eeb53bf90 -> 0b35fcd7f


[SPARK-5291][CORE] Add timestamp and reason why an executor is removed to SparkListenerExecutorAdded and SparkListenerExecutorRemoved

Recently `SparkListenerExecutorAdded` and `SparkListenerExecutorRemoved` are added.
I think it's useful if they have timestamp and the reason why an executor is removed.

Author: Kousuke Saruta <sa...@oss.nttdata.co.jp>

Closes #4082 from sarutak/SPARK-5291 and squashes the following commits:

a026ff2 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-5291
979dfe1 [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-5291
cf9f9080 [Kousuke Saruta] Fixed test case
1f2a89b [Kousuke Saruta] Merge branch 'master' of git://git.apache.org/spark into SPARK-5291
243f2a60 [Kousuke Saruta] Modified MesosSchedulerBackendSuite
a527c35 [Kousuke Saruta] Added timestamp to SparkListenerExecutorAdded


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0b35fcd7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0b35fcd7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0b35fcd7

Branch: refs/heads/master
Commit: 0b35fcd7f01044e86669bac93e9663277c86365b
Parents: eeb53bf
Author: Kousuke Saruta <sa...@oss.nttdata.co.jp>
Authored: Wed Jan 28 11:02:51 2015 -0800
Committer: Josh Rosen <jo...@databricks.com>
Committed: Wed Jan 28 11:02:51 2015 -0800

----------------------------------------------------------------------
 .../org/apache/spark/scheduler/SparkListener.scala  |  4 ++--
 .../cluster/CoarseGrainedSchedulerBackend.scala     |  6 ++++--
 .../cluster/mesos/MesosSchedulerBackend.scala       | 10 +++++-----
 .../scala/org/apache/spark/util/JsonProtocol.scala  | 12 +++++++++---
 .../mesos/MesosSchedulerBackendSuite.scala          |  4 ++--
 .../org/apache/spark/util/JsonProtocolSuite.scala   | 16 +++++++++++-----
 6 files changed, 33 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0b35fcd7/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index e5d1eb7..8f5ceaa 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -91,11 +91,11 @@ case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockMan
 case class SparkListenerUnpersistRDD(rddId: Int) extends SparkListenerEvent
 
 @DeveloperApi
-case class SparkListenerExecutorAdded(executorId: String, executorInfo: ExecutorInfo)
+case class SparkListenerExecutorAdded(time: Long, executorId: String, executorInfo: ExecutorInfo)
   extends SparkListenerEvent
 
 @DeveloperApi
-case class SparkListenerExecutorRemoved(executorId: String)
+case class SparkListenerExecutorRemoved(time: Long, executorId: String, reason: String)
   extends SparkListenerEvent
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/0b35fcd7/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 5786d36..103a5c0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -108,7 +108,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
               logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
             }
           }
-          listenerBus.post(SparkListenerExecutorAdded(executorId, data))
+          listenerBus.post(
+            SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
           makeOffers()
         }
 
@@ -216,7 +217,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val actorSyste
           totalCoreCount.addAndGet(-executorInfo.totalCores)
           totalRegisteredExecutors.addAndGet(-1)
           scheduler.executorLost(executorId, SlaveLost(reason))
-          listenerBus.post(SparkListenerExecutorRemoved(executorId))
+          listenerBus.post(
+            SparkListenerExecutorRemoved(System.currentTimeMillis(), executorId, reason))
         case None => logError(s"Asked to remove non-existent executor $executorId")
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/0b35fcd7/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
index 79c9051..c3c546b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala
@@ -269,7 +269,7 @@ private[spark] class MesosSchedulerBackend(
 
       mesosTasks.foreach { case (slaveId, tasks) =>
         slaveIdToWorkerOffer.get(slaveId).foreach(o =>
-          listenerBus.post(SparkListenerExecutorAdded(slaveId,
+          listenerBus.post(SparkListenerExecutorAdded(System.currentTimeMillis(), slaveId,
             new ExecutorInfo(o.host, o.cores)))
         )
         d.launchTasks(Collections.singleton(slaveIdToOffer(slaveId).getId), tasks, filters)
@@ -327,7 +327,7 @@ private[spark] class MesosSchedulerBackend(
       synchronized {
         if (status.getState == MesosTaskState.TASK_LOST && taskIdToSlaveId.contains(tid)) {
           // We lost the executor on this slave, so remember that it's gone
-          removeExecutor(taskIdToSlaveId(tid))
+          removeExecutor(taskIdToSlaveId(tid), "Lost executor")
         }
         if (isFinished(status.getState)) {
           taskIdToSlaveId.remove(tid)
@@ -359,9 +359,9 @@ private[spark] class MesosSchedulerBackend(
   /**
    * Remove executor associated with slaveId in a thread safe manner.
    */
-  private def removeExecutor(slaveId: String) = {
+  private def removeExecutor(slaveId: String, reason: String) = {
     synchronized {
-      listenerBus.post(SparkListenerExecutorRemoved(slaveId))
+      listenerBus.post(SparkListenerExecutorRemoved(System.currentTimeMillis(), slaveId, reason))
       slaveIdsWithExecutors -= slaveId
     }
   }
@@ -369,7 +369,7 @@ private[spark] class MesosSchedulerBackend(
   private def recordSlaveLost(d: SchedulerDriver, slaveId: SlaveID, reason: ExecutorLossReason) {
     inClassLoader() {
       logInfo("Mesos slave lost: " + slaveId.getValue)
-      removeExecutor(slaveId.getValue)
+      removeExecutor(slaveId.getValue, reason.toString)
       scheduler.executorLost(slaveId.getValue, reason)
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0b35fcd7/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index f896b50..b5f736d 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -204,13 +204,16 @@ private[spark] object JsonProtocol {
 
   def executorAddedToJson(executorAdded: SparkListenerExecutorAdded): JValue = {
     ("Event" -> Utils.getFormattedClassName(executorAdded)) ~
+    ("Timestamp" -> executorAdded.time) ~
     ("Executor ID" -> executorAdded.executorId) ~
     ("Executor Info" -> executorInfoToJson(executorAdded.executorInfo))
   }
 
   def executorRemovedToJson(executorRemoved: SparkListenerExecutorRemoved): JValue = {
     ("Event" -> Utils.getFormattedClassName(executorRemoved)) ~
-    ("Executor ID" -> executorRemoved.executorId)
+    ("Timestamp" -> executorRemoved.time) ~
+    ("Executor ID" -> executorRemoved.executorId) ~
+    ("Removed Reason" -> executorRemoved.reason)
   }
 
   /** ------------------------------------------------------------------- *
@@ -554,14 +557,17 @@ private[spark] object JsonProtocol {
   }
 
   def executorAddedFromJson(json: JValue): SparkListenerExecutorAdded = {
+    val time = (json \ "Timestamp").extract[Long]
     val executorId = (json \ "Executor ID").extract[String]
     val executorInfo = executorInfoFromJson(json \ "Executor Info")
-    SparkListenerExecutorAdded(executorId, executorInfo)
+    SparkListenerExecutorAdded(time, executorId, executorInfo)
   }
 
   def executorRemovedFromJson(json: JValue): SparkListenerExecutorRemoved = {
+    val time = (json \ "Timestamp").extract[Long]
     val executorId = (json \ "Executor ID").extract[String]
-    SparkListenerExecutorRemoved(executorId)
+    val reason = (json \ "Removed Reason").extract[String]
+    SparkListenerExecutorRemoved(time, executorId, reason)
   }
 
   /** --------------------------------------------------------------------- *

http://git-wip-us.apache.org/repos/asf/spark/blob/0b35fcd7/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
index 073814c..f2ff98e 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/mesos/MesosSchedulerBackendSuite.scala
@@ -43,7 +43,7 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
     conf.set("spark.mesos.executor.home" , "/mesos-home")
 
     val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
-    listenerBus.post(SparkListenerExecutorAdded("s1", new ExecutorInfo("host1", 2)))
+    listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2)))
     EasyMock.replay(listenerBus)
 
     val sc = EasyMock.createMock(classOf[SparkContext])
@@ -88,7 +88,7 @@ class MesosSchedulerBackendSuite extends FunSuite with LocalSparkContext with Ea
     val taskScheduler = EasyMock.createMock(classOf[TaskSchedulerImpl])
 
     val listenerBus = EasyMock.createMock(classOf[LiveListenerBus])
-    listenerBus.post(SparkListenerExecutorAdded("s1", new ExecutorInfo("host1", 2)))
+    listenerBus.post(SparkListenerExecutorAdded(EasyMock.anyLong, "s1", new ExecutorInfo("host1", 2)))
     EasyMock.replay(listenerBus)
 
     val sc = EasyMock.createMock(classOf[SparkContext])

http://git-wip-us.apache.org/repos/asf/spark/blob/0b35fcd7/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 0357fc6..6577eba 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -37,6 +37,9 @@ class JsonProtocolSuite extends FunSuite {
   val jobSubmissionTime = 1421191042750L
   val jobCompletionTime = 1421191296660L
 
+  val executorAddedTime = 1421458410000L
+  val executorRemovedTime = 1421458922000L
+
   test("SparkListenerEvent") {
     val stageSubmitted =
       SparkListenerStageSubmitted(makeStageInfo(100, 200, 300, 400L, 500L), properties)
@@ -73,9 +76,9 @@ class JsonProtocolSuite extends FunSuite {
     val unpersistRdd = SparkListenerUnpersistRDD(12345)
     val applicationStart = SparkListenerApplicationStart("The winner of all", None, 42L, "Garfield")
     val applicationEnd = SparkListenerApplicationEnd(42L)
-    val executorAdded = SparkListenerExecutorAdded("exec1",
+    val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
       new ExecutorInfo("Hostee.awesome.com", 11))
-    val executorRemoved = SparkListenerExecutorRemoved("exec2")
+    val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason")
 
     testEvent(stageSubmitted, stageSubmittedJsonString)
     testEvent(stageCompleted, stageCompletedJsonString)
@@ -1453,9 +1456,10 @@ class JsonProtocolSuite extends FunSuite {
     """
 
   private val executorAddedJsonString =
-    """
+    s"""
       |{
       |  "Event": "SparkListenerExecutorAdded",
+      |  "Timestamp": ${executorAddedTime},
       |  "Executor ID": "exec1",
       |  "Executor Info": {
       |    "Host": "Hostee.awesome.com",
@@ -1465,10 +1469,12 @@ class JsonProtocolSuite extends FunSuite {
     """
 
   private val executorRemovedJsonString =
-    """
+    s"""
       |{
       |  "Event": "SparkListenerExecutorRemoved",
-      |  "Executor ID": "exec2"
+      |  "Timestamp": ${executorRemovedTime},
+      |  "Executor ID": "exec2",
+      |  "Removed Reason": "test reason"
       |}
     """
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org