You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by vi...@apache.org on 2020/10/02 02:31:42 UTC
[spark] branch branch-3.0 updated: [SPARK-32996][WEB-UI][3.0]
Handle empty ExecutorMetrics in ExecutorMetricsJsonSerializer
This is an automated email from the ASF dual-hosted git repository.
viirya pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 41e1919 [SPARK-32996][WEB-UI][3.0] Handle empty ExecutorMetrics in ExecutorMetricsJsonSerializer
41e1919 is described below
commit 41e1919d24f573d5bef14df72729bdaccd35a82e
Author: Shruti Gumma <sh...@apple.com>
AuthorDate: Thu Oct 1 19:28:06 2020 -0700
[SPARK-32996][WEB-UI][3.0] Handle empty ExecutorMetrics in ExecutorMetricsJsonSerializer
### What changes were proposed in this pull request?
This is a backport PR for branch-3.0. This change was raised to `master` branch in `https://github.com/apache/spark/pull/29872`
When `peakMemoryMetrics` in `ExecutorSummary` is `Option.empty`, then the `ExecutorMetricsJsonSerializer#serialize` method does not execute the `jsonGenerator.writeObject` method. This causes the json to be generated with `peakMemoryMetrics` key added to the serialized string, but no corresponding value.
This causes an error to be thrown when it is the next key `attributes` turn to be added to the json:
`com.fasterxml.jackson.core.JsonGenerationException: Can not write a field name, expecting a value
`
### Why are the changes needed?
At the start of the Spark job, if `peakMemoryMetrics` is `Option.empty`, then it causes
a `com.fasterxml.jackson.core.JsonGenerationException` to be thrown when we navigate to the Executors tab in Spark UI.
Complete stacktrace:
> com.fasterxml.jackson.core.JsonGenerationException: Can not write a field name, expecting a value
> at com.fasterxml.jackson.core.JsonGenerator._reportError(JsonGenerator.java:2080)
> at com.fasterxml.jackson.core.json.WriterBasedJsonGenerator.writeFieldName(WriterBasedJsonGenerator.java:161)
> at com.fasterxml.jackson.databind.ser.BeanPropertyWriter.serializeAsField(BeanPropertyWriter.java:725)
> at com.fasterxml.jackson.databind.ser.std.BeanSerializerBase.serializeFields(BeanSerializerBase.java:721)
> at com.fasterxml.jackson.databind.ser.BeanSerializer.serialize(BeanSerializer.java:166)
> at com.fasterxml.jackson.databind.ser.std.CollectionSerializer.serializeContents(CollectionSerializer.java:145)
> at com.fasterxml.jackson.module.scala.ser.IterableSerializer.serializeContents(IterableSerializerModule.scala:26)
> at com.fasterxml.jackson.module.scala.ser.IterableSerializer.serializeContents$(IterableSerializerModule.scala:25)
> at com.fasterxml.jackson.module.scala.ser.UnresolvedIterableSerializer.serializeContents(IterableSerializerModule.scala:54)
> at com.fasterxml.jackson.module.scala.ser.UnresolvedIterableSerializer.serializeContents(IterableSerializerModule.scala:54)
> at com.fasterxml.jackson.databind.ser.std.AsArraySerializerBase.serialize(AsArraySerializerBase.java:250)
> at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider._serialize(DefaultSerializerProvider.java:480)
> at com.fasterxml.jackson.databind.ser.DefaultSerializerProvider.serializeValue(DefaultSerializerProvider.java:319)
> at com.fasterxml.jackson.databind.ObjectMapper._configAndWriteValue(ObjectMapper.java:4094)
> at com.fasterxml.jackson.databind.ObjectMapper.writeValueAsString(ObjectMapper.java:3404)
> at org.apache.spark.ui.exec.ExecutorsPage.allExecutorsDataScript$1(ExecutorsTab.scala:64)
> at org.apache.spark.ui.exec.ExecutorsPage.render(ExecutorsTab.scala:76)
> at org.apache.spark.ui.WebUI.$anonfun$attachPage$1(WebUI.scala:89)
> at org.apache.spark.ui.JettyUtils$$anon$1.doGet(JettyUtils.scala:80)
> at javax.servlet.http.HttpServlet.service(HttpServlet.java:687)
> at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
> at org.sparkproject.jetty.servlet.ServletHolder.handle(ServletHolder.java:873)
> at org.sparkproject.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1623)
> at org.apache.spark.ui.HttpSecurityFilter.doFilter(HttpSecurityFilter.scala:95)
> at org.sparkproject.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1610)
> at org.sparkproject.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:540)
> at org.sparkproject.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)
> at org.sparkproject.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1345)
> at org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)
> at org.sparkproject.jetty.servlet.ServletHandler.doScope(ServletHandler.java:480)
> at org.sparkproject.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)
> at org.sparkproject.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1247)
> at org.sparkproject.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)
> at org.sparkproject.jetty.server.handler.gzip.GzipHandler.handle(GzipHandler.java:753)
> at org.sparkproject.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:220)
> at org.sparkproject.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)
> at org.sparkproject.jetty.server.Server.handle(Server.java:505)
> at org.sparkproject.jetty.server.HttpChannel.handle(HttpChannel.java:370)
> at org.sparkproject.jetty.server.HttpConnection.onFillable(HttpConnection.java:267)
> at org.sparkproject.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:305)
> at org.sparkproject.jetty.io.FillInterest.fillable(FillInterest.java:103)
> at org.sparkproject.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:117)
> at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:333)
> at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:310)
> at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:168)
> at org.sparkproject.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:126)
> at org.sparkproject.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:366)
> at org.sparkproject.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:698)
> at org.sparkproject.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:804)
> at java.base/java.lang.Thread.run(Thread.java:834)
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
Unit test
Closes #29914 from shrutig/SPARK-32996-3.0.
Authored-by: Shruti Gumma <sh...@apple.com>
Signed-off-by: Liang-Chi Hsieh <vi...@gmail.com>
---
.../scala/org/apache/spark/status/api/v1/api.scala | 16 +++++--
.../spark/status/api/v1/ExecutorSummarySuite.scala | 51 ++++++++++++++++++++++
2 files changed, 63 insertions(+), 4 deletions(-)
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 5ec9b36..37db64a 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -127,6 +127,10 @@ private[spark] class ExecutorMetricsJsonDeserializer
new TypeReference[Option[Map[String, java.lang.Long]]] {})
metricsMap.map(metrics => new ExecutorMetrics(metrics))
}
+
+ override def getNullValue(ctxt: DeserializationContext): Option[ExecutorMetrics] = {
+ None
+ }
}
/** serializer for peakMemoryMetrics: convert ExecutorMetrics to map with metric name as key */
private[spark] class ExecutorMetricsJsonSerializer
@@ -135,11 +139,15 @@ private[spark] class ExecutorMetricsJsonSerializer
metrics: Option[ExecutorMetrics],
jsonGenerator: JsonGenerator,
serializerProvider: SerializerProvider): Unit = {
- metrics.foreach { m: ExecutorMetrics =>
- val metricsMap = ExecutorMetricType.metricToOffset.map { case (metric, _) =>
- metric -> m.getMetricValue(metric)
+ if (metrics.isEmpty) {
+ jsonGenerator.writeNull()
+ } else {
+ metrics.foreach { m: ExecutorMetrics =>
+ val metricsMap = ExecutorMetricType.metricToOffset.map { case (metric, _) =>
+ metric -> m.getMetricValue(metric)
+ }
+ jsonGenerator.writeObject(metricsMap)
}
- jsonGenerator.writeObject(metricsMap)
}
}
diff --git a/core/src/test/scala/org/apache/spark/status/api/v1/ExecutorSummarySuite.scala b/core/src/test/scala/org/apache/spark/status/api/v1/ExecutorSummarySuite.scala
new file mode 100644
index 0000000..2723af7
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/status/api/v1/ExecutorSummarySuite.scala
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.status.api.v1
+
+import java.util.Date
+
+import com.fasterxml.jackson.core.`type`.TypeReference
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
+import org.apache.spark.SparkFunSuite
+
+class ExecutorSummarySuite extends SparkFunSuite {
+
+ test("Check ExecutorSummary serialize and deserialize with empty peakMemoryMetrics") {
+ val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
+ val executorSummary = new ExecutorSummary("id", "host:port", true, 1,
+ 10, 10, 1, 1, 1,
+ 0, 0, 1, 100,
+ 1, 100, 100,
+ 10, false, 20, new Date(1600984336352L),
+ Option.empty, Option.empty, Map(), Option.empty, Set(), Option.empty, Map(), Map())
+ val expectedJson = "{\"id\":\"id\",\"hostPort\":\"host:port\",\"isActive\":true," +
+ "\"rddBlocks\":1,\"memoryUsed\":10,\"diskUsed\":10,\"totalCores\":1,\"maxTasks\":1," +
+ "\"activeTasks\":1,\"failedTasks\":0,\"completedTasks\":0,\"totalTasks\":1," +
+ "\"totalDuration\":100,\"totalGCTime\":1,\"totalInputBytes\":100," +
+ "\"totalShuffleRead\":100,\"totalShuffleWrite\":10,\"isBlacklisted\":false," +
+ "\"maxMemory\":20,\"addTime\":1600984336352,\"removeTime\":null,\"removeReason\":null," +
+ "\"executorLogs\":{},\"memoryMetrics\":null,\"blacklistedInStages\":[]," +
+ "\"peakMemoryMetrics\":null,\"attributes\":{},\"resources\":{}}"
+ val json = mapper.writeValueAsString(executorSummary)
+ assert(expectedJson.equals(json))
+ val deserializeExecutorSummary = mapper.readValue(json, new TypeReference[ExecutorSummary] {})
+ assert(deserializeExecutorSummary.peakMemoryMetrics == None)
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org