You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/12/08 23:43:26 UTC

kafka git commit: MINOR: Fix metric collection NPE during shutdown

Repository: kafka
Updated Branches:
  refs/heads/trunk 1949a76bc -> 006630fd9


MINOR: Fix metric collection NPE during shutdown

Collecting socket server metrics during shutdown may throw NullPointerException

Author: Xavier L�aut� <xa...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #2221 from xvrl/fix-metrics-npe-on-shutdown


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/006630fd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/006630fd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/006630fd

Branch: refs/heads/trunk
Commit: 006630fd93d8efb823e5b5f7d61584138df984a6
Parents: 1949a76
Author: Xavier L�aut� <xa...@confluent.io>
Authored: Thu Dec 8 22:26:25 2016 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Dec 8 23:42:48 2016 +0000

----------------------------------------------------------------------
 .../apache/kafka/common/metrics/Metrics.java    |  4 +++
 .../main/scala/kafka/network/SocketServer.scala |  7 ++--
 .../unit/kafka/network/SocketServerTest.scala   | 34 ++++++++++++++------
 3 files changed, 33 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/006630fd/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index bd20e13..78dad18 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -389,6 +389,10 @@ public class Metrics implements Closeable {
         return this.metrics;
     }
 
+    public KafkaMetric metric(MetricName metricName) {
+        return this.metrics.get(metricName);
+    }
+
     /**
      * This iterates over every Sensor and triggers a removeSensor if it has expired
      * Package private for testing

http://git-wip-us.apache.org/repos/asf/kafka/blob/006630fd/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index e98445f..55061ed 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -105,8 +105,9 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
 
     newGauge("NetworkProcessorAvgIdlePercent",
       new Gauge[Double] {
-        def value = allMetricNames.map( metricName =>
-          metrics.metrics().get(metricName).value()).sum / totalProcessorThreads
+        def value = allMetricNames.map { metricName =>
+          Option(metrics.metric(metricName)).fold(0.0)(_.value)
+        }.sum / totalProcessorThreads
       }
     )
 
@@ -389,7 +390,7 @@ private[kafka] class Processor(val id: Int,
   newGauge("IdlePercent",
     new Gauge[Double] {
       def value = {
-        metrics.metrics().get(metrics.metricName("io-wait-ratio", "socket-server-metrics", metricTags)).value()
+        Option(metrics.metric(metrics.metricName("io-wait-ratio", "socket-server-metrics", metricTags))).fold(0.0)(_.value)
       }
     },
     metricTags.asScala

http://git-wip-us.apache.org/repos/asf/kafka/blob/006630fd/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 7d0764b..c6f90ff 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -17,27 +17,29 @@
 
 package kafka.network
 
-import java.net._
-import javax.net.ssl._
 import java.io._
-import java.util.HashMap
-import java.util.Random
+import java.net._
 import java.nio.ByteBuffer
+import java.util.{HashMap, Random}
+import javax.net.ssl._
 
+import com.yammer.metrics.core.Gauge
+import com.yammer.metrics.{Metrics => YammerMetrics}
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.NetworkSend
 import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests.{ProduceRequest, RequestHeader}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.Time
-import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
-import org.apache.kafka.common.record.MemoryRecords
 import org.junit.Assert._
 import org.junit._
 import org.scalatest.junit.JUnitSuite
 
+import scala.collection.JavaConverters.mapAsScalaMapConverter
 import scala.collection.mutable.ArrayBuffer
 
 class SocketServerTest extends JUnitSuite {
@@ -395,4 +397,18 @@ class SocketServerTest extends JUnitSuite {
 
   }
 
+  @Test
+  def testMetricCollectionAfterShutdown(): Unit = {
+    server.shutdown()
+
+    val sum = YammerMetrics
+      .defaultRegistry
+      .allMetrics.asScala
+      .filterKeys(k => k.getName.endsWith("IdlePercent") || k.getName.endsWith("NetworkProcessorAvgIdlePercent"))
+      .collect { case (_, metric: Gauge[_]) => metric.value.asInstanceOf[Double] }
+      .sum
+
+    assertEquals(0, sum, 0)
+  }
+
 }