You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/05/15 06:05:19 UTC

[kafka] branch trunk updated: KAFKA-3473; More Controller Health Metrics (KIP-237)

This is an automated email from the ASF dual-hosted git repository.

lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0bb48a1  KAFKA-3473; More Controller Health Metrics (KIP-237)
0bb48a1 is described below

commit 0bb48a1669d7fdc30d3768f087fd6e592530dde2
Author: Dong Lin <li...@gmail.com>
AuthorDate: Mon May 14 23:04:56 2018 -0700

    KAFKA-3473; More Controller Health Metrics (KIP-237)
    
    This patch adds a few metrics that are useful for monitoring controller health. See KIP-237 for more detail.
    
    Author: Dong Lin <li...@gmail.com>
    
    Reviewers: Jun Rao <ju...@gmail.com>
    
    Closes #4392 from lindong28/KAFKA-3473
---
 .../controller/ControllerChannelManager.scala      | 32 ++++++++++++++--------
 .../kafka/controller/ControllerEventManager.scala  | 23 ++++++++++++++--
 .../scala/kafka/controller/KafkaController.scala   |  2 ++
 .../kafka/controller/ControllerTestUtils.scala     |  2 +-
 4 files changed, 44 insertions(+), 15 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index aab4de2..addd88d 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -19,7 +19,7 @@ package kafka.controller
 import java.net.SocketTimeoutException
 import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit}
 
-import com.yammer.metrics.core.Gauge
+import com.yammer.metrics.core.{Gauge, Timer}
 import kafka.api._
 import kafka.cluster.Broker
 import kafka.common.KafkaException
@@ -44,6 +44,7 @@ import scala.collection.{Set, mutable}
 
 object ControllerChannelManager {
   val QueueSizeMetricName = "QueueSize"
+  val RequestRateAndQueueTimeMetricName = "RequestRateAndQueueTimeMs"
 }
 
 class ControllerChannelManager(controllerContext: ControllerContext, config: KafkaConfig, time: Time, metrics: Metrics,
@@ -82,7 +83,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
       val stateInfoOpt = brokerStateInfo.get(brokerId)
       stateInfoOpt match {
         case Some(stateInfo) =>
-          stateInfo.messageQueue.put(QueueItem(apiKey, request, callback))
+          stateInfo.messageQueue.put(QueueItem(apiKey, request, callback, time.milliseconds()))
         case None =>
           warn(s"Not sending request $request to broker $brokerId, since it is offline.")
       }
@@ -151,8 +152,12 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
       case Some(name) => s"$name:Controller-${config.brokerId}-to-broker-${broker.id}-send-thread"
     }
 
+    val requestRateAndQueueTimeMetrics = newTimer(
+      RequestRateAndQueueTimeMetricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS, brokerMetricTags(broker.id)
+    )
+
     val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient,
-      brokerNode, config, time, stateChangeLogger, threadName)
+      brokerNode, config, time, requestRateAndQueueTimeMetrics, stateChangeLogger, threadName)
     requestThread.setDaemon(false)
 
     val queueSizeGauge = newGauge(
@@ -160,14 +165,14 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
       new Gauge[Int] {
         def value: Int = messageQueue.size
       },
-      queueSizeTags(broker.id)
+      brokerMetricTags(broker.id)
     )
 
-    brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue,
-      requestThread, queueSizeGauge))
+    brokerStateInfo.put(broker.id, ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue,
+      requestThread, queueSizeGauge, requestRateAndQueueTimeMetrics))
   }
 
-  private def queueSizeTags(brokerId: Int) = Map("broker-id" -> brokerId.toString)
+  private def brokerMetricTags(brokerId: Int) = Map("broker-id" -> brokerId.toString)
 
   private def removeExistingBroker(brokerState: ControllerBrokerStateInfo) {
     try {
@@ -178,7 +183,8 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
       brokerState.requestSendThread.shutdown()
       brokerState.networkClient.close()
       brokerState.messageQueue.clear()
-      removeMetric(QueueSizeMetricName, queueSizeTags(brokerState.brokerNode.id))
+      removeMetric(QueueSizeMetricName, brokerMetricTags(brokerState.brokerNode.id))
+      removeMetric(RequestRateAndQueueTimeMetricName, brokerMetricTags(brokerState.brokerNode.id))
       brokerStateInfo.remove(brokerState.brokerNode.id)
     } catch {
       case e: Throwable => error("Error while removing broker by the controller", e)
@@ -193,7 +199,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
 }
 
 case class QueueItem(apiKey: ApiKeys, request: AbstractRequest.Builder[_ <: AbstractRequest],
-                     callback: AbstractResponse => Unit)
+                     callback: AbstractResponse => Unit, enqueueTimeMs: Long)
 
 class RequestSendThread(val controllerId: Int,
                         val controllerContext: ControllerContext,
@@ -202,6 +208,7 @@ class RequestSendThread(val controllerId: Int,
                         val brokerNode: Node,
                         val config: KafkaConfig,
                         val time: Time,
+                        val requestRateAndQueueTimeMetrics: Timer,
                         val stateChangeLogger: StateChangeLogger,
                         name: String)
   extends ShutdownableThread(name = name) {
@@ -214,7 +221,9 @@ class RequestSendThread(val controllerId: Int,
 
     def backoff(): Unit = pause(100, TimeUnit.MILLISECONDS)
 
-    val QueueItem(apiKey, requestBuilder, callback) = queue.take()
+    val QueueItem(apiKey, requestBuilder, callback, enqueueTimeMs) = queue.take()
+    requestRateAndQueueTimeMetrics.update(time.milliseconds() - enqueueTimeMs, TimeUnit.MILLISECONDS)
+
     var clientResponse: ClientResponse = null
     try {
       var isSendSuccessful = false
@@ -496,7 +505,8 @@ case class ControllerBrokerStateInfo(networkClient: NetworkClient,
                                      brokerNode: Node,
                                      messageQueue: BlockingQueue[QueueItem],
                                      requestSendThread: RequestSendThread,
-                                     queueSizeGauge: Gauge[Int])
+                                     queueSizeGauge: Gauge[Int],
+                                     requestRateAndTimeMetrics: Timer)
 
 case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: AbstractResponse => Unit)
 
diff --git a/core/src/main/scala/kafka/controller/ControllerEventManager.scala b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
index b880f07..13967e0 100644
--- a/core/src/main/scala/kafka/controller/ControllerEventManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
@@ -20,9 +20,11 @@ package kafka.controller
 import java.util.concurrent.LinkedBlockingQueue
 import java.util.concurrent.locks.ReentrantLock
 
-import kafka.metrics.KafkaTimer
+import com.yammer.metrics.core.Gauge
+import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
 import kafka.utils.CoreUtils.inLock
 import kafka.utils.ShutdownableThread
+import org.apache.kafka.common.utils.Time
 
 import scala.collection._
 
@@ -30,12 +32,25 @@ object ControllerEventManager {
   val ControllerEventThreadName = "controller-event-thread"
 }
 class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[ControllerState, KafkaTimer],
-                             eventProcessedListener: ControllerEvent => Unit) {
+                             eventProcessedListener: ControllerEvent => Unit) extends KafkaMetricsGroup {
 
   @volatile private var _state: ControllerState = ControllerState.Idle
   private val putLock = new ReentrantLock()
   private val queue = new LinkedBlockingQueue[ControllerEvent]
   private val thread = new ControllerEventThread(ControllerEventManager.ControllerEventThreadName)
+  private val time = Time.SYSTEM
+
+  private val eventQueueTimeHist = newHistogram("EventQueueTimeMs")
+
+  newGauge(
+    "EventQueueSize",
+    new Gauge[Int] {
+      def value: Int = {
+        queue.size()
+      }
+    }
+  )
+
 
   def state: ControllerState = _state
 
@@ -52,7 +67,7 @@ class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[Controll
 
   def clearAndPut(event: ControllerEvent): Unit = inLock(putLock) {
     queue.clear()
-    queue.put(event)
+    put(event)
   }
 
   class ControllerEventThread(name: String) extends ShutdownableThread(name = name, isInterruptible = false) {
@@ -64,6 +79,8 @@ class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[Controll
         case controllerEvent =>
           _state = controllerEvent.state
 
+          eventQueueTimeHist.update(time.milliseconds() - controllerEvent.enqueueTimeMs)
+
           try {
             rateAndTimeMetrics(state).time {
               controllerEvent.process()
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index d3d1a81..0b50e34 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1663,6 +1663,8 @@ private[controller] class ControllerStats extends KafkaMetricsGroup {
 }
 
 sealed trait ControllerEvent {
+  val enqueueTimeMs: Long = Time.SYSTEM.milliseconds()
+
   def state: ControllerState
   def process(): Unit
 }
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala b/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala
index 407297a..b0413a7 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala
@@ -22,7 +22,7 @@ object ControllerTestUtils {
 
   /** Since ControllerEvent is sealed, return a subclass of ControllerEvent created with EasyMock */
   def createMockControllerEvent(controllerState: ControllerState, process: () => Unit): ControllerEvent = {
-    val mockEvent = EasyMock.createMock(classOf[ControllerEvent])
+    val mockEvent = EasyMock.createNiceMock(classOf[ControllerEvent])
     EasyMock.expect(mockEvent.state).andReturn(controllerState)
     EasyMock.expect(mockEvent.process()).andAnswer(new IAnswer[Unit]() {
       def answer(): Unit = {

-- 
To stop receiving notification emails like this one, please contact
lindong@apache.org.