You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by vi...@apache.org on 2020/10/02 02:31:42 UTC

[spark] branch branch-3.0 updated: [SPARK-32996][WEB-UI][3.0] Handle empty ExecutorMetrics in ExecutorMetricsJsonSerializer

This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new 41e1919  [SPARK-32996][WEB-UI][3.0] Handle empty ExecutorMetrics in ExecutorMetricsJsonSerializer
41e1919 is described below

commit 41e1919d24f573d5bef14df72729bdaccd35a82e
Author: Shruti Gumma <sh...@apple.com>
AuthorDate: Thu Oct 1 19:28:06 2020 -0700

    [SPARK-32996][WEB-UI][3.0] Handle empty ExecutorMetrics in ExecutorMetricsJsonSerializer
    
    ### What changes were proposed in this pull request?
    This is a backport PR for branch-3.0. This change was raised to `master` branch in `https://github.com/apache/spark/pull/29872`
    
    When `peakMemoryMetrics` in `ExecutorSummary` is `Option.empty`, then the `ExecutorMetricsJsonSerializer#serialize` method does not execute the `jsonGenerator.writeObject` method. This causes the json to be generated with `peakMemoryMetrics` key added to the serialized string, but no corresponding value.
    This causes an error to be thrown when it is the next key `attributes` turn to be added to the json:
    `com.fasterxml.jackson.core.JsonGenerationException: Can not write a field name, expecting a value
    `
    
    ### Why are the changes needed?
    At the start of the Spark job, if `peakMemoryMetrics` is `Option.empty`, then it causes
    a `com.fasterxml.jackson.core.JsonGenerationException` to be thrown when we navigate to the Executors tab in Spark UI.
    Complete stacktrace:
    
    > com.fasterxml.jackson.core.JsonGenerationException: Can not write a field name, expecting a value
    > 	at com.fasterxml.jackson.core.JsonGenerator._reportError(JsonGenerator.java:2080)
    > 	at com.fasterxml.jackson.core.json.WriterBasedJsonGenerator.writeFieldName(WriterBasedJsonGenerator.java:161)
    > 	at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:725)
    > 	at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:721)
    > 	at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:166)
    > 	at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serializeContents(CollectionSerializer.java:145)
    > 	at com.fasterxml.jackson.module.scala.ser.IterableSerializer.serializeContents(IterableSerializerModule.scala:26)
    > 	at com.fasterxml.jackson.module.scala.ser.IterableSerializer.serializeContents$(IterableSerializerModule.scala:25)
    > 	at com.fasterxml.jackson.module.scala.ser.UnresolvedIterableSerializer.serializeContents(IterableSerializerModule.scala:54)
    > 	at com.fasterxml.jackson.module.scala.ser.UnresolvedIterableSerializer.serializeContents(IterableSerializerModule.scala:54)
    > 	at com.fasterxml.jackson.databind.ser.std.AsArraySerializerBase.serialize(AsArraySerializerBase.java:250)
    > 	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480)
    > 	at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319)
    > 	at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:4094)
    > 	at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:3404)
    > 	at org.apache.spark.ui.exec.ExecutorsPage.allExecutorsDataScript$1(ExecutorsTab.scala:64)
    > 	at org.apache.spark.ui.exec.ExecutorsPage.render(ExecutorsTab.scala:76)
    > 	at org.apache.spark.ui.WebUI.$anonfun$attachPage$1(WebUI.scala:89)
    > 	at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:80)
    > 	at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
    > 	at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
    > 	at org.sparkproject.jetty.servlet.ServletHolder.handle(ServletHolder.java:873)
    > 	at org.sparkproject.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1623)
    > 	at org.apache.spark.ui.HttpSecurityFilter.doFilter(HttpSecurityFilter.scala:95)
    > 	at org.sparkproject.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1610)
    > 	at org.sparkproject.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:540)
    > 	at org.sparkproject.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
    > 	at org.sparkproject.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1345)
    > 	at org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)
    > 	at org.sparkproject.jetty.servlet.ServletHandler.doScope(ServletHandler.java:480)
    > 	at org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)
    > 	at org.sparkproject.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1247)
    > 	at org.sparkproject.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)
    > 	at org.sparkproject.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:753)
    > 	at org.sparkproject.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:220)
    > 	at org.sparkproject.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
    > 	at org.sparkproject.jetty.server.Server.handle(Server.java:505)
    > 	at org.sparkproject.jetty.server.HttpChannel.handle(HttpChannel.java:370)
    > 	at org.sparkproject.jetty.server.HttpConnection.onFillable(HttpConnection.java:267)
    > 	at org.sparkproject.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
    > 	at org.sparkproject.jetty.io.FillInterest.fillable(FillInterest.java:103)
    > 	at org.sparkproject.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
    > 	at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
    > 	at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
    > 	at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
    > 	at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)
    > 	at org.sparkproject.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
    > 	at org.sparkproject.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:698)
    > 	at org.sparkproject.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:804)
    > 	at java.base/java.lang.Thread.run(Thread.java:834)
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Unit test
    
    Closes #29914 from shrutig/SPARK-32996-3.0.
    
    Authored-by: Shruti Gumma <sh...@apple.com>
    Signed-off-by: Liang-Chi Hsieh <vi...@gmail.com>
---
 .../scala/org/apache/spark/status/api/v1/api.scala | 16 +++++--
 .../spark/status/api/v1/ExecutorSummarySuite.scala | 51 ++++++++++++++++++++++
 2 files changed, 63 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 5ec9b36..37db64a 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -127,6 +127,10 @@ private[spark] class ExecutorMetricsJsonDeserializer
       new TypeReference[Option[Map[String, java.lang.Long]]] {})
     metricsMap.map(metrics => new ExecutorMetrics(metrics))
   }
+
+  override def getNullValue(ctxt: DeserializationContext): Option[ExecutorMetrics] = {
+    None
+  }
 }
 /** serializer for peakMemoryMetrics: convert ExecutorMetrics to map with metric name as key */
 private[spark] class ExecutorMetricsJsonSerializer
@@ -135,11 +139,15 @@ private[spark] class ExecutorMetricsJsonSerializer
       metrics: Option[ExecutorMetrics],
       jsonGenerator: JsonGenerator,
       serializerProvider: SerializerProvider): Unit = {
-    metrics.foreach { m: ExecutorMetrics =>
-      val metricsMap = ExecutorMetricType.metricToOffset.map { case (metric, _) =>
-        metric -> m.getMetricValue(metric)
+    if (metrics.isEmpty) {
+      jsonGenerator.writeNull()
+    } else {
+      metrics.foreach { m: ExecutorMetrics =>
+        val metricsMap = ExecutorMetricType.metricToOffset.map { case (metric, _) =>
+          metric -> m.getMetricValue(metric)
+        }
+        jsonGenerator.writeObject(metricsMap)
       }
-      jsonGenerator.writeObject(metricsMap)
     }
   }
 
diff --git a/core/src/test/scala/org/apache/spark/status/api/v1/ExecutorSummarySuite.scala b/core/src/test/scala/org/apache/spark/status/api/v1/ExecutorSummarySuite.scala
new file mode 100644
index 0000000..2723af7
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/status/api/v1/ExecutorSummarySuite.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import java.util.Date
+
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
+import org.apache.spark.SparkFunSuite
+
+class ExecutorSummarySuite extends SparkFunSuite {
+
+  test("Check ExecutorSummary serialize and deserialize with empty peakMemoryMetrics") {
+    val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
+    val executorSummary = new ExecutorSummary("id", "host:port", true, 1,
+      10, 10, 1, 1, 1,
+      0, 0, 1, 100,
+      1, 100, 100,
+      10, false, 20, new Date(1600984336352L),
+      Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map())
+    val expectedJson = "{\"id\":\"id\",\"hostPort\":\"host:port\",\"isActive\":true," +
+      "\"rddBlocks\":1,\"memoryUsed\":10,\"diskUsed\":10,\"totalCores\":1,\"maxTasks\":1," +
+      "\"activeTasks\":1,\"failedTasks\":0,\"completedTasks\":0,\"totalTasks\":1," +
+      "\"totalDuration\":100,\"totalGCTime\":1,\"totalInputBytes\":100," +
+      "\"totalShuffleRead\":100,\"totalShuffleWrite\":10,\"isBlacklisted\":false," +
+      "\"maxMemory\":20,\"addTime\":1600984336352,\"removeTime\":null,\"removeReason\":null," +
+      "\"executorLogs\":{},\"memoryMetrics\":null,\"blacklistedInStages\":[]," +
+      "\"peakMemoryMetrics\":null,\"attributes\":{},\"resources\":{}}"
+    val json = mapper.writeValueAsString(executorSummary)
+    assert(expectedJson.equals(json))
+    val deserializeExecutorSummary = mapper.readValue(json, new TypeReference[ExecutorSummary] {})
+    assert(deserializeExecutorSummary.peakMemoryMetrics == None)
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org