You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/09/29 14:11:44 UTC

kafka git commit: KAFKA-5968; Create/remove request metrics during broker startup/shutdown

Repository: kafka
Updated Branches:
  refs/heads/trunk 3107a6c5c -> 0ef960304


KAFKA-5968; Create/remove request metrics during broker startup/shutdown

Replaces the static `RequestMetrics` object with a class so that metrics
are created and removed during broker startup and shutdown to avoid metrics
tests being affected by metrics left behind by previous tests.

Also reinstates `kafka.api.MetricsTest` which was failing frequently earlier
due to tests removing the static request metrics.

Author: Rajini Sivaram <ra...@googlemail.com>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #3991 from rajinisivaram/KAFKA-5968


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0ef96030
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0ef96030
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0ef96030

Branch: refs/heads/trunk
Commit: 0ef960304b6b81c1b725bf8e036f20d295045c4a
Parents: 3107a6c
Author: Rajini Sivaram <ra...@googlemail.com>
Authored: Fri Sep 29 15:09:55 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri Sep 29 15:11:29 2017 +0100

----------------------------------------------------------------------
 .../scala/kafka/network/RequestChannel.scala    | 111 ++++++++++++-------
 .../main/scala/kafka/network/SocketServer.scala |   3 +-
 .../integration/kafka/api/MetricsTest.scala     |  18 +--
 .../unit/kafka/network/SocketServerTest.scala   |   4 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala |   3 +-
 5 files changed, 89 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/0ef96030/core/src/main/scala/kafka/network/RequestChannel.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index e5f115c..c97e3af 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -49,11 +49,28 @@ object RequestChannel extends Logging {
     val sanitizedUser = Sanitizer.sanitize(principal.getName)
   }
 
+  class Metrics {
+
+    private val metricsMap = mutable.Map[String, RequestMetrics]()
+
+    (ApiKeys.values.toSeq.map(_.name) ++
+        Seq(RequestMetrics.consumerFetchMetricName, RequestMetrics.followFetchMetricName)).foreach { name =>
+      metricsMap.put(name, new RequestMetrics(name))
+    }
+
+    def apply(metricName: String) = metricsMap(metricName)
+
+    def shutdown(): Unit = {
+       metricsMap.values.foreach(_.removeMetrics())
+    }
+  }
+
   class Request(val processor: Int,
                 val context: RequestContext,
                 val startTimeNanos: Long,
                 memoryPool: MemoryPool,
-                @volatile private var buffer: ByteBuffer) extends BaseRequest {
+                @volatile private var buffer: ByteBuffer,
+                metrics: RequestChannel.Metrics) extends BaseRequest {
     // These need to be volatile because the readers are in the network thread and the writers are in the request
     // handler threads or the purgatory threads
     @volatile var requestDequeueTimeNanos = -1L
@@ -137,7 +154,7 @@ object RequestChannel extends Logging {
         else Seq.empty
       val metricNames = fetchMetricNames :+ header.apiKey.name
       metricNames.foreach { metricName =>
-        val m = RequestMetrics(metricName)
+        val m = metrics(metricName)
         m.requestRate.mark()
         m.requestQueueTimeHist.update(Math.round(requestQueueTimeMs))
         m.localTimeHist.update(Math.round(apiLocalTimeMs))
@@ -221,6 +238,7 @@ object RequestChannel extends Logging {
 }
 
 class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
+  val metrics = new RequestChannel.Metrics
   private var responseListeners: List[(Int) => Unit] = Nil
   private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
   private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
@@ -294,74 +312,67 @@ class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMe
 
   def updateErrorMetrics(apiKey: ApiKeys, errors: collection.Map[Errors, Integer]) {
     errors.foreach { case (error, count) =>
-      RequestMetrics.markErrorMeter(apiKey.name, error, count)
+      metrics(apiKey.name).markErrorMeter(error, count)
     }
   }
 
   def shutdown() {
     requestQueue.clear()
+    metrics.shutdown()
   }
 
   def sendShutdownRequest(): Unit = requestQueue.put(ShutdownRequest)
-
 }
 
 object RequestMetrics {
-
-  private val metricsMap = mutable.Map[String, RequestMetrics]()
-
   val consumerFetchMetricName = ApiKeys.FETCH.name + "Consumer"
   val followFetchMetricName = ApiKeys.FETCH.name + "Follower"
 
-  (ApiKeys.values.toSeq.map(_.name) ++ Seq(consumerFetchMetricName, followFetchMetricName)).foreach { name =>
-    metricsMap.put(name, new RequestMetrics(name))
-  }
-
-  def apply(metricName: String) = metricsMap(metricName)
-
-  def markErrorMeter(name: String, error: Errors, count: Int) {
-      val errorMeter = metricsMap(name).errorMeters(error)
-      errorMeter.getOrCreateMeter().mark(count.toLong)
-  }
-
-  // Used for testing until these metrics are moved to a class
-  private[kafka] def clearErrorMeters(): Unit = {
-    metricsMap.values.foreach { requestMetrics =>
-      requestMetrics.errorMeters.values.foreach(_.removeMeter())
-    }
-  }
-
+  val RequestsPerSec = "RequestsPerSec"
+  val RequestQueueTimeMs = "RequestQueueTimeMs"
+  val LocalTimeMs = "LocalTimeMs"
+  val RemoteTimeMs = "RemoteTimeMs"
+  val ThrottleTimeMs = "ThrottleTimeMs"
+  val ResponseQueueTimeMs = "ResponseQueueTimeMs"
+  val ResponseSendTimeMs = "ResponseSendTimeMs"
+  val TotalTimeMs = "TotalTimeMs"
+  val RequestBytes = "RequestBytes"
+  val MessageConversionsTimeMs = "MessageConversionsTimeMs"
+  val TemporaryMemoryBytes = "TemporaryMemoryBytes"
+  val ErrorsPerSec = "ErrorsPerSec"
 }
 
 class RequestMetrics(name: String) extends KafkaMetricsGroup {
+  import RequestMetrics._
+
   val tags = Map("request" -> name)
-  val requestRate = newMeter("RequestsPerSec", "requests", TimeUnit.SECONDS, tags)
+  val requestRate = newMeter(RequestsPerSec, "requests", TimeUnit.SECONDS, tags)
   // time a request spent in a request queue
-  val requestQueueTimeHist = newHistogram("RequestQueueTimeMs", biased = true, tags)
+  val requestQueueTimeHist = newHistogram(RequestQueueTimeMs, biased = true, tags)
   // time a request takes to be processed at the local broker
-  val localTimeHist = newHistogram("LocalTimeMs", biased = true, tags)
+  val localTimeHist = newHistogram(LocalTimeMs, biased = true, tags)
   // time a request takes to wait on remote brokers (currently only relevant to fetch and produce requests)
-  val remoteTimeHist = newHistogram("RemoteTimeMs", biased = true, tags)
+  val remoteTimeHist = newHistogram(RemoteTimeMs, biased = true, tags)
   // time a request is throttled
-  val throttleTimeHist = newHistogram("ThrottleTimeMs", biased = true, tags)
+  val throttleTimeHist = newHistogram(ThrottleTimeMs, biased = true, tags)
   // time a response spent in a response queue
-  val responseQueueTimeHist = newHistogram("ResponseQueueTimeMs", biased = true, tags)
+  val responseQueueTimeHist = newHistogram(ResponseQueueTimeMs, biased = true, tags)
   // time to send the response to the requester
-  val responseSendTimeHist = newHistogram("ResponseSendTimeMs", biased = true, tags)
-  val totalTimeHist = newHistogram("TotalTimeMs", biased = true, tags)
+  val responseSendTimeHist = newHistogram(ResponseSendTimeMs, biased = true, tags)
+  val totalTimeHist = newHistogram(TotalTimeMs, biased = true, tags)
   // request size in bytes
-  val requestBytesHist = newHistogram("RequestBytes", biased = true, tags)
+  val requestBytesHist = newHistogram(RequestBytes, biased = true, tags)
   // time for message conversions (only relevant to fetch and produce requests)
   val messageConversionsTimeHist =
     if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name)
-      Some(newHistogram("MessageConversionsTimeMs", biased = true, tags))
+      Some(newHistogram(MessageConversionsTimeMs, biased = true, tags))
     else
       None
   // Temporary memory allocated for processing request (only populated for fetch and produce requests)
   // This shows the memory allocated for compression/conversions excluding the actual request size
   val tempMemoryBytesHist =
     if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name)
-      Some(newHistogram("TemporaryMemoryBytes", biased = true, tags))
+      Some(newHistogram(TemporaryMemoryBytes, biased = true, tags))
     else
       None
 
@@ -379,21 +390,43 @@ class RequestMetrics(name: String) extends KafkaMetricsGroup {
       else {
         synchronized {
           if (meter == null)
-             meter = newMeter("ErrorsPerSec", "requests", TimeUnit.SECONDS, tags)
+             meter = newMeter(ErrorsPerSec, "requests", TimeUnit.SECONDS, tags)
           meter
         }
       }
     }
 
-    // This is currently used only in tests.
     def removeMeter(): Unit = {
       synchronized {
         if (meter != null) {
-          removeMetric("ErrorsPerSec", tags)
+          removeMetric(ErrorsPerSec, tags)
           meter = null
         }
       }
     }
+  }
 
+  def markErrorMeter(error: Errors, count: Int) {
+    errorMeters(error).getOrCreateMeter().mark(count.toLong)
+  }
+
+  def removeMetrics(): Unit = {
+    removeMetric(RequestsPerSec, tags)
+    removeMetric(RequestQueueTimeMs, tags)
+    removeMetric(LocalTimeMs, tags)
+    removeMetric(RemoteTimeMs, tags)
+    removeMetric(RequestsPerSec, tags)
+    removeMetric(ThrottleTimeMs, tags)
+    removeMetric(ResponseQueueTimeMs, tags)
+    removeMetric(TotalTimeMs, tags)
+    removeMetric(ResponseSendTimeMs, tags)
+    removeMetric(RequestBytes, tags)
+    removeMetric(ResponseSendTimeMs, tags)
+    if (name == ApiKeys.FETCH.name || name == ApiKeys.PRODUCE.name) {
+      removeMetric(MessageConversionsTimeMs, tags)
+      removeMetric(TemporaryMemoryBytes, tags)
+    }
+    errorMeters.values.foreach(_.removeMeter())
+    errorMeters.clear()
   }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0ef96030/core/src/main/scala/kafka/network/SocketServer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 875652d..fa792fb 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -139,6 +139,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
     this.synchronized {
       acceptors.values.foreach(_.shutdown)
       processors.foreach(_.shutdown)
+      requestChannel.shutdown()
     }
     info("Shutdown completed")
   }
@@ -549,7 +550,7 @@ private[kafka] class Processor(val id: Int,
             val context = new RequestContext(header, receive.source, channel.socketAddress,
               channel.principal, listenerName, securityProtocol)
             val req = new RequestChannel.Request(processor = id, context = context,
-              startTimeNanos = time.nanoseconds, memoryPool, receive.payload)
+              startTimeNanos = time.nanoseconds, memoryPool, receive.payload, requestChannel.metrics)
             requestChannel.sendRequest(req)
             selector.mute(receive.source)
           case None =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/0ef96030/core/src/test/scala/integration/kafka/api/MetricsTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/MetricsTest.scala b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
index 0bd5075..f1bedfd 100644
--- a/core/src/test/scala/integration/kafka/api/MetricsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/MetricsTest.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.common.config.SaslConfigs
 import org.apache.kafka.common.errors.InvalidTopicException
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.SecurityProtocol
-import org.junit.{After, Before, Ignore, Test}
+import org.junit.{After, Before, Test}
 import org.junit.Assert._
 
 import scala.collection.JavaConverters._
@@ -52,6 +52,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
 
   @Before
   override def setUp(): Unit = {
+    verifyNoRequestMetrics("Request metrics not removed in a previous test")
     startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism), KafkaSasl, kafkaServerJaasEntryName))
     super.setUp()
   }
@@ -60,13 +61,13 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
   override def tearDown(): Unit = {
     super.tearDown()
     closeSasl()
+    verifyNoRequestMetrics("Request metrics not removed in this test")
   }
 
   /**
    * Verifies some of the metrics of producer, consumer as well as server.
    */
   @Test
-  @Ignore
   def testMetrics(): Unit = {
     val topic = "topicWithOldMessageFormat"
     val props = new Properties
@@ -74,9 +75,6 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
     TestUtils.createTopic(this.zkUtils, topic, numPartitions = 1, replicationFactor = 1, this.servers, props)
     val tp = new TopicPartition(topic, 0)
 
-    // Clear static state
-    RequestMetrics.clearErrorMeters()
-
     // Produce and consume some records
     val numRecords = 10
     val recordSize = 1000
@@ -201,8 +199,7 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
     verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=MessageConversionsTimeMs,request=Produce", value => value > 0.0)
 
     verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Fetch")
-    // Temporary size for fetch should be zero after KAFKA-5968 is fixed
-    verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Fetch", value => value >= 0.0)
+    verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=TemporaryMemoryBytes,request=Fetch", value => value == 0.0)
 
      // request size recorded for all request types, check one
     verifyYammerMetricRecorded(s"$requestMetricsPrefix,name=RequestBytes,request=Metadata")
@@ -285,4 +282,11 @@ class MetricsTest extends IntegrationTestHarness with SaslSetup {
     assertTrue(s"Broker metric not recorded correctly for $name value $metricValue", verify(metricValue))
     metricValue
   }
+
+  private def verifyNoRequestMetrics(errorMessage: String): Unit = {
+    val metrics = Metrics.defaultRegistry.allMetrics.asScala.filter { case (n, _) =>
+      n.getMBeanName.startsWith("kafka.network:type=RequestMetrics")
+    }
+    assertTrue(s"$errorMessage: ${metrics.keys}", metrics.isEmpty)
+  }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0ef96030/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 077950d..46eb4ce 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -519,7 +519,7 @@ class SocketServerTest extends JUnitSuite {
       val channel = overrideServer.requestChannel
       val request = receiveRequest(channel)
 
-      val requestMetrics = RequestMetrics(request.header.apiKey.name)
+      val requestMetrics = channel.metrics(request.header.apiKey.name)
       def totalTimeHistCount(): Long = requestMetrics.totalTimeHist.count
       val expectedTotalTimeCount = totalTimeHistCount() + 1
 
@@ -561,7 +561,7 @@ class SocketServerTest extends JUnitSuite {
       TestUtils.waitUntilTrue(() => overrideServer.processor(request.processor).channel(request.context.connectionId).isEmpty,
         s"Idle connection `${request.context.connectionId}` was not closed by selector")
 
-      val requestMetrics = RequestMetrics(request.header.apiKey.name)
+      val requestMetrics = channel.metrics(request.header.apiKey.name)
       def totalTimeHistCount(): Long = requestMetrics.totalTimeHist.count
       val expectedTotalTimeCount = totalTimeHistCount() + 1
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/0ef96030/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index fde2ae1..c3f492d 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -54,6 +54,7 @@ import scala.collection.Map
 class KafkaApisTest {
 
   private val requestChannel = EasyMock.createNiceMock(classOf[RequestChannel])
+  private val requestChannelMetrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
   private val replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
   private val groupCoordinator = EasyMock.createNiceMock(classOf[GroupCoordinator])
   private val adminManager = EasyMock.createNiceMock(classOf[AdminManager])
@@ -402,7 +403,7 @@ class KafkaApisTest {
     val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS,
       new ListenerName(""), SecurityProtocol.PLAINTEXT)
     (request, new RequestChannel.Request(processor = 1, context = context, startTimeNanos =  0,
-      MemoryPool.NONE, buffer))
+      MemoryPool.NONE, buffer, requestChannelMetrics))
   }
 
   private def readResponse(api: ApiKeys, request: AbstractRequest, capturedResponse: Capture[RequestChannel.Response]): AbstractResponse = {