You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/09/03 23:47:15 UTC

git commit: [SPARK-2845] Add timestamps to block manager events.

Repository: spark
Updated Branches:
  refs/heads/master e5d376801 -> ccc69e26e


[SPARK-2845] Add timestamps to block manager events.

These are not used by the UI but are useful when analysing the
logs from a spark job.

Author: Marcelo Vanzin <va...@cloudera.com>

Closes #654 from vanzin/bm-event-tstamp and squashes the following commits:

d5d6e66 [Marcelo Vanzin] Fix tests.
ec06218 [Marcelo Vanzin] Review feedback.
f134dbc [Marcelo Vanzin] Merge branch 'master' into bm-event-tstamp
b495b7c [Marcelo Vanzin] Merge branch 'master' into bm-event-tstamp
7d2fe9e [Marcelo Vanzin] Review feedback.
d6f381c [Marcelo Vanzin] Update tests added after patch was created.
45e3bf8 [Marcelo Vanzin] Fix unit test after merge.
b37a10f [Marcelo Vanzin] Use === in test assertions.
ef72824 [Marcelo Vanzin] Handle backwards compatibility with 1.0.0.
aca1151 [Marcelo Vanzin] Fix unit test to check new fields.
efdda8e [Marcelo Vanzin] Add timestamps to block manager events.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ccc69e26
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ccc69e26
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ccc69e26

Branch: refs/heads/master
Commit: ccc69e26ec2fadd90886990b90a5a600efd08aba
Parents: e5d3768
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Wed Sep 3 14:47:11 2014 -0700
Committer: Andrew Or <an...@gmail.com>
Committed: Wed Sep 3 14:47:11 2014 -0700

----------------------------------------------------------------------
 .../apache/spark/scheduler/SparkListener.scala  |  4 +--
 .../spark/storage/BlockManagerMasterActor.scala |  7 ++--
 .../org/apache/spark/util/JsonProtocol.scala    | 12 ++++---
 .../storage/StorageStatusListenerSuite.scala    | 18 +++++-----
 .../spark/ui/storage/StorageTabSuite.scala      |  4 +--
 .../apache/spark/util/JsonProtocolSuite.scala   | 37 +++++++++++++++++---
 6 files changed, 58 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ccc69e26/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 86ca844..f33c2e0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -67,11 +67,11 @@ case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(S
   extends SparkListenerEvent
 
 @DeveloperApi
-case class SparkListenerBlockManagerAdded(blockManagerId: BlockManagerId, maxMem: Long)
+case class SparkListenerBlockManagerAdded(time: Long, blockManagerId: BlockManagerId, maxMem: Long)
   extends SparkListenerEvent
 
 @DeveloperApi
-case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId)
+case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockManagerId)
   extends SparkListenerEvent
 
 @DeveloperApi

http://git-wip-us.apache.org/repos/asf/spark/blob/ccc69e26/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 3ab0770..1a6c7cb 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -203,7 +203,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
         blockLocations.remove(blockId)
       }
     }
-    listenerBus.post(SparkListenerBlockManagerRemoved(blockManagerId))
+    listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId))
   }
 
   private def expireDeadHosts() {
@@ -325,6 +325,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
   }
 
   private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
+    val time = System.currentTimeMillis()
     if (!blockManagerInfo.contains(id)) {
       blockManagerIdByExecutor.get(id.executorId) match {
         case Some(manager) =>
@@ -340,9 +341,9 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
         id.hostPort, Utils.bytesToString(maxMemSize)))
 
       blockManagerInfo(id) =
-        new BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveActor)
+        new BlockManagerInfo(id, time, maxMemSize, slaveActor)
     }
-    listenerBus.post(SparkListenerBlockManagerAdded(id, maxMemSize))
+    listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
   }
 
   private def updateBlockInfo(

http://git-wip-us.apache.org/repos/asf/spark/blob/ccc69e26/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index a754345..1fc536b 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -152,13 +152,15 @@ private[spark] object JsonProtocol {
     val blockManagerId = blockManagerIdToJson(blockManagerAdded.blockManagerId)
     ("Event" -> Utils.getFormattedClassName(blockManagerAdded)) ~
     ("Block Manager ID" -> blockManagerId) ~
-    ("Maximum Memory" -> blockManagerAdded.maxMem)
+    ("Maximum Memory" -> blockManagerAdded.maxMem) ~
+    ("Timestamp" -> blockManagerAdded.time)
   }
 
   def blockManagerRemovedToJson(blockManagerRemoved: SparkListenerBlockManagerRemoved): JValue = {
     val blockManagerId = blockManagerIdToJson(blockManagerRemoved.blockManagerId)
     ("Event" -> Utils.getFormattedClassName(blockManagerRemoved)) ~
-    ("Block Manager ID" -> blockManagerId)
+    ("Block Manager ID" -> blockManagerId) ~
+    ("Timestamp" -> blockManagerRemoved.time)
   }
 
   def unpersistRDDToJson(unpersistRDD: SparkListenerUnpersistRDD): JValue = {
@@ -466,12 +468,14 @@ private[spark] object JsonProtocol {
   def blockManagerAddedFromJson(json: JValue): SparkListenerBlockManagerAdded = {
     val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
     val maxMem = (json \ "Maximum Memory").extract[Long]
-    SparkListenerBlockManagerAdded(blockManagerId, maxMem)
+    val time = Utils.jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
+    SparkListenerBlockManagerAdded(time, blockManagerId, maxMem)
   }
 
   def blockManagerRemovedFromJson(json: JValue): SparkListenerBlockManagerRemoved = {
     val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
-    SparkListenerBlockManagerRemoved(blockManagerId)
+    val time = Utils.jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
+    SparkListenerBlockManagerRemoved(time, blockManagerId)
   }
 
   def unpersistRDDFromJson(json: JValue): SparkListenerUnpersistRDD = {

http://git-wip-us.apache.org/repos/asf/spark/blob/ccc69e26/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
index 4e022a6..3a45875 100644
--- a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
@@ -36,13 +36,13 @@ class StorageStatusListenerSuite extends FunSuite {
 
     // Block manager add
     assert(listener.executorIdToStorageStatus.size === 0)
-    listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
+    listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
     assert(listener.executorIdToStorageStatus.size === 1)
     assert(listener.executorIdToStorageStatus.get("big").isDefined)
     assert(listener.executorIdToStorageStatus("big").blockManagerId === bm1)
     assert(listener.executorIdToStorageStatus("big").maxMem === 1000L)
     assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
-    listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
+    listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
     assert(listener.executorIdToStorageStatus.size === 2)
     assert(listener.executorIdToStorageStatus.get("fat").isDefined)
     assert(listener.executorIdToStorageStatus("fat").blockManagerId === bm2)
@@ -50,11 +50,11 @@ class StorageStatusListenerSuite extends FunSuite {
     assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
 
     // Block manager remove
-    listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm1))
+    listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(1L, bm1))
     assert(listener.executorIdToStorageStatus.size === 1)
     assert(!listener.executorIdToStorageStatus.get("big").isDefined)
     assert(listener.executorIdToStorageStatus.get("fat").isDefined)
-    listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm2))
+    listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(1L, bm2))
     assert(listener.executorIdToStorageStatus.size === 0)
     assert(!listener.executorIdToStorageStatus.get("big").isDefined)
     assert(!listener.executorIdToStorageStatus.get("fat").isDefined)
@@ -62,8 +62,8 @@ class StorageStatusListenerSuite extends FunSuite {
 
   test("task end without updated blocks") {
     val listener = new StorageStatusListener
-    listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
-    listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
+    listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
+    listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
     val taskMetrics = new TaskMetrics
 
     // Task end with no updated blocks
@@ -79,8 +79,8 @@ class StorageStatusListenerSuite extends FunSuite {
 
   test("task end with updated blocks") {
     val listener = new StorageStatusListener
-    listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
-    listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
+    listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
+    listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
     val taskMetrics1 = new TaskMetrics
     val taskMetrics2 = new TaskMetrics
     val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L))
@@ -128,7 +128,7 @@ class StorageStatusListenerSuite extends FunSuite {
 
   test("unpersist RDD") {
     val listener = new StorageStatusListener
-    listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
+    listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
     val taskMetrics1 = new TaskMetrics
     val taskMetrics2 = new TaskMetrics
     val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L))

http://git-wip-us.apache.org/repos/asf/spark/blob/ccc69e26/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
index d9e9c70..e1bc137 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -108,7 +108,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
     val myRddInfo1 = rddInfo1
     val myRddInfo2 = rddInfo2
     val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), "details")
-    bus.postToAll(SparkListenerBlockManagerAdded(bm1, 1000L))
+    bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
     bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
     assert(storageListener._rddInfoMap.size === 3)
     assert(storageListener.rddInfoList.size === 0) // not cached
@@ -175,7 +175,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
     val block1 = (RDDBlockId(1, 1), BlockStatus(memOnly, 200L, 0L, 0L))
     taskMetrics0.updatedBlocks = Some(Seq(block0))
     taskMetrics1.updatedBlocks = Some(Seq(block1))
-    bus.postToAll(SparkListenerBlockManagerAdded(bm1, 1000L))
+    bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
     bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
     assert(storageListener.rddInfoList.size === 0)
     bus.postToAll(SparkListenerTaskEnd(0, 0, "big", Success, taskInfo, taskMetrics0))

http://git-wip-us.apache.org/repos/asf/spark/blob/ccc69e26/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 66a17de..c84bafc 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -21,6 +21,9 @@ import java.util.Properties
 
 import scala.collection.Map
 
+import org.json4s.DefaultFormats
+import org.json4s.JsonDSL._
+import org.json4s.JsonAST._
 import org.json4s.jackson.JsonMethods._
 import org.scalatest.FunSuite
 
@@ -52,9 +55,9 @@ class JsonProtocolSuite extends FunSuite {
       "System Properties" -> Seq(("Username", "guest"), ("Password", "guest")),
       "Classpath Entries" -> Seq(("Super library", "/tmp/super_library"))
     ))
-    val blockManagerAdded = SparkListenerBlockManagerAdded(
+    val blockManagerAdded = SparkListenerBlockManagerAdded(1L,
       BlockManagerId("Stars", "In your multitude...", 300), 500)
-    val blockManagerRemoved = SparkListenerBlockManagerRemoved(
+    val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L,
       BlockManagerId("Scarce", "to be counted...", 100))
     val unpersistRdd = SparkListenerUnpersistRDD(12345)
     val applicationStart = SparkListenerApplicationStart("The winner of all", 42L, "Garfield")
@@ -151,6 +154,28 @@ class JsonProtocolSuite extends FunSuite {
     assert(newMetrics.inputMetrics.isEmpty)
   }
 
+  test("BlockManager events backward compatibility") {
+    // SparkListenerBlockManagerAdded/Removed in Spark 1.0.0 do not have a "time" property.
+    val blockManagerAdded = SparkListenerBlockManagerAdded(1L,
+      BlockManagerId("Stars", "In your multitude...", 300), 500)
+    val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L,
+      BlockManagerId("Scarce", "to be counted...", 100))
+
+    val oldBmAdded = JsonProtocol.blockManagerAddedToJson(blockManagerAdded)
+      .removeField({ _._1 == "Timestamp" })
+
+    val deserializedBmAdded = JsonProtocol.blockManagerAddedFromJson(oldBmAdded)
+    assert(SparkListenerBlockManagerAdded(-1L, blockManagerAdded.blockManagerId,
+      blockManagerAdded.maxMem) === deserializedBmAdded)
+
+    val oldBmRemoved = JsonProtocol.blockManagerRemovedToJson(blockManagerRemoved)
+      .removeField({ _._1 == "Timestamp" })
+
+    val deserializedBmRemoved = JsonProtocol.blockManagerRemovedFromJson(oldBmRemoved)
+    assert(SparkListenerBlockManagerRemoved(-1L, blockManagerRemoved.blockManagerId) ===
+      deserializedBmRemoved)
+  }
+
 
   /** -------------------------- *
    | Helper test running methods |
@@ -242,8 +267,10 @@ class JsonProtocolSuite extends FunSuite {
         assertEquals(e1.environmentDetails, e2.environmentDetails)
       case (e1: SparkListenerBlockManagerAdded, e2: SparkListenerBlockManagerAdded) =>
         assert(e1.maxMem === e2.maxMem)
+        assert(e1.time === e2.time)
         assertEquals(e1.blockManagerId, e2.blockManagerId)
       case (e1: SparkListenerBlockManagerRemoved, e2: SparkListenerBlockManagerRemoved) =>
+        assert(e1.time === e2.time)
         assertEquals(e1.blockManagerId, e2.blockManagerId)
       case (e1: SparkListenerUnpersistRDD, e2: SparkListenerUnpersistRDD) =>
         assert(e1.rddId == e2.rddId)
@@ -945,7 +972,8 @@ class JsonProtocolSuite extends FunSuite {
       |    "Host": "In your multitude...",
       |    "Port": 300
       |  },
-      |  "Maximum Memory": 500
+      |  "Maximum Memory": 500,
+      |  "Timestamp": 1
       |}
     """
 
@@ -957,7 +985,8 @@ class JsonProtocolSuite extends FunSuite {
       |    "Executor ID": "Scarce",
       |    "Host": "to be counted...",
       |    "Port": 100
-      |  }
+      |  },
+      |  "Timestamp": 2
       |}
     """
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org