You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/06/17 07:28:24 UTC

[GitHub] [spark] dcoliversun commented on a diff in pull request #36885: [SPARK-39489][CORE] Improve event logging JsonProtocol performance by using Jackson instead of Json4s

dcoliversun commented on code in PR #36885:
URL: https://github.com/apache/spark/pull/36885#discussion_r899841475


##########
core/src/main/scala/org/apache/spark/util/JsonProtocol.scala:
##########
@@ -57,298 +57,398 @@ import org.apache.spark.util.Utils.weakIntern
 private[spark] object JsonProtocol {
   // TODO: Remove this file and put JSON serialization into each individual class.
 
-  private implicit val format = DefaultFormats
-
   private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
     .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
 
   /** ------------------------------------------------- *
    * JSON serialization methods for SparkListenerEvents |
    * -------------------------------------------------- */
 
-  def sparkEventToJson(event: SparkListenerEvent): JValue = {
+  def sparkEventToJsonString(event: SparkListenerEvent): String = {
+    toJsonString { generator =>
+      writeSparkEventToJson(event, generator)
+    }
+  }
+
+  def toJsonString(block: JsonGenerator => Unit): String = {
+    val baos = new ByteArrayOutputStream()
+    val generator = mapper.createGenerator(baos, JsonEncoding.UTF8)
+    block(generator)
+    generator.close()
+    new String(baos.toByteArray, StandardCharsets.UTF_8)
+  }
+
+  def writeSparkEventToJson(event: SparkListenerEvent, g: JsonGenerator): Unit = {
     event match {
       case stageSubmitted: SparkListenerStageSubmitted =>
-        stageSubmittedToJson(stageSubmitted)
+        stageSubmittedToJson(stageSubmitted, g)
       case stageCompleted: SparkListenerStageCompleted =>
-        stageCompletedToJson(stageCompleted)
+        stageCompletedToJson(stageCompleted, g)
       case taskStart: SparkListenerTaskStart =>
-        taskStartToJson(taskStart)
+        taskStartToJson(taskStart, g)
       case taskGettingResult: SparkListenerTaskGettingResult =>
-        taskGettingResultToJson(taskGettingResult)
+        taskGettingResultToJson(taskGettingResult, g)
       case taskEnd: SparkListenerTaskEnd =>
-        taskEndToJson(taskEnd)
+        taskEndToJson(taskEnd, g)
       case jobStart: SparkListenerJobStart =>
-        jobStartToJson(jobStart)
+        jobStartToJson(jobStart, g)
       case jobEnd: SparkListenerJobEnd =>
-        jobEndToJson(jobEnd)
+        jobEndToJson(jobEnd, g)
       case environmentUpdate: SparkListenerEnvironmentUpdate =>
-        environmentUpdateToJson(environmentUpdate)
+        environmentUpdateToJson(environmentUpdate, g)
       case blockManagerAdded: SparkListenerBlockManagerAdded =>
-        blockManagerAddedToJson(blockManagerAdded)
+        blockManagerAddedToJson(blockManagerAdded, g)
       case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
-        blockManagerRemovedToJson(blockManagerRemoved)
+        blockManagerRemovedToJson(blockManagerRemoved, g)
       case unpersistRDD: SparkListenerUnpersistRDD =>
-        unpersistRDDToJson(unpersistRDD)
+        unpersistRDDToJson(unpersistRDD, g)
       case applicationStart: SparkListenerApplicationStart =>
-        applicationStartToJson(applicationStart)
+        applicationStartToJson(applicationStart, g)
       case applicationEnd: SparkListenerApplicationEnd =>
-        applicationEndToJson(applicationEnd)
+        applicationEndToJson(applicationEnd, g)
       case executorAdded: SparkListenerExecutorAdded =>
-        executorAddedToJson(executorAdded)
+        executorAddedToJson(executorAdded, g)
       case executorRemoved: SparkListenerExecutorRemoved =>
-        executorRemovedToJson(executorRemoved)
+        executorRemovedToJson(executorRemoved, g)
       case logStart: SparkListenerLogStart =>
-        logStartToJson(logStart)
+        logStartToJson(logStart, g)
       case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
-        executorMetricsUpdateToJson(metricsUpdate)
+        executorMetricsUpdateToJson(metricsUpdate, g)
       case stageExecutorMetrics: SparkListenerStageExecutorMetrics =>
-        stageExecutorMetricsToJson(stageExecutorMetrics)
+        stageExecutorMetricsToJson(stageExecutorMetrics, g)
       case blockUpdate: SparkListenerBlockUpdated =>
-        blockUpdateToJson(blockUpdate)
+        blockUpdateToJson(blockUpdate, g)
       case resourceProfileAdded: SparkListenerResourceProfileAdded =>
-        resourceProfileAddedToJson(resourceProfileAdded)
-      case _ => parse(mapper.writeValueAsString(event))
+        resourceProfileAddedToJson(resourceProfileAdded, g)
+      case _ =>
+        mapper.writeValue(g, event)
     }
   }
 
-  def stageSubmittedToJson(stageSubmitted: SparkListenerStageSubmitted): JValue = {
-    val stageInfo = stageInfoToJson(stageSubmitted.stageInfo)
-    val properties = propertiesToJson(stageSubmitted.properties)
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageSubmitted) ~
-    ("Stage Info" -> stageInfo) ~
-    ("Properties" -> properties)
+  def stageSubmittedToJson(stageSubmitted: SparkListenerStageSubmitted, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageSubmitted)
+    g.writeFieldName("Stage Info")
+    stageInfoToJson(stageSubmitted.stageInfo, g)
+    g.writeFieldName("Properties")
+    propertiesToJson(stageSubmitted.properties, g)
+    g.writeEndObject()
   }
 
-  def stageCompletedToJson(stageCompleted: SparkListenerStageCompleted): JValue = {
-    val stageInfo = stageInfoToJson(stageCompleted.stageInfo)
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageCompleted) ~
-    ("Stage Info" -> stageInfo)
+  def stageCompletedToJson(stageCompleted: SparkListenerStageCompleted, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageCompleted)
+    g.writeFieldName("Stage Info")
+    stageInfoToJson(stageCompleted.stageInfo, g)
+    g.writeEndObject()
   }
 
-  def taskStartToJson(taskStart: SparkListenerTaskStart): JValue = {
-    val taskInfo = taskStart.taskInfo
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskStart) ~
-    ("Stage ID" -> taskStart.stageId) ~
-    ("Stage Attempt ID" -> taskStart.stageAttemptId) ~
-    ("Task Info" -> taskInfoToJson(taskInfo))
+  def taskStartToJson(taskStart: SparkListenerTaskStart, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskStart)
+    g.writeNumberField("Stage ID", taskStart.stageId)
+    g.writeNumberField("Stage Attempt ID", taskStart.stageAttemptId)
+    g.writeFieldName("Task Info")
+    taskInfoToJson(taskStart.taskInfo, g)
+    g.writeEndObject()
   }
 
-  def taskGettingResultToJson(taskGettingResult: SparkListenerTaskGettingResult): JValue = {
+  def taskGettingResultToJson(
+      taskGettingResult: SparkListenerTaskGettingResult,
+      g: JsonGenerator): Unit = {
     val taskInfo = taskGettingResult.taskInfo
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskGettingResult) ~
-    ("Task Info" -> taskInfoToJson(taskInfo))
-  }
-
-  def taskEndToJson(taskEnd: SparkListenerTaskEnd): JValue = {
-    val taskEndReason = taskEndReasonToJson(taskEnd.reason)
-    val taskInfo = taskEnd.taskInfo
-    val executorMetrics = taskEnd.taskExecutorMetrics
-    val taskMetrics = taskEnd.taskMetrics
-    val taskMetricsJson = if (taskMetrics != null) taskMetricsToJson(taskMetrics) else JNothing
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskEnd) ~
-    ("Stage ID" -> taskEnd.stageId) ~
-    ("Stage Attempt ID" -> taskEnd.stageAttemptId) ~
-    ("Task Type" -> taskEnd.taskType) ~
-    ("Task End Reason" -> taskEndReason) ~
-    ("Task Info" -> taskInfoToJson(taskInfo)) ~
-    ("Task Executor Metrics" -> executorMetricsToJson(executorMetrics)) ~
-    ("Task Metrics" -> taskMetricsJson)
-  }
-
-  def jobStartToJson(jobStart: SparkListenerJobStart): JValue = {
-    val properties = propertiesToJson(jobStart.properties)
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.jobStart) ~
-    ("Job ID" -> jobStart.jobId) ~
-    ("Submission Time" -> jobStart.time) ~
-    ("Stage Infos" -> jobStart.stageInfos.map(stageInfoToJson)) ~  // Added in Spark 1.2.0
-    ("Stage IDs" -> jobStart.stageIds) ~
-    ("Properties" -> properties)
-  }
-
-  def jobEndToJson(jobEnd: SparkListenerJobEnd): JValue = {
-    val jobResult = jobResultToJson(jobEnd.jobResult)
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.jobEnd) ~
-    ("Job ID" -> jobEnd.jobId) ~
-    ("Completion Time" -> jobEnd.time) ~
-    ("Job Result" -> jobResult)
-  }
-
-  def environmentUpdateToJson(environmentUpdate: SparkListenerEnvironmentUpdate): JValue = {
+    g.writeStartObject()
+    g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskGettingResult)
+    g.writeFieldName("Task Info")
+    taskInfoToJson(taskInfo, g)
+    g.writeEndObject()
+  }
+
+  def taskEndToJson(taskEnd: SparkListenerTaskEnd, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskEnd)
+    g.writeNumberField("Stage ID", taskEnd.stageId)
+    g.writeNumberField("Stage Attempt ID", taskEnd.stageAttemptId)
+    g.writeStringField("Task Type", taskEnd.taskType)
+    g.writeFieldName("Task End Reason")
+    taskEndReasonToJson(taskEnd.reason, g)
+    g.writeFieldName("Task Info")
+    taskInfoToJson(taskEnd.taskInfo, g)
+    g.writeFieldName("Task Executor Metrics")
+    executorMetricsToJson(taskEnd.taskExecutorMetrics, g)
+    Option(taskEnd.taskMetrics).foreach { m =>
+      g.writeFieldName("Task Metrics")
+      taskMetricsToJson(m, g)
+    }

Review Comment:
   Correct me if I'm wrong. Before change, if `taskEnd.taskMetrics` is null, it is `Task Metrics` -> `JNothing`. In your PR, when `taskEnd.taskMetrics` is null, `Task Metrics` field is lost. Is it by design?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org