You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ka...@apache.org on 2021/10/30 22:31:40 UTC

[spark] branch master updated: [SPARK-37147][SS] MetricsReporter producing NullPointerException when element 'triggerExecution' not present in Map[]

This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 547996e  [SPARK-37147][SS] MetricsReporter producing NullPointerException when element 'triggerExecution' not present in Map[]
547996e is described below

commit 547996efed8e4593265aea2eb26b10f8b366e141
Author: Radek Busz <ex...@ef.com>
AuthorDate: Sun Oct 31 07:30:14 2021 +0900

    [SPARK-37147][SS] MetricsReporter producing NullPointerException when element 'triggerExecution' not present in Map[]
    
    ### What changes were proposed in this pull request?
    
    Bug Fix.
    
    The problematic code is in `MetricsReporter`:
    `registerGauge("latency", _.durationMs.get("triggerExecution").longValue(), 0L)`
    Instead of `.getOrDefault(...).longValue()` it uses `.get("triggerExecution").longValue()` which can return a null pointer exception if "triggerExecution" is not in the durationMs map.
    
    Solution: use `.getOrDefault` when accessing a map.
    
    ### Why are the changes needed?
    
    When `MetricsReporter.scala` registers a Gauge it occasionally returns a NPE. This breaks reporting custom metrics via Dropwizard and logs multiple times a stacktrace. It usually happens when using StructuredStreaming on a slow data source but I'm not able to reliably reproduce it every time.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes - fixes occasional failures when reporting metrics with Dropwizard
    
    ### How was this patch tested?
    
    Added a unit-test.
    
    Closes #34426 from gitplaneta/SPARK-37147.
    
    Authored-by: Radek Busz <ex...@ef.com>
    Signed-off-by: Jungtaek Lim <ka...@gmail.com>
---
 .../spark/sql/execution/streaming/MetricsReporter.scala     |  2 +-
 .../apache/spark/sql/streaming/StreamingQuerySuite.scala    | 13 +++++++++++++
 2 files changed, 14 insertions(+), 1 deletion(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
index 8709822..600b16f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
@@ -40,7 +40,7 @@ class MetricsReporter(
   // together in Ganglia as a single metric group
   registerGauge("inputRate-total", _.inputRowsPerSecond, 0.0)
   registerGauge("processingRate-total", _.processedRowsPerSecond, 0.0)
-  registerGauge("latency", _.durationMs.get("triggerExecution").longValue(), 0L)
+  registerGauge("latency", _.durationMs.getOrDefault("triggerExecution", 0L).longValue(), 0L)
 
   private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
   timestampFormat.setTimeZone(DateTimeUtils.getTimeZone("UTC"))
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 9c2403d..21a0b24 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -19,13 +19,16 @@ package org.apache.spark.sql.streaming
 
 import java.io.File
 import java.nio.charset.StandardCharsets.UTF_8
+import java.util.Collections
 import java.util.concurrent.CountDownLatch
 
 import scala.collection.mutable
+import scala.util.{Success, Try}
 
 import org.apache.commons.io.FileUtils
 import org.apache.commons.lang3.RandomStringUtils
 import org.apache.hadoop.fs.Path
+import org.mockito.Mockito.when
 import org.scalactic.TolerantNumerics
 import org.scalatest.BeforeAndAfter
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
@@ -465,6 +468,16 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
     }
   }
 
+  test("SPARK-37147: MetricsReporter does not fail when durationMs is empty") {
+    val stateOpProgressMock = mock[StreamingQueryProgress]
+    when(stateOpProgressMock.durationMs).thenReturn(Collections.emptyMap[String, java.lang.Long]())
+    val streamExecMock = mock[StreamExecution]
+    when(streamExecMock.lastProgress).thenReturn(stateOpProgressMock)
+
+    val gauges = new MetricsReporter(streamExecMock, "").metricRegistry.getGauges()
+    assert(Try(gauges.get("latency").getValue) == Success(0L))
+  }
+
   test("input row calculation with same V1 source used twice in self-join") {
     val streamingTriggerDF = spark.createDataset(1 to 10).toDF
     val streamingInputDF = createSingleTriggerStreamingDF(streamingTriggerDF).toDF("value")

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org