You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by an...@apache.org on 2015/07/21 18:51:16 UTC

spark git commit: [SPARK-9036] [CORE] SparkListenerExecutorMetricsUpdate messages not included in JsonProtocol

Repository: spark
Updated Branches:
  refs/heads/master 6592a6058 -> f67da43c3


[SPARK-9036] [CORE] SparkListenerExecutorMetricsUpdate messages not included in JsonProtocol

This PR implements a JSON serializer and deserializer in the JSONProtocol to handle the (de)serialization of SparkListenerExecutorMetricsUpdate events. It also includes a unit test in the JSONProtocolSuite file. This was implemented to satisfy the improvement request in the JIRA  issue SPARK-9036.

Author: Ben <be...@gmail.com>

Closes #7555 from NamelessAnalyst/master and squashes the following commits:

fb4e3cc [Ben] Update JSON Protocol and tests
aa69517 [Ben] Update JSON Protocol and tests --Corrected Stage Attempt to Stage Attempt ID
33e5774 [Ben] Update JSON Protocol Tests
3f237e7 [Ben] Update JSON Protocol Tests
84ca798 [Ben] Update JSON Protocol Tests
cde57a0 [Ben] Update JSON Protocol Tests
8049600 [Ben] Update JSON Protocol Tests
c5bc061 [Ben] Update JSON Protocol Tests
6f25785 [Ben] Merge remote-tracking branch 'origin/master'
df2a609 [Ben] Update JSON Protocol
dcda80b [Ben] Update JSON Protocol


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f67da43c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f67da43c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f67da43c

Branch: refs/heads/master
Commit: f67da43c394c27ceb4e6bfd49e81be05e406aa29
Parents: 6592a60
Author: Ben <be...@gmail.com>
Authored: Tue Jul 21 09:51:13 2015 -0700
Committer: Andrew Or <an...@databricks.com>
Committed: Tue Jul 21 09:51:13 2015 -0700

----------------------------------------------------------------------
 .../org/apache/spark/util/JsonProtocol.scala    | 31 ++++++++-
 .../apache/spark/util/JsonProtocolSuite.scala   | 69 +++++++++++++++++++-
 2 files changed, 96 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/f67da43c/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index adf69a4..a078f14 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -92,8 +92,8 @@ private[spark] object JsonProtocol {
         executorRemovedToJson(executorRemoved)
       case logStart: SparkListenerLogStart =>
         logStartToJson(logStart)
-      // These aren't used, but keeps compiler happy
-      case SparkListenerExecutorMetricsUpdate(_, _) => JNothing
+      case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
+        executorMetricsUpdateToJson(metricsUpdate)
     }
   }
 
@@ -224,6 +224,19 @@ private[spark] object JsonProtocol {
     ("Spark Version" -> SPARK_VERSION)
   }
 
+  def executorMetricsUpdateToJson(metricsUpdate: SparkListenerExecutorMetricsUpdate): JValue = {
+    val execId = metricsUpdate.execId
+    val taskMetrics = metricsUpdate.taskMetrics
+    ("Event" -> Utils.getFormattedClassName(metricsUpdate)) ~
+    ("Executor ID" -> execId) ~
+    ("Metrics Updated" -> taskMetrics.map { case (taskId, stageId, stageAttemptId, metrics) =>
+      ("Task ID" -> taskId) ~
+      ("Stage ID" -> stageId) ~
+      ("Stage Attempt ID" -> stageAttemptId) ~
+      ("Task Metrics" -> taskMetricsToJson(metrics))
+    })
+  }
+
   /** ------------------------------------------------------------------- *
    * JSON serialization methods for classes SparkListenerEvents depend on |
    * -------------------------------------------------------------------- */
@@ -463,6 +476,7 @@ private[spark] object JsonProtocol {
     val executorAdded = Utils.getFormattedClassName(SparkListenerExecutorAdded)
     val executorRemoved = Utils.getFormattedClassName(SparkListenerExecutorRemoved)
     val logStart = Utils.getFormattedClassName(SparkListenerLogStart)
+    val metricsUpdate = Utils.getFormattedClassName(SparkListenerExecutorMetricsUpdate)
 
     (json \ "Event").extract[String] match {
       case `stageSubmitted` => stageSubmittedFromJson(json)
@@ -481,6 +495,7 @@ private[spark] object JsonProtocol {
       case `executorAdded` => executorAddedFromJson(json)
       case `executorRemoved` => executorRemovedFromJson(json)
       case `logStart` => logStartFromJson(json)
+      case `metricsUpdate` => executorMetricsUpdateFromJson(json)
     }
   }
 
@@ -598,6 +613,18 @@ private[spark] object JsonProtocol {
     SparkListenerLogStart(sparkVersion)
   }
 
+  def executorMetricsUpdateFromJson(json: JValue): SparkListenerExecutorMetricsUpdate = {
+    val execInfo = (json \ "Executor ID").extract[String]
+    val taskMetrics = (json \ "Metrics Updated").extract[List[JValue]].map { json =>
+      val taskId = (json \ "Task ID").extract[Long]
+      val stageId = (json \ "Stage ID").extract[Int]
+      val stageAttemptId = (json \ "Stage Attempt ID").extract[Int]
+      val metrics = taskMetricsFromJson(json \ "Task Metrics")
+      (taskId, stageId, stageAttemptId, metrics)
+    }
+    SparkListenerExecutorMetricsUpdate(execInfo, taskMetrics)
+  }
+
   /** --------------------------------------------------------------------- *
    * JSON deserialization methods for classes SparkListenerEvents depend on |
    * ---------------------------------------------------------------------- */

http://git-wip-us.apache.org/repos/asf/spark/blob/f67da43c/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index e0ef9c7..dde95f3 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -83,6 +83,9 @@ class JsonProtocolSuite extends SparkFunSuite {
     val executorAdded = SparkListenerExecutorAdded(executorAddedTime, "exec1",
       new ExecutorInfo("Hostee.awesome.com", 11, logUrlMap))
     val executorRemoved = SparkListenerExecutorRemoved(executorRemovedTime, "exec2", "test reason")
+    val executorMetricsUpdate = SparkListenerExecutorMetricsUpdate("exec3", Seq(
+      (1L, 2, 3, makeTaskMetrics(300L, 400L, 500L, 600L, 700, 800,
+        hasHadoopInput = true, hasOutput = true))))
 
     testEvent(stageSubmitted, stageSubmittedJsonString)
     testEvent(stageCompleted, stageCompletedJsonString)
@@ -102,6 +105,7 @@ class JsonProtocolSuite extends SparkFunSuite {
     testEvent(applicationEnd, applicationEndJsonString)
     testEvent(executorAdded, executorAddedJsonString)
     testEvent(executorRemoved, executorRemovedJsonString)
+    testEvent(executorMetricsUpdate, executorMetricsUpdateJsonString)
   }
 
   test("Dependent Classes") {
@@ -440,10 +444,20 @@ class JsonProtocolSuite extends SparkFunSuite {
       case (e1: SparkListenerEnvironmentUpdate, e2: SparkListenerEnvironmentUpdate) =>
         assertEquals(e1.environmentDetails, e2.environmentDetails)
       case (e1: SparkListenerExecutorAdded, e2: SparkListenerExecutorAdded) =>
-        assert(e1.executorId == e1.executorId)
+        assert(e1.executorId === e1.executorId)
         assertEquals(e1.executorInfo, e2.executorInfo)
       case (e1: SparkListenerExecutorRemoved, e2: SparkListenerExecutorRemoved) =>
-        assert(e1.executorId == e1.executorId)
+        assert(e1.executorId === e1.executorId)
+      case (e1: SparkListenerExecutorMetricsUpdate, e2: SparkListenerExecutorMetricsUpdate) =>
+        assert(e1.execId === e2.execId)
+        assertSeqEquals[(Long, Int, Int, TaskMetrics)](e1.taskMetrics, e2.taskMetrics, (a, b) => {
+          val (taskId1, stageId1, stageAttemptId1, metrics1) = a
+          val (taskId2, stageId2, stageAttemptId2, metrics2) = b
+          assert(taskId1 === taskId2)
+          assert(stageId1 === stageId2)
+          assert(stageAttemptId1 === stageAttemptId2)
+          assertEquals(metrics1, metrics2)
+        })
       case (e1, e2) =>
         assert(e1 === e2)
       case _ => fail("Events don't match in types!")
@@ -1598,4 +1612,55 @@ class JsonProtocolSuite extends SparkFunSuite {
       |  "Removed Reason": "test reason"
       |}
     """
+
+  private val executorMetricsUpdateJsonString =
+  s"""
+     |{
+     |  "Event": "SparkListenerExecutorMetricsUpdate",
+     |  "Executor ID": "exec3",
+     |  "Metrics Updated": [
+     |  {
+     |    "Task ID": 1,
+     |    "Stage ID": 2,
+     |    "Stage Attempt ID": 3,
+     |    "Task Metrics": {
+     |    "Host Name": "localhost",
+     |    "Executor Deserialize Time": 300,
+     |    "Executor Run Time": 400,
+     |    "Result Size": 500,
+     |    "JVM GC Time": 600,
+     |    "Result Serialization Time": 700,
+     |    "Memory Bytes Spilled": 800,
+     |    "Disk Bytes Spilled": 0,
+     |    "Input Metrics": {
+     |      "Data Read Method": "Hadoop",
+     |      "Bytes Read": 2100,
+     |      "Records Read": 21
+     |    },
+     |    "Output Metrics": {
+     |      "Data Write Method": "Hadoop",
+     |      "Bytes Written": 1200,
+     |      "Records Written": 12
+     |    },
+     |    "Updated Blocks": [
+     |      {
+     |        "Block ID": "rdd_0_0",
+     |        "Status": {
+     |          "Storage Level": {
+     |            "Use Disk": true,
+     |            "Use Memory": true,
+     |            "Use ExternalBlockStore": false,
+     |            "Deserialized": false,
+     |            "Replication": 2
+     |          },
+     |          "Memory Size": 0,
+     |          "ExternalBlockStore Size": 0,
+     |          "Disk Size": 0
+     |        }
+     |      }
+     |    ]
+     |  }
+     |  }]
+     |}
+   """.stripMargin
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org