You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/06/18 19:43:55 UTC

[GitHub] [spark] JoshRosen commented on a diff in pull request #36885: [WIP][SPARK-39489][CORE] Improve event logging JsonProtocol performance by using Jackson instead of Json4s

JoshRosen commented on code in PR #36885:
URL: https://github.com/apache/spark/pull/36885#discussion_r901009402


##########
core/src/main/scala/org/apache/spark/util/JsonProtocol.scala:
##########
@@ -360,255 +460,342 @@ private[spark] object JsonProtocol {
    *
    * The behavior here must match that of [[accumValueFromJson]]. Exposed for testing.
    */
-  private[util] def accumValueToJson(name: Option[String], value: Any): JValue = {
+  private[util] def accumValueToJson(
+      name: Option[String],
+      value: Any,
+      g: JsonGenerator,
+      fieldName: Option[String] = None): Unit = {
     if (name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))) {
       value match {
-        case v: Int => JInt(v)
-        case v: Long => JInt(v)
+        case v: Int =>
+          fieldName.foreach(g.writeFieldName)
+          g.writeNumber(v)
+        case v: Long =>
+          fieldName.foreach(g.writeFieldName)
+          g.writeNumber(v)
         // We only have 3 kind of internal accumulator types, so if it's not int or long, it must be
         // the blocks accumulator, whose type is `java.util.List[(BlockId, BlockStatus)]`
         case v: java.util.List[_] =>
-          JArray(v.asScala.toList.flatMap {
+          fieldName.foreach(g.writeFieldName)
+          g.writeStartArray()
+          v.asScala.foreach {
             case (id: BlockId, status: BlockStatus) =>
-              Some(
-                ("Block ID" -> id.toString) ~
-                ("Status" -> blockStatusToJson(status))
-              )
+              g.writeStartObject()
+              g.writeStringField("Block ID", id.toString)
+              g.writeFieldName("Status")
+              blockStatusToJson(status, g)
+              g.writeEndObject()
             case _ =>
               // Ignore unsupported types. A user may put `METRICS_PREFIX` in the name. We should
               // not crash.
-              None
-          })
+          }
+          g.writeEndArray()
         case _ =>
           // Ignore unsupported types. A user may put `METRICS_PREFIX` in the name. We should not
           // crash.
-          JNothing
       }
     } else {
       // For all external accumulators, just use strings
-      JString(value.toString)
+      fieldName.foreach(g.writeFieldName)
+      g.writeString(value.toString)
     }
   }
 
-  def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = {
-    val shuffleReadMetrics: JValue =
-      ("Remote Blocks Fetched" -> taskMetrics.shuffleReadMetrics.remoteBlocksFetched) ~
-        ("Local Blocks Fetched" -> taskMetrics.shuffleReadMetrics.localBlocksFetched) ~
-        ("Fetch Wait Time" -> taskMetrics.shuffleReadMetrics.fetchWaitTime) ~
-        ("Remote Bytes Read" -> taskMetrics.shuffleReadMetrics.remoteBytesRead) ~
-        ("Remote Bytes Read To Disk" -> taskMetrics.shuffleReadMetrics.remoteBytesReadToDisk) ~
-        ("Local Bytes Read" -> taskMetrics.shuffleReadMetrics.localBytesRead) ~
-        ("Total Records Read" -> taskMetrics.shuffleReadMetrics.recordsRead)
-    val shuffleWriteMetrics: JValue =
-      ("Shuffle Bytes Written" -> taskMetrics.shuffleWriteMetrics.bytesWritten) ~
-        ("Shuffle Write Time" -> taskMetrics.shuffleWriteMetrics.writeTime) ~
-        ("Shuffle Records Written" -> taskMetrics.shuffleWriteMetrics.recordsWritten)
-    val inputMetrics: JValue =
-      ("Bytes Read" -> taskMetrics.inputMetrics.bytesRead) ~
-        ("Records Read" -> taskMetrics.inputMetrics.recordsRead)
-    val outputMetrics: JValue =
-      ("Bytes Written" -> taskMetrics.outputMetrics.bytesWritten) ~
-        ("Records Written" -> taskMetrics.outputMetrics.recordsWritten)
-    val updatedBlocks =
-      JArray(taskMetrics.updatedBlockStatuses.toList.map { case (id, status) =>
-        ("Block ID" -> id.toString) ~
-          ("Status" -> blockStatusToJson(status))
-      })
-    ("Executor Deserialize Time" -> taskMetrics.executorDeserializeTime) ~
-    ("Executor Deserialize CPU Time" -> taskMetrics.executorDeserializeCpuTime) ~
-    ("Executor Run Time" -> taskMetrics.executorRunTime) ~
-    ("Executor CPU Time" -> taskMetrics.executorCpuTime) ~
-    ("Peak Execution Memory" -> taskMetrics.peakExecutionMemory) ~
-    ("Result Size" -> taskMetrics.resultSize) ~
-    ("JVM GC Time" -> taskMetrics.jvmGCTime) ~
-    ("Result Serialization Time" -> taskMetrics.resultSerializationTime) ~
-    ("Memory Bytes Spilled" -> taskMetrics.memoryBytesSpilled) ~
-    ("Disk Bytes Spilled" -> taskMetrics.diskBytesSpilled) ~
-    ("Shuffle Read Metrics" -> shuffleReadMetrics) ~
-    ("Shuffle Write Metrics" -> shuffleWriteMetrics) ~
-    ("Input Metrics" -> inputMetrics) ~
-    ("Output Metrics" -> outputMetrics) ~
-    ("Updated Blocks" -> updatedBlocks)
+  def taskMetricsToJson(taskMetrics: TaskMetrics, g: JsonGenerator): Unit = {
+    def writeShuffleReadMetrics(): Unit = {
+      g.writeStartObject()
+      g.writeNumberField(
+        "Remote Blocks Fetched", taskMetrics.shuffleReadMetrics.remoteBlocksFetched)
+      g.writeNumberField("Local Blocks Fetched", taskMetrics.shuffleReadMetrics.localBlocksFetched)
+      g.writeNumberField("Fetch Wait Time", taskMetrics.shuffleReadMetrics.fetchWaitTime)
+      g.writeNumberField("Remote Bytes Read", taskMetrics.shuffleReadMetrics.remoteBytesRead)
+      g.writeNumberField(
+        "Remote Bytes Read To Disk", taskMetrics.shuffleReadMetrics.remoteBytesReadToDisk)
+      g.writeNumberField("Local Bytes Read", taskMetrics.shuffleReadMetrics.localBytesRead)
+      g.writeNumberField("Total Records Read", taskMetrics.shuffleReadMetrics.recordsRead)
+      g.writeEndObject()
+    }
+    def writeShuffleWriteMetrics(): Unit = {
+      g.writeStartObject()
+      g.writeNumberField("Shuffle Bytes Written", taskMetrics.shuffleWriteMetrics.bytesWritten)
+      g.writeNumberField("Shuffle Write Time", taskMetrics.shuffleWriteMetrics.writeTime)
+      g.writeNumberField("Shuffle Records Written", taskMetrics.shuffleWriteMetrics.recordsWritten)
+      g.writeEndObject()
+    }
+    def writeInputMetrics(): Unit = {
+      g.writeStartObject()
+      g.writeNumberField("Bytes Read", taskMetrics.inputMetrics.bytesRead)
+      g.writeNumberField("Records Read", taskMetrics.inputMetrics.recordsRead)
+      g.writeEndObject()
+    }
+    def writeOutputMetrics(): Unit = {
+      g.writeStartObject()
+      g.writeNumberField("Bytes Written", taskMetrics.outputMetrics.bytesWritten)
+      g.writeNumberField("Records Written", taskMetrics.outputMetrics.recordsWritten)
+      g.writeEndObject()
+    }
+    def writeUpdatedBlocks(): Unit = {
+      g.writeStartArray()
+      taskMetrics.updatedBlockStatuses.foreach { case (id, status) =>
+        g.writeStartObject()
+        g.writeStringField("Block ID", id.toString)
+        g.writeFieldName("Status")
+        blockStatusToJson(status, g)
+        g.writeEndObject()
+      }
+      g.writeEndArray()
+    }
+
+    g.writeStartObject()
+    g.writeNumberField("Executor Deserialize Time", taskMetrics.executorDeserializeTime)
+    g.writeNumberField("Executor Deserialize CPU Time", taskMetrics.executorDeserializeCpuTime)
+    g.writeNumberField("Executor Run Time", taskMetrics.executorRunTime)
+    g.writeNumberField("Executor CPU Time", taskMetrics.executorCpuTime)
+    g.writeNumberField("Peak Execution Memory", taskMetrics.peakExecutionMemory)
+    g.writeNumberField("Result Size", taskMetrics.resultSize)
+    g.writeNumberField("JVM GC Time", taskMetrics.jvmGCTime)
+    g.writeNumberField("Result Serialization Time", taskMetrics.resultSerializationTime)
+    g.writeNumberField("Memory Bytes Spilled", taskMetrics.memoryBytesSpilled)
+    g.writeNumberField("Disk Bytes Spilled", taskMetrics.diskBytesSpilled)
+    g.writeFieldName("Shuffle Read Metrics")
+    writeShuffleReadMetrics()
+    g.writeFieldName("Shuffle Write Metrics")
+    writeShuffleWriteMetrics()
+    g.writeFieldName("Input Metrics")
+    writeInputMetrics()
+    g.writeFieldName("Output Metrics")
+    writeOutputMetrics()
+    g.writeFieldName("Updated Blocks")
+    writeUpdatedBlocks()
+    g.writeEndObject()
   }
 
   /** Convert executor metrics to JSON. */
-  def executorMetricsToJson(executorMetrics: ExecutorMetrics): JValue = {
-    val metrics = ExecutorMetricType.metricToOffset.map { case (m, _) =>
-      JField(m, executorMetrics.getMetricValue(m))
+  def executorMetricsToJson(executorMetrics: ExecutorMetrics, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    ExecutorMetricType.metricToOffset.foreach { case (m, _) =>
+      g.writeNumberField(m, executorMetrics.getMetricValue(m))
     }
-    JObject(metrics.toSeq: _*)
+    g.writeEndObject()
   }
 
-  def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = {
-    val reason = Utils.getFormattedClassName(taskEndReason)
-    val json: JObject = taskEndReason match {
+  def taskEndReasonToJson(taskEndReason: TaskEndReason, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Reason", Utils.getFormattedClassName(taskEndReason))
+    taskEndReason match {
       case fetchFailed: FetchFailed =>
-        val blockManagerAddress = Option(fetchFailed.bmAddress).
-          map(blockManagerIdToJson).getOrElse(JNothing)
-        ("Block Manager Address" -> blockManagerAddress) ~
-        ("Shuffle ID" -> fetchFailed.shuffleId) ~
-        ("Map ID" -> fetchFailed.mapId) ~
-        ("Map Index" -> fetchFailed.mapIndex) ~
-        ("Reduce ID" -> fetchFailed.reduceId) ~
-        ("Message" -> fetchFailed.message)
+        Option(fetchFailed.bmAddress).foreach { id =>
+          g.writeFieldName("Block Manager Address")
+          blockManagerIdToJson(id, g)
+        }
+        g.writeNumberField("Shuffle ID", fetchFailed.shuffleId)
+        g.writeNumberField("Map ID", fetchFailed.mapId)
+        g.writeNumberField("Map Index", fetchFailed.mapIndex)
+        g.writeNumberField("Reduce ID", fetchFailed.reduceId)
+        g.writeStringField("Message", fetchFailed.message)
       case exceptionFailure: ExceptionFailure =>
-        val stackTrace = stackTraceToJson(exceptionFailure.stackTrace)
-        val accumUpdates = accumulablesToJson(exceptionFailure.accumUpdates)
-        ("Class Name" -> exceptionFailure.className) ~
-        ("Description" -> exceptionFailure.description) ~
-        ("Stack Trace" -> stackTrace) ~
-        ("Full Stack Trace" -> exceptionFailure.fullStackTrace) ~
-        ("Accumulator Updates" -> accumUpdates)
+        g.writeStringField("Class Name", exceptionFailure.className)
+        g.writeStringField("Description", exceptionFailure.description)
+        g.writeFieldName("Stack Trace")
+        stackTraceToJson(exceptionFailure.stackTrace, g)
+        g.writeStringField("Full Stack Trace", exceptionFailure.fullStackTrace)
+        g.writeFieldName("Accumulator Updates")
+        accumulablesToJson(exceptionFailure.accumUpdates, g)
       case taskCommitDenied: TaskCommitDenied =>
-        ("Job ID" -> taskCommitDenied.jobID) ~
-        ("Partition ID" -> taskCommitDenied.partitionID) ~
-        ("Attempt Number" -> taskCommitDenied.attemptNumber)
+        g.writeNumberField("Job ID", taskCommitDenied.jobID)
+        g.writeNumberField("Partition ID", taskCommitDenied.partitionID)
+        g.writeNumberField("Attempt Number", taskCommitDenied.attemptNumber)
       case ExecutorLostFailure(executorId, exitCausedByApp, reason) =>
-        ("Executor ID" -> executorId) ~
-        ("Exit Caused By App" -> exitCausedByApp) ~
-        ("Loss Reason" -> reason)
+        g.writeStringField("Executor ID", executorId)
+        g.writeBooleanField("Exit Caused By App", exitCausedByApp)
+        reason.foreach(g.writeStringField("Loss Reason", _))
       case taskKilled: TaskKilled =>
-        val accumUpdates = JArray(taskKilled.accumUpdates.map(accumulableInfoToJson).toList)
-        ("Kill Reason" -> taskKilled.reason) ~
-        ("Accumulator Updates" -> accumUpdates)
-      case _ => emptyJson
+        g.writeStringField("Kill Reason", taskKilled.reason)
+        g.writeArrayFieldStart("Accumulator Updates")
+        taskKilled.accumUpdates.foreach { info =>
+          accumulableInfoToJson(info, g)
+        }
+        g.writeEndArray()
+      case _ =>
+        // no extra fields to write
     }
-    ("Reason" -> reason) ~ json
+    g.writeEndObject()
   }
 
-  def blockManagerIdToJson(blockManagerId: BlockManagerId): JValue = {
-    ("Executor ID" -> blockManagerId.executorId) ~
-    ("Host" -> blockManagerId.host) ~
-    ("Port" -> blockManagerId.port)
+  def blockManagerIdToJson(blockManagerId: BlockManagerId, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Executor ID", blockManagerId.executorId)
+    g.writeStringField("Host", blockManagerId.host)
+    g.writeNumberField("Port", blockManagerId.port)
+    g.writeEndObject()
   }
 
-  def jobResultToJson(jobResult: JobResult): JValue = {
-    val result = Utils.getFormattedClassName(jobResult)
-    val json = jobResult match {
-      case JobSucceeded => emptyJson
+  def jobResultToJson(jobResult: JobResult, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Result", Utils.getFormattedClassName(jobResult))
+    jobResult match {
       case jobFailed: JobFailed =>
-        JObject("Exception" -> exceptionToJson(jobFailed.exception))
-    }
-    ("Result" -> result) ~ json
-  }
-
-  def rddInfoToJson(rddInfo: RDDInfo): JValue = {
-    val storageLevel = storageLevelToJson(rddInfo.storageLevel)
-    val parentIds = JArray(rddInfo.parentIds.map(JInt(_)).toList)
-    ("RDD ID" -> rddInfo.id) ~
-    ("Name" -> rddInfo.name) ~
-    ("Scope" -> rddInfo.scope.map(_.toJson)) ~
-    ("Callsite" -> rddInfo.callSite) ~
-    ("Parent IDs" -> parentIds) ~
-    ("Storage Level" -> storageLevel) ~
-    ("Barrier" -> rddInfo.isBarrier) ~
-    ("DeterministicLevel" -> rddInfo.outputDeterministicLevel.toString) ~
-    ("Number of Partitions" -> rddInfo.numPartitions) ~
-    ("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~
-    ("Memory Size" -> rddInfo.memSize) ~
-    ("Disk Size" -> rddInfo.diskSize)
-  }
-
-  def storageLevelToJson(storageLevel: StorageLevel): JValue = {
-    ("Use Disk" -> storageLevel.useDisk) ~
-    ("Use Memory" -> storageLevel.useMemory) ~
-    ("Deserialized" -> storageLevel.deserialized) ~
-    ("Replication" -> storageLevel.replication)
-  }
-
-  def blockStatusToJson(blockStatus: BlockStatus): JValue = {
-    val storageLevel = storageLevelToJson(blockStatus.storageLevel)
-    ("Storage Level" -> storageLevel) ~
-    ("Memory Size" -> blockStatus.memSize) ~
-    ("Disk Size" -> blockStatus.diskSize)
-  }
-
-  def executorInfoToJson(executorInfo: ExecutorInfo): JValue = {
-    ("Host" -> executorInfo.executorHost) ~
-    ("Total Cores" -> executorInfo.totalCores) ~
-    ("Log Urls" -> mapToJson(executorInfo.logUrlMap)) ~
-    ("Attributes" -> mapToJson(executorInfo.attributes)) ~
-    ("Resources" -> resourcesMapToJson(executorInfo.resourcesInfo)) ~
-    ("Resource Profile Id" -> executorInfo.resourceProfileId) ~
-    ("Registration Time" -> executorInfo.registrationTime) ~
-    ("Request Time" -> executorInfo.requestTime)
-  }
-
-  def resourcesMapToJson(m: Map[String, ResourceInformation]): JValue = {
-    val jsonFields = m.map {
-      case (k, v) => JField(k, v.toJson)
+        g.writeFieldName("Exception")
+        exceptionToJson(jobFailed.exception, g)
+      case JobSucceeded =>
+        // Nothing else to write in case of success
     }
-    JObject(jsonFields.toList)
+    g.writeEndObject()
   }
 
-  def blockUpdatedInfoToJson(blockUpdatedInfo: BlockUpdatedInfo): JValue = {
-    ("Block Manager ID" -> blockManagerIdToJson(blockUpdatedInfo.blockManagerId)) ~
-    ("Block ID" -> blockUpdatedInfo.blockId.toString) ~
-    ("Storage Level" -> storageLevelToJson(blockUpdatedInfo.storageLevel)) ~
-    ("Memory Size" -> blockUpdatedInfo.memSize) ~
-    ("Disk Size" -> blockUpdatedInfo.diskSize)
-  }
-
-  def executorResourceRequestToJson(execReq: ExecutorResourceRequest): JValue = {
-    ("Resource Name" -> execReq.resourceName) ~
-    ("Amount" -> execReq.amount) ~
-    ("Discovery Script" -> execReq.discoveryScript) ~
-    ("Vendor" -> execReq.vendor)
-  }
-
-  def executorResourceRequestMapToJson(m: Map[String, ExecutorResourceRequest]): JValue = {
-    val jsonFields = m.map {
-      case (k, execReq) =>
-        JField(k, executorResourceRequestToJson(execReq))
+  def rddInfoToJson(rddInfo: RDDInfo, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeNumberField("RDD ID", rddInfo.id)
+    g.writeStringField("Name", rddInfo.name)
+    rddInfo.scope.foreach { s =>
+      g.writeStringField("Scope", s.toJson)
+    }
+    g.writeStringField("Callsite", rddInfo.callSite)
+    g.writeArrayFieldStart("Parent IDs")
+    rddInfo.parentIds.foreach(g.writeNumber)
+    g.writeEndArray()
+    g.writeFieldName("Storage Level")
+    storageLevelToJson(rddInfo.storageLevel, g)
+    g.writeBooleanField("Barrier", rddInfo.isBarrier)
+    g.writeStringField("DeterministicLevel", rddInfo.outputDeterministicLevel.toString)
+    g.writeNumberField("Number of Partitions", rddInfo.numPartitions)
+    g.writeNumberField("Number of Cached Partitions", rddInfo.numCachedPartitions)
+    g.writeNumberField("Memory Size", rddInfo.memSize)
+    g.writeNumberField("Disk Size", rddInfo.diskSize)
+    g.writeEndObject()
+  }
+
+  def storageLevelToJson(storageLevel: StorageLevel, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeBooleanField("Use Disk", storageLevel.useDisk)
+    g.writeBooleanField("Use Memory", storageLevel.useMemory)
+    g.writeBooleanField("Deserialized", storageLevel.deserialized)
+    g.writeNumberField("Replication", storageLevel.replication)
+    g.writeEndObject()
+  }
+
+  def blockStatusToJson(blockStatus: BlockStatus, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeFieldName("Storage Level")
+    storageLevelToJson(blockStatus.storageLevel, g)
+    g.writeNumberField("Memory Size", blockStatus.memSize)
+    g.writeNumberField("Disk Size", blockStatus.diskSize)
+    g.writeEndObject()
+  }
+
+  def executorInfoToJson(executorInfo: ExecutorInfo, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Host", executorInfo.executorHost)
+    g.writeNumberField("Total Cores", executorInfo.totalCores)
+    writeMapField("Log Urls", executorInfo.logUrlMap, g)
+    writeMapField("Attributes", executorInfo.attributes, g)
+    g.writeObjectFieldStart("Resources")
+    executorInfo.resourcesInfo.foreach { case (k, v) =>
+      g.writeFieldName(k)
+      g.writeRawValue(compact(v.toJson()))
     }
-    JObject(jsonFields.toList)
+    g.writeEndObject()
+    g.writeNumberField("Resource Profile Id", executorInfo.resourceProfileId)
+    executorInfo.registrationTime.foreach(g.writeNumberField("Registration Time", _))
+    executorInfo.requestTime.foreach(g.writeNumberField("Request Time", _))
+    g.writeEndObject()
+  }
+
+  def blockUpdatedInfoToJson(blockUpdatedInfo: BlockUpdatedInfo, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeFieldName("Block Manager ID")
+    blockManagerIdToJson(blockUpdatedInfo.blockManagerId, g)
+    g.writeStringField("Block ID", blockUpdatedInfo.blockId.toString)
+    g.writeFieldName("Storage Level")
+    storageLevelToJson(blockUpdatedInfo.storageLevel, g)
+    g.writeNumberField("Memory Size", blockUpdatedInfo.memSize)
+    g.writeNumberField("Disk Size", blockUpdatedInfo.diskSize)
+    g.writeEndObject()
+  }
+
+  def executorResourceRequestToJson(execReq: ExecutorResourceRequest, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Resource Name", execReq.resourceName)
+    g.writeNumberField("Amount", execReq.amount)
+    g.writeStringField("Discovery Script", execReq.discoveryScript)
+    g.writeStringField("Vendor", execReq.vendor)
+    g.writeEndObject()
+  }
+
+  def executorResourceRequestMapToJson(
+      m: Map[String, ExecutorResourceRequest],
+      g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    m.foreach { case (k, execReq) =>
+      g.writeFieldName(k)
+      executorResourceRequestToJson(execReq, g)
+    }
+    g.writeEndObject()
   }
 
-  def taskResourceRequestToJson(taskReq: TaskResourceRequest): JValue = {
-    ("Resource Name" -> taskReq.resourceName) ~
-    ("Amount" -> taskReq.amount)
+  def taskResourceRequestToJson(taskReq: TaskResourceRequest, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Resource Name", taskReq.resourceName)
+    g.writeNumberField("Amount", taskReq.amount)
+    g.writeEndObject()
   }
 
-  def taskResourceRequestMapToJson(m: Map[String, TaskResourceRequest]): JValue = {
-    val jsonFields = m.map {
-      case (k, taskReq) =>
-        JField(k, taskResourceRequestToJson(taskReq))
+  def taskResourceRequestMapToJson(m: Map[String, TaskResourceRequest], g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    m.foreach { case (k, taskReq) =>
+      g.writeFieldName(k)
+      taskResourceRequestToJson(taskReq, g)
     }
-    JObject(jsonFields.toList)
+    g.writeEndObject()
   }
 
   /** ------------------------------ *
    * Util JSON serialization methods |
    * ------------------------------- */
 
-  def mapToJson(m: Map[String, String]): JValue = {
-    val jsonFields = m.map { case (k, v) => JField(k, JString(v)) }
-    JObject(jsonFields.toList)
+  def writeMapField(name: String, m: Map[String, String], g: JsonGenerator): Unit = {
+    g.writeObjectFieldStart(name)
+    m.foreach { case (k, v) => g.writeStringField(k, v) }
+    g.writeEndObject()
   }
 
-  def propertiesToJson(properties: Properties): JValue = {
-    Option(properties).map { p =>
-      mapToJson(p.asScala)
-    }.getOrElse(JNothing)

Review Comment:
   Fixed in https://github.com/apache/spark/pull/36885/commits/c5e59b6d14a3b0f649df3c554e97f5ae7312b74c



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org