You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2014/09/03 23:47:15 UTC
git commit: [SPARK-2845] Add timestamps to block manager events.
Repository: spark
Updated Branches:
refs/heads/master e5d376801 -> ccc69e26e
[SPARK-2845] Add timestamps to block manager events.
These are not used by the UI but are useful when analysing the
logs from a spark job.
Author: Marcelo Vanzin <va...@cloudera.com>
Closes #654 from vanzin/bm-event-tstamp and squashes the following commits:
d5d6e66 [Marcelo Vanzin] Fix tests.
ec06218 [Marcelo Vanzin] Review feedback.
f134dbc [Marcelo Vanzin] Merge branch 'master' into bm-event-tstamp
b495b7c [Marcelo Vanzin] Merge branch 'master' into bm-event-tstamp
7d2fe9e [Marcelo Vanzin] Review feedback.
d6f381c [Marcelo Vanzin] Update tests added after patch was created.
45e3bf8 [Marcelo Vanzin] Fix unit test after merge.
b37a10f [Marcelo Vanzin] Use === in test assertions.
ef72824 [Marcelo Vanzin] Handle backwards compatibility with 1.0.0.
aca1151 [Marcelo Vanzin] Fix unit test to check new fields.
efdda8e [Marcelo Vanzin] Add timestamps to block manager events.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ccc69e26
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ccc69e26
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ccc69e26
Branch: refs/heads/master
Commit: ccc69e26ec2fadd90886990b90a5a600efd08aba
Parents: e5d3768
Author: Marcelo Vanzin <va...@cloudera.com>
Authored: Wed Sep 3 14:47:11 2014 -0700
Committer: Andrew Or <an...@gmail.com>
Committed: Wed Sep 3 14:47:11 2014 -0700
----------------------------------------------------------------------
.../apache/spark/scheduler/SparkListener.scala | 4 +--
.../spark/storage/BlockManagerMasterActor.scala | 7 ++--
.../org/apache/spark/util/JsonProtocol.scala | 12 ++++---
.../storage/StorageStatusListenerSuite.scala | 18 +++++-----
.../spark/ui/storage/StorageTabSuite.scala | 4 +--
.../apache/spark/util/JsonProtocolSuite.scala | 37 +++++++++++++++++---
6 files changed, 58 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/ccc69e26/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
index 86ca844..f33c2e0 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala
@@ -67,11 +67,11 @@ case class SparkListenerEnvironmentUpdate(environmentDetails: Map[String, Seq[(S
extends SparkListenerEvent
@DeveloperApi
-case class SparkListenerBlockManagerAdded(blockManagerId: BlockManagerId, maxMem: Long)
+case class SparkListenerBlockManagerAdded(time: Long, blockManagerId: BlockManagerId, maxMem: Long)
extends SparkListenerEvent
@DeveloperApi
-case class SparkListenerBlockManagerRemoved(blockManagerId: BlockManagerId)
+case class SparkListenerBlockManagerRemoved(time: Long, blockManagerId: BlockManagerId)
extends SparkListenerEvent
@DeveloperApi
http://git-wip-us.apache.org/repos/asf/spark/blob/ccc69e26/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
index 3ab0770..1a6c7cb 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterActor.scala
@@ -203,7 +203,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
blockLocations.remove(blockId)
}
}
- listenerBus.post(SparkListenerBlockManagerRemoved(blockManagerId))
+ listenerBus.post(SparkListenerBlockManagerRemoved(System.currentTimeMillis(), blockManagerId))
}
private def expireDeadHosts() {
@@ -325,6 +325,7 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
}
private def register(id: BlockManagerId, maxMemSize: Long, slaveActor: ActorRef) {
+ val time = System.currentTimeMillis()
if (!blockManagerInfo.contains(id)) {
blockManagerIdByExecutor.get(id.executorId) match {
case Some(manager) =>
@@ -340,9 +341,9 @@ class BlockManagerMasterActor(val isLocal: Boolean, conf: SparkConf, listenerBus
id.hostPort, Utils.bytesToString(maxMemSize)))
blockManagerInfo(id) =
- new BlockManagerInfo(id, System.currentTimeMillis(), maxMemSize, slaveActor)
+ new BlockManagerInfo(id, time, maxMemSize, slaveActor)
}
- listenerBus.post(SparkListenerBlockManagerAdded(id, maxMemSize))
+ listenerBus.post(SparkListenerBlockManagerAdded(time, id, maxMemSize))
}
private def updateBlockInfo(
http://git-wip-us.apache.org/repos/asf/spark/blob/ccc69e26/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index a754345..1fc536b 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -152,13 +152,15 @@ private[spark] object JsonProtocol {
val blockManagerId = blockManagerIdToJson(blockManagerAdded.blockManagerId)
("Event" -> Utils.getFormattedClassName(blockManagerAdded)) ~
("Block Manager ID" -> blockManagerId) ~
- ("Maximum Memory" -> blockManagerAdded.maxMem)
+ ("Maximum Memory" -> blockManagerAdded.maxMem) ~
+ ("Timestamp" -> blockManagerAdded.time)
}
def blockManagerRemovedToJson(blockManagerRemoved: SparkListenerBlockManagerRemoved): JValue = {
val blockManagerId = blockManagerIdToJson(blockManagerRemoved.blockManagerId)
("Event" -> Utils.getFormattedClassName(blockManagerRemoved)) ~
- ("Block Manager ID" -> blockManagerId)
+ ("Block Manager ID" -> blockManagerId) ~
+ ("Timestamp" -> blockManagerRemoved.time)
}
def unpersistRDDToJson(unpersistRDD: SparkListenerUnpersistRDD): JValue = {
@@ -466,12 +468,14 @@ private[spark] object JsonProtocol {
def blockManagerAddedFromJson(json: JValue): SparkListenerBlockManagerAdded = {
val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
val maxMem = (json \ "Maximum Memory").extract[Long]
- SparkListenerBlockManagerAdded(blockManagerId, maxMem)
+ val time = Utils.jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
+ SparkListenerBlockManagerAdded(time, blockManagerId, maxMem)
}
def blockManagerRemovedFromJson(json: JValue): SparkListenerBlockManagerRemoved = {
val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
- SparkListenerBlockManagerRemoved(blockManagerId)
+ val time = Utils.jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
+ SparkListenerBlockManagerRemoved(time, blockManagerId)
}
def unpersistRDDFromJson(json: JValue): SparkListenerUnpersistRDD = {
http://git-wip-us.apache.org/repos/asf/spark/blob/ccc69e26/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
index 4e022a6..3a45875 100644
--- a/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/storage/StorageStatusListenerSuite.scala
@@ -36,13 +36,13 @@ class StorageStatusListenerSuite extends FunSuite {
// Block manager add
assert(listener.executorIdToStorageStatus.size === 0)
- listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
assert(listener.executorIdToStorageStatus.size === 1)
assert(listener.executorIdToStorageStatus.get("big").isDefined)
assert(listener.executorIdToStorageStatus("big").blockManagerId === bm1)
assert(listener.executorIdToStorageStatus("big").maxMem === 1000L)
assert(listener.executorIdToStorageStatus("big").numBlocks === 0)
- listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
assert(listener.executorIdToStorageStatus.size === 2)
assert(listener.executorIdToStorageStatus.get("fat").isDefined)
assert(listener.executorIdToStorageStatus("fat").blockManagerId === bm2)
@@ -50,11 +50,11 @@ class StorageStatusListenerSuite extends FunSuite {
assert(listener.executorIdToStorageStatus("fat").numBlocks === 0)
// Block manager remove
- listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm1))
+ listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(1L, bm1))
assert(listener.executorIdToStorageStatus.size === 1)
assert(!listener.executorIdToStorageStatus.get("big").isDefined)
assert(listener.executorIdToStorageStatus.get("fat").isDefined)
- listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(bm2))
+ listener.onBlockManagerRemoved(SparkListenerBlockManagerRemoved(1L, bm2))
assert(listener.executorIdToStorageStatus.size === 0)
assert(!listener.executorIdToStorageStatus.get("big").isDefined)
assert(!listener.executorIdToStorageStatus.get("fat").isDefined)
@@ -62,8 +62,8 @@ class StorageStatusListenerSuite extends FunSuite {
test("task end without updated blocks") {
val listener = new StorageStatusListener
- listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
- listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
val taskMetrics = new TaskMetrics
// Task end with no updated blocks
@@ -79,8 +79,8 @@ class StorageStatusListenerSuite extends FunSuite {
test("task end with updated blocks") {
val listener = new StorageStatusListener
- listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
- listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm2, 2000L))
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm2, 2000L))
val taskMetrics1 = new TaskMetrics
val taskMetrics2 = new TaskMetrics
val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L))
@@ -128,7 +128,7 @@ class StorageStatusListenerSuite extends FunSuite {
test("unpersist RDD") {
val listener = new StorageStatusListener
- listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(bm1, 1000L))
+ listener.onBlockManagerAdded(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
val taskMetrics1 = new TaskMetrics
val taskMetrics2 = new TaskMetrics
val block1 = (RDDBlockId(1, 1), BlockStatus(StorageLevel.DISK_ONLY, 0L, 100L, 0L))
http://git-wip-us.apache.org/repos/asf/spark/blob/ccc69e26/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
index d9e9c70..e1bc137 100644
--- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
+++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala
@@ -108,7 +108,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
val myRddInfo1 = rddInfo1
val myRddInfo2 = rddInfo2
val stageInfo0 = new StageInfo(0, 0, "0", 100, Seq(myRddInfo0, myRddInfo1, myRddInfo2), "details")
- bus.postToAll(SparkListenerBlockManagerAdded(bm1, 1000L))
+ bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
assert(storageListener._rddInfoMap.size === 3)
assert(storageListener.rddInfoList.size === 0) // not cached
@@ -175,7 +175,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter {
val block1 = (RDDBlockId(1, 1), BlockStatus(memOnly, 200L, 0L, 0L))
taskMetrics0.updatedBlocks = Some(Seq(block0))
taskMetrics1.updatedBlocks = Some(Seq(block1))
- bus.postToAll(SparkListenerBlockManagerAdded(bm1, 1000L))
+ bus.postToAll(SparkListenerBlockManagerAdded(1L, bm1, 1000L))
bus.postToAll(SparkListenerStageSubmitted(stageInfo0))
assert(storageListener.rddInfoList.size === 0)
bus.postToAll(SparkListenerTaskEnd(0, 0, "big", Success, taskInfo, taskMetrics0))
http://git-wip-us.apache.org/repos/asf/spark/blob/ccc69e26/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 66a17de..c84bafc 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -21,6 +21,9 @@ import java.util.Properties
import scala.collection.Map
+import org.json4s.DefaultFormats
+import org.json4s.JsonDSL._
+import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods._
import org.scalatest.FunSuite
@@ -52,9 +55,9 @@ class JsonProtocolSuite extends FunSuite {
"System Properties" -> Seq(("Username", "guest"), ("Password", "guest")),
"Classpath Entries" -> Seq(("Super library", "/tmp/super_library"))
))
- val blockManagerAdded = SparkListenerBlockManagerAdded(
+ val blockManagerAdded = SparkListenerBlockManagerAdded(1L,
BlockManagerId("Stars", "In your multitude...", 300), 500)
- val blockManagerRemoved = SparkListenerBlockManagerRemoved(
+ val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L,
BlockManagerId("Scarce", "to be counted...", 100))
val unpersistRdd = SparkListenerUnpersistRDD(12345)
val applicationStart = SparkListenerApplicationStart("The winner of all", 42L, "Garfield")
@@ -151,6 +154,28 @@ class JsonProtocolSuite extends FunSuite {
assert(newMetrics.inputMetrics.isEmpty)
}
+ test("BlockManager events backward compatibility") {
+ // SparkListenerBlockManagerAdded/Removed in Spark 1.0.0 do not have a "time" property.
+ val blockManagerAdded = SparkListenerBlockManagerAdded(1L,
+ BlockManagerId("Stars", "In your multitude...", 300), 500)
+ val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L,
+ BlockManagerId("Scarce", "to be counted...", 100))
+
+ val oldBmAdded = JsonProtocol.blockManagerAddedToJson(blockManagerAdded)
+ .removeField({ _._1 == "Timestamp" })
+
+ val deserializedBmAdded = JsonProtocol.blockManagerAddedFromJson(oldBmAdded)
+ assert(SparkListenerBlockManagerAdded(-1L, blockManagerAdded.blockManagerId,
+ blockManagerAdded.maxMem) === deserializedBmAdded)
+
+ val oldBmRemoved = JsonProtocol.blockManagerRemovedToJson(blockManagerRemoved)
+ .removeField({ _._1 == "Timestamp" })
+
+ val deserializedBmRemoved = JsonProtocol.blockManagerRemovedFromJson(oldBmRemoved)
+ assert(SparkListenerBlockManagerRemoved(-1L, blockManagerRemoved.blockManagerId) ===
+ deserializedBmRemoved)
+ }
+
/** -------------------------- *
| Helper test running methods |
@@ -242,8 +267,10 @@ class JsonProtocolSuite extends FunSuite {
assertEquals(e1.environmentDetails, e2.environmentDetails)
case (e1: SparkListenerBlockManagerAdded, e2: SparkListenerBlockManagerAdded) =>
assert(e1.maxMem === e2.maxMem)
+ assert(e1.time === e2.time)
assertEquals(e1.blockManagerId, e2.blockManagerId)
case (e1: SparkListenerBlockManagerRemoved, e2: SparkListenerBlockManagerRemoved) =>
+ assert(e1.time === e2.time)
assertEquals(e1.blockManagerId, e2.blockManagerId)
case (e1: SparkListenerUnpersistRDD, e2: SparkListenerUnpersistRDD) =>
assert(e1.rddId == e2.rddId)
@@ -945,7 +972,8 @@ class JsonProtocolSuite extends FunSuite {
| "Host": "In your multitude...",
| "Port": 300
| },
- | "Maximum Memory": 500
+ | "Maximum Memory": 500,
+ | "Timestamp": 1
|}
"""
@@ -957,7 +985,8 @@ class JsonProtocolSuite extends FunSuite {
| "Executor ID": "Scarce",
| "Host": "to be counted...",
| "Port": 100
- | }
+ | },
+ | "Timestamp": 2
|}
"""
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org