You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2020/02/25 04:34:33 UTC

[spark] branch branch-3.0 updated: [SPARK-30936][CORE] Set FAIL_ON_UNKNOWN_PROPERTIES to false by default to parse Spark events

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new b37b085  [SPARK-30936][CORE] Set FAIL_ON_UNKNOWN_PROPERTIES to false by default to parse Spark events
b37b085 is described below

commit b37b0855be0d7a4905ef3cb78e16a421e32c8ba0
Author: Shixiong Zhu <zs...@gmail.com>
AuthorDate: Tue Feb 25 12:28:31 2020 +0800

    [SPARK-30936][CORE] Set FAIL_ON_UNKNOWN_PROPERTIES to false by default to parse Spark events
    
    ### What changes were proposed in this pull request?
    
    Set `FAIL_ON_UNKNOWN_PROPERTIES` to `false` in `JsonProtocol` to allow ignore unknown fields in a Spark event. After this change, if we add new fields to a Spark event parsed by `ObjectMapper`, the event json string generated by a new Spark version can still be read by an old Spark History Server.
    
    Since Spark History Server is an extra service, it usually takes time to upgrade, and it's possible that a Spark application is upgraded before SHS. Forwards-compatibility will allow an old SHS to support new Spark applications (may lose some new features but most of functions should still work).
    
    ### Why are the changes needed?
    
    `JsonProtocol` is supposed to provide strong backwards-compatibility and forwards-compatibility guarantees: any version of Spark should be able to read JSON output written by any other version, including newer versions.
    
    However, the forwards-compatibility guarantee is broken for events parsed by `ObjectMapper`. If a new field is added to an event parsed by `ObjectMapper` (e.g., https://github.com/apache/spark/commit/6dc5921e66d56885b95c07e56e687f9f6c1eaca7#diff-dc5c7a41fbb7479cef48b67eb41ad254R33), the event json string generated by a new Spark version cannot be parsed by an old version of SHS right now.
    
    ### Does this PR introduce any user-facing change?
    
    No.
    
    ### How was this patch tested?
    
    The new added tests.
    
    Closes #27680 from zsxwing/SPARK-30936.
    
    Authored-by: Shixiong Zhu <zs...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
    (cherry picked from commit 3126557b0763998ff893bf382089467f2a0d6ae5)
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../scala/org/apache/spark/util/JsonProtocol.scala |  3 ++-
 .../org/apache/spark/util/JsonProtocolSuite.scala  | 24 ++++++++++++++++++
 .../streaming/StreamingQueryListenerSuite.scala    | 29 +++++++++++-----------
 3 files changed, 41 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index 4d89c4f..aee9862 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -22,7 +22,7 @@ import java.util.{Properties, UUID}
 import scala.collection.JavaConverters._
 import scala.collection.Map
 
-import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
 import com.fasterxml.jackson.module.scala.DefaultScalaModule
 import org.json4s.DefaultFormats
 import org.json4s.JsonAST._
@@ -59,6 +59,7 @@ private[spark] object JsonProtocol {
   private implicit val format = DefaultFormats
 
   private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
+    .configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
 
   /** ------------------------------------------------- *
    * JSON serialization methods for SparkListenerEvents |
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index a2a4b3a..f2b9f97 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -481,6 +481,28 @@ class JsonProtocolSuite extends SparkFunSuite {
     testAccumValue(Some("anything"), blocks, JString(blocks.toString))
     testAccumValue(Some("anything"), 123, JString("123"))
   }
+
+  test("SPARK-30936: forwards compatibility - ignore unknown fields") {
+    val expected = TestListenerEvent("foo", 123)
+    val unknownFieldsJson =
+      """{
+        |  "Event" : "org.apache.spark.util.TestListenerEvent",
+        |  "foo" : "foo",
+        |  "bar" : 123,
+        |  "unknown" : "unknown"
+        |}""".stripMargin
+    assert(JsonProtocol.sparkEventFromJson(parse(unknownFieldsJson)) === expected)
+  }
+
+  test("SPARK-30936: backwards compatibility - set default values for missing fields") {
+    val expected = TestListenerEvent("foo", 0)
+    val unknownFieldsJson =
+      """{
+        |  "Event" : "org.apache.spark.util.TestListenerEvent",
+        |  "foo" : "foo"
+        |}""".stripMargin
+    assert(JsonProtocol.sparkEventFromJson(parse(unknownFieldsJson)) === expected)
+  }
 }
 
 
@@ -2310,3 +2332,5 @@ private[spark] object JsonProtocolSuite extends Assertions {
       |}
     """.stripMargin
 }
+
+case class TestListenerEvent(foo: String, bar: Int) extends SparkListenerEvent
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 9d0f829..6bb1646 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -382,28 +382,27 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
     }
   }
 
-  testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.0") {
+  testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_0") {
     // query-event-logs-version-2.0.0.txt has all types of events generated by
-    // Structured Streaming in Spark 2.0.0.
+    // Structured Streaming in Spark 2.0.0. Because we renamed the classes,
     // SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
     // to verify that we can skip broken jsons generated by Structured Streaming.
-    testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt")
+    testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt", 1)
   }
 
-  testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.1") {
+  testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_1") {
     // query-event-logs-version-2.0.1.txt has all types of events generated by
-    // Structured Streaming in Spark 2.0.1.
+    // Structured Streaming in Spark 2.0.1. Because we renamed the classes,
     // SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
     // to verify that we can skip broken jsons generated by Structured Streaming.
-    testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.1.txt")
+    testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.1.txt", 1)
   }
 
-  testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2.0.2") {
+  testQuietly("ReplayListenerBus should ignore broken event jsons generated in 2_0_2") {
     // query-event-logs-version-2.0.2.txt has all types of events generated by
-    // Structured Streaming in Spark 2.0.2.
-    // SparkListenerApplicationEnd is the only valid event and it's the last event. We use it
-    // to verify that we can skip broken jsons generated by Structured Streaming.
-    testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.2.txt")
+    // Structured Streaming in Spark 2.0.2. SPARK-18516 refactored Structured Streaming query events
+    // in 2.1.0. This test is to verify we are able to load events generated by Spark 2.0.2.
+    testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.2.txt", 5)
   }
 
   test("listener propagates observable metrics") {
@@ -463,7 +462,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
     }
   }
 
-  private def testReplayListenerBusWithBorkenEventJsons(fileName: String): Unit = {
+  private def testReplayListenerBusWithBorkenEventJsons(
+      fileName: String,
+      expectedEventSize: Int): Unit = {
     val input = getClass.getResourceAsStream(s"/structured-streaming/$fileName")
     val events = mutable.ArrayBuffer[SparkListenerEvent]()
     try {
@@ -479,8 +480,8 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
       replayer.addListener(new SparkListener {})
       replayer.replay(input, fileName)
       // SparkListenerApplicationEnd is the only valid event
-      assert(events.size === 1)
-      assert(events(0).isInstanceOf[SparkListenerApplicationEnd])
+      assert(events.size === expectedEventSize)
+      assert(events.last.isInstanceOf[SparkListenerApplicationEnd])
     } finally {
       input.close()
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org