You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/06/16 07:59:26 UTC

[GitHub] [spark] LuciferYang commented on a diff in pull request #36885: [SPARK-39489][CORE] Improve event logging JsonProtocol performance by using Jackson instead of Json4s

LuciferYang commented on code in PR #36885:
URL: https://github.com/apache/spark/pull/36885#discussion_r898806559


##########
core/src/main/scala/org/apache/spark/util/JsonProtocol.scala:
##########
@@ -731,183 +925,180 @@ private[spark] object JsonProtocol {
     SparkListenerJobStart(jobId, submissionTime, stageInfos, properties)
   }
 
-  def jobEndFromJson(json: JValue): SparkListenerJobEnd = {
-    val jobId = (json \ "Job ID").extract[Int]
+  def jobEndFromJson(json: JsonNode): SparkListenerJobEnd = {
+    val jobId = json.get("Job ID").intValue
     val completionTime =
-      jsonOption(json \ "Completion Time").map(_.extract[Long]).getOrElse(-1L)
-    val jobResult = jobResultFromJson(json \ "Job Result")
+      jsonOption(json.get("Completion Time")).map(_.longValue).getOrElse(-1L)
+    val jobResult = jobResultFromJson(json.get("Job Result"))
     SparkListenerJobEnd(jobId, completionTime, jobResult)
   }
 
-  def resourceProfileAddedFromJson(json: JValue): SparkListenerResourceProfileAdded = {
-    val profId = (json \ "Resource Profile Id").extract[Int]
-    val executorReqs = executorResourceRequestMapFromJson(json \ "Executor Resource Requests")
-    val taskReqs = taskResourceRequestMapFromJson(json \ "Task Resource Requests")
+  def resourceProfileAddedFromJson(json: JsonNode): SparkListenerResourceProfileAdded = {
+    val profId = json.get("Resource Profile Id").intValue
+    val executorReqs = executorResourceRequestMapFromJson(json.get("Executor Resource Requests"))
+    val taskReqs = taskResourceRequestMapFromJson(json.get("Task Resource Requests"))
     val rp = new ResourceProfile(executorReqs.toMap, taskReqs.toMap)
     rp.setResourceProfileId(profId)
     SparkListenerResourceProfileAdded(rp)
   }
 
-  def executorResourceRequestFromJson(json: JValue): ExecutorResourceRequest = {
-    val rName = (json \ "Resource Name").extract[String]
-    val amount = (json \ "Amount").extract[Int]
-    val discoveryScript = (json \ "Discovery Script").extract[String]
-    val vendor = (json \ "Vendor").extract[String]
+  def executorResourceRequestFromJson(json: JsonNode): ExecutorResourceRequest = {
+    val rName = json.get("Resource Name").textValue
+    val amount = json.get("Amount").intValue
+    val discoveryScript = json.get("Discovery Script").textValue
+    val vendor = json.get("Vendor").textValue
     new ExecutorResourceRequest(rName, amount, discoveryScript, vendor)
   }
 
-  def taskResourceRequestFromJson(json: JValue): TaskResourceRequest = {
-    val rName = (json \ "Resource Name").extract[String]
-    val amount = (json \ "Amount").extract[Int]
+  def taskResourceRequestFromJson(json: JsonNode): TaskResourceRequest = {
+    val rName = json.get("Resource Name").textValue
+    val amount = json.get("Amount").intValue
     new TaskResourceRequest(rName, amount)
   }
 
-  def taskResourceRequestMapFromJson(json: JValue): Map[String, TaskResourceRequest] = {
-    val jsonFields = json.asInstanceOf[JObject].obj
-    jsonFields.collect { case JField(k, v) =>
-      val req = taskResourceRequestFromJson(v)
-      (k, req)
+  def taskResourceRequestMapFromJson(json: JsonNode): Map[String, TaskResourceRequest] = {
+    json.fields().asScala.collect { case field =>
+      val req = taskResourceRequestFromJson(field.getValue)
+      (field.getKey, req)
     }.toMap
   }
 
-  def executorResourceRequestMapFromJson(json: JValue): Map[String, ExecutorResourceRequest] = {
-    val jsonFields = json.asInstanceOf[JObject].obj
-    jsonFields.collect { case JField(k, v) =>
-      val req = executorResourceRequestFromJson(v)
-      (k, req)
+  def executorResourceRequestMapFromJson(json: JsonNode): Map[String, ExecutorResourceRequest] = {
+    json.fields().asScala.collect { case field =>
+      val req = executorResourceRequestFromJson(field.getValue)
+      (field.getKey, req)
     }.toMap
   }
 
-  def environmentUpdateFromJson(json: JValue): SparkListenerEnvironmentUpdate = {
+  def environmentUpdateFromJson(json: JsonNode): SparkListenerEnvironmentUpdate = {
     // For compatible with previous event logs
-    val hadoopProperties = jsonOption(json \ "Hadoop Properties").map(mapFromJson(_).toSeq)
+    val hadoopProperties = jsonOption(json.get("Hadoop Properties")).map(mapFromJson(_).toSeq)
       .getOrElse(Seq.empty)
-    val metricsProperties = jsonOption(json \ "Metrics Properties").map(mapFromJson(_).toSeq)
+    val metricsProperties = jsonOption(json.get("Metrics Properties")).map(mapFromJson(_).toSeq)
       .getOrElse(Seq.empty)
     val environmentDetails = Map[String, Seq[(String, String)]](
-      "JVM Information" -> mapFromJson(json \ "JVM Information").toSeq,
-      "Spark Properties" -> mapFromJson(json \ "Spark Properties").toSeq,
+      "JVM Information" -> mapFromJson(json.get("JVM Information")).toSeq,
+      "Spark Properties" -> mapFromJson(json.get("Spark Properties")).toSeq,
       "Hadoop Properties" -> hadoopProperties,
-      "System Properties" -> mapFromJson(json \ "System Properties").toSeq,
+      "System Properties" -> mapFromJson(json.get("System Properties")).toSeq,
       "Metrics Properties" -> metricsProperties,
-      "Classpath Entries" -> mapFromJson(json \ "Classpath Entries").toSeq)
+      "Classpath Entries" -> mapFromJson(json.get("Classpath Entries")).toSeq)
     SparkListenerEnvironmentUpdate(environmentDetails)
   }
 
-  def blockManagerAddedFromJson(json: JValue): SparkListenerBlockManagerAdded = {
-    val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
-    val maxMem = (json \ "Maximum Memory").extract[Long]
-    val time = jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
-    val maxOnHeapMem = jsonOption(json \ "Maximum Onheap Memory").map(_.extract[Long])
-    val maxOffHeapMem = jsonOption(json \ "Maximum Offheap Memory").map(_.extract[Long])
+  def blockManagerAddedFromJson(json: JsonNode): SparkListenerBlockManagerAdded = {
+    val blockManagerId = blockManagerIdFromJson(json.get("Block Manager ID"))
+    val maxMem = json.get("Maximum Memory").longValue
+    val time = jsonOption(json.get("Timestamp")).map(_.longValue).getOrElse(-1L)
+    val maxOnHeapMem = jsonOption(json.get("Maximum Onheap Memory")).map(_.longValue)
+    val maxOffHeapMem = jsonOption(json.get("Maximum Offheap Memory")).map(_.longValue)
     SparkListenerBlockManagerAdded(time, blockManagerId, maxMem, maxOnHeapMem, maxOffHeapMem)
   }
 
-  def blockManagerRemovedFromJson(json: JValue): SparkListenerBlockManagerRemoved = {
-    val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
-    val time = jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
+  def blockManagerRemovedFromJson(json: JsonNode): SparkListenerBlockManagerRemoved = {
+    val blockManagerId = blockManagerIdFromJson(json.get("Block Manager ID"))
+    val time = jsonOption(json.get("Timestamp")).map(_.longValue).getOrElse(-1L)
     SparkListenerBlockManagerRemoved(time, blockManagerId)
   }
 
-  def unpersistRDDFromJson(json: JValue): SparkListenerUnpersistRDD = {
-    SparkListenerUnpersistRDD((json \ "RDD ID").extract[Int])
+  def unpersistRDDFromJson(json: JsonNode): SparkListenerUnpersistRDD = {
+    SparkListenerUnpersistRDD(json.get("RDD ID").intValue)
   }
 
-  def applicationStartFromJson(json: JValue): SparkListenerApplicationStart = {
-    val appName = (json \ "App Name").extract[String]
-    val appId = jsonOption(json \ "App ID").map(_.extract[String])
-    val time = (json \ "Timestamp").extract[Long]
-    val sparkUser = (json \ "User").extract[String]
-    val appAttemptId = jsonOption(json \ "App Attempt ID").map(_.extract[String])
-    val driverLogs = jsonOption(json \ "Driver Logs").map(mapFromJson)
-    val driverAttributes = jsonOption(json \ "Driver Attributes").map(mapFromJson)
+  def applicationStartFromJson(json: JsonNode): SparkListenerApplicationStart = {
+    val appName = json.get("App Name").textValue
+    val appId = jsonOption(json.get("App ID")).map(_.asText())
+    val time = json.get("Timestamp").longValue
+    val sparkUser = json.get("User").textValue
+    val appAttemptId = jsonOption(json.get("App Attempt ID")).map(_.asText())
+    val driverLogs = jsonOption(json.get("Driver Logs")).map(mapFromJson)
+    val driverAttributes = jsonOption(json.get("Driver Attributes")).map(mapFromJson)
     SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId, driverLogs,
       driverAttributes)
   }
 
-  def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = {
-    SparkListenerApplicationEnd((json \ "Timestamp").extract[Long])
+  def applicationEndFromJson(json: JsonNode): SparkListenerApplicationEnd = {
+    SparkListenerApplicationEnd(json.get("Timestamp").longValue)
   }
 
-  def executorAddedFromJson(json: JValue): SparkListenerExecutorAdded = {
-    val time = (json \ "Timestamp").extract[Long]
-    val executorId = (json \ "Executor ID").extract[String]
-    val executorInfo = executorInfoFromJson(json \ "Executor Info")
+  def executorAddedFromJson(json: JsonNode): SparkListenerExecutorAdded = {
+    val time = json.get("Timestamp").longValue
+    val executorId = json.get("Executor ID").textValue
+    val executorInfo = executorInfoFromJson(json.get("Executor Info"))
     SparkListenerExecutorAdded(time, executorId, executorInfo)
   }
 
-  def executorRemovedFromJson(json: JValue): SparkListenerExecutorRemoved = {
-    val time = (json \ "Timestamp").extract[Long]
-    val executorId = (json \ "Executor ID").extract[String]
-    val reason = (json \ "Removed Reason").extract[String]
+  def executorRemovedFromJson(json: JsonNode): SparkListenerExecutorRemoved = {
+    val time = json.get("Timestamp").longValue
+    val executorId = json.get("Executor ID").textValue
+    val reason = json.get("Removed Reason").textValue
     SparkListenerExecutorRemoved(time, executorId, reason)
   }
 
-  def logStartFromJson(json: JValue): SparkListenerLogStart = {
-    val sparkVersion = (json \ "Spark Version").extract[String]
+  def logStartFromJson(json: JsonNode): SparkListenerLogStart = {
+    val sparkVersion = json.get("Spark Version").textValue
     SparkListenerLogStart(sparkVersion)
   }
 
-  def executorMetricsUpdateFromJson(json: JValue): SparkListenerExecutorMetricsUpdate = {
-    val execInfo = (json \ "Executor ID").extract[String]
-    val accumUpdates = (json \ "Metrics Updated").extract[List[JValue]].map { json =>
-      val taskId = (json \ "Task ID").extract[Long]
-      val stageId = (json \ "Stage ID").extract[Int]
-      val stageAttemptId = (json \ "Stage Attempt ID").extract[Int]
+  def executorMetricsUpdateFromJson(json: JsonNode): SparkListenerExecutorMetricsUpdate = {
+    val execInfo = json.get("Executor ID").textValue
+    val accumUpdates = json.get("Metrics Updated").elements.asScala.map { json =>
+      val taskId = json.get("Task ID").longValue
+      val stageId = json.get("Stage ID").intValue
+      val stageAttemptId = json.get("Stage Attempt ID").intValue
       val updates =
-        (json \ "Accumulator Updates").extract[List[JValue]].map(accumulableInfoFromJson)
+        json.get("Accumulator Updates").elements.asScala.map(accumulableInfoFromJson).toArray.toSeq
       (taskId, stageId, stageAttemptId, updates)
-    }
-    val executorUpdates = (json \ "Executor Metrics Updated") match {
-      case JNothing => Map.empty[(Int, Int), ExecutorMetrics]
-      case value: JValue => value.extract[List[JValue]].map { json =>
-        val stageId = (json \ "Stage ID").extract[Int]
-        val stageAttemptId = (json \ "Stage Attempt ID").extract[Int]
-        val executorMetrics = executorMetricsFromJson(json \ "Executor Metrics")
+    }.toArray.toSeq
+    val executorUpdates = jsonOption(json.get("Executor Metrics Updated")).map { value =>
+      value.elements.asScala.map { json =>
+        val stageId = json.get("Stage ID").intValue
+        val stageAttemptId = json.get("Stage Attempt ID").intValue
+        val executorMetrics = executorMetricsFromJson(json.get("Executor Metrics"))
         ((stageId, stageAttemptId) -> executorMetrics)
       }.toMap
-    }
+    }.getOrElse(Map.empty)

Review Comment:
   seems should be `Map.empty[(Int, Int), ExecutorMetrics]`, otherwise compilation will fail with `Scala-2.13`:
   
   ```
   [error] /home/runner/work/spark/spark/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala:1061:64: type mismatch;
   [error]  found   : scala.collection.Map[_1,org.apache.spark.executor.ExecutorMetrics] where type _1 <: (Int, Int)
   [error]  required: scala.collection.Map[(Int, Int),org.apache.spark.executor.ExecutorMetrics]
   [error] Note: _1 <: (Int, Int), but trait Map is invariant in type K.
   [error] You may wish to investigate a wildcard type such as `_ <: (Int, Int)`. (SLS 3.2.10)
   [error]     SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates, executorUpdates)
   [error]                                                                ^
   [error] one error found
   [error] (core / Compile / compileIncremental) Compilation failed
   [error] Total time: 136 s (02:16), completed Jun 16, 2022 2:07:49 AM
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org