You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by zs...@apache.org on 2020/06/08 19:11:41 UTC

[spark] branch branch-3.0 updated: [SPARK-31923][CORE] Ignore internal accumulators that use unrecognized types rather than crashing

This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new b00ac30  [SPARK-31923][CORE] Ignore internal accumulators that use unrecognized types rather than crashing
b00ac30 is described below

commit b00ac30dfb621962e5b39c52a3bb09440936a0ff
Author: Shixiong Zhu <zs...@gmail.com>
AuthorDate: Mon Jun 8 12:06:17 2020 -0700

    [SPARK-31923][CORE] Ignore internal accumulators that use unrecognized types rather than crashing
    
    ### What changes were proposed in this pull request?
    
    Ignore internal accumulators that use unrecognized types rather than crashing so that an event log containing such accumulators can still be converted to JSON and logged.
    
    ### Why are the changes needed?
    
    A user may use internal accumulators by adding the `internal.metrics.` prefix to the accumulator name to hide sensitive information from UI (Accumulators except internal ones will be shown in Spark UI).
    
    However, `org.apache.spark.util.JsonProtocol.accumValueToJson` assumes an internal accumulator has only 3 possible types: `int`, `long`, and `java.util.List[(BlockId, BlockStatus)]`. When an internal accumulator uses an unexpected type, it will crash.
    
    An event log that contains such accumulator will be dropped because it cannot be converted to JSON, and it will cause weird UI issue when rendering in Spark History Server. For example, if `SparkListenerTaskEnd` is dropped because of this issue, the user will see the task is still running even if it was finished.
    
    It's better to make `accumValueToJson` more robust because it's up to the user to pick up the accumulator name.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    The new unit tests.
    
    Closes #28744 from zsxwing/fix-internal-accum.
    
    Authored-by: Shixiong Zhu <zs...@gmail.com>
    Signed-off-by: Shixiong Zhu <zs...@gmail.com>
    (cherry picked from commit b333ed0c4a5733a9c36ad79de1d4c13c6cf3c5d4)
    Signed-off-by: Shixiong Zhu <zs...@gmail.com>
---
 .../scala/org/apache/spark/util/JsonProtocol.scala | 20 ++++++---
 .../org/apache/spark/util/JsonProtocolSuite.scala  | 48 ++++++++++++++++++++++
 2 files changed, 63 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
index f445fd4..d53ca0f 100644
--- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
+++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala
@@ -351,12 +351,22 @@ private[spark] object JsonProtocol {
         case v: Long => JInt(v)
         // We only have 3 kind of internal accumulator types, so if it's not int or long, it must be
         // the blocks accumulator, whose type is `java.util.List[(BlockId, BlockStatus)]`
-        case v =>
-          JArray(v.asInstanceOf[java.util.List[(BlockId, BlockStatus)]].asScala.toList.map {
-            case (id, status) =>
-              ("Block ID" -> id.toString) ~
-              ("Status" -> blockStatusToJson(status))
+        case v: java.util.List[_] =>
+          JArray(v.asScala.toList.flatMap {
+            case (id: BlockId, status: BlockStatus) =>
+              Some(
+                ("Block ID" -> id.toString) ~
+                ("Status" -> blockStatusToJson(status))
+              )
+            case _ =>
+              // Ignore unsupported types. A user may put `METRICS_PREFIX` in the name. We should
+              // not crash.
+              None
           })
+        case _ =>
+          // Ignore unsupported types. A user may put `METRICS_PREFIX` in the name. We should not
+          // crash.
+          JNothing
       }
     } else {
       // For all external accumulators, just use strings
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index d1f09d8..5f1c753 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -482,6 +482,54 @@ class JsonProtocolSuite extends SparkFunSuite {
     testAccumValue(Some("anything"), 123, JString("123"))
   }
 
+  /** Create an AccumulableInfo and verify we can serialize and deserialize it. */
+  private def testAccumulableInfo(
+      name: String,
+      value: Option[Any],
+      expectedValue: Option[Any]): Unit = {
+    val isInternal = name.startsWith(InternalAccumulator.METRICS_PREFIX)
+    val accum = AccumulableInfo(
+      123L,
+      Some(name),
+      update = value,
+      value = value,
+      internal = isInternal,
+      countFailedValues = false)
+    val json = JsonProtocol.accumulableInfoToJson(accum)
+    val newAccum = JsonProtocol.accumulableInfoFromJson(json)
+    assert(newAccum == accum.copy(update = expectedValue, value = expectedValue))
+  }
+
+  test("SPARK-31923: unexpected value type of internal accumulator") {
+    // Because a user may use `METRICS_PREFIX` in an accumulator name, we should test unexpected
+    // types to make sure we don't crash.
+    import InternalAccumulator.METRICS_PREFIX
+    testAccumulableInfo(
+      METRICS_PREFIX + "fooString",
+      value = Some("foo"),
+      expectedValue = None)
+    testAccumulableInfo(
+      METRICS_PREFIX + "fooList",
+      value = Some(java.util.Arrays.asList("string")),
+      expectedValue = Some(java.util.Collections.emptyList())
+    )
+    val blocks = Seq(
+      (TestBlockId("block1"), BlockStatus(StorageLevel.MEMORY_ONLY, 1L, 2L)),
+      (TestBlockId("block2"), BlockStatus(StorageLevel.DISK_ONLY, 3L, 4L)))
+    testAccumulableInfo(
+      METRICS_PREFIX + "fooList",
+      value = Some(java.util.Arrays.asList(
+        "string",
+        blocks(0),
+        blocks(1))),
+      expectedValue = Some(blocks.asJava)
+    )
+    testAccumulableInfo(
+      METRICS_PREFIX + "fooSet",
+      value = Some(Set("foo")),
+      expectedValue = None)
+  }
+
   test("SPARK-30936: forwards compatibility - ignore unknown fields") {
     val expected = TestListenerEvent("foo", 123)
     val unknownFieldsJson =


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org