You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2021/10/30 22:31:40 UTC
[spark] branch master updated: [SPARK-37147][SS] MetricsReporter
producing NullPointerException when element 'triggerExecution' not present
in Map[]
This is an automated email from the ASF dual-hosted git repository.
kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 547996e [SPARK-37147][SS] MetricsReporter producing NullPointerException when element 'triggerExecution' not present in Map[]
547996e is described below
commit 547996efed8e4593265aea2eb26b10f8b366e141
Author: Radek Busz <ex...@ef.com>
AuthorDate: Sun Oct 31 07:30:14 2021 +0900
[SPARK-37147][SS] MetricsReporter producing NullPointerException when element 'triggerExecution' not present in Map[]
### What changes were proposed in this pull request?
Bug Fix.
The problematic code is in `MetricsReporter`:
`registerGauge("latency", _.durationMs.get("triggerExecution").longValue(), 0L)`
Instead of `.getOrDefault(...).longValue()` it uses `.get("triggerExecution").longValue()` which can return a null pointer exception if "triggerExecution" is not in the durationMs map.
Solution: use `.getOrDefault` when accessing a map.
### Why are the changes needed?
When `MetricsReporter.scala` registers a Gauge it occasionally returns a NPE. This breaks reporting custom metrics via Dropwizard and logs multiple times a stacktrace. It usually happens when using StructuredStreaming on a slow data source but I'm not able to reliably reproduce it every time.
### Does this PR introduce _any_ user-facing change?
Yes - fixes occasional failures when reporting metrics with Dropwizard
### How was this patch tested?
Added a unit-test.
Closes #34426 from gitplaneta/SPARK-37147.
Authored-by: Radek Busz <ex...@ef.com>
Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
.../spark/sql/execution/streaming/MetricsReporter.scala | 2 +-
.../apache/spark/sql/streaming/StreamingQuerySuite.scala | 13 +++++++++++++
2 files changed, 14 insertions(+), 1 deletion(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
index 8709822..600b16f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
@@ -40,7 +40,7 @@ class MetricsReporter(
// together in Ganglia as a single metric group
registerGauge("inputRate-total", _.inputRowsPerSecond, 0.0)
registerGauge("processingRate-total", _.processedRowsPerSecond, 0.0)
- registerGauge("latency", _.durationMs.get("triggerExecution").longValue(), 0L)
+ registerGauge("latency", _.durationMs.getOrDefault("triggerExecution", 0L).longValue(), 0L)
private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 9c2403d..21a0b24 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -19,13 +19,16 @@ package org.apache.spark.sql.streaming
import java.io.File
import java.nio.charset.StandardCharsets.UTF_8
+import java.util.Collections
import java.util.concurrent.CountDownLatch
import scala.collection.mutable
+import scala.util.{Success, Try}
import org.apache.commons.io.FileUtils
import org.apache.commons.lang3.RandomStringUtils
import org.apache.hadoop.fs.Path
+import org.mockito.Mockito.when
import org.scalactic.TolerantNumerics
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -465,6 +468,16 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
}
}
+ test("SPARK-37147: MetricsReporter does not fail when durationMs is empty") {
+ val stateOpProgressMock = mock[StreamingQueryProgress]
+ when(stateOpProgressMock.durationMs).thenReturn(Collections.emptyMap[String, java.lang.Long]())
+ val streamExecMock = mock[StreamExecution]
+ when(streamExecMock.lastProgress).thenReturn(stateOpProgressMock)
+
+ val gauges = new MetricsReporter(streamExecMock, "").metricRegistry.getGauges()
+ assert(Try(gauges.get("latency").getValue) == Success(0L))
+ }
+
test("input row calculation with same V1 source used twice in self-join") {
val streamingTriggerDF = spark.createDataset(1 to 10).toDF
val streamingInputDF = createSingleTriggerStreamingDF(streamingTriggerDF).toDF("value")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org