You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2016/12/08 23:43:26 UTC
kafka git commit: MINOR: Fix metric collection NPE during shutdown
Repository: kafka
Updated Branches:
refs/heads/trunk 1949a76bc -> 006630fd9
MINOR: Fix metric collection NPE during shutdown
Collecting socket server metrics during shutdown may throw NullPointerException
Author: Xavier L�aut� <xa...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #2221 from xvrl/fix-metrics-npe-on-shutdown
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/006630fd
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/006630fd
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/006630fd
Branch: refs/heads/trunk
Commit: 006630fd93d8efb823e5b5f7d61584138df984a6
Parents: 1949a76
Author: Xavier L�aut� <xa...@confluent.io>
Authored: Thu Dec 8 22:26:25 2016 +0000
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Dec 8 23:42:48 2016 +0000
----------------------------------------------------------------------
.../apache/kafka/common/metrics/Metrics.java | 4 +++
.../main/scala/kafka/network/SocketServer.scala | 7 ++--
.../unit/kafka/network/SocketServerTest.scala | 34 ++++++++++++++------
3 files changed, 33 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/006630fd/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
index bd20e13..78dad18 100644
--- a/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
+++ b/clients/src/main/java/org/apache/kafka/common/metrics/Metrics.java
@@ -389,6 +389,10 @@ public class Metrics implements Closeable {
return this.metrics;
}
+ public KafkaMetric metric(MetricName metricName) {
+ return this.metrics.get(metricName);
+ }
+
/**
* This iterates over every Sensor and triggers a removeSensor if it has expired
* Package private for testing
http://git-wip-us.apache.org/repos/asf/kafka/blob/006630fd/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index e98445f..55061ed 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -105,8 +105,9 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
newGauge("NetworkProcessorAvgIdlePercent",
new Gauge[Double] {
- def value = allMetricNames.map( metricName =>
- metrics.metrics().get(metricName).value()).sum / totalProcessorThreads
+ def value = allMetricNames.map { metricName =>
+ Option(metrics.metric(metricName)).fold(0.0)(_.value)
+ }.sum / totalProcessorThreads
}
)
@@ -389,7 +390,7 @@ private[kafka] class Processor(val id: Int,
newGauge("IdlePercent",
new Gauge[Double] {
def value = {
- metrics.metrics().get(metrics.metricName("io-wait-ratio", "socket-server-metrics", metricTags)).value()
+ Option(metrics.metric(metrics.metricName("io-wait-ratio", "socket-server-metrics", metricTags))).fold(0.0)(_.value)
}
},
metricTags.asScala
http://git-wip-us.apache.org/repos/asf/kafka/blob/006630fd/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 7d0764b..c6f90ff 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -17,27 +17,29 @@
package kafka.network
-import java.net._
-import javax.net.ssl._
import java.io._
-import java.util.HashMap
-import java.util.Random
+import java.net._
import java.nio.ByteBuffer
+import java.util.{HashMap, Random}
+import javax.net.ssl._
+import com.yammer.metrics.core.Gauge
+import com.yammer.metrics.{Metrics => YammerMetrics}
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.NetworkSend
import org.apache.kafka.common.protocol.{ApiKeys, SecurityProtocol}
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.record.MemoryRecords
import org.apache.kafka.common.requests.{ProduceRequest, RequestHeader}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.Time
-import kafka.server.KafkaConfig
-import kafka.utils.TestUtils
-import org.apache.kafka.common.record.MemoryRecords
import org.junit.Assert._
import org.junit._
import org.scalatest.junit.JUnitSuite
+import scala.collection.JavaConverters.mapAsScalaMapConverter
import scala.collection.mutable.ArrayBuffer
class SocketServerTest extends JUnitSuite {
@@ -395,4 +397,18 @@ class SocketServerTest extends JUnitSuite {
}
+ @Test
+ def testMetricCollectionAfterShutdown(): Unit = {
+ server.shutdown()
+
+ val sum = YammerMetrics
+ .defaultRegistry
+ .allMetrics.asScala
+ .filterKeys(k => k.getName.endsWith("IdlePercent") || k.getName.endsWith("NetworkProcessorAvgIdlePercent"))
+ .collect { case (_, metric: Gauge[_]) => metric.value.asInstanceOf[Double] }
+ .sum
+
+ assertEquals(0, sum, 0)
+ }
+
}