You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by jo...@apache.org on 2022/07/03 20:50:33 UTC

[spark] branch master updated: [SPARK-39489][CORE] Improve event logging JsonProtocol performance by using Jackson instead of Json4s

This is an automated email from the ASF dual-hosted git repository.

joshrosen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f0c4f32293 [SPARK-39489][CORE] Improve event logging JsonProtocol performance by using Jackson instead of Json4s
6f0c4f32293 is described below

commit 6f0c4f322937139a42217aeee596628891f28f60
Author: Josh Rosen <jo...@databricks.com>
AuthorDate: Sun Jul 3 13:49:18 2022 -0700

    [SPARK-39489][CORE] Improve event logging JsonProtocol performance by using Jackson instead of Json4s
    
    ### What changes were proposed in this pull request?
    
    This PR improves the performance of `org.apache.spark.util.JsonProtocol` by replacing all uses of Json4s with uses of Jackson `JsonGenerator`. In my benchmarking so far, I have seen ~2x speedups for reading events and ~3x speedups for writing them.
    
    JsonProtocol is used by `EventLoggingListener` to emit JSON event logs capturing the firehose of SparkListener events. The history server uses `ReplayListenerBus` to parse the event logs and replay the event stream in order to rebuild and display a terminated Spark application's web UI.
    
    Today, JsonProtocol uses the Json4s library's ASTs when writing and reading JSON. This existing approach was chosen because we require fine-grained control over how the JSON is generated (so the listener events themselves can't be directly passed to an ObjectMapper). JsonProtocol needs to be backwards- and forwards-compatible so that a single history server application can display UIs from multiple versions of Spark: as a result, the code has special logic for handling fields which mi [...]
    
    Performance profiling revealed that JsonProtocol spends a significant proportion of its time in Json4s AST calls. Much of this overhead comes from temporary Scala collections which are allocated when constructing the ASTs or when extracting fields from them.
    
    This PR aims to improve performance by replacing all direct uses of Json4s with calls to Jackson's `JsonGenerator`, a low-level imperative API for generating JSON.
    
    As a simple example, let's consider the `SparkListenerApplicationStart` event: this produces a JSON record like:
    
    ```json
    {"Event":"SparkListenerApplicationStart","App Name":"Spark shell","App ID":"app-20220615090229-0000","Timestamp":1655308948551,"User":"joshrosen"}
    ```
    
    With Json4s, this is expressed as:
    
    ```scala
    def applicationStartToJson(applicationStart: SparkListenerApplicationStart): JValue = {
        ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.applicationStart) ~
        ("App Name" -> applicationStart.appName) ~
        ("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~
        ("Timestamp" -> applicationStart.time) ~
        ("User" -> applicationStart.sparkUser) ~
        ("App Attempt ID" -> applicationStart.appAttemptId.map(JString(_)).getOrElse(JNothing)) ~
        ("Driver Logs" -> applicationStart.driverLogs.map(mapToJson).getOrElse(JNothing)) ~
        ("Driver Attributes" -> applicationStart.driverAttributes.map(mapToJson).getOrElse(JNothing))
      }
    ```
    
    With Jackson JsonGenerator, this is expressed as:
    
    ```scala
    def applicationStartToJson(
          applicationStart: SparkListenerApplicationStart,
          g: JsonGenerator): Unit = {
        g.writeStartObject()
        g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.applicationStart)
        g.writeStringField("App Name", applicationStart.appName)
        applicationStart.appId.foreach(g.writeStringField("App ID", _))
        g.writeNumberField("Timestamp", applicationStart.time)
        g.writeStringField("User", applicationStart.sparkUser)
        applicationStart.appAttemptId.foreach(g.writeStringField("App Attempt ID", _))
        applicationStart.driverLogs.foreach(writeMapField("Driver Logs", _, g))
        applicationStart.driverAttributes.foreach(writeMapField("Driver Attributes", _, g))
        g.writeEndObject()
      }
    ```
    
    The JsonGenerator code is more verbose but is significantly faster. This example is a relatively straightforward conversion, but there's a bit of trickiness in more complex cases (such as serialization of task metrics or resource profiles). I will leave comments in the GitHub PR review to highlight some of the less straightforward cases.
    
    This is a big PR, but most of the changes were made in a semi-automated fashion with regex find-replace macros.
    
    ### Why are the changes needed?
    
    Improving JsonProtocol performance has several benefits:
    
    - Decreased history server load times, especially for long-running Spark applications.
    - Improved EventLoggingListener throughput: this reduces the likelihood that the will fall behind and drop events (which can lead to confusing and inconsistent Spark UIs).
    - Reduced resource consumption on the driver, which may improve the throughput of task scheduling.
    
    In addition, this is a stepping-stone towards eventually being able to remove our Json4s dependency:
    
    - Today Spark uses Json4s 3.x and this causes library conflicts for end users who want to upgrade to 4.x; see https://github.com/apache/spark/pull/33630 for one example.
    - To _completely_ remove Json4s we'll need to update several other parts of Spark (including code used for ML model serialization); this PR is just a first step towards that goal if we decide to pursue it.
    - In this PR, I continue to use Json4s in test code; I think it's fine to keep Json4s as a test-only dependency.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    
    ### Correctness
    
    There are two aspects to consider:
    
    - Ability for old HistoryServer versions to read new logs: I performed tests using randomly-generated events and confirmed that the old and new implementations produce JSON output that is character-for-character identical: see a [review comment](https://github.com/apache/spark/pull/36885#issuecomment-1167855113) for full details.
    - Ability to read event logs produced by old Spark versions: I believe this is covered by backwards-compatibility tests in `JsonProtocolSuite`, `SQLJsonProtocolSuite`, and `HistoryServerSuite`. I looked through the git history of `JsonProtocol` and found a few cases where PRs that modified it did not add compatibility tests: for each of these cases, I added new test cases in `JsonProtocolSuite`.
    
    ### Performance
    
    So far, I have only measured performance via laptop microbenchmarks.
    
    To measure event parsing performance, I generated a large event log by running `(1 to 5).foreach(_ => spark.range(1, 100 * 1000, 1, 100 * 1000).foreach(_ => ()))` then used the history server's logs to measure the time taken to load the UI. This PR's improvements cut the loading time from ~34 seconds to ~18 seconds.
    
    To measure event logging overheads, I used YourKit asynchronous CPU profiling to to measure `spark.range(1, 100 * 1000, 1, 100 * 1000).foreach(_ => ())`. For this benchmark, this PR's changes cut total time in `logEvent` (including callees) from ~9.5 seconds to ~2.9 seconds.
    
    Closes #36885 from JoshRosen/remove-json4s-from-jsonprotocol.
    
    Authored-by: Josh Rosen <jo...@databricks.com>
    Signed-off-by: Josh Rosen <jo...@databricks.com>
---
 .../apache/spark/deploy/history/EventFilter.scala  |    3 +-
 .../spark/resource/ResourceInformation.scala       |    4 +-
 .../spark/scheduler/EventLoggingListener.scala     |   13 +-
 .../apache/spark/scheduler/ReplayListenerBus.scala |    3 +-
 .../scala/org/apache/spark/util/JsonProtocol.scala | 1734 +++++++++++---------
 .../history/EventLogFileCompactorSuite.scala       |    3 +-
 .../spark/deploy/history/EventLogTestHelper.scala  |    3 +-
 .../deploy/history/FsHistoryProviderSuite.scala    |    7 +-
 .../scheduler/EventLoggingListenerSuite.scala      |   21 +-
 .../spark/scheduler/ReplayListenerSuite.scala      |   30 +-
 .../org/apache/spark/util/JsonProtocolSuite.scala  |  395 ++++-
 .../scala/org/apache/spark/ml/MLEventsSuite.scala  |   12 +-
 .../spark/sql/execution/SQLJsonProtocolSuite.scala |    6 +-
 .../sql/execution/metric/SQLMetricsSuite.scala     |    7 +-
 .../execution/ui/SQLAppStatusListenerSuite.scala   |    8 +-
 .../streaming/StreamingQueryListenerSuite.scala    |    6 +-
 16 files changed, 1357 insertions(+), 898 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala b/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala
index a5f2394960b..02c01a5598e 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/EventFilter.scala
@@ -21,7 +21,6 @@ import scala.io.{Codec, Source}
 import scala.util.control.NonFatal
 
 import org.apache.hadoop.fs.{FileSystem, Path}
-import org.json4s.jackson.JsonMethods.parse
 
 import org.apache.spark.deploy.history.EventFilter.FilterStatistics
 import org.apache.spark.internal.Logging
@@ -81,7 +80,7 @@ private[spark] object EventFilter extends Logging {
       lines.zipWithIndex.foreach { case (line, lineNum) =>
         try {
           val event = try {
-            Some(JsonProtocol.sparkEventFromJson(parse(line)))
+            Some(JsonProtocol.sparkEventFromJson(line))
           } catch {
             // ignore any exception occurred from unidentified json
             case NonFatal(_) =>
diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceInformation.scala b/core/src/main/scala/org/apache/spark/resource/ResourceInformation.scala
index be056e15b6d..7f7bb36512d 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceInformation.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceInformation.scala
@@ -55,7 +55,9 @@ class ResourceInformation(
 
   override def hashCode(): Int = Seq(name, addresses.toSeq).hashCode()
 
-  def toJson(): JValue = ResourceInformationJson(name, addresses).toJValue
+  // TODO(SPARK-39658): reconsider whether we want to expose a third-party library's
+  // symbols as part of a public API:
+  final def toJson(): JValue = ResourceInformationJson(name, addresses).toJValue
 }
 
 private[spark] object ResourceInformation {
diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
index cfbaa46ab68..b52a0f2f999 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala
@@ -24,8 +24,6 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 import org.apache.hadoop.conf.Configuration
-import org.json4s.JsonAST.JValue
-import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.{SPARK_VERSION, SparkConf, SparkContext}
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -66,7 +64,7 @@ private[spark] class EventLoggingListener(
     EventLogFileWriter(appId, appAttemptId, logBaseDir, sparkConf, hadoopConf)
 
   // For testing. Keep track of all JSON serialized events that have been logged.
-  private[scheduler] val loggedEvents = new mutable.ArrayBuffer[JValue]
+  private[scheduler] val loggedEvents = new mutable.ArrayBuffer[String]
 
   private val shouldLogBlockUpdates = sparkConf.get(EVENT_LOG_BLOCK_UPDATES)
   private val shouldLogStageExecutorMetrics = sparkConf.get(EVENT_LOG_STAGE_EXECUTOR_METRICS)
@@ -86,9 +84,8 @@ private[spark] class EventLoggingListener(
 
   private def initEventLog(): Unit = {
     val metadata = SparkListenerLogStart(SPARK_VERSION)
-    val eventJson = JsonProtocol.logStartToJson(metadata)
-    val metadataJson = compact(eventJson)
-    logWriter.writeEvent(metadataJson, flushLogger = true)
+    val eventJson = JsonProtocol.sparkEventToJsonString(metadata)
+    logWriter.writeEvent(eventJson, flushLogger = true)
     if (testing && loggedEvents != null) {
       loggedEvents += eventJson
     }
@@ -96,8 +93,8 @@ private[spark] class EventLoggingListener(
 
   /** Log the event as JSON. */
   private def logEvent(event: SparkListenerEvent, flushLogger: Boolean = false): Unit = {
-    val eventJson = JsonProtocol.sparkEventToJson(event)
-    logWriter.writeEvent(compact(render(eventJson)), flushLogger)
+    val eventJson = JsonProtocol.sparkEventToJsonString(event)
+    logWriter.writeEvent(eventJson, flushLogger)
     if (testing) {
       loggedEvents += eventJson
     }
diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index 60b6fe7a609..dbb4fa74ded 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -23,7 +23,6 @@ import scala.io.{Codec, Source}
 
 import com.fasterxml.jackson.core.JsonParseException
 import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException
-import org.json4s.jackson.JsonMethods._
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.scheduler.ReplayListenerBus._
@@ -86,7 +85,7 @@ private[spark] class ReplayListenerBus extends SparkListenerBus with Logging {
           currentLine = entry._1
           lineNumber = entry._2 + 1
 
-          postToAll(JsonProtocol.sparkEventFromJson(parse(currentLine)))
+          postToAll(JsonProtocol.sparkEventFromJson(currentLine))
         } catch {
           case e: ClassNotFoundException =>
             // Ignore unknown events, parse through the event log file.
diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index f0755b04bef..5820a50fb7c 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -17,17 +17,17 @@
 
 package org.apache.spark.util
 
+import java.io.ByteArrayOutputStream
+import java.nio.charset.StandardCharsets
 import java.util.{Properties, UUID}
 
 import scala.collection.JavaConverters._
 import scala.collection.Map
 
-import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.core.{JsonEncoding, JsonGenerator}
+import com.fasterxml.jackson.databind.{DeserializationFeature, JsonNode, ObjectMapper}
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
-import org.json4s.DefaultFormats
-import org.json4s.JsonAST._
-import org.json4s.JsonDSL._
-import org.json4s.jackson.JsonMethods._
+import org.json4s.jackson.JsonMethods.compact
 
 import org.apache.spark._
 import org.apache.spark.executor._
@@ -57,8 +57,6 @@ import org.apache.spark.util.Utils.weakIntern
 private[spark] object JsonProtocol {
   // TODO: Remove this file and put JSON serialization into each individual class.
 
-  private implicit val format = DefaultFormats
-
   private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
     .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
 
@@ -66,289 +64,397 @@ private[spark] object JsonProtocol {
    * JSON serialization methods for SparkListenerEvents |
    * -------------------------------------------------- */
 
-  def sparkEventToJson(event: SparkListenerEvent): JValue = {
+  def sparkEventToJsonString(event: SparkListenerEvent): String = {
+    toJsonString { generator =>
+      writeSparkEventToJson(event, generator)
+    }
+  }
+
+  def toJsonString(block: JsonGenerator => Unit): String = {
+    val baos = new ByteArrayOutputStream()
+    val generator = mapper.createGenerator(baos, JsonEncoding.UTF8)
+    block(generator)
+    generator.close()
+    baos.close()
+    new String(baos.toByteArray, StandardCharsets.UTF_8)
+  }
+
+  def writeSparkEventToJson(event: SparkListenerEvent, g: JsonGenerator): Unit = {
     event match {
       case stageSubmitted: SparkListenerStageSubmitted =>
-        stageSubmittedToJson(stageSubmitted)
+        stageSubmittedToJson(stageSubmitted, g)
       case stageCompleted: SparkListenerStageCompleted =>
-        stageCompletedToJson(stageCompleted)
+        stageCompletedToJson(stageCompleted, g)
       case taskStart: SparkListenerTaskStart =>
-        taskStartToJson(taskStart)
+        taskStartToJson(taskStart, g)
       case taskGettingResult: SparkListenerTaskGettingResult =>
-        taskGettingResultToJson(taskGettingResult)
+        taskGettingResultToJson(taskGettingResult, g)
       case taskEnd: SparkListenerTaskEnd =>
-        taskEndToJson(taskEnd)
+        taskEndToJson(taskEnd, g)
       case jobStart: SparkListenerJobStart =>
-        jobStartToJson(jobStart)
+        jobStartToJson(jobStart, g)
       case jobEnd: SparkListenerJobEnd =>
-        jobEndToJson(jobEnd)
+        jobEndToJson(jobEnd, g)
       case environmentUpdate: SparkListenerEnvironmentUpdate =>
-        environmentUpdateToJson(environmentUpdate)
+        environmentUpdateToJson(environmentUpdate, g)
       case blockManagerAdded: SparkListenerBlockManagerAdded =>
-        blockManagerAddedToJson(blockManagerAdded)
+        blockManagerAddedToJson(blockManagerAdded, g)
       case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
-        blockManagerRemovedToJson(blockManagerRemoved)
+        blockManagerRemovedToJson(blockManagerRemoved, g)
       case unpersistRDD: SparkListenerUnpersistRDD =>
-        unpersistRDDToJson(unpersistRDD)
+        unpersistRDDToJson(unpersistRDD, g)
       case applicationStart: SparkListenerApplicationStart =>
-        applicationStartToJson(applicationStart)
+        applicationStartToJson(applicationStart, g)
       case applicationEnd: SparkListenerApplicationEnd =>
-        applicationEndToJson(applicationEnd)
+        applicationEndToJson(applicationEnd, g)
       case executorAdded: SparkListenerExecutorAdded =>
-        executorAddedToJson(executorAdded)
+        executorAddedToJson(executorAdded, g)
       case executorRemoved: SparkListenerExecutorRemoved =>
-        executorRemovedToJson(executorRemoved)
+        executorRemovedToJson(executorRemoved, g)
       case logStart: SparkListenerLogStart =>
-        logStartToJson(logStart)
+        logStartToJson(logStart, g)
       case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
-        executorMetricsUpdateToJson(metricsUpdate)
+        executorMetricsUpdateToJson(metricsUpdate, g)
       case stageExecutorMetrics: SparkListenerStageExecutorMetrics =>
-        stageExecutorMetricsToJson(stageExecutorMetrics)
+        stageExecutorMetricsToJson(stageExecutorMetrics, g)
       case blockUpdate: SparkListenerBlockUpdated =>
-        blockUpdateToJson(blockUpdate)
+        blockUpdateToJson(blockUpdate, g)
       case resourceProfileAdded: SparkListenerResourceProfileAdded =>
-        resourceProfileAddedToJson(resourceProfileAdded)
-      case _ => parse(mapper.writeValueAsString(event))
+        resourceProfileAddedToJson(resourceProfileAdded, g)
+      case _ =>
+        mapper.writeValue(g, event)
     }
   }
 
-  def stageSubmittedToJson(stageSubmitted: SparkListenerStageSubmitted): JValue = {
-    val stageInfo = stageInfoToJson(stageSubmitted.stageInfo)
-    val properties = propertiesToJson(stageSubmitted.properties)
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageSubmitted) ~
-    ("Stage Info" -> stageInfo) ~
-    ("Properties" -> properties)
+  def stageSubmittedToJson(stageSubmitted: SparkListenerStageSubmitted, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageSubmitted)
+    g.writeFieldName("Stage Info")
+    stageInfoToJson(stageSubmitted.stageInfo, g)
+    Option(stageSubmitted.properties).foreach { properties =>
+      g.writeFieldName("Properties")
+      propertiesToJson(properties, g)
+    }
+    g.writeEndObject()
   }
 
-  def stageCompletedToJson(stageCompleted: SparkListenerStageCompleted): JValue = {
-    val stageInfo = stageInfoToJson(stageCompleted.stageInfo)
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageCompleted) ~
-    ("Stage Info" -> stageInfo)
+  def stageCompletedToJson(stageCompleted: SparkListenerStageCompleted, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageCompleted)
+    g.writeFieldName("Stage Info")
+    stageInfoToJson(stageCompleted.stageInfo, g)
+    g.writeEndObject()
   }
 
-  def taskStartToJson(taskStart: SparkListenerTaskStart): JValue = {
-    val taskInfo = taskStart.taskInfo
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskStart) ~
-    ("Stage ID" -> taskStart.stageId) ~
-    ("Stage Attempt ID" -> taskStart.stageAttemptId) ~
-    ("Task Info" -> taskInfoToJson(taskInfo))
+  def taskStartToJson(taskStart: SparkListenerTaskStart, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskStart)
+    g.writeNumberField("Stage ID", taskStart.stageId)
+    g.writeNumberField("Stage Attempt ID", taskStart.stageAttemptId)
+    g.writeFieldName("Task Info")
+    taskInfoToJson(taskStart.taskInfo, g)
+    g.writeEndObject()
   }
 
-  def taskGettingResultToJson(taskGettingResult: SparkListenerTaskGettingResult): JValue = {
+  def taskGettingResultToJson(
+      taskGettingResult: SparkListenerTaskGettingResult,
+      g: JsonGenerator): Unit = {
     val taskInfo = taskGettingResult.taskInfo
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskGettingResult) ~
-    ("Task Info" -> taskInfoToJson(taskInfo))
-  }
-
-  def taskEndToJson(taskEnd: SparkListenerTaskEnd): JValue = {
-    val taskEndReason = taskEndReasonToJson(taskEnd.reason)
-    val taskInfo = taskEnd.taskInfo
-    val executorMetrics = taskEnd.taskExecutorMetrics
-    val taskMetrics = taskEnd.taskMetrics
-    val taskMetricsJson = if (taskMetrics != null) taskMetricsToJson(taskMetrics) else JNothing
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskEnd) ~
-    ("Stage ID" -> taskEnd.stageId) ~
-    ("Stage Attempt ID" -> taskEnd.stageAttemptId) ~
-    ("Task Type" -> taskEnd.taskType) ~
-    ("Task End Reason" -> taskEndReason) ~
-    ("Task Info" -> taskInfoToJson(taskInfo)) ~
-    ("Task Executor Metrics" -> executorMetricsToJson(executorMetrics)) ~
-    ("Task Metrics" -> taskMetricsJson)
-  }
-
-  def jobStartToJson(jobStart: SparkListenerJobStart): JValue = {
-    val properties = propertiesToJson(jobStart.properties)
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.jobStart) ~
-    ("Job ID" -> jobStart.jobId) ~
-    ("Submission Time" -> jobStart.time) ~
-    ("Stage Infos" -> jobStart.stageInfos.map(stageInfoToJson)) ~  // Added in Spark 1.2.0
-    ("Stage IDs" -> jobStart.stageIds) ~
-    ("Properties" -> properties)
-  }
-
-  def jobEndToJson(jobEnd: SparkListenerJobEnd): JValue = {
-    val jobResult = jobResultToJson(jobEnd.jobResult)
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.jobEnd) ~
-    ("Job ID" -> jobEnd.jobId) ~
-    ("Completion Time" -> jobEnd.time) ~
-    ("Job Result" -> jobResult)
-  }
-
-  def environmentUpdateToJson(environmentUpdate: SparkListenerEnvironmentUpdate): JValue = {
-    val environmentDetails = environmentUpdate.environmentDetails
-    val jvmInformation = mapToJson(environmentDetails("JVM Information").toMap)
-    val sparkProperties = mapToJson(environmentDetails("Spark Properties").toMap)
-    val hadoopProperties = mapToJson(environmentDetails("Hadoop Properties").toMap)
-    val systemProperties = mapToJson(environmentDetails("System Properties").toMap)
-    val metricsProperties = mapToJson(environmentDetails("Metrics Properties").toMap)
-    val classpathEntries = mapToJson(environmentDetails("Classpath Entries").toMap)
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.environmentUpdate) ~
-    ("JVM Information" -> jvmInformation) ~
-    ("Spark Properties" -> sparkProperties) ~
-    ("Hadoop Properties" -> hadoopProperties) ~
-    ("System Properties" -> systemProperties) ~
-    ("Metrics Properties"-> metricsProperties) ~
-    ("Classpath Entries" -> classpathEntries)
-  }
-
-  def blockManagerAddedToJson(blockManagerAdded: SparkListenerBlockManagerAdded): JValue = {
-    val blockManagerId = blockManagerIdToJson(blockManagerAdded.blockManagerId)
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.blockManagerAdded) ~
-    ("Block Manager ID" -> blockManagerId) ~
-    ("Maximum Memory" -> blockManagerAdded.maxMem) ~
-    ("Timestamp" -> blockManagerAdded.time) ~
-    ("Maximum Onheap Memory" -> blockManagerAdded.maxOnHeapMem) ~
-    ("Maximum Offheap Memory" -> blockManagerAdded.maxOffHeapMem)
-  }
-
-  def blockManagerRemovedToJson(blockManagerRemoved: SparkListenerBlockManagerRemoved): JValue = {
-    val blockManagerId = blockManagerIdToJson(blockManagerRemoved.blockManagerId)
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.blockManagerRemoved) ~
-    ("Block Manager ID" -> blockManagerId) ~
-    ("Timestamp" -> blockManagerRemoved.time)
-  }
-
-  def unpersistRDDToJson(unpersistRDD: SparkListenerUnpersistRDD): JValue = {
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.unpersistRDD) ~
-    ("RDD ID" -> unpersistRDD.rddId)
-  }
-
-  def applicationStartToJson(applicationStart: SparkListenerApplicationStart): JValue = {
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.applicationStart) ~
-    ("App Name" -> applicationStart.appName) ~
-    ("App ID" -> applicationStart.appId.map(JString(_)).getOrElse(JNothing)) ~
-    ("Timestamp" -> applicationStart.time) ~
-    ("User" -> applicationStart.sparkUser) ~
-    ("App Attempt ID" -> applicationStart.appAttemptId.map(JString(_)).getOrElse(JNothing)) ~
-    ("Driver Logs" -> applicationStart.driverLogs.map(mapToJson).getOrElse(JNothing)) ~
-    ("Driver Attributes" -> applicationStart.driverAttributes.map(mapToJson).getOrElse(JNothing))
-  }
-
-  def applicationEndToJson(applicationEnd: SparkListenerApplicationEnd): JValue = {
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.applicationEnd) ~
-    ("Timestamp" -> applicationEnd.time)
-  }
-
-  def resourceProfileAddedToJson(profileAdded: SparkListenerResourceProfileAdded): JValue = {
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.resourceProfileAdded) ~
-      ("Resource Profile Id" -> profileAdded.resourceProfile.id) ~
-      ("Executor Resource Requests" ->
-        executorResourceRequestMapToJson(profileAdded.resourceProfile.executorResources)) ~
-      ("Task Resource Requests" ->
-        taskResourceRequestMapToJson(profileAdded.resourceProfile.taskResources))
-  }
-
-  def executorAddedToJson(executorAdded: SparkListenerExecutorAdded): JValue = {
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.executorAdded) ~
-    ("Timestamp" -> executorAdded.time) ~
-    ("Executor ID" -> executorAdded.executorId) ~
-    ("Executor Info" -> executorInfoToJson(executorAdded.executorInfo))
-  }
+    g.writeStartObject()
+    g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskGettingResult)
+    g.writeFieldName("Task Info")
+    taskInfoToJson(taskInfo, g)
+    g.writeEndObject()
+  }
+
+  def taskEndToJson(taskEnd: SparkListenerTaskEnd, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.taskEnd)
+    g.writeNumberField("Stage ID", taskEnd.stageId)
+    g.writeNumberField("Stage Attempt ID", taskEnd.stageAttemptId)
+    g.writeStringField("Task Type", taskEnd.taskType)
+    g.writeFieldName("Task End Reason")
+    taskEndReasonToJson(taskEnd.reason, g)
+    g.writeFieldName("Task Info")
+    taskInfoToJson(taskEnd.taskInfo, g)
+    g.writeFieldName("Task Executor Metrics")
+    executorMetricsToJson(taskEnd.taskExecutorMetrics, g)
+    Option(taskEnd.taskMetrics).foreach { m =>
+      g.writeFieldName("Task Metrics")
+      taskMetricsToJson(m, g)
+    }
+    g.writeEndObject()
+  }
+
+  def jobStartToJson(jobStart: SparkListenerJobStart, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.jobStart)
+    g.writeNumberField("Job ID", jobStart.jobId)
+    g.writeNumberField("Submission Time", jobStart.time)
+    g.writeArrayFieldStart("Stage Infos")  // Added in Spark 1.2.0
+    jobStart.stageInfos.foreach(stageInfoToJson(_, g))
+    g.writeEndArray()
+    g.writeArrayFieldStart("Stage IDs")
+    jobStart.stageIds.foreach(g.writeNumber)
+    g.writeEndArray()
+    Option(jobStart.properties).foreach { properties =>
+      g.writeFieldName("Properties")
+      propertiesToJson(properties, g)
+    }
 
-  def executorRemovedToJson(executorRemoved: SparkListenerExecutorRemoved): JValue = {
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.executorRemoved) ~
-    ("Timestamp" -> executorRemoved.time) ~
-    ("Executor ID" -> executorRemoved.executorId) ~
-    ("Removed Reason" -> executorRemoved.reason)
+    g.writeEndObject()
   }
 
-  def logStartToJson(logStart: SparkListenerLogStart): JValue = {
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.logStart) ~
-    ("Spark Version" -> SPARK_VERSION)
+  def jobEndToJson(jobEnd: SparkListenerJobEnd, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.jobEnd)
+    g.writeNumberField("Job ID", jobEnd.jobId)
+    g.writeNumberField("Completion Time", jobEnd.time)
+    g.writeFieldName("Job Result")
+    jobResultToJson(jobEnd.jobResult, g)
+    g.writeEndObject()
   }
 
-  def executorMetricsUpdateToJson(metricsUpdate: SparkListenerExecutorMetricsUpdate): JValue = {
+  def environmentUpdateToJson(
+      environmentUpdate: SparkListenerEnvironmentUpdate,
+      g: JsonGenerator): Unit = {
+    val environmentDetails = environmentUpdate.environmentDetails
+    g.writeStartObject()
+    g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.environmentUpdate)
+    writeMapField("JVM Information", environmentDetails("JVM Information").toMap, g)
+    writeMapField("Spark Properties", environmentDetails("Spark Properties").toMap, g)
+    writeMapField("Hadoop Properties", environmentDetails("Hadoop Properties").toMap, g)
+    writeMapField("System Properties", environmentDetails("System Properties").toMap, g)
+    writeMapField("Metrics Properties", environmentDetails("Metrics Properties").toMap, g)
+    writeMapField("Classpath Entries", environmentDetails("Classpath Entries").toMap, g)
+    g.writeEndObject()
+  }
+
+  def blockManagerAddedToJson(
+      blockManagerAdded: SparkListenerBlockManagerAdded,
+      g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.blockManagerAdded)
+    g.writeFieldName("Block Manager ID")
+    blockManagerIdToJson(blockManagerAdded.blockManagerId, g)
+    g.writeNumberField("Maximum Memory", blockManagerAdded.maxMem)
+    g.writeNumberField("Timestamp", blockManagerAdded.time)
+    blockManagerAdded.maxOnHeapMem.foreach(g.writeNumberField("Maximum Onheap Memory", _))
+    blockManagerAdded.maxOffHeapMem.foreach(g.writeNumberField("Maximum Offheap Memory", _))
+    g.writeEndObject()
+  }
+
+  def blockManagerRemovedToJson(
+      blockManagerRemoved: SparkListenerBlockManagerRemoved,
+      g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.blockManagerRemoved)
+    g.writeFieldName("Block Manager ID")
+    blockManagerIdToJson(blockManagerRemoved.blockManagerId, g)
+    g.writeNumberField("Timestamp", blockManagerRemoved.time)
+    g.writeEndObject()
+  }
+
+  def unpersistRDDToJson(unpersistRDD: SparkListenerUnpersistRDD, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.unpersistRDD)
+    g.writeNumberField("RDD ID", unpersistRDD.rddId)
+    g.writeEndObject()
+  }
+
+  def applicationStartToJson(
+      applicationStart: SparkListenerApplicationStart,
+      g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.applicationStart)
+    g.writeStringField("App Name", applicationStart.appName)
+    applicationStart.appId.foreach(g.writeStringField("App ID", _))
+    g.writeNumberField("Timestamp", applicationStart.time)
+    g.writeStringField("User", applicationStart.sparkUser)
+    applicationStart.appAttemptId.foreach(g.writeStringField("App Attempt ID", _))
+    applicationStart.driverLogs.foreach(writeMapField("Driver Logs", _, g))
+    applicationStart.driverAttributes.foreach(writeMapField("Driver Attributes", _, g))
+    g.writeEndObject()
+  }
+
+  def applicationEndToJson(
+      applicationEnd: SparkListenerApplicationEnd,
+      g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.applicationEnd)
+    g.writeNumberField("Timestamp", applicationEnd.time)
+    g.writeEndObject()
+  }
+
+  def resourceProfileAddedToJson(
+      profileAdded: SparkListenerResourceProfileAdded,
+      g: JsonGenerator
+    ): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.resourceProfileAdded)
+    g.writeNumberField("Resource Profile Id", profileAdded.resourceProfile.id)
+    g.writeFieldName("Executor Resource Requests")
+    executorResourceRequestMapToJson(profileAdded.resourceProfile.executorResources, g)
+    g.writeFieldName("Task Resource Requests")
+    taskResourceRequestMapToJson(profileAdded.resourceProfile.taskResources, g)
+    g.writeEndObject()
+  }
+
+  def executorAddedToJson(executorAdded: SparkListenerExecutorAdded, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.executorAdded)
+    g.writeNumberField("Timestamp", executorAdded.time)
+    g.writeStringField("Executor ID", executorAdded.executorId)
+    g.writeFieldName("Executor Info")
+    executorInfoToJson(executorAdded.executorInfo, g)
+    g.writeEndObject()
+  }
+
+  def executorRemovedToJson(
+      executorRemoved: SparkListenerExecutorRemoved,
+      g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.executorRemoved)
+    g.writeNumberField("Timestamp", executorRemoved.time)
+    g.writeStringField("Executor ID", executorRemoved.executorId)
+    g.writeStringField("Removed Reason", executorRemoved.reason)
+    g.writeEndObject()
+  }
+
+  def logStartToJson(logStart: SparkListenerLogStart, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.logStart)
+    g.writeStringField("Spark Version", SPARK_VERSION)
+    g.writeEndObject()
+  }
+
+  def executorMetricsUpdateToJson(
+      metricsUpdate: SparkListenerExecutorMetricsUpdate,
+      g: JsonGenerator): Unit = {
     val execId = metricsUpdate.execId
     val accumUpdates = metricsUpdate.accumUpdates
     val executorUpdates = metricsUpdate.executorUpdates
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.metricsUpdate) ~
-    ("Executor ID" -> execId) ~
-    ("Metrics Updated" -> accumUpdates.map { case (taskId, stageId, stageAttemptId, updates) =>
-      ("Task ID" -> taskId) ~
-      ("Stage ID" -> stageId) ~
-      ("Stage Attempt ID" -> stageAttemptId) ~
-      ("Accumulator Updates" -> JArray(updates.map(accumulableInfoToJson).toList))
-    }) ~
-    ("Executor Metrics Updated" -> executorUpdates.map {
+    g.writeStartObject()
+    g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.metricsUpdate)
+    g.writeStringField("Executor ID", execId)
+    g.writeArrayFieldStart("Metrics Updated")
+    accumUpdates.foreach { case (taskId, stageId, stageAttemptId, updates) =>
+      g.writeStartObject()
+      g.writeNumberField("Task ID", taskId)
+      g.writeNumberField("Stage ID", stageId)
+      g.writeNumberField("Stage Attempt ID", stageAttemptId)
+      g.writeArrayFieldStart("Accumulator Updates")
+      updates.foreach(accumulableInfoToJson(_, g))
+      g.writeEndArray()
+      g.writeEndObject()
+    }
+    g.writeEndArray()
+    g.writeArrayFieldStart("Executor Metrics Updated")
+    executorUpdates.foreach {
       case ((stageId, stageAttemptId), metrics) =>
-        ("Stage ID" -> stageId) ~
-        ("Stage Attempt ID" -> stageAttemptId) ~
-        ("Executor Metrics" -> executorMetricsToJson(metrics))
-    })
+        g.writeStartObject()
+        g.writeNumberField("Stage ID", stageId)
+        g.writeNumberField("Stage Attempt ID", stageAttemptId)
+        g.writeFieldName("Executor Metrics")
+        executorMetricsToJson(metrics, g)
+        g.writeEndObject()
+    }
+    g.writeEndArray()
+    g.writeEndObject()
   }
 
-  def stageExecutorMetricsToJson(metrics: SparkListenerStageExecutorMetrics): JValue = {
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageExecutorMetrics) ~
-    ("Executor ID" -> metrics.execId) ~
-    ("Stage ID" -> metrics.stageId) ~
-    ("Stage Attempt ID" -> metrics.stageAttemptId) ~
-    ("Executor Metrics" -> executorMetricsToJson(metrics.executorMetrics))
+  def stageExecutorMetricsToJson(
+      metrics: SparkListenerStageExecutorMetrics,
+      g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.stageExecutorMetrics)
+    g.writeStringField("Executor ID", metrics.execId)
+    g.writeNumberField("Stage ID", metrics.stageId)
+    g.writeNumberField("Stage Attempt ID", metrics.stageAttemptId)
+    g.writeFieldName("Executor Metrics")
+    executorMetricsToJson(metrics.executorMetrics, g)
+    g.writeEndObject()
   }
 
-  def blockUpdateToJson(blockUpdate: SparkListenerBlockUpdated): JValue = {
-    val blockUpdatedInfo = blockUpdatedInfoToJson(blockUpdate.blockUpdatedInfo)
-    ("Event" -> SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.blockUpdate) ~
-    ("Block Updated Info" -> blockUpdatedInfo)
+  def blockUpdateToJson(blockUpdate: SparkListenerBlockUpdated, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Event", SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES.blockUpdate)
+    g.writeFieldName("Block Updated Info")
+    blockUpdatedInfoToJson(blockUpdate.blockUpdatedInfo, g)
+    g.writeEndObject()
   }
 
   /** ------------------------------------------------------------------- *
    * JSON serialization methods for classes SparkListenerEvents depend on |
    * -------------------------------------------------------------------- */
 
-  def stageInfoToJson(stageInfo: StageInfo): JValue = {
-    val rddInfo = JArray(stageInfo.rddInfos.map(rddInfoToJson).toList)
-    val parentIds = JArray(stageInfo.parentIds.map(JInt(_)).toList)
-    val submissionTime = stageInfo.submissionTime.map(JInt(_)).getOrElse(JNothing)
-    val completionTime = stageInfo.completionTime.map(JInt(_)).getOrElse(JNothing)
-    val failureReason = stageInfo.failureReason.map(JString(_)).getOrElse(JNothing)
-    ("Stage ID" -> stageInfo.stageId) ~
-    ("Stage Attempt ID" -> stageInfo.attemptNumber) ~
-    ("Stage Name" -> stageInfo.name) ~
-    ("Number of Tasks" -> stageInfo.numTasks) ~
-    ("RDD Info" -> rddInfo) ~
-    ("Parent IDs" -> parentIds) ~
-    ("Details" -> stageInfo.details) ~
-    ("Submission Time" -> submissionTime) ~
-    ("Completion Time" -> completionTime) ~
-    ("Failure Reason" -> failureReason) ~
-    ("Accumulables" -> accumulablesToJson(stageInfo.accumulables.values)) ~
-    ("Resource Profile Id" -> stageInfo.resourceProfileId)
-  }
-
-  def taskInfoToJson(taskInfo: TaskInfo): JValue = {
-    ("Task ID" -> taskInfo.taskId) ~
-    ("Index" -> taskInfo.index) ~
-    ("Attempt" -> taskInfo.attemptNumber) ~
-    ("Partition ID" -> taskInfo.partitionId) ~
-    ("Launch Time" -> taskInfo.launchTime) ~
-    ("Executor ID" -> taskInfo.executorId) ~
-    ("Host" -> taskInfo.host) ~
-    ("Locality" -> taskInfo.taskLocality.toString) ~
-    ("Speculative" -> taskInfo.speculative) ~
-    ("Getting Result Time" -> taskInfo.gettingResultTime) ~
-    ("Finish Time" -> taskInfo.finishTime) ~
-    ("Failed" -> taskInfo.failed) ~
-    ("Killed" -> taskInfo.killed) ~
-    ("Accumulables" -> accumulablesToJson(taskInfo.accumulables))
+  def stageInfoToJson(stageInfo: StageInfo, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeNumberField("Stage ID", stageInfo.stageId)
+    g.writeNumberField("Stage Attempt ID", stageInfo.attemptNumber)
+    g.writeStringField("Stage Name", stageInfo.name)
+    g.writeNumberField ("Number of Tasks", stageInfo.numTasks)
+    g.writeArrayFieldStart("RDD Info")
+    stageInfo.rddInfos.foreach(rddInfoToJson(_, g))
+    g.writeEndArray()
+    g.writeArrayFieldStart("Parent IDs")
+    stageInfo.parentIds.foreach(g.writeNumber)
+    g.writeEndArray()
+    g.writeStringField("Details", stageInfo.details)
+    stageInfo.submissionTime.foreach(g.writeNumberField("Submission Time", _))
+    stageInfo.completionTime.foreach(g.writeNumberField("Completion Time", _))
+    stageInfo.failureReason.foreach(g.writeStringField("Failure Reason", _))
+    g.writeFieldName("Accumulables")
+    accumulablesToJson(stageInfo.accumulables.values, g)
+    g.writeNumberField("Resource Profile Id", stageInfo.resourceProfileId)
+    g.writeEndObject()
+  }
+
+  def taskInfoToJson(taskInfo: TaskInfo, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeNumberField("Task ID", taskInfo.taskId)
+    g.writeNumberField("Index", taskInfo.index)
+    g.writeNumberField("Attempt", taskInfo.attemptNumber)
+    g.writeNumberField("Partition ID", taskInfo.partitionId)
+    g.writeNumberField("Launch Time", taskInfo.launchTime)
+    g.writeStringField("Executor ID", taskInfo.executorId)
+    g.writeStringField("Host", taskInfo.host)
+    g.writeStringField("Locality", taskInfo.taskLocality.toString)
+    g.writeBooleanField("Speculative", taskInfo.speculative)
+    g.writeNumberField("Getting Result Time", taskInfo.gettingResultTime)
+    g.writeNumberField("Finish Time", taskInfo.finishTime)
+    g.writeBooleanField("Failed", taskInfo.failed)
+    g.writeBooleanField("Killed", taskInfo.killed)
+    g.writeFieldName("Accumulables")
+    accumulablesToJson(taskInfo.accumulables, g)
+    g.writeEndObject()
   }
 
   private lazy val accumulableExcludeList = Set("internal.metrics.updatedBlockStatuses")
 
-  def accumulablesToJson(accumulables: Iterable[AccumulableInfo]): JArray = {
-    JArray(accumulables
+  def accumulablesToJson(accumulables: Iterable[AccumulableInfo], g: JsonGenerator): Unit = {
+    g.writeStartArray()
+    accumulables
         .filterNot(_.name.exists(accumulableExcludeList.contains))
-        .toList.sortBy(_.id).map(accumulableInfoToJson))
+        .toList.sortBy(_.id).foreach(a => accumulableInfoToJson(a, g))
+    g.writeEndArray()
   }
 
-  def accumulableInfoToJson(accumulableInfo: AccumulableInfo): JValue = {
+  def accumulableInfoToJson(accumulableInfo: AccumulableInfo, g: JsonGenerator): Unit = {
     val name = accumulableInfo.name
-    ("ID" -> accumulableInfo.id) ~
-    ("Name" -> name) ~
-    ("Update" -> accumulableInfo.update.map { v => accumValueToJson(name, v) }) ~
-    ("Value" -> accumulableInfo.value.map { v => accumValueToJson(name, v) }) ~
-    ("Internal" -> accumulableInfo.internal) ~
-    ("Count Failed Values" -> accumulableInfo.countFailedValues) ~
-    ("Metadata" -> accumulableInfo.metadata)
+    g.writeStartObject()
+    g.writeNumberField("ID", accumulableInfo.id)
+    name.foreach(g.writeStringField("Name", _))
+    accumulableInfo.update.foreach { v =>
+      accumValueToJson(name, v, g, fieldName = Some("Update"))
+    }
+    accumulableInfo.value.foreach { v =>
+      accumValueToJson(name, v, g, fieldName = Some("Value"))
+    }
+    g.writeBooleanField("Internal", accumulableInfo.internal)
+    g.writeBooleanField("Count Failed Values", accumulableInfo.countFailedValues)
+    accumulableInfo.metadata.foreach(g.writeStringField("Metadata", _))
+    g.writeEndObject()
   }
 
   /**
@@ -360,256 +466,343 @@ private[spark] object JsonProtocol {
    *
    * The behavior here must match that of [[accumValueFromJson]]. Exposed for testing.
    */
-  private[util] def accumValueToJson(name: Option[String], value: Any): JValue = {
+  private[util] def accumValueToJson(
+      name: Option[String],
+      value: Any,
+      g: JsonGenerator,
+      fieldName: Option[String] = None): Unit = {
     if (name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))) {
       value match {
-        case v: Int => JInt(v)
-        case v: Long => JInt(v)
+        case v: Int =>
+          fieldName.foreach(g.writeFieldName)
+          g.writeNumber(v)
+        case v: Long =>
+          fieldName.foreach(g.writeFieldName)
+          g.writeNumber(v)
         // We only have 3 kind of internal accumulator types, so if it's not int or long, it must be
         // the blocks accumulator, whose type is `java.util.List[(BlockId, BlockStatus)]`
         case v: java.util.List[_] =>
-          JArray(v.asScala.toList.flatMap {
+          fieldName.foreach(g.writeFieldName)
+          g.writeStartArray()
+          v.asScala.foreach {
             case (id: BlockId, status: BlockStatus) =>
-              Some(
-                ("Block ID" -> id.toString) ~
-                ("Status" -> blockStatusToJson(status))
-              )
+              g.writeStartObject()
+              g.writeStringField("Block ID", id.toString)
+              g.writeFieldName("Status")
+              blockStatusToJson(status, g)
+              g.writeEndObject()
             case _ =>
               // Ignore unsupported types. A user may put `METRICS_PREFIX` in the name. We should
               // not crash.
-              None
-          })
+          }
+          g.writeEndArray()
         case _ =>
           // Ignore unsupported types. A user may put `METRICS_PREFIX` in the name. We should not
           // crash.
-          JNothing
       }
     } else {
       // For all external accumulators, just use strings
-      JString(value.toString)
+      fieldName.foreach(g.writeFieldName)
+      g.writeString(value.toString)
     }
   }
 
-  def taskMetricsToJson(taskMetrics: TaskMetrics): JValue = {
-    val shuffleReadMetrics: JValue =
-      ("Remote Blocks Fetched" -> taskMetrics.shuffleReadMetrics.remoteBlocksFetched) ~
-        ("Local Blocks Fetched" -> taskMetrics.shuffleReadMetrics.localBlocksFetched) ~
-        ("Fetch Wait Time" -> taskMetrics.shuffleReadMetrics.fetchWaitTime) ~
-        ("Remote Bytes Read" -> taskMetrics.shuffleReadMetrics.remoteBytesRead) ~
-        ("Remote Bytes Read To Disk" -> taskMetrics.shuffleReadMetrics.remoteBytesReadToDisk) ~
-        ("Local Bytes Read" -> taskMetrics.shuffleReadMetrics.localBytesRead) ~
-        ("Total Records Read" -> taskMetrics.shuffleReadMetrics.recordsRead)
-    val shuffleWriteMetrics: JValue =
-      ("Shuffle Bytes Written" -> taskMetrics.shuffleWriteMetrics.bytesWritten) ~
-        ("Shuffle Write Time" -> taskMetrics.shuffleWriteMetrics.writeTime) ~
-        ("Shuffle Records Written" -> taskMetrics.shuffleWriteMetrics.recordsWritten)
-    val inputMetrics: JValue =
-      ("Bytes Read" -> taskMetrics.inputMetrics.bytesRead) ~
-        ("Records Read" -> taskMetrics.inputMetrics.recordsRead)
-    val outputMetrics: JValue =
-      ("Bytes Written" -> taskMetrics.outputMetrics.bytesWritten) ~
-        ("Records Written" -> taskMetrics.outputMetrics.recordsWritten)
-    val updatedBlocks =
-      JArray(taskMetrics.updatedBlockStatuses.toList.map { case (id, status) =>
-        ("Block ID" -> id.toString) ~
-          ("Status" -> blockStatusToJson(status))
-      })
-    ("Executor Deserialize Time" -> taskMetrics.executorDeserializeTime) ~
-    ("Executor Deserialize CPU Time" -> taskMetrics.executorDeserializeCpuTime) ~
-    ("Executor Run Time" -> taskMetrics.executorRunTime) ~
-    ("Executor CPU Time" -> taskMetrics.executorCpuTime) ~
-    ("Peak Execution Memory" -> taskMetrics.peakExecutionMemory) ~
-    ("Result Size" -> taskMetrics.resultSize) ~
-    ("JVM GC Time" -> taskMetrics.jvmGCTime) ~
-    ("Result Serialization Time" -> taskMetrics.resultSerializationTime) ~
-    ("Memory Bytes Spilled" -> taskMetrics.memoryBytesSpilled) ~
-    ("Disk Bytes Spilled" -> taskMetrics.diskBytesSpilled) ~
-    ("Shuffle Read Metrics" -> shuffleReadMetrics) ~
-    ("Shuffle Write Metrics" -> shuffleWriteMetrics) ~
-    ("Input Metrics" -> inputMetrics) ~
-    ("Output Metrics" -> outputMetrics) ~
-    ("Updated Blocks" -> updatedBlocks)
+  def taskMetricsToJson(taskMetrics: TaskMetrics, g: JsonGenerator): Unit = {
+    def writeShuffleReadMetrics(): Unit = {
+      g.writeStartObject()
+      g.writeNumberField(
+        "Remote Blocks Fetched", taskMetrics.shuffleReadMetrics.remoteBlocksFetched)
+      g.writeNumberField("Local Blocks Fetched", taskMetrics.shuffleReadMetrics.localBlocksFetched)
+      g.writeNumberField("Fetch Wait Time", taskMetrics.shuffleReadMetrics.fetchWaitTime)
+      g.writeNumberField("Remote Bytes Read", taskMetrics.shuffleReadMetrics.remoteBytesRead)
+      g.writeNumberField(
+        "Remote Bytes Read To Disk", taskMetrics.shuffleReadMetrics.remoteBytesReadToDisk)
+      g.writeNumberField("Local Bytes Read", taskMetrics.shuffleReadMetrics.localBytesRead)
+      g.writeNumberField("Total Records Read", taskMetrics.shuffleReadMetrics.recordsRead)
+      g.writeEndObject()
+    }
+    def writeShuffleWriteMetrics(): Unit = {
+      g.writeStartObject()
+      g.writeNumberField("Shuffle Bytes Written", taskMetrics.shuffleWriteMetrics.bytesWritten)
+      g.writeNumberField("Shuffle Write Time", taskMetrics.shuffleWriteMetrics.writeTime)
+      g.writeNumberField("Shuffle Records Written", taskMetrics.shuffleWriteMetrics.recordsWritten)
+      g.writeEndObject()
+    }
+    def writeInputMetrics(): Unit = {
+      g.writeStartObject()
+      g.writeNumberField("Bytes Read", taskMetrics.inputMetrics.bytesRead)
+      g.writeNumberField("Records Read", taskMetrics.inputMetrics.recordsRead)
+      g.writeEndObject()
+    }
+    def writeOutputMetrics(): Unit = {
+      g.writeStartObject()
+      g.writeNumberField("Bytes Written", taskMetrics.outputMetrics.bytesWritten)
+      g.writeNumberField("Records Written", taskMetrics.outputMetrics.recordsWritten)
+      g.writeEndObject()
+    }
+    def writeUpdatedBlocks(): Unit = {
+      g.writeStartArray()
+      taskMetrics.updatedBlockStatuses.foreach { case (id, status) =>
+        g.writeStartObject()
+        g.writeStringField("Block ID", id.toString)
+        g.writeFieldName("Status")
+        blockStatusToJson(status, g)
+        g.writeEndObject()
+      }
+      g.writeEndArray()
+    }
+
+    g.writeStartObject()
+    g.writeNumberField("Executor Deserialize Time", taskMetrics.executorDeserializeTime)
+    g.writeNumberField("Executor Deserialize CPU Time", taskMetrics.executorDeserializeCpuTime)
+    g.writeNumberField("Executor Run Time", taskMetrics.executorRunTime)
+    g.writeNumberField("Executor CPU Time", taskMetrics.executorCpuTime)
+    g.writeNumberField("Peak Execution Memory", taskMetrics.peakExecutionMemory)
+    g.writeNumberField("Result Size", taskMetrics.resultSize)
+    g.writeNumberField("JVM GC Time", taskMetrics.jvmGCTime)
+    g.writeNumberField("Result Serialization Time", taskMetrics.resultSerializationTime)
+    g.writeNumberField("Memory Bytes Spilled", taskMetrics.memoryBytesSpilled)
+    g.writeNumberField("Disk Bytes Spilled", taskMetrics.diskBytesSpilled)
+    g.writeFieldName("Shuffle Read Metrics")
+    writeShuffleReadMetrics()
+    g.writeFieldName("Shuffle Write Metrics")
+    writeShuffleWriteMetrics()
+    g.writeFieldName("Input Metrics")
+    writeInputMetrics()
+    g.writeFieldName("Output Metrics")
+    writeOutputMetrics()
+    g.writeFieldName("Updated Blocks")
+    writeUpdatedBlocks()
+    g.writeEndObject()
   }
 
   /** Convert executor metrics to JSON. */
-  def executorMetricsToJson(executorMetrics: ExecutorMetrics): JValue = {
-    val metrics = ExecutorMetricType.metricToOffset.map { case (m, _) =>
-      JField(m, executorMetrics.getMetricValue(m))
+  def executorMetricsToJson(executorMetrics: ExecutorMetrics, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    ExecutorMetricType.metricToOffset.foreach { case (m, _) =>
+      g.writeNumberField(m, executorMetrics.getMetricValue(m))
     }
-    JObject(metrics.toSeq: _*)
+    g.writeEndObject()
   }
 
-  def taskEndReasonToJson(taskEndReason: TaskEndReason): JValue = {
-    val reason = Utils.getFormattedClassName(taskEndReason)
-    val json: JObject = taskEndReason match {
+  def taskEndReasonToJson(taskEndReason: TaskEndReason, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Reason", Utils.getFormattedClassName(taskEndReason))
+    taskEndReason match {
       case fetchFailed: FetchFailed =>
-        val blockManagerAddress = Option(fetchFailed.bmAddress).
-          map(blockManagerIdToJson).getOrElse(JNothing)
-        ("Block Manager Address" -> blockManagerAddress) ~
-        ("Shuffle ID" -> fetchFailed.shuffleId) ~
-        ("Map ID" -> fetchFailed.mapId) ~
-        ("Map Index" -> fetchFailed.mapIndex) ~
-        ("Reduce ID" -> fetchFailed.reduceId) ~
-        ("Message" -> fetchFailed.message)
+        Option(fetchFailed.bmAddress).foreach { id =>
+          g.writeFieldName("Block Manager Address")
+          blockManagerIdToJson(id, g)
+        }
+        g.writeNumberField("Shuffle ID", fetchFailed.shuffleId)
+        g.writeNumberField("Map ID", fetchFailed.mapId)
+        g.writeNumberField("Map Index", fetchFailed.mapIndex)
+        g.writeNumberField("Reduce ID", fetchFailed.reduceId)
+        g.writeStringField("Message", fetchFailed.message)
       case exceptionFailure: ExceptionFailure =>
-        val stackTrace = stackTraceToJson(exceptionFailure.stackTrace)
-        val accumUpdates = accumulablesToJson(exceptionFailure.accumUpdates)
-        ("Class Name" -> exceptionFailure.className) ~
-        ("Description" -> exceptionFailure.description) ~
-        ("Stack Trace" -> stackTrace) ~
-        ("Full Stack Trace" -> exceptionFailure.fullStackTrace) ~
-        ("Accumulator Updates" -> accumUpdates)
+        g.writeStringField("Class Name", exceptionFailure.className)
+        g.writeStringField("Description", exceptionFailure.description)
+        g.writeFieldName("Stack Trace")
+        stackTraceToJson(exceptionFailure.stackTrace, g)
+        g.writeStringField("Full Stack Trace", exceptionFailure.fullStackTrace)
+        g.writeFieldName("Accumulator Updates")
+        accumulablesToJson(exceptionFailure.accumUpdates, g)
       case taskCommitDenied: TaskCommitDenied =>
-        ("Job ID" -> taskCommitDenied.jobID) ~
-        ("Partition ID" -> taskCommitDenied.partitionID) ~
-        ("Attempt Number" -> taskCommitDenied.attemptNumber)
+        g.writeNumberField("Job ID", taskCommitDenied.jobID)
+        g.writeNumberField("Partition ID", taskCommitDenied.partitionID)
+        g.writeNumberField("Attempt Number", taskCommitDenied.attemptNumber)
       case ExecutorLostFailure(executorId, exitCausedByApp, reason) =>
-        ("Executor ID" -> executorId) ~
-        ("Exit Caused By App" -> exitCausedByApp) ~
-        ("Loss Reason" -> reason)
+        g.writeStringField("Executor ID", executorId)
+        g.writeBooleanField("Exit Caused By App", exitCausedByApp)
+        reason.foreach(g.writeStringField("Loss Reason", _))
       case taskKilled: TaskKilled =>
-        val accumUpdates = JArray(taskKilled.accumUpdates.map(accumulableInfoToJson).toList)
-        ("Kill Reason" -> taskKilled.reason) ~
-        ("Accumulator Updates" -> accumUpdates)
-      case _ => emptyJson
+        g.writeStringField("Kill Reason", taskKilled.reason)
+        g.writeArrayFieldStart("Accumulator Updates")
+        taskKilled.accumUpdates.foreach { info =>
+          accumulableInfoToJson(info, g)
+        }
+        g.writeEndArray()
+      case _ =>
+        // no extra fields to write
     }
-    ("Reason" -> reason) ~ json
+    g.writeEndObject()
   }
 
-  def blockManagerIdToJson(blockManagerId: BlockManagerId): JValue = {
-    ("Executor ID" -> blockManagerId.executorId) ~
-    ("Host" -> blockManagerId.host) ~
-    ("Port" -> blockManagerId.port)
+  def blockManagerIdToJson(blockManagerId: BlockManagerId, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Executor ID", blockManagerId.executorId)
+    g.writeStringField("Host", blockManagerId.host)
+    g.writeNumberField("Port", blockManagerId.port)
+    g.writeEndObject()
   }
 
-  def jobResultToJson(jobResult: JobResult): JValue = {
-    val result = Utils.getFormattedClassName(jobResult)
-    val json = jobResult match {
-      case JobSucceeded => emptyJson
+  def jobResultToJson(jobResult: JobResult, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Result", Utils.getFormattedClassName(jobResult))
+    jobResult match {
       case jobFailed: JobFailed =>
-        JObject("Exception" -> exceptionToJson(jobFailed.exception))
+        g.writeFieldName("Exception")
+        exceptionToJson(jobFailed.exception, g)
+      case JobSucceeded =>
+        // Nothing else to write in case of success
     }
-    ("Result" -> result) ~ json
-  }
-
-  def rddInfoToJson(rddInfo: RDDInfo): JValue = {
-    val storageLevel = storageLevelToJson(rddInfo.storageLevel)
-    val parentIds = JArray(rddInfo.parentIds.map(JInt(_)).toList)
-    ("RDD ID" -> rddInfo.id) ~
-    ("Name" -> rddInfo.name) ~
-    ("Scope" -> rddInfo.scope.map(_.toJson)) ~
-    ("Callsite" -> rddInfo.callSite) ~
-    ("Parent IDs" -> parentIds) ~
-    ("Storage Level" -> storageLevel) ~
-    ("Barrier" -> rddInfo.isBarrier) ~
-    ("DeterministicLevel" -> rddInfo.outputDeterministicLevel.toString) ~
-    ("Number of Partitions" -> rddInfo.numPartitions) ~
-    ("Number of Cached Partitions" -> rddInfo.numCachedPartitions) ~
-    ("Memory Size" -> rddInfo.memSize) ~
-    ("Disk Size" -> rddInfo.diskSize)
-  }
-
-  def storageLevelToJson(storageLevel: StorageLevel): JValue = {
-    ("Use Disk" -> storageLevel.useDisk) ~
-    ("Use Memory" -> storageLevel.useMemory) ~
-    ("Use Off Heap" -> storageLevel.useOffHeap) ~
-    ("Deserialized" -> storageLevel.deserialized) ~
-    ("Replication" -> storageLevel.replication)
-  }
-
-  def blockStatusToJson(blockStatus: BlockStatus): JValue = {
-    val storageLevel = storageLevelToJson(blockStatus.storageLevel)
-    ("Storage Level" -> storageLevel) ~
-    ("Memory Size" -> blockStatus.memSize) ~
-    ("Disk Size" -> blockStatus.diskSize)
-  }
-
-  def executorInfoToJson(executorInfo: ExecutorInfo): JValue = {
-    ("Host" -> executorInfo.executorHost) ~
-    ("Total Cores" -> executorInfo.totalCores) ~
-    ("Log Urls" -> mapToJson(executorInfo.logUrlMap)) ~
-    ("Attributes" -> mapToJson(executorInfo.attributes)) ~
-    ("Resources" -> resourcesMapToJson(executorInfo.resourcesInfo)) ~
-    ("Resource Profile Id" -> executorInfo.resourceProfileId) ~
-    ("Registration Time" -> executorInfo.registrationTime) ~
-    ("Request Time" -> executorInfo.requestTime)
-  }
-
-  def resourcesMapToJson(m: Map[String, ResourceInformation]): JValue = {
-    val jsonFields = m.map {
-      case (k, v) => JField(k, v.toJson)
-    }
-    JObject(jsonFields.toList)
-  }
-
-  def blockUpdatedInfoToJson(blockUpdatedInfo: BlockUpdatedInfo): JValue = {
-    ("Block Manager ID" -> blockManagerIdToJson(blockUpdatedInfo.blockManagerId)) ~
-    ("Block ID" -> blockUpdatedInfo.blockId.toString) ~
-    ("Storage Level" -> storageLevelToJson(blockUpdatedInfo.storageLevel)) ~
-    ("Memory Size" -> blockUpdatedInfo.memSize) ~
-    ("Disk Size" -> blockUpdatedInfo.diskSize)
+    g.writeEndObject()
   }
 
-  def executorResourceRequestToJson(execReq: ExecutorResourceRequest): JValue = {
-    ("Resource Name" -> execReq.resourceName) ~
-    ("Amount" -> execReq.amount) ~
-    ("Discovery Script" -> execReq.discoveryScript) ~
-    ("Vendor" -> execReq.vendor)
-  }
-
-  def executorResourceRequestMapToJson(m: Map[String, ExecutorResourceRequest]): JValue = {
-    val jsonFields = m.map {
-      case (k, execReq) =>
-        JField(k, executorResourceRequestToJson(execReq))
+  def rddInfoToJson(rddInfo: RDDInfo, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeNumberField("RDD ID", rddInfo.id)
+    g.writeStringField("Name", rddInfo.name)
+    rddInfo.scope.foreach { s =>
+      g.writeStringField("Scope", s.toJson)
+    }
+    g.writeStringField("Callsite", rddInfo.callSite)
+    g.writeArrayFieldStart("Parent IDs")
+    rddInfo.parentIds.foreach(g.writeNumber)
+    g.writeEndArray()
+    g.writeFieldName("Storage Level")
+    storageLevelToJson(rddInfo.storageLevel, g)
+    g.writeBooleanField("Barrier", rddInfo.isBarrier)
+    g.writeStringField("DeterministicLevel", rddInfo.outputDeterministicLevel.toString)
+    g.writeNumberField("Number of Partitions", rddInfo.numPartitions)
+    g.writeNumberField("Number of Cached Partitions", rddInfo.numCachedPartitions)
+    g.writeNumberField("Memory Size", rddInfo.memSize)
+    g.writeNumberField("Disk Size", rddInfo.diskSize)
+    g.writeEndObject()
+  }
+
+  def storageLevelToJson(storageLevel: StorageLevel, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeBooleanField("Use Disk", storageLevel.useDisk)
+    g.writeBooleanField("Use Memory", storageLevel.useMemory)
+    g.writeBooleanField("Use Off Heap", storageLevel.useOffHeap)
+    g.writeBooleanField("Deserialized", storageLevel.deserialized)
+    g.writeNumberField("Replication", storageLevel.replication)
+    g.writeEndObject()
+  }
+
+  def blockStatusToJson(blockStatus: BlockStatus, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeFieldName("Storage Level")
+    storageLevelToJson(blockStatus.storageLevel, g)
+    g.writeNumberField("Memory Size", blockStatus.memSize)
+    g.writeNumberField("Disk Size", blockStatus.diskSize)
+    g.writeEndObject()
+  }
+
+  def executorInfoToJson(executorInfo: ExecutorInfo, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Host", executorInfo.executorHost)
+    g.writeNumberField("Total Cores", executorInfo.totalCores)
+    writeMapField("Log Urls", executorInfo.logUrlMap, g)
+    writeMapField("Attributes", executorInfo.attributes, g)
+    g.writeObjectFieldStart("Resources")
+    // TODO(SPARK-39658): here we are taking a Json4s JValue and are converting it to
+    // a JSON string then are combining that string with Jackson-generated JSON. This is
+    // done because ResourceInformation.toJson is a public class and exposes Json4s
+    // JValues as part of its public API. We should reconsider the design of that interface
+    // and explore whether we can avoid exposing third-party symbols in this public API.
+    executorInfo.resourcesInfo.foreach { case (k, v) =>
+      g.writeFieldName(k)
+      g.writeRawValue(compact(v.toJson()))
     }
-    JObject(jsonFields.toList)
+    g.writeEndObject()
+    g.writeNumberField("Resource Profile Id", executorInfo.resourceProfileId)
+    executorInfo.registrationTime.foreach(g.writeNumberField("Registration Time", _))
+    executorInfo.requestTime.foreach(g.writeNumberField("Request Time", _))
+    g.writeEndObject()
+  }
+
+  def blockUpdatedInfoToJson(blockUpdatedInfo: BlockUpdatedInfo, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeFieldName("Block Manager ID")
+    blockManagerIdToJson(blockUpdatedInfo.blockManagerId, g)
+    g.writeStringField("Block ID", blockUpdatedInfo.blockId.toString)
+    g.writeFieldName("Storage Level")
+    storageLevelToJson(blockUpdatedInfo.storageLevel, g)
+    g.writeNumberField("Memory Size", blockUpdatedInfo.memSize)
+    g.writeNumberField("Disk Size", blockUpdatedInfo.diskSize)
+    g.writeEndObject()
+  }
+
+  def executorResourceRequestToJson(execReq: ExecutorResourceRequest, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Resource Name", execReq.resourceName)
+    g.writeNumberField("Amount", execReq.amount)
+    g.writeStringField("Discovery Script", execReq.discoveryScript)
+    g.writeStringField("Vendor", execReq.vendor)
+    g.writeEndObject()
+  }
+
+  def executorResourceRequestMapToJson(
+      m: Map[String, ExecutorResourceRequest],
+      g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    m.foreach { case (k, execReq) =>
+      g.writeFieldName(k)
+      executorResourceRequestToJson(execReq, g)
+    }
+    g.writeEndObject()
   }
 
-  def taskResourceRequestToJson(taskReq: TaskResourceRequest): JValue = {
-    ("Resource Name" -> taskReq.resourceName) ~
-    ("Amount" -> taskReq.amount)
+  def taskResourceRequestToJson(taskReq: TaskResourceRequest, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Resource Name", taskReq.resourceName)
+    g.writeNumberField("Amount", taskReq.amount)
+    g.writeEndObject()
   }
 
-  def taskResourceRequestMapToJson(m: Map[String, TaskResourceRequest]): JValue = {
-    val jsonFields = m.map {
-      case (k, taskReq) =>
-        JField(k, taskResourceRequestToJson(taskReq))
+  def taskResourceRequestMapToJson(m: Map[String, TaskResourceRequest], g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    m.foreach { case (k, taskReq) =>
+      g.writeFieldName(k)
+      taskResourceRequestToJson(taskReq, g)
     }
-    JObject(jsonFields.toList)
+    g.writeEndObject()
   }
 
   /** ------------------------------ *
    * Util JSON serialization methods |
    * ------------------------------- */
 
-  def mapToJson(m: Map[String, String]): JValue = {
-    val jsonFields = m.map { case (k, v) => JField(k, JString(v)) }
-    JObject(jsonFields.toList)
+  def writeMapField(name: String, m: Map[String, String], g: JsonGenerator): Unit = {
+    g.writeObjectFieldStart(name)
+    m.foreach { case (k, v) => g.writeStringField(k, v) }
+    g.writeEndObject()
   }
 
-  def propertiesToJson(properties: Properties): JValue = {
-    Option(properties).map { p =>
-      mapToJson(p.asScala)
-    }.getOrElse(JNothing)
+  def propertiesToJson(properties: Properties, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    properties.asScala.foreach { case (k, v) => g.writeStringField(k, v) }
+    g.writeEndObject()
   }
 
-  def UUIDToJson(id: UUID): JValue = {
-    ("Least Significant Bits" -> id.getLeastSignificantBits) ~
-    ("Most Significant Bits" -> id.getMostSignificantBits)
+  def UUIDToJson(id: UUID, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeNumberField("Least Significant Bits", id.getLeastSignificantBits)
+    g.writeNumberField("Most Significant Bits", id.getMostSignificantBits)
+    g.writeEndObject()
   }
 
-  def stackTraceToJson(stackTrace: Array[StackTraceElement]): JValue = {
-    JArray(stackTrace.map { case line =>
-      ("Declaring Class" -> line.getClassName) ~
-      ("Method Name" -> line.getMethodName) ~
-      ("File Name" -> line.getFileName) ~
-      ("Line Number" -> line.getLineNumber)
-    }.toList)
+  def stackTraceToJson(stackTrace: Array[StackTraceElement], g: JsonGenerator): Unit = {
+    g.writeStartArray()
+    stackTrace.foreach { line =>
+      g.writeStartObject()
+      g.writeStringField("Declaring Class", line.getClassName)
+      g.writeStringField("Method Name", line.getMethodName)
+      g.writeStringField("File Name", line.getFileName)
+      g.writeNumberField("Line Number", line.getLineNumber)
+      g.writeEndObject()
+    }
+    g.writeEndArray()
   }
 
-  def exceptionToJson(exception: Exception): JValue = {
-    ("Message" -> exception.getMessage) ~
-    ("Stack Trace" -> stackTraceToJson(exception.getStackTrace))
+  def exceptionToJson(exception: Exception, g: JsonGenerator): Unit = {
+    g.writeStartObject()
+    g.writeStringField("Message", exception.getMessage)
+    g.writeFieldName("Stack Trace")
+    stackTraceToJson(exception.getStackTrace, g)
+    g.writeEndObject()
   }
 
 
@@ -640,10 +833,14 @@ private[spark] object JsonProtocol {
     val resourceProfileAdded = Utils.getFormattedClassName(SparkListenerResourceProfileAdded)
   }
 
-  def sparkEventFromJson(json: JValue): SparkListenerEvent = {
+  def sparkEventFromJson(json: String): SparkListenerEvent = {
+    sparkEventFromJson(mapper.readTree(json))
+  }
+
+  def sparkEventFromJson(json: JsonNode): SparkListenerEvent = {
     import SPARK_LISTENER_EVENT_FORMATTED_CLASS_NAMES._
 
-    (json \ "Event").extract[String] match {
+    json.get("Event").asText match {
       case `stageSubmitted` => stageSubmittedFromJson(json)
       case `stageCompleted` => stageCompletedFromJson(json)
       case `taskStart` => taskStartFromJson(json)
@@ -664,66 +861,69 @@ private[spark] object JsonProtocol {
       case `stageExecutorMetrics` => stageExecutorMetricsFromJson(json)
       case `blockUpdate` => blockUpdateFromJson(json)
       case `resourceProfileAdded` => resourceProfileAddedFromJson(json)
-      case other => mapper.readValue(compact(render(json)), Utils.classForName(other))
+      case other => mapper.readValue(json.toString, Utils.classForName(other))
         .asInstanceOf[SparkListenerEvent]
     }
   }
 
-  def stageSubmittedFromJson(json: JValue): SparkListenerStageSubmitted = {
-    val stageInfo = stageInfoFromJson(json \ "Stage Info")
-    val properties = propertiesFromJson(json \ "Properties")
+  def stageSubmittedFromJson(json: JsonNode): SparkListenerStageSubmitted = {
+    val stageInfo = stageInfoFromJson(json.get("Stage Info"))
+    val properties = propertiesFromJson(json.get("Properties"))
     SparkListenerStageSubmitted(stageInfo, properties)
   }
 
-  def stageCompletedFromJson(json: JValue): SparkListenerStageCompleted = {
-    val stageInfo = stageInfoFromJson(json \ "Stage Info")
+  def stageCompletedFromJson(json: JsonNode): SparkListenerStageCompleted = {
+    val stageInfo = stageInfoFromJson(json.get("Stage Info"))
     SparkListenerStageCompleted(stageInfo)
   }
 
-  def taskStartFromJson(json: JValue): SparkListenerTaskStart = {
-    val stageId = (json \ "Stage ID").extract[Int]
+  def taskStartFromJson(json: JsonNode): SparkListenerTaskStart = {
+    val stageId = json.get("Stage ID").extractInt
     val stageAttemptId =
-      jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
-    val taskInfo = taskInfoFromJson(json \ "Task Info")
+      jsonOption(json.get("Stage Attempt ID")).map(_.extractInt).getOrElse(0)
+    val taskInfo = taskInfoFromJson(json.get("Task Info"))
     SparkListenerTaskStart(stageId, stageAttemptId, taskInfo)
   }
 
-  def taskGettingResultFromJson(json: JValue): SparkListenerTaskGettingResult = {
-    val taskInfo = taskInfoFromJson(json \ "Task Info")
+  def taskGettingResultFromJson(json: JsonNode): SparkListenerTaskGettingResult = {
+    val taskInfo = taskInfoFromJson(json.get("Task Info"))
     SparkListenerTaskGettingResult(taskInfo)
   }
 
   /** Extract the executor metrics from JSON. */
-  def executorMetricsFromJson(json: JValue): ExecutorMetrics = {
+  def executorMetricsFromJson(maybeJson: JsonNode): ExecutorMetrics = {
+    // Executor metrics might be absent in JSON from very old Spark versions.
+    // In this case we return zero values for each metric.
     val metrics =
       ExecutorMetricType.metricToOffset.map { case (metric, _) =>
-        metric -> jsonOption(json \ metric).map(_.extract[Long]).getOrElse(0L)
+        val metricValueJson = jsonOption(maybeJson).flatMap(json => jsonOption(json.get(metric)))
+        metric -> metricValueJson.map(_.extractLong).getOrElse(0L)
       }
     new ExecutorMetrics(metrics.toMap)
   }
 
-  def taskEndFromJson(json: JValue): SparkListenerTaskEnd = {
-    val stageId = (json \ "Stage ID").extract[Int]
+  def taskEndFromJson(json: JsonNode): SparkListenerTaskEnd = {
+    val stageId = json.get("Stage ID").extractInt
     val stageAttemptId =
-      jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
-    val taskType = (json \ "Task Type").extract[String]
-    val taskEndReason = taskEndReasonFromJson(json \ "Task End Reason")
-    val taskInfo = taskInfoFromJson(json \ "Task Info")
-    val executorMetrics = executorMetricsFromJson(json \ "Task Executor Metrics")
-    val taskMetrics = taskMetricsFromJson(json \ "Task Metrics")
+      jsonOption(json.get("Stage Attempt ID")).map(_.extractInt).getOrElse(0)
+    val taskType = json.get("Task Type").extractString
+    val taskEndReason = taskEndReasonFromJson(json.get("Task End Reason"))
+    val taskInfo = taskInfoFromJson(json.get("Task Info"))
+    val executorMetrics = executorMetricsFromJson(json.get("Task Executor Metrics"))
+    val taskMetrics = taskMetricsFromJson(json.get("Task Metrics"))
     SparkListenerTaskEnd(stageId, stageAttemptId, taskType, taskEndReason, taskInfo,
       executorMetrics, taskMetrics)
   }
 
-  def jobStartFromJson(json: JValue): SparkListenerJobStart = {
-    val jobId = (json \ "Job ID").extract[Int]
+  def jobStartFromJson(json: JsonNode): SparkListenerJobStart = {
+    val jobId = json.get("Job ID").extractInt
     val submissionTime =
-      jsonOption(json \ "Submission Time").map(_.extract[Long]).getOrElse(-1L)
-    val stageIds = (json \ "Stage IDs").extract[List[JValue]].map(_.extract[Int])
-    val properties = propertiesFromJson(json \ "Properties")
+      jsonOption(json.get("Submission Time")).map(_.extractLong).getOrElse(-1L)
+    val stageIds = json.get("Stage IDs").extractElements.map(_.extractInt).toArray.toSeq
+    val properties = propertiesFromJson(json.get("Properties"))
     // The "Stage Infos" field was added in Spark 1.2.0
-    val stageInfos = jsonOption(json \ "Stage Infos")
-      .map(_.extract[Seq[JValue]].map(stageInfoFromJson)).getOrElse {
+    val stageInfos = jsonOption(json.get("Stage Infos"))
+      .map(_.extractElements.map(stageInfoFromJson).toArray.toSeq).getOrElse {
         stageIds.map { id =>
           new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown",
             resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
@@ -732,155 +932,153 @@ private[spark] object JsonProtocol {
     SparkListenerJobStart(jobId, submissionTime, stageInfos, properties)
   }
 
-  def jobEndFromJson(json: JValue): SparkListenerJobEnd = {
-    val jobId = (json \ "Job ID").extract[Int]
+  def jobEndFromJson(json: JsonNode): SparkListenerJobEnd = {
+    val jobId = json.get("Job ID").extractInt
     val completionTime =
-      jsonOption(json \ "Completion Time").map(_.extract[Long]).getOrElse(-1L)
-    val jobResult = jobResultFromJson(json \ "Job Result")
+      jsonOption(json.get("Completion Time")).map(_.extractLong).getOrElse(-1L)
+    val jobResult = jobResultFromJson(json.get("Job Result"))
     SparkListenerJobEnd(jobId, completionTime, jobResult)
   }
 
-  def resourceProfileAddedFromJson(json: JValue): SparkListenerResourceProfileAdded = {
-    val profId = (json \ "Resource Profile Id").extract[Int]
-    val executorReqs = executorResourceRequestMapFromJson(json \ "Executor Resource Requests")
-    val taskReqs = taskResourceRequestMapFromJson(json \ "Task Resource Requests")
+  def resourceProfileAddedFromJson(json: JsonNode): SparkListenerResourceProfileAdded = {
+    val profId = json.get("Resource Profile Id").extractInt
+    val executorReqs = executorResourceRequestMapFromJson(json.get("Executor Resource Requests"))
+    val taskReqs = taskResourceRequestMapFromJson(json.get("Task Resource Requests"))
     val rp = new ResourceProfile(executorReqs.toMap, taskReqs.toMap)
     rp.setResourceProfileId(profId)
     SparkListenerResourceProfileAdded(rp)
   }
 
-  def executorResourceRequestFromJson(json: JValue): ExecutorResourceRequest = {
-    val rName = (json \ "Resource Name").extract[String]
-    val amount = (json \ "Amount").extract[Long]
-    val discoveryScript = (json \ "Discovery Script").extract[String]
-    val vendor = (json \ "Vendor").extract[String]
+  def executorResourceRequestFromJson(json: JsonNode): ExecutorResourceRequest = {
+    val rName = json.get("Resource Name").extractString
+    val amount = json.get("Amount").extractLong
+    val discoveryScript = json.get("Discovery Script").extractString
+    val vendor = json.get("Vendor").extractString
     new ExecutorResourceRequest(rName, amount, discoveryScript, vendor)
   }
 
-  def taskResourceRequestFromJson(json: JValue): TaskResourceRequest = {
-    val rName = (json \ "Resource Name").extract[String]
-    val amount = (json \ "Amount").extract[Double]
+  def taskResourceRequestFromJson(json: JsonNode): TaskResourceRequest = {
+    val rName = json.get("Resource Name").extractString
+    val amount = json.get("Amount").extractDouble
     new TaskResourceRequest(rName, amount)
   }
 
-  def taskResourceRequestMapFromJson(json: JValue): Map[String, TaskResourceRequest] = {
-    val jsonFields = json.asInstanceOf[JObject].obj
-    jsonFields.collect { case JField(k, v) =>
-      val req = taskResourceRequestFromJson(v)
-      (k, req)
+  def taskResourceRequestMapFromJson(json: JsonNode): Map[String, TaskResourceRequest] = {
+    json.fields().asScala.collect { case field =>
+      val req = taskResourceRequestFromJson(field.getValue)
+      (field.getKey, req)
     }.toMap
   }
 
-  def executorResourceRequestMapFromJson(json: JValue): Map[String, ExecutorResourceRequest] = {
-    val jsonFields = json.asInstanceOf[JObject].obj
-    jsonFields.collect { case JField(k, v) =>
-      val req = executorResourceRequestFromJson(v)
-      (k, req)
+  def executorResourceRequestMapFromJson(json: JsonNode): Map[String, ExecutorResourceRequest] = {
+    json.fields().asScala.collect { case field =>
+      val req = executorResourceRequestFromJson(field.getValue)
+      (field.getKey, req)
     }.toMap
   }
 
-  def environmentUpdateFromJson(json: JValue): SparkListenerEnvironmentUpdate = {
+  def environmentUpdateFromJson(json: JsonNode): SparkListenerEnvironmentUpdate = {
     // For compatible with previous event logs
-    val hadoopProperties = jsonOption(json \ "Hadoop Properties").map(mapFromJson(_).toSeq)
+    val hadoopProperties = jsonOption(json.get("Hadoop Properties")).map(mapFromJson(_).toSeq)
       .getOrElse(Seq.empty)
-    val metricsProperties = jsonOption(json \ "Metrics Properties").map(mapFromJson(_).toSeq)
+    // The "Metrics Properties" field was added in Spark 3.4.0:
+    val metricsProperties = jsonOption(json.get("Metrics Properties")).map(mapFromJson(_).toSeq)
       .getOrElse(Seq.empty)
     val environmentDetails = Map[String, Seq[(String, String)]](
-      "JVM Information" -> mapFromJson(json \ "JVM Information").toSeq,
-      "Spark Properties" -> mapFromJson(json \ "Spark Properties").toSeq,
+      "JVM Information" -> mapFromJson(json.get("JVM Information")).toSeq,
+      "Spark Properties" -> mapFromJson(json.get("Spark Properties")).toSeq,
       "Hadoop Properties" -> hadoopProperties,
-      "System Properties" -> mapFromJson(json \ "System Properties").toSeq,
+      "System Properties" -> mapFromJson(json.get("System Properties")).toSeq,
       "Metrics Properties" -> metricsProperties,
-      "Classpath Entries" -> mapFromJson(json \ "Classpath Entries").toSeq)
+      "Classpath Entries" -> mapFromJson(json.get("Classpath Entries")).toSeq)
     SparkListenerEnvironmentUpdate(environmentDetails)
   }
 
-  def blockManagerAddedFromJson(json: JValue): SparkListenerBlockManagerAdded = {
-    val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
-    val maxMem = (json \ "Maximum Memory").extract[Long]
-    val time = jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
-    val maxOnHeapMem = jsonOption(json \ "Maximum Onheap Memory").map(_.extract[Long])
-    val maxOffHeapMem = jsonOption(json \ "Maximum Offheap Memory").map(_.extract[Long])
+  def blockManagerAddedFromJson(json: JsonNode): SparkListenerBlockManagerAdded = {
+    val blockManagerId = blockManagerIdFromJson(json.get("Block Manager ID"))
+    val maxMem = json.get("Maximum Memory").extractLong
+    val time = jsonOption(json.get("Timestamp")).map(_.extractLong).getOrElse(-1L)
+    val maxOnHeapMem = jsonOption(json.get("Maximum Onheap Memory")).map(_.extractLong)
+    val maxOffHeapMem = jsonOption(json.get("Maximum Offheap Memory")).map(_.extractLong)
     SparkListenerBlockManagerAdded(time, blockManagerId, maxMem, maxOnHeapMem, maxOffHeapMem)
   }
 
-  def blockManagerRemovedFromJson(json: JValue): SparkListenerBlockManagerRemoved = {
-    val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
-    val time = jsonOption(json \ "Timestamp").map(_.extract[Long]).getOrElse(-1L)
+  def blockManagerRemovedFromJson(json: JsonNode): SparkListenerBlockManagerRemoved = {
+    val blockManagerId = blockManagerIdFromJson(json.get("Block Manager ID"))
+    val time = jsonOption(json.get("Timestamp")).map(_.extractLong).getOrElse(-1L)
     SparkListenerBlockManagerRemoved(time, blockManagerId)
   }
 
-  def unpersistRDDFromJson(json: JValue): SparkListenerUnpersistRDD = {
-    SparkListenerUnpersistRDD((json \ "RDD ID").extract[Int])
+  def unpersistRDDFromJson(json: JsonNode): SparkListenerUnpersistRDD = {
+    SparkListenerUnpersistRDD(json.get("RDD ID").extractInt)
   }
 
-  def applicationStartFromJson(json: JValue): SparkListenerApplicationStart = {
-    val appName = (json \ "App Name").extract[String]
-    val appId = jsonOption(json \ "App ID").map(_.extract[String])
-    val time = (json \ "Timestamp").extract[Long]
-    val sparkUser = (json \ "User").extract[String]
-    val appAttemptId = jsonOption(json \ "App Attempt ID").map(_.extract[String])
-    val driverLogs = jsonOption(json \ "Driver Logs").map(mapFromJson)
-    val driverAttributes = jsonOption(json \ "Driver Attributes").map(mapFromJson)
+  def applicationStartFromJson(json: JsonNode): SparkListenerApplicationStart = {
+    val appName = json.get("App Name").extractString
+    val appId = jsonOption(json.get("App ID")).map(_.asText())
+    val time = json.get("Timestamp").extractLong
+    val sparkUser = json.get("User").extractString
+    val appAttemptId = jsonOption(json.get("App Attempt ID")).map(_.asText())
+    val driverLogs = jsonOption(json.get("Driver Logs")).map(mapFromJson)
+    val driverAttributes = jsonOption(json.get("Driver Attributes")).map(mapFromJson)
     SparkListenerApplicationStart(appName, appId, time, sparkUser, appAttemptId, driverLogs,
       driverAttributes)
   }
 
-  def applicationEndFromJson(json: JValue): SparkListenerApplicationEnd = {
-    SparkListenerApplicationEnd((json \ "Timestamp").extract[Long])
+  def applicationEndFromJson(json: JsonNode): SparkListenerApplicationEnd = {
+    SparkListenerApplicationEnd(json.get("Timestamp").extractLong)
   }
 
-  def executorAddedFromJson(json: JValue): SparkListenerExecutorAdded = {
-    val time = (json \ "Timestamp").extract[Long]
-    val executorId = (json \ "Executor ID").extract[String]
-    val executorInfo = executorInfoFromJson(json \ "Executor Info")
+  def executorAddedFromJson(json: JsonNode): SparkListenerExecutorAdded = {
+    val time = json.get("Timestamp").extractLong
+    val executorId = json.get("Executor ID").extractString
+    val executorInfo = executorInfoFromJson(json.get("Executor Info"))
     SparkListenerExecutorAdded(time, executorId, executorInfo)
   }
 
-  def executorRemovedFromJson(json: JValue): SparkListenerExecutorRemoved = {
-    val time = (json \ "Timestamp").extract[Long]
-    val executorId = (json \ "Executor ID").extract[String]
-    val reason = (json \ "Removed Reason").extract[String]
+  def executorRemovedFromJson(json: JsonNode): SparkListenerExecutorRemoved = {
+    val time = json.get("Timestamp").extractLong
+    val executorId = json.get("Executor ID").extractString
+    val reason = json.get("Removed Reason").extractString
     SparkListenerExecutorRemoved(time, executorId, reason)
   }
 
-  def logStartFromJson(json: JValue): SparkListenerLogStart = {
-    val sparkVersion = (json \ "Spark Version").extract[String]
+  def logStartFromJson(json: JsonNode): SparkListenerLogStart = {
+    val sparkVersion = json.get("Spark Version").extractString
     SparkListenerLogStart(sparkVersion)
   }
 
-  def executorMetricsUpdateFromJson(json: JValue): SparkListenerExecutorMetricsUpdate = {
-    val execInfo = (json \ "Executor ID").extract[String]
-    val accumUpdates = (json \ "Metrics Updated").extract[List[JValue]].map { json =>
-      val taskId = (json \ "Task ID").extract[Long]
-      val stageId = (json \ "Stage ID").extract[Int]
-      val stageAttemptId = (json \ "Stage Attempt ID").extract[Int]
+  def executorMetricsUpdateFromJson(json: JsonNode): SparkListenerExecutorMetricsUpdate = {
+    val execInfo = json.get("Executor ID").extractString
+    val accumUpdates = json.get("Metrics Updated").extractElements.map { json =>
+      val taskId = json.get("Task ID").extractLong
+      val stageId = json.get("Stage ID").extractInt
+      val stageAttemptId = json.get("Stage Attempt ID").extractInt
       val updates =
-        (json \ "Accumulator Updates").extract[List[JValue]].map(accumulableInfoFromJson)
+        json.get("Accumulator Updates").extractElements.map(accumulableInfoFromJson).toArray.toSeq
       (taskId, stageId, stageAttemptId, updates)
-    }
-    val executorUpdates = (json \ "Executor Metrics Updated") match {
-      case JNothing => Map.empty[(Int, Int), ExecutorMetrics]
-      case value: JValue => value.extract[List[JValue]].map { json =>
-        val stageId = (json \ "Stage ID").extract[Int]
-        val stageAttemptId = (json \ "Stage Attempt ID").extract[Int]
-        val executorMetrics = executorMetricsFromJson(json \ "Executor Metrics")
+    }.toArray.toSeq
+    val executorUpdates = jsonOption(json.get("Executor Metrics Updated")).map { value =>
+      value.extractElements.map { json =>
+        val stageId = json.get("Stage ID").extractInt
+        val stageAttemptId = json.get("Stage Attempt ID").extractInt
+        val executorMetrics = executorMetricsFromJson(json.get("Executor Metrics"))
         ((stageId, stageAttemptId) -> executorMetrics)
       }.toMap
-    }
+    }.getOrElse(Map.empty[(Int, Int), ExecutorMetrics])
     SparkListenerExecutorMetricsUpdate(execInfo, accumUpdates, executorUpdates)
   }
 
-  def stageExecutorMetricsFromJson(json: JValue): SparkListenerStageExecutorMetrics = {
-    val execId = (json \ "Executor ID").extract[String]
-    val stageId = (json \ "Stage ID").extract[Int]
-    val stageAttemptId = (json \ "Stage Attempt ID").extract[Int]
-    val executorMetrics = executorMetricsFromJson(json \ "Executor Metrics")
+  def stageExecutorMetricsFromJson(json: JsonNode): SparkListenerStageExecutorMetrics = {
+    val execId = json.get("Executor ID").extractString
+    val stageId = json.get("Stage ID").extractInt
+    val stageAttemptId = json.get("Stage Attempt ID").extractInt
+    val executorMetrics = executorMetricsFromJson(json.get("Executor Metrics"))
     SparkListenerStageExecutorMetrics(execId, stageId, stageAttemptId, executorMetrics)
   }
 
-  def blockUpdateFromJson(json: JValue): SparkListenerBlockUpdated = {
-    val blockUpdatedInfo = blockUpdatedInfoFromJson(json \ "Block Updated Info")
+  def blockUpdateFromJson(json: JsonNode): SparkListenerBlockUpdated = {
+    val blockUpdatedInfo = blockUpdatedInfoFromJson(json.get("Block Updated Info"))
     SparkListenerBlockUpdated(blockUpdatedInfo)
   }
 
@@ -888,27 +1086,27 @@ private[spark] object JsonProtocol {
    * JSON deserialization methods for classes SparkListenerEvents depend on |
    * ---------------------------------------------------------------------- */
 
-  def stageInfoFromJson(json: JValue): StageInfo = {
-    val stageId = (json \ "Stage ID").extract[Int]
-    val attemptId = jsonOption(json \ "Stage Attempt ID").map(_.extract[Int]).getOrElse(0)
-    val stageName = (json \ "Stage Name").extract[String]
-    val numTasks = (json \ "Number of Tasks").extract[Int]
-    val rddInfos = (json \ "RDD Info").extract[List[JValue]].map(rddInfoFromJson)
-    val parentIds = jsonOption(json \ "Parent IDs")
-      .map { l => l.extract[List[JValue]].map(_.extract[Int]) }
+  def stageInfoFromJson(json: JsonNode): StageInfo = {
+    val stageId = json.get("Stage ID").extractInt
+    val attemptId = jsonOption(json.get("Stage Attempt ID")).map(_.extractInt).getOrElse(0)
+    val stageName = json.get("Stage Name").extractString
+    val numTasks = json.get("Number of Tasks").extractInt
+    val rddInfos = json.get("RDD Info").extractElements.map(rddInfoFromJson).toArray
+    val parentIds = jsonOption(json.get("Parent IDs"))
+      .map { l => l.extractElements.map(_.extractInt).toArray.toSeq }
       .getOrElse(Seq.empty)
-    val details = jsonOption(json \ "Details").map(_.extract[String]).getOrElse("")
-    val submissionTime = jsonOption(json \ "Submission Time").map(_.extract[Long])
-    val completionTime = jsonOption(json \ "Completion Time").map(_.extract[Long])
-    val failureReason = jsonOption(json \ "Failure Reason").map(_.extract[String])
+    val details = jsonOption(json.get("Details")).map(_.asText).getOrElse("")
+    val submissionTime = jsonOption(json.get("Submission Time")).map(_.extractLong)
+    val completionTime = jsonOption(json.get("Completion Time")).map(_.extractLong)
+    val failureReason = jsonOption(json.get("Failure Reason")).map(_.asText)
     val accumulatedValues = {
-      jsonOption(json \ "Accumulables").map(_.extract[List[JValue]]) match {
+      jsonOption(json.get("Accumulables")).map(_.extractElements) match {
         case Some(values) => values.map(accumulableInfoFromJson)
         case None => Seq.empty[AccumulableInfo]
       }
     }
 
-    val rpId = jsonOption(json \ "Resource Profile Id").map(_.extract[Int])
+    val rpId = jsonOption(json.get("Resource Profile Id")).map(_.extractInt)
     val stageProf = rpId.getOrElse(ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     val stageInfo = new StageInfo(stageId, attemptId, stageName, numTasks, rddInfos,
       parentIds, details, resourceProfileId = stageProf)
@@ -921,22 +1119,23 @@ private[spark] object JsonProtocol {
     stageInfo
   }
 
-  def taskInfoFromJson(json: JValue): TaskInfo = {
-    val taskId = (json \ "Task ID").extract[Long]
-    val index = (json \ "Index").extract[Int]
-    val attempt = jsonOption(json \ "Attempt").map(_.extract[Int]).getOrElse(1)
-    val partitionId = jsonOption(json \ "Partition ID").map(_.extract[Int]).getOrElse(-1)
-    val launchTime = (json \ "Launch Time").extract[Long]
-    val executorId = weakIntern((json \ "Executor ID").extract[String])
-    val host = weakIntern((json \ "Host").extract[String])
-    val taskLocality = TaskLocality.withName((json \ "Locality").extract[String])
-    val speculative = jsonOption(json \ "Speculative").exists(_.extract[Boolean])
-    val gettingResultTime = (json \ "Getting Result Time").extract[Long]
-    val finishTime = (json \ "Finish Time").extract[Long]
-    val failed = (json \ "Failed").extract[Boolean]
-    val killed = jsonOption(json \ "Killed").exists(_.extract[Boolean])
-    val accumulables = jsonOption(json \ "Accumulables").map(_.extract[Seq[JValue]]) match {
-      case Some(values) => values.map(accumulableInfoFromJson)
+  def taskInfoFromJson(json: JsonNode): TaskInfo = {
+    val taskId = json.get("Task ID").extractLong
+    val index = json.get("Index").extractInt
+    val attempt = jsonOption(json.get("Attempt")).map(_.extractInt).getOrElse(1)
+    // The "Partition ID" field was added in Spark 3.3.0:
+    val partitionId = jsonOption(json.get("Partition ID")).map(_.extractInt).getOrElse(-1)
+    val launchTime = json.get("Launch Time").extractLong
+    val executorId = weakIntern(json.get("Executor ID").extractString)
+    val host = weakIntern(json.get("Host").extractString)
+    val taskLocality = TaskLocality.withName(json.get("Locality").extractString)
+    val speculative = jsonOption(json.get("Speculative")).exists(_.extractBoolean)
+    val gettingResultTime = json.get("Getting Result Time").extractLong
+    val finishTime = json.get("Finish Time").extractLong
+    val failed = json.get("Failed").extractBoolean
+    val killed = jsonOption(json.get("Killed")).exists(_.extractBoolean)
+    val accumulables = jsonOption(json.get("Accumulables")).map(_.extractElements) match {
+      case Some(values) => values.map(accumulableInfoFromJson).toArray.toSeq
       case None => Seq.empty[AccumulableInfo]
     }
 
@@ -951,15 +1150,15 @@ private[spark] object JsonProtocol {
     taskInfo
   }
 
-  def accumulableInfoFromJson(json: JValue): AccumulableInfo = {
-    val id = (json \ "ID").extract[Long]
-    val name = jsonOption(json \ "Name").map(_.extract[String])
-    val update = jsonOption(json \ "Update").map { v => accumValueFromJson(name, v) }
-    val value = jsonOption(json \ "Value").map { v => accumValueFromJson(name, v) }
-    val internal = jsonOption(json \ "Internal").exists(_.extract[Boolean])
+  def accumulableInfoFromJson(json: JsonNode): AccumulableInfo = {
+    val id = json.get("ID").extractLong
+    val name = jsonOption(json.get("Name")).map(_.asText)
+    val update = jsonOption(json.get("Update")).map { v => accumValueFromJson(name, v) }
+    val value = jsonOption(json.get("Value")).map { v => accumValueFromJson(name, v) }
+    val internal = jsonOption(json.get("Internal")).exists(_.extractBoolean)
     val countFailedValues =
-      jsonOption(json \ "Count Failed Values").exists(_.extract[Boolean])
-    val metadata = jsonOption(json \ "Metadata").map(_.extract[String])
+      jsonOption(json.get("Count Failed Values")).exists(_.extractBoolean)
+    val metadata = jsonOption(json.get("Metadata")).map(_.asText)
     new AccumulableInfo(id, name, update, value, internal, countFailedValues, metadata)
   }
 
@@ -972,98 +1171,96 @@ private[spark] object JsonProtocol {
    *
    * The behavior here must match that of [[accumValueToJson]]. Exposed for testing.
    */
-  private[util] def accumValueFromJson(name: Option[String], value: JValue): Any = {
+  private[util] def accumValueFromJson(name: Option[String], value: JsonNode): Any = {
     if (name.exists(_.startsWith(InternalAccumulator.METRICS_PREFIX))) {
-      value match {
-        case JInt(v) => v.toLong
-        case JArray(v) =>
-          v.map { blockJson =>
-            val id = BlockId((blockJson \ "Block ID").extract[String])
-            val status = blockStatusFromJson(blockJson \ "Status")
-            (id, status)
-          }.asJava
-        case _ => throw new IllegalArgumentException(s"unexpected json value $value for " +
+      if (value.isIntegralNumber) {
+        value.extractLong
+      } else if (value.isArray) {
+        value.extractElements.map { blockJson =>
+          val id = BlockId(blockJson.get("Block ID").extractString)
+          val status = blockStatusFromJson(blockJson.get("Status"))
+          (id, status)
+        }.toArray.toSeq.asJava
+      } else {
+        throw new IllegalArgumentException(s"unexpected json value $value for " +
           "accumulator " + name.get)
       }
     } else {
-      value.extract[String]
+      value.asText
     }
   }
 
-  def taskMetricsFromJson(json: JValue): TaskMetrics = {
+  def taskMetricsFromJson(json: JsonNode): TaskMetrics = {
     val metrics = TaskMetrics.empty
-    if (json == JNothing) {
+    if (json == null || json.isNull) {
       return metrics
     }
-    metrics.setExecutorDeserializeTime((json \ "Executor Deserialize Time").extract[Long])
-    metrics.setExecutorDeserializeCpuTime((json \ "Executor Deserialize CPU Time") match {
-      case JNothing => 0
-      case x => x.extract[Long]
-    })
-    metrics.setExecutorRunTime((json \ "Executor Run Time").extract[Long])
-    metrics.setExecutorCpuTime((json \ "Executor CPU Time") match {
-      case JNothing => 0
-      case x => x.extract[Long]
-    })
-    metrics.setPeakExecutionMemory((json \ "Peak Execution Memory") match {
-      case JNothing => 0
-      case x => x.extract[Long]
-    })
-    metrics.setResultSize((json \ "Result Size").extract[Long])
-    metrics.setJvmGCTime((json \ "JVM GC Time").extract[Long])
-    metrics.setResultSerializationTime((json \ "Result Serialization Time").extract[Long])
-    metrics.incMemoryBytesSpilled((json \ "Memory Bytes Spilled").extract[Long])
-    metrics.incDiskBytesSpilled((json \ "Disk Bytes Spilled").extract[Long])
+    metrics.setExecutorDeserializeTime(json.get("Executor Deserialize Time").extractLong)
+    // The "Executor Deserialize CPU Time" field was added in Spark 2.1.0:
+    metrics.setExecutorDeserializeCpuTime(
+      jsonOption(json.get("Executor Deserialize CPU Time")).map(_.extractLong).getOrElse(0))
+    metrics.setExecutorRunTime(json.get("Executor Run Time").extractLong)
+    // The "Executor CPU Time" field was added in Spark 2.1.0:
+    metrics.setExecutorCpuTime(
+      jsonOption(json.get("Executor CPU Time")).map(_.extractLong).getOrElse(0))
+    // The "Peak Execution Memory" field was added in Spark 3.0.0:
+    metrics.setPeakExecutionMemory(
+      jsonOption(json.get("Peak Execution Memory")).map(_.extractLong).getOrElse(0))
+    metrics.setResultSize(json.get("Result Size").extractLong)
+    metrics.setJvmGCTime(json.get("JVM GC Time").extractLong)
+    metrics.setResultSerializationTime(json.get("Result Serialization Time").extractLong)
+    metrics.incMemoryBytesSpilled(json.get("Memory Bytes Spilled").extractLong)
+    metrics.incDiskBytesSpilled(json.get("Disk Bytes Spilled").extractLong)
 
     // Shuffle read metrics
-    jsonOption(json \ "Shuffle Read Metrics").foreach { readJson =>
+    jsonOption(json.get("Shuffle Read Metrics")).foreach { readJson =>
       val readMetrics = metrics.createTempShuffleReadMetrics()
-      readMetrics.incRemoteBlocksFetched((readJson \ "Remote Blocks Fetched").extract[Int])
-      readMetrics.incLocalBlocksFetched((readJson \ "Local Blocks Fetched").extract[Int])
-      readMetrics.incRemoteBytesRead((readJson \ "Remote Bytes Read").extract[Long])
-      jsonOption(readJson \ "Remote Bytes Read To Disk")
-        .foreach { v => readMetrics.incRemoteBytesReadToDisk(v.extract[Long])}
+      readMetrics.incRemoteBlocksFetched(readJson.get("Remote Blocks Fetched").extractInt)
+      readMetrics.incLocalBlocksFetched(readJson.get("Local Blocks Fetched").extractInt)
+      readMetrics.incRemoteBytesRead(readJson.get("Remote Bytes Read").extractLong)
+      jsonOption(readJson.get("Remote Bytes Read To Disk"))
+        .foreach { v => readMetrics.incRemoteBytesReadToDisk(v.extractLong)}
       readMetrics.incLocalBytesRead(
-        jsonOption(readJson \ "Local Bytes Read").map(_.extract[Long]).getOrElse(0L))
-      readMetrics.incFetchWaitTime((readJson \ "Fetch Wait Time").extract[Long])
+        jsonOption(readJson.get("Local Bytes Read")).map(_.extractLong).getOrElse(0L))
+      readMetrics.incFetchWaitTime(readJson.get("Fetch Wait Time").extractLong)
       readMetrics.incRecordsRead(
-        jsonOption(readJson \ "Total Records Read").map(_.extract[Long]).getOrElse(0L))
+        jsonOption(readJson.get("Total Records Read")).map(_.extractLong).getOrElse(0L))
       metrics.mergeShuffleReadMetrics()
     }
 
     // Shuffle write metrics
     // TODO: Drop the redundant "Shuffle" since it's inconsistent with related classes.
-    jsonOption(json \ "Shuffle Write Metrics").foreach { writeJson =>
+    jsonOption(json.get("Shuffle Write Metrics")).foreach { writeJson =>
       val writeMetrics = metrics.shuffleWriteMetrics
-      writeMetrics.incBytesWritten((writeJson \ "Shuffle Bytes Written").extract[Long])
+      writeMetrics.incBytesWritten(writeJson.get("Shuffle Bytes Written").extractLong)
       writeMetrics.incRecordsWritten(
-        jsonOption(writeJson \ "Shuffle Records Written").map(_.extract[Long]).getOrElse(0L))
-      writeMetrics.incWriteTime((writeJson \ "Shuffle Write Time").extract[Long])
+        jsonOption(writeJson.get("Shuffle Records Written")).map(_.extractLong).getOrElse(0L))
+      writeMetrics.incWriteTime(writeJson.get("Shuffle Write Time").extractLong)
     }
 
     // Output metrics
-    jsonOption(json \ "Output Metrics").foreach { outJson =>
+    jsonOption(json.get("Output Metrics")).foreach { outJson =>
       val outputMetrics = metrics.outputMetrics
-      outputMetrics.setBytesWritten((outJson \ "Bytes Written").extract[Long])
+      outputMetrics.setBytesWritten(outJson.get("Bytes Written").extractLong)
       outputMetrics.setRecordsWritten(
-        jsonOption(outJson \ "Records Written").map(_.extract[Long]).getOrElse(0L))
+        jsonOption(outJson.get("Records Written")).map(_.extractLong).getOrElse(0L))
     }
 
     // Input metrics
-    jsonOption(json \ "Input Metrics").foreach { inJson =>
+    jsonOption(json.get("Input Metrics")).foreach { inJson =>
       val inputMetrics = metrics.inputMetrics
-      inputMetrics.incBytesRead((inJson \ "Bytes Read").extract[Long])
+      inputMetrics.incBytesRead(inJson.get("Bytes Read").extractLong)
       inputMetrics.incRecordsRead(
-        jsonOption(inJson \ "Records Read").map(_.extract[Long]).getOrElse(0L))
+        jsonOption(inJson.get("Records Read")).map(_.extractLong).getOrElse(0L))
     }
 
     // Updated blocks
-    jsonOption(json \ "Updated Blocks").foreach { blocksJson =>
-      metrics.setUpdatedBlockStatuses(blocksJson.extract[List[JValue]].map { blockJson =>
-        val id = BlockId((blockJson \ "Block ID").extract[String])
-        val status = blockStatusFromJson(blockJson \ "Status")
+    jsonOption(json.get("Updated Blocks")).foreach { blocksJson =>
+      metrics.setUpdatedBlockStatuses(blocksJson.extractElements.map { blockJson =>
+        val id = BlockId(blockJson.get("Block ID").extractString)
+        val status = blockStatusFromJson(blockJson.get("Status"))
         (id, status)
-      })
+      }.toArray.toSeq)
     }
 
     metrics
@@ -1081,61 +1278,61 @@ private[spark] object JsonProtocol {
     val unknownReason = Utils.getFormattedClassName(UnknownReason)
   }
 
-  def taskEndReasonFromJson(json: JValue): TaskEndReason = {
+  def taskEndReasonFromJson(json: JsonNode): TaskEndReason = {
     import TASK_END_REASON_FORMATTED_CLASS_NAMES._
 
-    (json \ "Reason").extract[String] match {
+    json.get("Reason").extractString match {
       case `success` => Success
       case `resubmitted` => Resubmitted
       case `fetchFailed` =>
-        val blockManagerAddress = blockManagerIdFromJson(json \ "Block Manager Address")
-        val shuffleId = (json \ "Shuffle ID").extract[Int]
-        val mapId = (json \ "Map ID").extract[Long]
-        val mapIndex = json \ "Map Index" match {
-          case JNothing =>
-            // Note, we use the invalid value Int.MinValue here to fill the map index for backward
-            // compatibility. Otherwise, the fetch failed event will be dropped when the history
-            // server loads the event log written by the Spark version before 3.0.
-            Int.MinValue
-          case x => x.extract[Int]
+        val blockManagerAddress = blockManagerIdFromJson(json.get("Block Manager Address"))
+        val shuffleId = json.get("Shuffle ID").extractInt
+        val mapId = json.get("Map ID").extractLong
+        val mapIndex = jsonOption(json.get("Map Index")).map(_.extractInt).getOrElse {
+          // Note, we use the invalid value Int.MinValue here to fill the map index for backward
+          // compatibility. Otherwise, the fetch failed event will be dropped when the history
+          // server loads the event log written by the Spark version before 3.0.
+          Int.MinValue
         }
-        val reduceId = (json \ "Reduce ID").extract[Int]
-        val message = jsonOption(json \ "Message").map(_.extract[String])
+        val reduceId = json.get("Reduce ID").extractInt
+        val message = jsonOption(json.get("Message")).map(_.asText)
         new FetchFailed(blockManagerAddress, shuffleId, mapId, mapIndex, reduceId,
           message.getOrElse("Unknown reason"))
       case `exceptionFailure` =>
-        val className = (json \ "Class Name").extract[String]
-        val description = (json \ "Description").extract[String]
-        val stackTrace = stackTraceFromJson(json \ "Stack Trace")
+        val className = json.get("Class Name").extractString
+        val description = json.get("Description").extractString
+        val stackTrace = stackTraceFromJson(json.get("Stack Trace"))
         val fullStackTrace =
-          jsonOption(json \ "Full Stack Trace").map(_.extract[String]).orNull
+          jsonOption(json.get("Full Stack Trace")).map(_.asText).orNull
         // Fallback on getting accumulator updates from TaskMetrics, which was logged in Spark 1.x
-        val accumUpdates = jsonOption(json \ "Accumulator Updates")
-          .map(_.extract[List[JValue]].map(accumulableInfoFromJson))
-          .getOrElse(taskMetricsFromJson(json \ "Metrics").accumulators().map(acc => {
+        val accumUpdates = jsonOption(json.get("Accumulator Updates"))
+          .map(_.extractElements.map(accumulableInfoFromJson).toArray.toSeq)
+          .getOrElse(taskMetricsFromJson(json.get("Metrics")).accumulators().map(acc => {
             acc.toInfo(Some(acc.value), None)
-          }))
+          }).toArray.toSeq)
         ExceptionFailure(className, description, stackTrace, fullStackTrace, None, accumUpdates)
       case `taskResultLost` => TaskResultLost
       case `taskKilled` =>
-        val killReason = jsonOption(json \ "Kill Reason")
-          .map(_.extract[String]).getOrElse("unknown reason")
-        val accumUpdates = jsonOption(json \ "Accumulator Updates")
-          .map(_.extract[List[JValue]].map(accumulableInfoFromJson))
+      // The "Kill Reason" field was added in Spark 2.2.0:
+        val killReason = jsonOption(json.get("Kill Reason"))
+          .map(_.asText).getOrElse("unknown reason")
+        // The "Accumulator Updates" field was added in Spark 2.4.0:
+        val accumUpdates = jsonOption(json.get("Accumulator Updates"))
+          .map(_.extractElements.map(accumulableInfoFromJson).toArray.toSeq)
           .getOrElse(Seq[AccumulableInfo]())
         TaskKilled(killReason, accumUpdates)
       case `taskCommitDenied` =>
         // Unfortunately, the `TaskCommitDenied` message was introduced in 1.3.0 but the JSON
         // de/serialization logic was not added until 1.5.1. To provide backward compatibility
         // for reading those logs, we need to provide default values for all the fields.
-        val jobId = jsonOption(json \ "Job ID").map(_.extract[Int]).getOrElse(-1)
-        val partitionId = jsonOption(json \ "Partition ID").map(_.extract[Int]).getOrElse(-1)
-        val attemptNo = jsonOption(json \ "Attempt Number").map(_.extract[Int]).getOrElse(-1)
+        val jobId = jsonOption(json.get("Job ID")).map(_.extractInt).getOrElse(-1)
+        val partitionId = jsonOption(json.get("Partition ID")).map(_.extractInt).getOrElse(-1)
+        val attemptNo = jsonOption(json.get("Attempt Number")).map(_.extractInt).getOrElse(-1)
         TaskCommitDenied(jobId, partitionId, attemptNo)
       case `executorLostFailure` =>
-        val exitCausedByApp = jsonOption(json \ "Exit Caused By App").map(_.extract[Boolean])
-        val executorId = jsonOption(json \ "Executor ID").map(_.extract[String])
-        val reason = jsonOption(json \ "Loss Reason").map(_.extract[String])
+        val exitCausedByApp = jsonOption(json.get("Exit Caused By App")).map(_.extractBoolean)
+        val executorId = jsonOption(json.get("Executor ID")).map(_.asText)
+        val reason = jsonOption(json.get("Loss Reason")).map(_.asText)
         ExecutorLostFailure(
           executorId.getOrElse("Unknown"),
           exitCausedByApp.getOrElse(true),
@@ -1144,14 +1341,14 @@ private[spark] object JsonProtocol {
     }
   }
 
-  def blockManagerIdFromJson(json: JValue): BlockManagerId = {
+  def blockManagerIdFromJson(json: JsonNode): BlockManagerId = {
     // On metadata fetch fail, block manager ID can be null (SPARK-4471)
-    if (json == JNothing) {
+    if (json == null || json.isNull) {
       return null
     }
-    val executorId = weakIntern((json \ "Executor ID").extract[String])
-    val host = weakIntern((json \ "Host").extract[String])
-    val port = (json \ "Port").extract[Int]
+    val executorId = weakIntern(json.get("Executor ID").extractString)
+    val host = weakIntern(json.get("Host").extractString)
+    val port = json.get("Port").extractInt
     BlockManagerId(executorId, host, port)
   }
 
@@ -1160,36 +1357,37 @@ private[spark] object JsonProtocol {
     val jobFailed = Utils.getFormattedClassName(JobFailed)
   }
 
-  def jobResultFromJson(json: JValue): JobResult = {
+  def jobResultFromJson(json: JsonNode): JobResult = {
     import JOB_RESULT_FORMATTED_CLASS_NAMES._
 
-    (json \ "Result").extract[String] match {
+    json.get("Result").extractString match {
       case `jobSucceeded` => JobSucceeded
       case `jobFailed` =>
-        val exception = exceptionFromJson(json \ "Exception")
+        val exception = exceptionFromJson(json.get("Exception"))
         new JobFailed(exception)
     }
   }
 
-  def rddInfoFromJson(json: JValue): RDDInfo = {
-    val rddId = (json \ "RDD ID").extract[Int]
-    val name = (json \ "Name").extract[String]
-    val scope = jsonOption(json \ "Scope")
-      .map(_.extract[String])
+  def rddInfoFromJson(json: JsonNode): RDDInfo = {
+    val rddId = json.get("RDD ID").extractInt
+    val name = json.get("Name").extractString
+    val scope = jsonOption(json.get("Scope"))
+      .map(_.asText)
       .map(RDDOperationScope.fromJson)
-    val callsite = jsonOption(json \ "Callsite").map(_.extract[String]).getOrElse("")
-    val parentIds = jsonOption(json \ "Parent IDs")
-      .map { l => l.extract[List[JValue]].map(_.extract[Int]) }
+    val callsite = jsonOption(json.get("Callsite")).map(_.asText).getOrElse("")
+    val parentIds = jsonOption(json.get("Parent IDs"))
+      .map { l => l.extractElements.map(_.extractInt).toArray.toSeq }
       .getOrElse(Seq.empty)
-    val storageLevel = storageLevelFromJson(json \ "Storage Level")
-    val isBarrier = jsonOption(json \ "Barrier").map(_.extract[Boolean]).getOrElse(false)
-    val numPartitions = (json \ "Number of Partitions").extract[Int]
-    val numCachedPartitions = (json \ "Number of Cached Partitions").extract[Int]
-    val memSize = (json \ "Memory Size").extract[Long]
-    val diskSize = (json \ "Disk Size").extract[Long]
+    val storageLevel = storageLevelFromJson(json.get("Storage Level"))
+    // The "Barrier" field was added in Spark 3.0.0:
+    val isBarrier = jsonOption(json.get("Barrier")).map(_.extractBoolean).getOrElse(false)
+    val numPartitions = json.get("Number of Partitions").extractInt
+    val numCachedPartitions = json.get("Number of Cached Partitions").extractInt
+    val memSize = json.get("Memory Size").extractLong
+    val diskSize = json.get("Disk Size").extractLong
 
     val outputDeterministicLevel = DeterministicLevel.withName(
-      jsonOption(json \ "DeterministicLevel").map(_.extract[String]).getOrElse("DETERMINATE"))
+      jsonOption(json.get("DeterministicLevel")).map(_.asText).getOrElse("DETERMINATE"))
 
     val rddInfo =
       new RDDInfo(rddId, name, numPartitions, storageLevel, isBarrier, parentIds, callsite, scope,
@@ -1200,16 +1398,16 @@ private[spark] object JsonProtocol {
     rddInfo
   }
 
-  def storageLevelFromJson(json: JValue): StorageLevel = {
-    val useDisk = (json \ "Use Disk").extract[Boolean]
-    val useMemory = (json \ "Use Memory").extract[Boolean]
+  def storageLevelFromJson(json: JsonNode): StorageLevel = {
+    val useDisk = json.get("Use Disk").extractBoolean
+    val useMemory = json.get("Use Memory").extractBoolean
     // The "Use Off Heap" field was added in Spark 3.4.0
-    val useOffHeap = jsonOption(json \ "Use Off Heap") match {
-      case Some(value) => value.extract[Boolean]
+    val useOffHeap = jsonOption(json.get("Use Off Heap")) match {
+      case Some(value) => value.extractBoolean
       case None => false
     }
-    val deserialized = (json \ "Deserialized").extract[Boolean]
-    val replication = (json \ "Replication").extract[Int]
+    val deserialized = json.get("Deserialized").extractBoolean
+    val replication = json.get("Replication").extractInt
     StorageLevel(
       useDisk = useDisk,
       useMemory = useMemory,
@@ -1218,54 +1416,59 @@ private[spark] object JsonProtocol {
       replication = replication)
   }
 
-  def blockStatusFromJson(json: JValue): BlockStatus = {
-    val storageLevel = storageLevelFromJson(json \ "Storage Level")
-    val memorySize = (json \ "Memory Size").extract[Long]
-    val diskSize = (json \ "Disk Size").extract[Long]
+  def blockStatusFromJson(json: JsonNode): BlockStatus = {
+    val storageLevel = storageLevelFromJson(json.get("Storage Level"))
+    val memorySize = json.get("Memory Size").extractLong
+    val diskSize = json.get("Disk Size").extractLong
     BlockStatus(storageLevel, memorySize, diskSize)
   }
 
-  def executorInfoFromJson(json: JValue): ExecutorInfo = {
-    val executorHost = (json \ "Host").extract[String]
-    val totalCores = (json \ "Total Cores").extract[Int]
-    val logUrls = mapFromJson(json \ "Log Urls").toMap
-    val attributes = jsonOption(json \ "Attributes") match {
+  def executorInfoFromJson(json: JsonNode): ExecutorInfo = {
+    val executorHost = json.get("Host").extractString
+    val totalCores = json.get("Total Cores").extractInt
+    val logUrls = mapFromJson(json.get("Log Urls")).toMap
+    // The "Attributes" field was added in Spark 3.0.0:
+    val attributes = jsonOption(json.get("Attributes")) match {
       case Some(attr) => mapFromJson(attr).toMap
       case None => Map.empty[String, String]
     }
-    val resources = jsonOption(json \ "Resources") match {
+    // The "Resources" field was added in Spark 3.0.0:
+    val resources = jsonOption(json.get("Resources")) match {
       case Some(resources) => resourcesMapFromJson(resources).toMap
       case None => Map.empty[String, ResourceInformation]
     }
-    val resourceProfileId = jsonOption(json \ "Resource Profile Id") match {
-      case Some(id) => id.extract[Int]
+    // The "Resource Profile Id" field was added in Spark 3.4.0
+    val resourceProfileId = jsonOption(json.get("Resource Profile Id")) match {
+      case Some(id) => id.extractInt
       case None => ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
     }
-    val registrationTs = jsonOption(json \ "Registration Time") map { ts =>
-      ts.extract[Long]
+    // The "Registration Time" field was added in Spark 3.4.0
+    val registrationTs = jsonOption(json.get("Registration Time")) map { ts =>
+      ts.extractLong
     }
-    val requestTs = jsonOption(json \ "Request Time") map { ts =>
-      ts.extract[Long]
+    // The "Request Time" field was added in Spark 3.4.0
+    val requestTs = jsonOption(json.get("Request Time")) map { ts =>
+      ts.extractLong
     }
 
     new ExecutorInfo(executorHost, totalCores, logUrls, attributes.toMap, resources.toMap,
       resourceProfileId, registrationTs, requestTs)
   }
 
-  def blockUpdatedInfoFromJson(json: JValue): BlockUpdatedInfo = {
-    val blockManagerId = blockManagerIdFromJson(json \ "Block Manager ID")
-    val blockId = BlockId((json \ "Block ID").extract[String])
-    val storageLevel = storageLevelFromJson(json \ "Storage Level")
-    val memorySize = (json \ "Memory Size").extract[Long]
-    val diskSize = (json \ "Disk Size").extract[Long]
+  def blockUpdatedInfoFromJson(json: JsonNode): BlockUpdatedInfo = {
+    val blockManagerId = blockManagerIdFromJson(json.get("Block Manager ID"))
+    val blockId = BlockId(json.get("Block ID").extractString)
+    val storageLevel = storageLevelFromJson(json.get("Storage Level"))
+    val memorySize = json.get("Memory Size").extractLong
+    val diskSize = json.get("Disk Size").extractLong
     BlockUpdatedInfo(blockManagerId, blockId, storageLevel, memorySize, diskSize)
   }
 
-  def resourcesMapFromJson(json: JValue): Map[String, ResourceInformation] = {
-    val jsonFields = json.asInstanceOf[JObject].obj
-    jsonFields.collect { case JField(k, v) =>
-      val resourceInfo = ResourceInformation.parseJson(v)
-      (k, resourceInfo)
+  def resourcesMapFromJson(json: JsonNode): Map[String, ResourceInformation] = {
+    assert(json.isObject, s"expected object, got ${json.getNodeType}")
+    json.fields.asScala.map { field =>
+      val resourceInfo = ResourceInformation.parseJson(field.getValue.toString)
+      (field.getKey, resourceInfo)
     }.toMap
   }
 
@@ -1273,49 +1476,86 @@ private[spark] object JsonProtocol {
    * Util JSON deserialization methods |
    * --------------------------------- */
 
-  def mapFromJson(json: JValue): Map[String, String] = {
-    val jsonFields = json.asInstanceOf[JObject].obj
-    jsonFields.collect { case JField(k, JString(v)) => (k, v) }.toMap
+  def mapFromJson(json: JsonNode): Map[String, String] = {
+    assert(json.isObject, s"expected object, got ${json.getNodeType}")
+    json.fields.asScala.map { field =>
+      (field.getKey, field.getValue.extractString)
+    }.toMap
   }
 
-  def propertiesFromJson(json: JValue): Properties = {
+  def propertiesFromJson(json: JsonNode): Properties = {
     jsonOption(json).map { value =>
       val properties = new Properties
-      mapFromJson(json).foreach { case (k, v) => properties.setProperty(k, v) }
+      mapFromJson(value).foreach { case (k, v) => properties.setProperty(k, v) }
       properties
     }.orNull
   }
 
-  def UUIDFromJson(json: JValue): UUID = {
-    val leastSignificantBits = (json \ "Least Significant Bits").extract[Long]
-    val mostSignificantBits = (json \ "Most Significant Bits").extract[Long]
+  def UUIDFromJson(json: JsonNode): UUID = {
+    val leastSignificantBits = json.get("Least Significant Bits").extractLong
+    val mostSignificantBits = json.get("Most Significant Bits").extractLong
     new UUID(leastSignificantBits, mostSignificantBits)
   }
 
-  def stackTraceFromJson(json: JValue): Array[StackTraceElement] = {
-    json.extract[List[JValue]].map { line =>
-      val declaringClass = (line \ "Declaring Class").extract[String]
-      val methodName = (line \ "Method Name").extract[String]
-      val fileName = (line \ "File Name").extract[String]
-      val lineNumber = (line \ "Line Number").extract[Int]
+  def stackTraceFromJson(json: JsonNode): Array[StackTraceElement] = {
+    json.extractElements.map { line =>
+      val declaringClass = line.get("Declaring Class").extractString
+      val methodName = line.get("Method Name").extractString
+      val fileName = line.get("File Name").extractString
+      val lineNumber = line.get("Line Number").extractInt
       new StackTraceElement(declaringClass, methodName, fileName, lineNumber)
     }.toArray
   }
 
-  def exceptionFromJson(json: JValue): Exception = {
-    val e = new Exception((json \ "Message").extract[String])
-    e.setStackTrace(stackTraceFromJson(json \ "Stack Trace"))
+  def exceptionFromJson(json: JsonNode): Exception = {
+    val e = new Exception(json.get("Message").extractString)
+    e.setStackTrace(stackTraceFromJson(json.get("Stack Trace")))
     e
   }
 
-  /** Return an option that translates JNothing to None */
-  private def jsonOption(json: JValue): Option[JValue] = {
-    json match {
-      case JNothing => None
-      case value: JValue => Some(value)
+  /** Return an option that translates NullNode to None */
+  private def jsonOption(json: JsonNode): Option[JsonNode] = {
+    if (json == null || json.isNull) {
+      None
+    } else {
+      Some(json)
     }
   }
 
-  private def emptyJson: JObject = JObject(List[JField]())
+  /**
+   * Implicit conversions to add methods to JsonNode that perform type-checking when
+   * reading fields. This ensures that JSON parsing will fail if we process JSON with
+   * unexpected input types (instead of silently falling back to default values).
+   */
+  private implicit class JsonNodeImplicits(json: JsonNode) {
+    def extractElements: Iterator[JsonNode] = {
+      require(json.isContainerNode, s"Expected container, got ${json.getNodeType}")
+      json.elements.asScala
+    }
+
+    def extractBoolean: Boolean = {
+      require(json.isBoolean, s"Expected boolean, got ${json.getNodeType}")
+      json.booleanValue
+    }
+
+    def extractInt: Int = {
+      require(json.isNumber, s"Expected number, got ${json.getNodeType}")
+      json.intValue
+    }
+
+    def extractLong: Long = {
+      require(json.isNumber, s"Expected number, got ${json.getNodeType}")
+      json.longValue
+    }
+
+    def extractDouble: Double = {
+      require(json.isNumber, s"Expected number, got ${json.getNodeType}")
+      json.doubleValue
+    }
 
+    def extractString: String = {
+      require(json.isTextual, s"Expected string, got ${json.getNodeType}")
+      json.textValue
+    }
+  }
 }
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala
index 7d07af4d724..5297ac5aac8 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileCompactorSuite.scala
@@ -21,7 +21,6 @@ import scala.collection.mutable
 import scala.io.{Codec, Source}
 
 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
-import org.json4s.jackson.JsonMethods.parse
 
 import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.deploy.SparkHadoopUtil
@@ -162,7 +161,7 @@ class EventLogFileCompactorSuite extends SparkFunSuite {
         val lines = Source.fromInputStream(is)(Codec.UTF8).getLines().toList
         assert(lines.length === 2, "Compacted file should have only two events being accepted")
         lines.foreach { line =>
-          val event = JsonProtocol.sparkEventFromJson(parse(line))
+          val event = JsonProtocol.sparkEventFromJson(line)
           assert(!event.isInstanceOf[SparkListenerJobStart] &&
             !event.isInstanceOf[SparkListenerJobEnd])
         }
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala
index 298fd65f293..a68086256d1 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogTestHelper.scala
@@ -22,7 +22,6 @@ import java.nio.charset.StandardCharsets
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileStatus, FileSystem, Path}
-import org.json4s.jackson.JsonMethods.{compact, render}
 
 import org.apache.spark.SparkConf
 import org.apache.spark.internal.config._
@@ -107,6 +106,6 @@ object EventLogTestHelper {
   }
 
   def convertEvent(event: SparkListenerEvent): String = {
-    compact(render(JsonProtocol.sparkEventToJson(event)))
+    JsonProtocol.sparkEventToJsonString(event)
   }
 }
diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
index 541b283c13f..afe8c61a524 100644
--- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala
@@ -30,7 +30,6 @@ import org.apache.commons.io.FileUtils
 import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataInputStream, Path}
 import org.apache.hadoop.hdfs.{DFSInputStream, DistributedFileSystem}
 import org.apache.hadoop.security.AccessControlException
-import org.json4s.jackson.JsonMethods._
 import org.mockito.ArgumentMatchers.{any, argThat}
 import org.mockito.Mockito.{doThrow, mock, spy, verify, when}
 import org.scalatest.PrivateMethodTester
@@ -1730,13 +1729,13 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with L
     val bstream = new BufferedOutputStream(cstream)
 
     val metadata = SparkListenerLogStart(org.apache.spark.SPARK_VERSION)
-    val eventJson = JsonProtocol.logStartToJson(metadata)
-    val metadataJson = compact(eventJson) + "\n"
+    val eventJsonString = JsonProtocol.sparkEventToJsonString(metadata)
+    val metadataJson = eventJsonString + "\n"
     bstream.write(metadataJson.getBytes(StandardCharsets.UTF_8))
 
     val writer = new OutputStreamWriter(bstream, StandardCharsets.UTF_8)
     Utils.tryWithSafeFinally {
-      events.foreach(e => writer.write(compact(render(JsonProtocol.sparkEventToJson(e))) + "\n"))
+      events.foreach(e => writer.write(JsonProtocol.sparkEventToJsonString(e) + "\n"))
     } {
       writer.close()
     }
diff --git a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
index d790c35a330..2a5a44ad783 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/EventLoggingListenerSuite.scala
@@ -26,7 +26,6 @@ import scala.collection.mutable.Set
 import scala.io.{Codec, Source}
 
 import org.apache.hadoop.fs.Path
-import org.json4s.jackson.JsonMethods._
 import org.mockito.Mockito
 import org.scalatest.BeforeAndAfter
 
@@ -140,7 +139,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
       assert(lines(2).contains("SparkListenerJobStart"))
 
       lines.foreach{
-        line => JsonProtocol.sparkEventFromJson(parse(line)) match {
+        line => JsonProtocol.sparkEventFromJson(line) match {
           case logStartEvent: SparkListenerLogStart =>
             assert(logStartEvent == logStart)
 
@@ -180,7 +179,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
     sc.stop()
 
     val eventLogStream = EventLogFileReader.openEventLog(new Path(testDirPath, appId), fileSystem)
-    val events = readLines(eventLogStream).map(line => JsonProtocol.sparkEventFromJson(parse(line)))
+    val events = readLines(eventLogStream).map(line => JsonProtocol.sparkEventFromJson(line))
     val jobStartEvents = events
       .filter(event => event.isInstanceOf[SparkListenerJobStart])
       .map(_.asInstanceOf[SparkListenerJobStart])
@@ -248,9 +247,9 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
       assert(lines(0).contains("SparkListenerLogStart"))
       assert(lines(1).contains("SparkListenerApplicationStart"))
       assert(lines(2).contains("SparkListenerApplicationEnd"))
-      assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
-      assert(JsonProtocol.sparkEventFromJson(parse(lines(1))) === applicationStart)
-      assert(JsonProtocol.sparkEventFromJson(parse(lines(2))) === applicationEnd)
+      assert(JsonProtocol.sparkEventFromJson(lines(0)) === logStart)
+      assert(JsonProtocol.sparkEventFromJson(lines(1)) === applicationStart)
+      assert(JsonProtocol.sparkEventFromJson(lines(2)) === applicationEnd)
     } finally {
       logData.close()
     }
@@ -307,7 +306,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
       lines.foreach { line =>
         eventSet.foreach { event =>
           if (line.contains(event)) {
-            val parsedEvent = JsonProtocol.sparkEventFromJson(parse(line))
+            val parsedEvent = JsonProtocol.sparkEventFromJson(line)
             val eventType = Utils.getFormattedClassName(parsedEvent)
             if (eventType == event) {
               eventSet.remove(event)
@@ -315,7 +314,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
           }
         }
       }
-      assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
+      assert(JsonProtocol.sparkEventFromJson(lines(0)) === logStart)
       assert(eventSet.isEmpty, "The following events are missing: " + eventSet.toSeq)
     } {
       logData.close()
@@ -518,7 +517,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
       assert(lines.size === 25)
       assert(lines(0).contains("SparkListenerLogStart"))
       assert(lines(1).contains("SparkListenerApplicationStart"))
-      assert(JsonProtocol.sparkEventFromJson(parse(lines(0))) === logStart)
+      assert(JsonProtocol.sparkEventFromJson(lines(0)) === logStart)
       var logIdx = 1
       events.foreach { event =>
         event match {
@@ -609,7 +608,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
   /** Check that the Spark history log line matches the expected event. */
   private def checkEvent(line: String, event: SparkListenerEvent): Unit = {
     assert(line.contains(event.getClass.toString.split("\\.").last))
-    val parsed = JsonProtocol.sparkEventFromJson(parse(line))
+    val parsed = JsonProtocol.sparkEventFromJson(line)
     assert(parsed.getClass === event.getClass)
     (event, parsed) match {
       case (expected: SparkListenerStageSubmitted, actual: SparkListenerStageSubmitted) =>
@@ -641,7 +640,7 @@ class EventLoggingListenerSuite extends SparkFunSuite with LocalSparkContext wit
       line: String,
       stageId: Int,
       expectedEvents: Map[(Int, String), SparkListenerStageExecutorMetrics]): String = {
-    JsonProtocol.sparkEventFromJson(parse(line)) match {
+    JsonProtocol.sparkEventFromJson(line) match {
       case executorMetrics: SparkListenerStageExecutorMetrics =>
           expectedEvents.get((stageId, executorMetrics.execId)) match {
             case Some(expectedMetrics) =>
diff --git a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
index cb50c7c9597..77d9ae88fbc 100644
--- a/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/scheduler/ReplayListenerSuite.scala
@@ -24,8 +24,6 @@ import java.util.concurrent.atomic.AtomicInteger
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.fs.Path
-import org.json4s.JsonAST.JValue
-import org.json4s.jackson.JsonMethods._
 import org.scalatest.BeforeAndAfter
 
 import org.apache.spark._
@@ -60,8 +58,8 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
     val applicationEnd = SparkListenerApplicationEnd(1000L)
     Utils.tryWithResource(new PrintWriter(fwriter)) { writer =>
       // scalastyle:off println
-      writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
-      writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
+      writer.println(JsonProtocol.sparkEventToJsonString(applicationStart))
+      writer.println(JsonProtocol.sparkEventToJsonString(applicationEnd))
       // scalastyle:on println
     }
 
@@ -76,8 +74,8 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
       logData.close()
     }
     assert(eventMonster.loggedEvents.size === 2)
-    assert(eventMonster.loggedEvents(0) === JsonProtocol.sparkEventToJson(applicationStart))
-    assert(eventMonster.loggedEvents(1) === JsonProtocol.sparkEventToJson(applicationEnd))
+    assert(eventMonster.loggedEvents(0) === JsonProtocol.sparkEventToJsonString(applicationStart))
+    assert(eventMonster.loggedEvents(1) === JsonProtocol.sparkEventToJsonString(applicationEnd))
   }
 
   /**
@@ -99,8 +97,8 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
       val applicationEnd = SparkListenerApplicationEnd(1000L)
 
       // scalastyle:off println
-      writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
-      writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
+      writer.println(JsonProtocol.sparkEventToJsonString(applicationStart))
+      writer.println(JsonProtocol.sparkEventToJsonString(applicationEnd))
       // scalastyle:on println
     }
 
@@ -144,9 +142,9 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
     val applicationEnd = SparkListenerApplicationEnd(1000L)
     Utils.tryWithResource(new PrintWriter(fwriter)) { writer =>
       // scalastyle:off println
-      writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationStart))))
+      writer.println(JsonProtocol.sparkEventToJsonString(applicationStart))
       writer.println("""{"Event":"UnrecognizedEventOnlyForTest","Timestamp":1477593059313}""")
-      writer.println(compact(render(JsonProtocol.sparkEventToJson(applicationEnd))))
+      writer.println(JsonProtocol.sparkEventToJsonString(applicationEnd))
       // scalastyle:on println
     }
 
@@ -161,8 +159,8 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
       logData.close()
     }
     assert(eventMonster.loggedEvents.size === 2)
-    assert(eventMonster.loggedEvents(0) === JsonProtocol.sparkEventToJson(applicationStart))
-    assert(eventMonster.loggedEvents(1) === JsonProtocol.sparkEventToJson(applicationEnd))
+    assert(eventMonster.loggedEvents(0) === JsonProtocol.sparkEventToJsonString(applicationStart))
+    assert(eventMonster.loggedEvents(1) === JsonProtocol.sparkEventToJsonString(applicationEnd))
   }
 
   // This assumes the correctness of EventLoggingListener
@@ -226,9 +224,9 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
     // Verify the same events are replayed in the same order
     assert(sc.eventLogger.isDefined)
     val originalEvents = sc.eventLogger.get.loggedEvents
-      .map(JsonProtocol.sparkEventFromJson(_))
+      .map(JsonProtocol.sparkEventFromJson)
     val replayedEvents = eventMonster.loggedEvents
-      .map(JsonProtocol.sparkEventFromJson(_))
+      .map(JsonProtocol.sparkEventFromJson)
     originalEvents.zip(replayedEvents).foreach { case (e1, e2) =>
       JsonProtocolSuite.assertEquals(e1, e1)
     }
@@ -245,10 +243,10 @@ class ReplayListenerSuite extends SparkFunSuite with BeforeAndAfter with LocalSp
    */
   private class EventBufferingListener extends SparkFirehoseListener {
 
-    private[scheduler] val loggedEvents = new ArrayBuffer[JValue]
+    private[scheduler] val loggedEvents = new ArrayBuffer[String]
 
     override def onEvent(event: SparkListenerEvent): Unit = {
-      val eventJson = JsonProtocol.sparkEventToJson(event)
+      val eventJson = JsonProtocol.sparkEventToJsonString(event)
       loggedEvents += eventJson
     }
   }
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 7a18223ec5b..9c5d580d433 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -21,7 +21,10 @@ import java.util.Properties
 
 import scala.collection.JavaConverters._
 import scala.collection.Map
+import scala.language.implicitConversions
 
+import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
+import com.fasterxml.jackson.databind.node.{ObjectNode, TextNode}
 import org.json4s.JsonAST.{JArray, JInt, JString, JValue}
 import org.json4s.JsonDSL._
 import org.json4s.jackson.JsonMethods._
@@ -33,17 +36,21 @@ import org.apache.spark.executor._
 import org.apache.spark.metrics.ExecutorMetricType
 import org.apache.spark.rdd.{DeterministicLevel, RDDOperationScope}
 import org.apache.spark.resource._
+import org.apache.spark.resource.ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID
 import org.apache.spark.scheduler._
 import org.apache.spark.scheduler.cluster.ExecutorInfo
 import org.apache.spark.shuffle.MetadataFetchFailedException
 import org.apache.spark.storage._
 
 class JsonProtocolSuite extends SparkFunSuite {
+  import JsonProtocol.toJsonString
   import JsonProtocolSuite._
 
   test("SparkListenerEvent") {
     val stageSubmitted =
       SparkListenerStageSubmitted(makeStageInfo(100, 200, 300, 400L, 500L), properties)
+    val stageSubmittedWithNullProperties =
+      SparkListenerStageSubmitted(makeStageInfo(100, 200, 300, 400L, 500L), properties = null)
     val stageCompleted = SparkListenerStageCompleted(makeStageInfo(101, 201, 301, 401L, 501L))
     val taskStart = SparkListenerTaskStart(111, 0, makeTaskInfo(222L, 333, 1, 333, 444L, false))
     val taskGettingResult =
@@ -72,6 +79,9 @@ class JsonProtocolSuite extends SparkFunSuite {
         makeStageInfo(x, x * 200, x * 300, x * 400L, x * 500L))
       SparkListenerJobStart(10, jobSubmissionTime, stageInfos, properties)
     }
+    val jobStartWithNullProperties = {
+      SparkListenerJobStart(10, jobSubmissionTime, stageInfos = Seq.empty, properties = null)
+    }
     val jobEnd = SparkListenerJobEnd(20, jobCompletionTime, JobSucceeded)
     val environmentUpdate = SparkListenerEnvironmentUpdate(Map[String, Seq[(String, String)]](
       "JVM Information" -> Seq(("GC speed", "9999 objects/s"), ("Java home", "Land of coffee")),
@@ -149,6 +159,7 @@ class JsonProtocolSuite extends SparkFunSuite {
     resourceProfile.setResourceProfileId(21)
     val resourceProfileAdded = SparkListenerResourceProfileAdded(resourceProfile)
     testEvent(stageSubmitted, stageSubmittedJsonString)
+    testEvent(stageSubmittedWithNullProperties, stageSubmittedWithNullPropertiesJsonString)
     testEvent(stageCompleted, stageCompletedJsonString)
     testEvent(taskStart, taskStartJsonString)
     testEvent(taskGettingResult, taskGettingResultJsonString)
@@ -156,6 +167,7 @@ class JsonProtocolSuite extends SparkFunSuite {
     testEvent(taskEndWithHadoopInput, taskEndWithHadoopInputJsonString)
     testEvent(taskEndWithOutput, taskEndWithOutputJsonString)
     testEvent(jobStart, jobStartJsonString)
+    testEvent(jobStartWithNullProperties, jobStartWithNullPropertiesJsonString)
     testEvent(jobEnd, jobEndJsonString)
     testEvent(environmentUpdate, environmentUpdateJsonString)
     testEvent(blockManagerAdded, blockManagerAddedJsonString)
@@ -248,21 +260,21 @@ class JsonProtocolSuite extends SparkFunSuite {
 
   test("ExceptionFailure backward compatibility: full stack trace") {
     val exceptionFailure = ExceptionFailure("To be", "or not to be", stackTrace, null, None)
-    val oldEvent = JsonProtocol.taskEndReasonToJson(exceptionFailure)
-      .removeField({ _._1 == "Full Stack Trace" })
+    val oldEvent = toJsonString(JsonProtocol.taskEndReasonToJson(exceptionFailure, _))
+      .removeField("Full Stack Trace")
     assertEquals(exceptionFailure, JsonProtocol.taskEndReasonFromJson(oldEvent))
   }
 
   test("StageInfo backward compatibility (details, accumulables)") {
     val info = makeStageInfo(1, 2, 3, 4L, 5L)
-    val newJson = JsonProtocol.stageInfoToJson(info)
+    val newJson = toJsonString(JsonProtocol.stageInfoToJson(info, _))
 
     // Fields added after 1.0.0.
     assert(info.details.nonEmpty)
     assert(info.accumulables.nonEmpty)
     val oldJson = newJson
-      .removeField { case (field, _) => field == "Details" }
-      .removeField { case (field, _) => field == "Accumulables" }
+      .removeField("Details")
+      .removeField("Accumulables")
 
     val newInfo = JsonProtocol.stageInfoFromJson(oldJson)
 
@@ -273,7 +285,7 @@ class JsonProtocolSuite extends SparkFunSuite {
 
   test("StageInfo resourceProfileId") {
     val info = makeStageInfo(1, 2, 3, 4L, 5L, 5)
-    val json = JsonProtocol.stageInfoToJson(info)
+    val json = toJsonString(JsonProtocol.stageInfoToJson(info, _))
 
     // Fields added after 1.0.0.
     assert(info.details.nonEmpty)
@@ -288,18 +300,21 @@ class JsonProtocolSuite extends SparkFunSuite {
   test("InputMetrics backward compatibility") {
     // InputMetrics were added after 1.0.1.
     val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true, hasOutput = false)
-    val newJson = JsonProtocol.taskMetricsToJson(metrics)
-    val oldJson = newJson.removeField { case (field, _) => field == "Input Metrics" }
+    val newJson = toJsonString(JsonProtocol.taskMetricsToJson(metrics, _))
+    val oldJson = newJson.removeField("Input Metrics")
     val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
+    assert(newMetrics.inputMetrics.recordsRead == 0)
+    assert(newMetrics.inputMetrics.bytesRead == 0)
   }
 
   test("Input/Output records backwards compatibility") {
     // records read were added after 1.2
     val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6,
       hasHadoopInput = true, hasOutput = true, hasRecords = false)
-    val newJson = JsonProtocol.taskMetricsToJson(metrics)
-    val oldJson = newJson.removeField { case (field, _) => field == "Records Read" }
-                         .removeField { case (field, _) => field == "Records Written" }
+    val newJson = toJsonString(JsonProtocol.taskMetricsToJson(metrics, _))
+    val oldJson = newJson
+      .removeField("Records Read")
+      .removeField("Records Written")
     val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
     assert(newMetrics.inputMetrics.recordsRead == 0)
     assert(newMetrics.outputMetrics.recordsWritten == 0)
@@ -307,22 +322,46 @@ class JsonProtocolSuite extends SparkFunSuite {
 
   test("Shuffle Read/Write records backwards compatibility") {
     // records read were added after 1.2
+    // "Remote Bytes Read To Disk" was added in 2.3.0
     val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6,
       hasHadoopInput = false, hasOutput = false, hasRecords = false)
-    val newJson = JsonProtocol.taskMetricsToJson(metrics)
-    val oldJson = newJson.removeField { case (field, _) => field == "Total Records Read" }
-                         .removeField { case (field, _) => field == "Shuffle Records Written" }
+    val newJson = toJsonString(JsonProtocol.taskMetricsToJson(metrics, _))
+    val oldJson = newJson
+      .removeField("Total Records Read")
+      .removeField("Shuffle Records Written")
+      .removeField("Remote Bytes Read To Disk")
     val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
     assert(newMetrics.shuffleReadMetrics.recordsRead == 0)
+    assert(newMetrics.shuffleReadMetrics.remoteBytesReadToDisk == 0)
     assert(newMetrics.shuffleWriteMetrics.recordsWritten == 0)
   }
 
   test("OutputMetrics backward compatibility") {
     // OutputMetrics were added after 1.1
     val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = false, hasOutput = true)
-    val newJson = JsonProtocol.taskMetricsToJson(metrics)
-    val oldJson = newJson.removeField { case (field, _) => field == "Output Metrics" }
+    val newJson = toJsonString(JsonProtocol.taskMetricsToJson(metrics, _))
+    val oldJson = newJson.removeField("Output Metrics")
+    val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
+    assert(newMetrics.outputMetrics.recordsWritten == 0)
+    assert(newMetrics.outputMetrics.bytesWritten == 0)
+  }
+
+  test("TaskMetrics backward compatibility") {
+    // "Executor Deserialize CPU Time" and "Executor CPU Time" were introduced in Spark 2.1.0
+    // "Peak Execution Memory" was introduced in Spark 3.0.0
+    val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = false, hasOutput = true)
+    metrics.setExecutorDeserializeCpuTime(100L)
+    metrics.setExecutorCpuTime(100L)
+    metrics.setPeakExecutionMemory(100L)
+    val newJson = toJsonString(JsonProtocol.taskMetricsToJson(metrics, _))
+    val oldJson = newJson
+      .removeField("Executor Deserialize CPU Time")
+      .removeField("Executor CPU Time")
+      .removeField("Peak Execution Memory")
     val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
+    assert(newMetrics.executorDeserializeCpuTime == 0)
+    assert(newMetrics.executorCpuTime == 0)
+    assert(newMetrics.peakExecutionMemory == 0)
   }
 
   test("StorageLevel backward compatibility") {
@@ -334,8 +373,8 @@ class JsonProtocolSuite extends SparkFunSuite {
       deserialized = false,
       replication = 1
     )
-    val newJson = JsonProtocol.storageLevelToJson(level)
-    val oldJson = newJson.removeField { case (field, _) => field == "Use Off Heap" }
+    val newJson = toJsonString(JsonProtocol.storageLevelToJson(level, _))
+    val oldJson = newJson.removeField("Use Off Heap")
     val newLevel = JsonProtocol.storageLevelFromJson(oldJson)
     assert(newLevel.useOffHeap === false)
   }
@@ -347,15 +386,15 @@ class JsonProtocolSuite extends SparkFunSuite {
     val blockManagerRemoved = SparkListenerBlockManagerRemoved(2L,
       BlockManagerId("Scarce", "to be counted...", 100))
 
-    val oldBmAdded = JsonProtocol.blockManagerAddedToJson(blockManagerAdded)
-      .removeField({ _._1 == "Timestamp" })
+    val oldBmAdded = toJsonString(JsonProtocol.blockManagerAddedToJson(blockManagerAdded, _))
+      .removeField("Timestamp")
 
     val deserializedBmAdded = JsonProtocol.blockManagerAddedFromJson(oldBmAdded)
     assert(SparkListenerBlockManagerAdded(-1L, blockManagerAdded.blockManagerId,
       blockManagerAdded.maxMem) === deserializedBmAdded)
 
-    val oldBmRemoved = JsonProtocol.blockManagerRemovedToJson(blockManagerRemoved)
-      .removeField({ _._1 == "Timestamp" })
+    val oldBmRemoved = toJsonString(JsonProtocol.blockManagerRemovedToJson(blockManagerRemoved, _))
+      .removeField("Timestamp")
 
     val deserializedBmRemoved = JsonProtocol.blockManagerRemovedFromJson(oldBmRemoved)
     assert(SparkListenerBlockManagerRemoved(-1L, blockManagerRemoved.blockManagerId) ===
@@ -366,8 +405,8 @@ class JsonProtocolSuite extends SparkFunSuite {
     // FetchFailed in Spark 1.1.0 does not have a "Message" property.
     val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 16L, 18, 19,
       "ignored")
-    val oldEvent = JsonProtocol.taskEndReasonToJson(fetchFailed)
-      .removeField({ _._1 == "Message" })
+    val oldEvent = toJsonString(JsonProtocol.taskEndReasonToJson(fetchFailed, _))
+      .removeField("Message")
     val expectedFetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 16L,
       18, 19, "Unknown reason")
     assert(expectedFetchFailed === JsonProtocol.taskEndReasonFromJson(oldEvent))
@@ -377,8 +416,8 @@ class JsonProtocolSuite extends SparkFunSuite {
     // FetchFailed in Spark 2.4.0 does not have "Map Index" property.
     val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 16L, 18, 19,
       "ignored")
-    val oldEvent = JsonProtocol.taskEndReasonToJson(fetchFailed)
-      .removeField({ _._1 == "Map Index" })
+    val oldEvent = toJsonString(JsonProtocol.taskEndReasonToJson(fetchFailed, _))
+      .removeField("Map Index")
     val expectedFetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 16L,
       Int.MinValue, 19, "ignored")
     assert(expectedFetchFailed === JsonProtocol.taskEndReasonFromJson(oldEvent))
@@ -388,8 +427,8 @@ class JsonProtocolSuite extends SparkFunSuite {
     // Metrics about local shuffle bytes read were added in 1.3.1.
     val metrics = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6,
       hasHadoopInput = false, hasOutput = false, hasRecords = false)
-    val newJson = JsonProtocol.taskMetricsToJson(metrics)
-    val oldJson = newJson.removeField { case (field, _) => field == "Local Bytes Read" }
+    val newJson = toJsonString(JsonProtocol.taskMetricsToJson(metrics, _))
+    val oldJson = newJson.removeField("Local Bytes Read")
     val newMetrics = JsonProtocol.taskMetricsFromJson(oldJson)
     assert(newMetrics.shuffleReadMetrics.localBytesRead == 0)
   }
@@ -399,18 +438,18 @@ class JsonProtocolSuite extends SparkFunSuite {
     // SparkListenerApplicationStart pre-Spark 1.4 does not have "appAttemptId".
     // SparkListenerApplicationStart pre-Spark 1.5 does not have "driverLogs
     val applicationStart = SparkListenerApplicationStart("test", None, 1L, "user", None, None)
-    val oldEvent = JsonProtocol.applicationStartToJson(applicationStart)
-      .removeField({ _._1 == "App ID" })
-      .removeField({ _._1 == "App Attempt ID" })
-      .removeField({ _._1 == "Driver Logs"})
+    val oldEvent = toJsonString(JsonProtocol.applicationStartToJson(applicationStart, _))
+      .removeField("App ID")
+      .removeField("App Attempt ID")
+      .removeField( "Driver Logs")
     assert(applicationStart === JsonProtocol.applicationStartFromJson(oldEvent))
   }
 
   test("ExecutorLostFailure backward compatibility") {
     // ExecutorLostFailure in Spark 1.1.0 does not have an "Executor ID" property.
     val executorLostFailure = ExecutorLostFailure("100", true, Some("Induced failure"))
-    val oldEvent = JsonProtocol.taskEndReasonToJson(executorLostFailure)
-      .removeField({ _._1 == "Executor ID" })
+    val oldEvent = toJsonString(JsonProtocol.taskEndReasonToJson(executorLostFailure, _))
+      .removeField("Executor ID")
     val expectedExecutorLostFailure = ExecutorLostFailure("Unknown", true, Some("Induced failure"))
     assert(expectedExecutorLostFailure === JsonProtocol.taskEndReasonFromJson(oldEvent))
   }
@@ -423,7 +462,7 @@ class JsonProtocolSuite extends SparkFunSuite {
       stageIds.map(id => new StageInfo(id, 0, "unknown", 0, Seq.empty, Seq.empty, "unknown",
         resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID))
     val jobStart = SparkListenerJobStart(10, jobSubmissionTime, stageInfos, properties)
-    val oldEvent = JsonProtocol.jobStartToJson(jobStart).removeField({_._1 == "Stage Infos"})
+    val oldEvent = toJsonString(JsonProtocol.jobStartToJson(jobStart, _)).removeField("Stage Infos")
     val expectedJobStart =
       SparkListenerJobStart(10, jobSubmissionTime, dummyStageInfos, properties)
     assertEquals(expectedJobStart, JsonProtocol.jobStartFromJson(oldEvent))
@@ -435,29 +474,34 @@ class JsonProtocolSuite extends SparkFunSuite {
     val stageIds = Seq[Int](1, 2, 3, 4)
     val stageInfos = stageIds.map(x => makeStageInfo(x * 10, x * 20, x * 30, x * 40L, x * 50L))
     val jobStart = SparkListenerJobStart(11, jobSubmissionTime, stageInfos, properties)
-    val oldStartEvent = JsonProtocol.jobStartToJson(jobStart)
-      .removeField({ _._1 == "Submission Time"})
+    val oldStartEvent = toJsonString(JsonProtocol.jobStartToJson(jobStart, _))
+      .removeField("Submission Time")
     val expectedJobStart = SparkListenerJobStart(11, -1, stageInfos, properties)
     assertEquals(expectedJobStart, JsonProtocol.jobStartFromJson(oldStartEvent))
 
     val jobEnd = SparkListenerJobEnd(11, jobCompletionTime, JobSucceeded)
-    val oldEndEvent = JsonProtocol.jobEndToJson(jobEnd)
-      .removeField({ _._1 == "Completion Time"})
+    val oldEndEvent = toJsonString(JsonProtocol.jobEndToJson(jobEnd, _))
+      .removeField("Completion Time")
     val expectedJobEnd = SparkListenerJobEnd(11, -1, JobSucceeded)
     assertEquals(expectedJobEnd, JsonProtocol.jobEndFromJson(oldEndEvent))
   }
 
-  test("RDDInfo backward compatibility (scope, parent IDs, callsite)") {
+  test("RDDInfo backward compatibility") {
     // "Scope" and "Parent IDs" were introduced in Spark 1.4.0
     // "Callsite" was introduced in Spark 1.6.0
-    val rddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, false, Seq(1, 6, 8),
-      "callsite", Some(new RDDOperationScope("fable")))
-    val oldRddInfoJson = JsonProtocol.rddInfoToJson(rddInfo)
-      .removeField({ _._1 == "Parent IDs"})
-      .removeField({ _._1 == "Scope"})
-      .removeField({ _._1 == "Callsite"})
+    // "Barrier" was introduced in Spark 3.0.0
+    // "DeterministicLevel" was introduced in Spark 3.2.0
+    val rddInfo = new RDDInfo(1, "one", 100, StorageLevel.NONE, true, Seq(1, 6, 8),
+      "callsite", Some(new RDDOperationScope("fable")), DeterministicLevel.INDETERMINATE)
+    val oldRddInfoJson = toJsonString(JsonProtocol.rddInfoToJson(rddInfo, _))
+      .removeField("Parent IDs")
+      .removeField("Scope")
+      .removeField("Callsite")
+      .removeField("Barrier")
+      .removeField("DeterministicLevel")
     val expectedRddInfo = new RDDInfo(
-      1, "one", 100, StorageLevel.NONE, false, Seq.empty, "", scope = None)
+      1, "one", 100, StorageLevel.NONE, false, Seq.empty, "", scope = None,
+      outputDeterministicLevel = DeterministicLevel.INDETERMINATE)
     assertEquals(expectedRddInfo, JsonProtocol.rddInfoFromJson(oldRddInfoJson))
   }
 
@@ -465,7 +509,8 @@ class JsonProtocolSuite extends SparkFunSuite {
     // Prior to Spark 1.4.0, StageInfo did not have the "Parent IDs" property
     val stageInfo = new StageInfo(1, 1, "me-stage", 1, Seq.empty, Seq(1, 2, 3), "details",
       resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
-    val oldStageInfo = JsonProtocol.stageInfoToJson(stageInfo).removeField({ _._1 == "Parent IDs"})
+    val oldStageInfo = toJsonString(JsonProtocol.stageInfoToJson(stageInfo, _))
+      .removeField("Parent IDs")
     val expectedStageInfo = new StageInfo(1, 1, "me-stage", 1, Seq.empty, Seq.empty, "details",
       resourceProfileId = ResourceProfile.DEFAULT_RESOURCE_PROFILE_ID)
     assertEquals(expectedStageInfo, JsonProtocol.stageInfoFromJson(oldStageInfo))
@@ -474,10 +519,10 @@ class JsonProtocolSuite extends SparkFunSuite {
   // `TaskCommitDenied` was added in 1.3.0 but JSON de/serialization logic was added in 1.5.1
   test("TaskCommitDenied backward compatibility") {
     val denied = TaskCommitDenied(1, 2, 3)
-    val oldDenied = JsonProtocol.taskEndReasonToJson(denied)
-      .removeField({ _._1 == "Job ID" })
-      .removeField({ _._1 == "Partition ID" })
-      .removeField({ _._1 == "Attempt Number" })
+    val oldDenied = toJsonString(JsonProtocol.taskEndReasonToJson(denied, _))
+      .removeField("Job ID")
+      .removeField("Partition ID")
+      .removeField("Attempt Number")
     val expectedDenied = TaskCommitDenied(-1, -1, -1)
     assertEquals(expectedDenied, JsonProtocol.taskEndReasonFromJson(oldDenied))
   }
@@ -485,16 +530,16 @@ class JsonProtocolSuite extends SparkFunSuite {
   test("AccumulableInfo backward compatibility") {
     // "Internal" property of AccumulableInfo was added in 1.5.1
     val accumulableInfo = makeAccumulableInfo(1, internal = true, countFailedValues = true)
-    val accumulableInfoJson = JsonProtocol.accumulableInfoToJson(accumulableInfo)
-    val oldJson = accumulableInfoJson.removeField({ _._1 == "Internal" })
+    val accumulableInfoJson = toJsonString(JsonProtocol.accumulableInfoToJson(accumulableInfo, _))
+    val oldJson = accumulableInfoJson.removeField("Internal")
     val oldInfo = JsonProtocol.accumulableInfoFromJson(oldJson)
     assert(!oldInfo.internal)
     // "Count Failed Values" property of AccumulableInfo was added in 2.0.0
-    val oldJson2 = accumulableInfoJson.removeField({ _._1 == "Count Failed Values" })
+    val oldJson2 = accumulableInfoJson.removeField("Count Failed Values")
     val oldInfo2 = JsonProtocol.accumulableInfoFromJson(oldJson2)
     assert(!oldInfo2.countFailedValues)
     // "Metadata" property of AccumulableInfo was added in 2.0.0
-    val oldJson3 = accumulableInfoJson.removeField({ _._1 == "Metadata" })
+    val oldJson3 = accumulableInfoJson.removeField("Metadata")
     val oldInfo3 = JsonProtocol.accumulableInfoFromJson(oldJson3)
     assert(oldInfo3.metadata.isEmpty)
   }
@@ -504,14 +549,15 @@ class JsonProtocolSuite extends SparkFunSuite {
     // we should still be able to fallback to constructing the accumulator updates from the
     // "Task Metrics" field, if it exists.
     val tm = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true, hasOutput = true)
-    val tmJson = JsonProtocol.taskMetricsToJson(tm)
+    val tmJson = toJsonString(JsonProtocol.taskMetricsToJson(tm, _))
     val accumUpdates = tm.accumulators().map(AccumulatorSuite.makeInfo)
     val exception = new SparkException("sentimental")
     val exceptionFailure = new ExceptionFailure(exception, accumUpdates)
-    val exceptionFailureJson = JsonProtocol.taskEndReasonToJson(exceptionFailure)
-    val tmFieldJson: JValue = "Task Metrics" -> tmJson
-    val oldExceptionFailureJson: JValue =
-      exceptionFailureJson.removeField { _._1 == "Accumulator Updates" }.merge(tmFieldJson)
+    val exceptionFailureJson = toJsonString(JsonProtocol.taskEndReasonToJson(exceptionFailure, _))
+    val oldExceptionFailureJson =
+      exceptionFailureJson
+        .removeField("Accumulator Updates")
+        .addStringField("Task Metrics", tmJson)
     val oldExceptionFailure =
       JsonProtocol.taskEndReasonFromJson(oldExceptionFailureJson).asInstanceOf[ExceptionFailure]
     assert(exceptionFailure.className === oldExceptionFailure.className)
@@ -523,12 +569,30 @@ class JsonProtocolSuite extends SparkFunSuite {
       exceptionFailure.accumUpdates, oldExceptionFailure.accumUpdates, (x, y) => x == y)
   }
 
+  test("TaskKilled backward compatibility") {
+    // The "Kill Reason" field was added in Spark 2.2.0
+    // The "Accumulator Updates" field was added in Spark 2.4.0
+    val tm = makeTaskMetrics(1L, 2L, 3L, 4L, 5, 6, hasHadoopInput = true, hasOutput = true)
+    val accumUpdates = tm.accumulators().map(AccumulatorSuite.makeInfo)
+    val taskKilled = TaskKilled(reason = "test", accumUpdates)
+    val taskKilledJson = toJsonString(JsonProtocol.taskEndReasonToJson(taskKilled, _))
+    val oldExceptionFailureJson =
+      taskKilledJson
+        .removeField("Kill Reason")
+        .removeField("Accumulator Updates")
+    val oldTaskKilled =
+      JsonProtocol.taskEndReasonFromJson(oldExceptionFailureJson).asInstanceOf[TaskKilled]
+    assert(oldTaskKilled.reason === "unknown reason")
+    assert(oldTaskKilled.accums.isEmpty)
+    assert(oldTaskKilled.accumUpdates.isEmpty)
+  }
+
   test("ExecutorMetricsUpdate backward compatibility: executor metrics update") {
     // executorMetricsUpdate was added in 2.4.0.
     val executorMetricsUpdate = makeExecutorMetricsUpdate("1", true, true)
     val oldExecutorMetricsUpdateJson =
-      JsonProtocol.executorMetricsUpdateToJson(executorMetricsUpdate)
-        .removeField( _._1 == "Executor Metrics Updated")
+      toJsonString(JsonProtocol.executorMetricsUpdateToJson(executorMetricsUpdate, _))
+        .removeField("Executor Metrics Updated")
     val expectedExecutorMetricsUpdate = makeExecutorMetricsUpdate("1", true, false)
     assertEquals(expectedExecutorMetricsUpdate,
       JsonProtocol.executorMetricsUpdateFromJson(oldExecutorMetricsUpdateJson))
@@ -539,14 +603,91 @@ class JsonProtocolSuite extends SparkFunSuite {
     val executorMetrics = new ExecutorMetrics(Array(12L, 23L, 45L, 67L, 78L, 89L,
       90L, 123L, 456L, 789L, 40L, 20L, 20L, 10L, 20L, 10L, 301L))
     val oldExecutorMetricsJson =
-      JsonProtocol.executorMetricsToJson(executorMetrics)
-        .removeField( _._1 == "MappedPoolMemory")
+      toJsonString(JsonProtocol.executorMetricsToJson(executorMetrics, _))
+        .removeField("MappedPoolMemory")
     val expectedExecutorMetrics = new ExecutorMetrics(Array(12L, 23L, 45L, 67L,
       78L, 89L, 90L, 123L, 456L, 0L, 40L, 20L, 20L, 10L, 20L, 10L, 301L))
     assertEquals(expectedExecutorMetrics,
       JsonProtocol.executorMetricsFromJson(oldExecutorMetricsJson))
   }
 
+  test("EnvironmentUpdate backward compatibility: handle missing metrics properties") {
+    // The "Metrics Properties" field was added in Spark 3.4.0:
+    val expectedEvent: SparkListenerEnvironmentUpdate = {
+      val e = JsonProtocol.environmentUpdateFromJson(environmentUpdateJsonString)
+      e.copy(environmentDetails =
+        e.environmentDetails + ("Metrics Properties" -> Seq.empty[(String, String)]))
+    }
+    val oldEnvironmentUpdateJson = environmentUpdateJsonString
+      .removeField("Metrics Properties")
+    assertEquals(expectedEvent, JsonProtocol.environmentUpdateFromJson(oldEnvironmentUpdateJson))
+  }
+
+  test("ExecutorInfo backward compatibility") {
+    // The "Attributes" and "Resources" fields were added in Spark 3.0.0
+    // The "Resource Profile Id", "Registration Time", and "Request Time"
+    // fields were added in Spark 3.4.0
+    val resourcesInfo = Map(ResourceUtils.GPU ->
+      new ResourceInformation(ResourceUtils.GPU, Array("0", "1"))).toMap
+    val attributes = Map("ContainerId" -> "ct1", "User" -> "spark").toMap
+    val executorInfo =
+      new ExecutorInfo(
+        "Hostee.awesome.com",
+        11,
+        logUrlMap = Map.empty[String, String].toMap,
+        attributes = attributes,
+        resourcesInfo = resourcesInfo,
+        resourceProfileId = 123,
+        registrationTime = Some(2L),
+        requestTime = Some(1L))
+    val oldExecutorInfoJson = toJsonString(JsonProtocol.executorInfoToJson(executorInfo, _))
+      .removeField("Attributes")
+      .removeField("Resources")
+      .removeField("Resource Profile Id")
+      .removeField("Registration Time")
+      .removeField("Request Time")
+    val oldEvent = JsonProtocol.executorInfoFromJson(oldExecutorInfoJson)
+    assert(oldEvent.attributes.isEmpty)
+    assert(oldEvent.resourcesInfo.isEmpty)
+    assert(oldEvent.resourceProfileId == DEFAULT_RESOURCE_PROFILE_ID)
+    assert(oldEvent.registrationTime.isEmpty)
+    assert(oldEvent.requestTime.isEmpty)
+  }
+
+  test("TaskInfo backward compatibility: handle missing partition ID field") {
+    // The "Partition ID" field was added in Spark 3.3.0:
+    val newJson =
+      """
+        |{
+        |  "Task ID": 222,
+        |  "Index": 333,
+        |  "Attempt": 1,
+        |  "Partition ID": 333,
+        |  "Launch Time": 444,
+        |  "Executor ID": "executor",
+        |  "Host": "your kind sir",
+        |  "Locality": "NODE_LOCAL",
+        |  "Speculative": false,
+        |  "Getting Result Time": 0,
+        |  "Finish Time": 0,
+        |  "Failed": false,
+        |  "Killed": false,
+        |  "Accumulables": [
+        |    {
+        |      "ID": 1,
+        |      "Name": "Accumulable1",
+        |      "Update": "delta1",
+        |      "Value": "val1",
+        |      "Internal": false,
+        |      "Count Failed Values": false
+        |    }
+        |  ]
+        |}
+    """.stripMargin
+    val oldJson = newJson.removeField("Partition ID")
+    assert(JsonProtocol.taskInfoFromJson(oldJson).partitionId === -1)
+  }
+
   test("AccumulableInfo value de/serialization") {
     import InternalAccumulator._
     val blocks = Seq[(BlockId, BlockStatus)](
@@ -554,7 +695,7 @@ class JsonProtocolSuite extends SparkFunSuite {
       (TestBlockId("feebo"), BlockStatus(StorageLevel.DISK_ONLY, 3L, 4L)))
     val blocksJson = JArray(blocks.toList.map { case (id, status) =>
       ("Block ID" -> id.toString) ~
-      ("Status" -> JsonProtocol.blockStatusToJson(status))
+      ("Status" -> parse(toJsonString(JsonProtocol.blockStatusToJson(status, _))))
     })
     testAccumValue(Some(RESULT_SIZE), 3L, JInt(3))
     testAccumValue(Some(shuffleRead.REMOTE_BLOCKS_FETCHED), 2, JInt(2))
@@ -577,7 +718,7 @@ class JsonProtocolSuite extends SparkFunSuite {
       value = value,
       internal = isInternal,
       countFailedValues = false)
-    val json = JsonProtocol.accumulableInfoToJson(accum)
+    val json = toJsonString(JsonProtocol.accumulableInfoToJson(accum, _))
     val newAccum = JsonProtocol.accumulableInfoFromJson(json)
     assert(newAccum == accum.copy(update = expectedValue, value = expectedValue))
   }
@@ -621,7 +762,7 @@ class JsonProtocolSuite extends SparkFunSuite {
         |  "bar" : 123,
         |  "unknown" : "unknown"
         |}""".stripMargin
-    assert(JsonProtocol.sparkEventFromJson(parse(unknownFieldsJson)) === expected)
+    assert(JsonProtocol.sparkEventFromJson(unknownFieldsJson) === expected)
   }
 
   test("SPARK-30936: backwards compatibility - set default values for missing fields") {
@@ -631,13 +772,34 @@ class JsonProtocolSuite extends SparkFunSuite {
         |  "Event" : "org.apache.spark.util.TestListenerEvent",
         |  "foo" : "foo"
         |}""".stripMargin
-    assert(JsonProtocol.sparkEventFromJson(parse(unknownFieldsJson)) === expected)
+    assert(JsonProtocol.sparkEventFromJson(unknownFieldsJson) === expected)
   }
 }
 
 
 private[spark] object JsonProtocolSuite extends Assertions {
   import InternalAccumulator._
+  import JsonProtocol.toJsonString
+
+  private val mapper = new ObjectMapper()
+
+  private implicit class JsonStringImplicits(json: String) {
+    def removeField(field: String): String = {
+      val tree = mapper.readTree(json)
+      Option(tree.asInstanceOf[ObjectNode].findParent(field)).foreach(_.remove(field))
+      tree.toString
+    }
+
+    def addStringField(k: String, v: String): String = {
+      val tree = mapper.readTree(json)
+      tree.asInstanceOf[ObjectNode].set(k, new TextNode(v))
+      tree.toString
+    }
+  }
+
+  private implicit def toJsonNode(json: String): JsonNode = {
+    mapper.readTree(json)
+  }
 
   private val jobSubmissionTime = 1421191042750L
   private val jobCompletionTime = 1421191296660L
@@ -648,50 +810,62 @@ private[spark] object JsonProtocolSuite extends Assertions {
   private val nodeExcludedTime = 1421458952000L
   private val nodeUnexcludedTime = 1421458962000L
 
+  implicit def jValueToJsonNode(value: JValue): JsonNode = {
+    mapper.readTree(pretty(value))
+  }
+
   private def testEvent(event: SparkListenerEvent, jsonString: String): Unit = {
-    val actualJsonString = compact(render(JsonProtocol.sparkEventToJson(event)))
-    val newEvent = JsonProtocol.sparkEventFromJson(parse(actualJsonString))
+    val actualJsonString = JsonProtocol.sparkEventToJsonString(event)
+    val newEvent = JsonProtocol.sparkEventFromJson(actualJsonString)
     assertJsonStringEquals(jsonString, actualJsonString, event.getClass.getSimpleName)
     assertEquals(event, newEvent)
   }
 
   private def testRDDInfo(info: RDDInfo): Unit = {
-    val newInfo = JsonProtocol.rddInfoFromJson(JsonProtocol.rddInfoToJson(info))
+    val newInfo = JsonProtocol.rddInfoFromJson(
+      toJsonString(JsonProtocol.rddInfoToJson(info, _)))
     assertEquals(info, newInfo)
   }
 
   private def testStageInfo(info: StageInfo): Unit = {
-    val newInfo = JsonProtocol.stageInfoFromJson(JsonProtocol.stageInfoToJson(info))
+    val newInfo = JsonProtocol.stageInfoFromJson(
+      toJsonString(JsonProtocol.stageInfoToJson(info, _)))
     assertEquals(info, newInfo)
   }
 
   private def testStorageLevel(level: StorageLevel): Unit = {
-    val newLevel = JsonProtocol.storageLevelFromJson(JsonProtocol.storageLevelToJson(level))
+    val newLevel = JsonProtocol.storageLevelFromJson(
+      toJsonString(JsonProtocol.storageLevelToJson(level, _)))
     assertEquals(level, newLevel)
   }
 
   private def testTaskMetrics(metrics: TaskMetrics): Unit = {
-    val newMetrics = JsonProtocol.taskMetricsFromJson(JsonProtocol.taskMetricsToJson(metrics))
+    val newMetrics = JsonProtocol.taskMetricsFromJson(
+      toJsonString(JsonProtocol.taskMetricsToJson(metrics, _)))
     assertEquals(metrics, newMetrics)
   }
 
   private def testBlockManagerId(id: BlockManagerId): Unit = {
-    val newId = JsonProtocol.blockManagerIdFromJson(JsonProtocol.blockManagerIdToJson(id))
+    val newId = JsonProtocol.blockManagerIdFromJson(
+      toJsonString(JsonProtocol.blockManagerIdToJson(id, _)))
     assert(id === newId)
   }
 
   private def testTaskInfo(info: TaskInfo): Unit = {
-    val newInfo = JsonProtocol.taskInfoFromJson(JsonProtocol.taskInfoToJson(info))
+    val newInfo = JsonProtocol.taskInfoFromJson(
+      toJsonString(JsonProtocol.taskInfoToJson(info, _)))
     assertEquals(info, newInfo)
   }
 
   private def testJobResult(result: JobResult): Unit = {
-    val newResult = JsonProtocol.jobResultFromJson(JsonProtocol.jobResultToJson(result))
+    val newResult = JsonProtocol.jobResultFromJson(
+      toJsonString(JsonProtocol.jobResultToJson(result, _)))
     assertEquals(result, newResult)
   }
 
   private def testTaskEndReason(reason: TaskEndReason): Unit = {
-    val newReason = JsonProtocol.taskEndReasonFromJson(JsonProtocol.taskEndReasonToJson(reason))
+    val newReason = JsonProtocol.taskEndReasonFromJson(
+      toJsonString(JsonProtocol.taskEndReasonToJson(reason, _)))
     assertEquals(reason, newReason)
   }
 
@@ -701,12 +875,13 @@ private[spark] object JsonProtocolSuite extends Assertions {
   }
 
   private def testExecutorInfo(info: ExecutorInfo): Unit = {
-    val newInfo = JsonProtocol.executorInfoFromJson(JsonProtocol.executorInfoToJson(info))
+    val newInfo = JsonProtocol.executorInfoFromJson(
+      toJsonString(JsonProtocol.executorInfoToJson(info, _)))
     assertEquals(info, newInfo)
   }
 
   private def testAccumValue(name: Option[String], value: Any, expectedJson: JValue): Unit = {
-    val json = JsonProtocol.accumValueToJson(name, value)
+    val json = parse(toJsonString(JsonProtocol.accumValueToJson(name, value, _)))
     assert(json === expectedJson)
     val newValue = JsonProtocol.accumValueFromJson(name, json)
     val expectedValue = if (name.exists(_.startsWith(METRICS_PREFIX))) value else value.toString
@@ -936,14 +1111,18 @@ private[spark] object JsonProtocolSuite extends Assertions {
     }
   }
 
+  private def prettyString(json: JsonNode): String = {
+    mapper.writerWithDefaultPrettyPrinter().writeValueAsString(json)
+  }
+
   private def assertJsonStringEquals(expected: String, actual: String, metadata: String): Unit = {
-    val expectedJson = parse(expected)
-    val actualJson = parse(actual)
+    val expectedJson = mapper.readTree(expected)
+    val actualJson = mapper.readTree(actual)
     if (expectedJson != actualJson) {
       // scalastyle:off
       // This prints something useful if the JSON strings don't match
-      println(s"=== EXPECTED ===\n${pretty(expectedJson)}\n")
-      println(s"=== ACTUAL ===\n${pretty(actualJson)}\n")
+      println(s"=== EXPECTED ===\n${prettyString(expectedJson)}\n")
+      println(s"=== ACTUAL ===\n${prettyString(actualJson)}\n")
       // scalastyle:on
       throw new TestFailedException(s"$metadata JSON did not equal", 1)
     }
@@ -1192,6 +1371,41 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |}
     """.stripMargin
 
+  private val stageSubmittedWithNullPropertiesJsonString =
+    """
+      |{
+      |  "Event": "SparkListenerStageSubmitted",
+      |  "Stage Info": {
+      |    "Stage ID": 100,
+      |    "Stage Attempt ID": 0,
+      |    "Stage Name": "greetings",
+      |    "Number of Tasks": 200,
+      |    "RDD Info": [],
+      |    "Parent IDs" : [100, 200, 300],
+      |    "Details": "details",
+      |    "Accumulables": [
+      |      {
+      |        "ID": 1,
+      |        "Name": "Accumulable1",
+      |        "Update": "delta1",
+      |        "Value": "val1",
+      |        "Internal": false,
+      |        "Count Failed Values": false
+      |      },
+      |      {
+      |        "ID": 2,
+      |        "Name": "Accumulable2",
+      |        "Update": "delta2",
+      |        "Value": "val2",
+      |        "Internal": false,
+      |        "Count Failed Values": false
+      |      }
+      |    ],
+      |    "Resource Profile Id" : 0
+      |  }
+      |}
+    """.stripMargin
+
   private val stageCompletedJsonString =
     """
       |{
@@ -2055,6 +2269,17 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |}
     """.stripMargin
 
+  private val jobStartWithNullPropertiesJsonString =
+    """
+      |{
+      |  "Event": "SparkListenerJobStart",
+      |  "Job ID": 10,
+      |  "Submission Time": 1421191042750,
+      |  "Stage Infos": [],
+      |  "Stage IDs": []
+      |}
+    """.stripMargin
+
   private val jobEndJsonString =
     """
       |{
diff --git a/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala
index 1226ad9be55..f7e7e7fe2a7 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/MLEventsSuite.scala
@@ -146,7 +146,7 @@ class MLEventsSuite
     }
     // Test if they can be ser/de via JSON protocol.
     assert(events.nonEmpty)
-    events.map(JsonProtocol.sparkEventToJson).foreach { event =>
+    events.map(JsonProtocol.sparkEventToJsonString).foreach { event =>
       assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
     }
   }
@@ -204,7 +204,7 @@ class MLEventsSuite
     }
     // Test if they can be ser/de via JSON protocol.
     assert(events.nonEmpty)
-    events.map(JsonProtocol.sparkEventToJson).foreach { event =>
+    events.map(JsonProtocol.sparkEventToJsonString).foreach { event =>
       assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
     }
   }
@@ -236,7 +236,7 @@ class MLEventsSuite
       // Test if they can be ser/de via JSON protocol.
       eventually(timeout(10.seconds), interval(1.second)) {
         assert(events.nonEmpty)
-        events.map(JsonProtocol.sparkEventToJson).foreach { event =>
+        events.map(JsonProtocol.sparkEventToJsonString).foreach { event =>
           assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
         }
       }
@@ -264,7 +264,7 @@ class MLEventsSuite
       // Test if they can be ser/de via JSON protocol.
       eventually(timeout(10.seconds), interval(1.second)) {
         assert(events.nonEmpty)
-        events.map(JsonProtocol.sparkEventToJson).foreach { event =>
+        events.map(JsonProtocol.sparkEventToJsonString).foreach { event =>
           assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
         }
       }
@@ -299,7 +299,7 @@ class MLEventsSuite
       // Test if they can be ser/de via JSON protocol.
       eventually(timeout(10.seconds), interval(1.second)) {
         assert(events.nonEmpty)
-        events.map(JsonProtocol.sparkEventToJson).foreach { event =>
+        events.map(JsonProtocol.sparkEventToJsonString).foreach { event =>
           assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
         }
       }
@@ -327,7 +327,7 @@ class MLEventsSuite
       // Test if they can be ser/de via JSON protocol.
       eventually(timeout(10.seconds), interval(1.second)) {
         assert(events.nonEmpty)
-        events.map(JsonProtocol.sparkEventToJson).foreach { event =>
+        events.map(JsonProtocol.sparkEventToJsonString).foreach { event =>
           assert(JsonProtocol.sparkEventFromJson(event).isInstanceOf[MLEvent])
         }
       }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala
index 55f17134224..4fd8341b3f5 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLJsonProtocolSuite.scala
@@ -57,7 +57,7 @@ class SQLJsonProtocolSuite extends SparkFunSuite with LocalSparkSession {
           |}
       """.stripMargin
 
-      val reconstructedEvent = JsonProtocol.sparkEventFromJson(parse(SQLExecutionStartJsonString))
+      val reconstructedEvent = JsonProtocol.sparkEventFromJson(SQLExecutionStartJsonString)
       if (newExecutionStartEvent) {
         val expectedEvent = SparkListenerSQLExecutionStart(0, "test desc", "test detail",
           "test plan", new SparkPlanInfo("TestNode", "test string", Nil, Map(), Nil), 0,
@@ -79,8 +79,8 @@ class SQLJsonProtocolSuite extends SparkFunSuite with LocalSparkSession {
     event.executionName = Some("test")
     event.qe = qe
     event.executionFailure = Some(new RuntimeException("test"))
-    val json = JsonProtocol.sparkEventToJson(event)
-    assert(json == parse(
+    val json = JsonProtocol.sparkEventToJsonString(event)
+    assert(parse(json) == parse(
       """
         |{
         |  "Event" : "org.apache.spark.sql.execution.ui.SparkListenerSQLExecutionEnd",
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
index 80153b00aa9..c1720915812 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsSuite.scala
@@ -22,6 +22,7 @@ import java.io.File
 import scala.reflect.{classTag, ClassTag}
 import scala.util.Random
 
+import com.fasterxml.jackson.databind.ObjectMapper
 import org.apache.hadoop.mapreduce.{JobContext, TaskAttemptContext}
 
 import org.apache.spark.internal.io.FileCommitProtocol
@@ -580,8 +581,10 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils
     assert(metricInfo.metadata === Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER))
     // After serializing to JSON, the original value type is lost, but we can still
     // identify that it's a SQL metric from the metadata
-    val metricInfoJson = JsonProtocol.accumulableInfoToJson(metricInfo)
-    val metricInfoDeser = JsonProtocol.accumulableInfoFromJson(metricInfoJson)
+    val mapper = new ObjectMapper()
+    val metricInfoJson = JsonProtocol.toJsonString(
+      JsonProtocol.accumulableInfoToJson(metricInfo, _))
+    val metricInfoDeser = JsonProtocol.accumulableInfoFromJson(mapper.readTree(metricInfoJson))
     metricInfoDeser.update match {
       case Some(v: String) => assert(v.toLong === 10L)
       case Some(v) => fail(s"deserialized metric value was not a string: ${v.getClass.getName}")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index 2395f9164d6..226b6e47a96 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -608,8 +608,8 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
 
   test("roundtripping SparkListenerDriverAccumUpdates through JsonProtocol (SPARK-18462)") {
     val event = SparkListenerDriverAccumUpdates(1L, Seq((2L, 3L)))
-    val json = JsonProtocol.sparkEventToJson(event)
-    assertValidDataInJson(json,
+    val json = JsonProtocol.sparkEventToJsonString(event)
+    assertValidDataInJson(parse(json),
       parse("""
         |{
         |  "Event": "org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates",
@@ -627,14 +627,14 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
     }
 
     // Test a case where the numbers in the JSON can only fit in longs:
-    val longJson = parse(
+    val longJson =
       """
         |{
         |  "Event": "org.apache.spark.sql.execution.ui.SparkListenerDriverAccumUpdates",
         |  "executionId": 4294967294,
         |  "accumUpdates": [[4294967294,3]]
         |}
-      """.stripMargin)
+      """.stripMargin
     JsonProtocol.sparkEventFromJson(longJson) match {
       case SparkListenerDriverAccumUpdates(executionId, accums) =>
         assert(executionId == 4294967294L)
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index e729fe32eba..7b53b4c7858 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -247,7 +247,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
 
   test("QueryStartedEvent serialization") {
     def testSerialization(event: QueryStartedEvent): Unit = {
-      val json = JsonProtocol.sparkEventToJson(event)
+      val json = JsonProtocol.sparkEventToJsonString(event)
       val newEvent = JsonProtocol.sparkEventFromJson(json).asInstanceOf[QueryStartedEvent]
       assert(newEvent.id === event.id)
       assert(newEvent.runId === event.runId)
@@ -263,7 +263,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
   test("QueryProgressEvent serialization") {
     def testSerialization(event: QueryProgressEvent): Unit = {
       import scala.collection.JavaConverters._
-      val json = JsonProtocol.sparkEventToJson(event)
+      val json = JsonProtocol.sparkEventToJsonString(event)
       val newEvent = JsonProtocol.sparkEventFromJson(json).asInstanceOf[QueryProgressEvent]
       assert(newEvent.progress.json === event.progress.json)  // json as a proxy for equality
       assert(newEvent.progress.durationMs.asScala === event.progress.durationMs.asScala)
@@ -275,7 +275,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
 
   test("QueryTerminatedEvent serialization") {
     def testSerialization(event: QueryTerminatedEvent): Unit = {
-      val json = JsonProtocol.sparkEventToJson(event)
+      val json = JsonProtocol.sparkEventToJsonString(event)
       val newEvent = JsonProtocol.sparkEventFromJson(json).asInstanceOf[QueryTerminatedEvent]
       assert(newEvent.id === event.id)
       assert(newEvent.runId === event.runId)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org