You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/05/15 06:08:00 UTC

[jira] [Commented] (KAFKA-3473) KIP-237: More Controller Health Metrics

    [ https://issues.apache.org/jira/browse/KAFKA-3473?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16475316#comment-16475316 ] 

ASF GitHub Bot commented on KAFKA-3473:
---------------------------------------

lindong28 closed pull request #4392: KAFKA-3473; More Controller Health Metrics (KIP-237)
URL: https://github.com/apache/kafka/pull/4392
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index aab4de23606..addd88df3f0 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -19,7 +19,7 @@ package kafka.controller
 import java.net.SocketTimeoutException
 import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit}
 
-import com.yammer.metrics.core.Gauge
+import com.yammer.metrics.core.{Gauge, Timer}
 import kafka.api._
 import kafka.cluster.Broker
 import kafka.common.KafkaException
@@ -44,6 +44,7 @@ import scala.collection.{Set, mutable}
 
 object ControllerChannelManager {
   val QueueSizeMetricName = "QueueSize"
+  val RequestRateAndQueueTimeMetricName = "RequestRateAndQueueTimeMs"
 }
 
 class ControllerChannelManager(controllerContext: ControllerContext, config: KafkaConfig, time: Time, metrics: Metrics,
@@ -82,7 +83,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
       val stateInfoOpt = brokerStateInfo.get(brokerId)
       stateInfoOpt match {
         case Some(stateInfo) =>
-          stateInfo.messageQueue.put(QueueItem(apiKey, request, callback))
+          stateInfo.messageQueue.put(QueueItem(apiKey, request, callback, time.milliseconds()))
         case None =>
           warn(s"Not sending request $request to broker $brokerId, since it is offline.")
       }
@@ -151,8 +152,12 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
       case Some(name) => s"$name:Controller-${config.brokerId}-to-broker-${broker.id}-send-thread"
     }
 
+    val requestRateAndQueueTimeMetrics = newTimer(
+      RequestRateAndQueueTimeMetricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS, brokerMetricTags(broker.id)
+    )
+
     val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient,
-      brokerNode, config, time, stateChangeLogger, threadName)
+      brokerNode, config, time, requestRateAndQueueTimeMetrics, stateChangeLogger, threadName)
     requestThread.setDaemon(false)
 
     val queueSizeGauge = newGauge(
@@ -160,14 +165,14 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
       new Gauge[Int] {
         def value: Int = messageQueue.size
       },
-      queueSizeTags(broker.id)
+      brokerMetricTags(broker.id)
     )
 
-    brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue,
-      requestThread, queueSizeGauge))
+    brokerStateInfo.put(broker.id, ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue,
+      requestThread, queueSizeGauge, requestRateAndQueueTimeMetrics))
   }
 
-  private def queueSizeTags(brokerId: Int) = Map("broker-id" -> brokerId.toString)
+  private def brokerMetricTags(brokerId: Int) = Map("broker-id" -> brokerId.toString)
 
   private def removeExistingBroker(brokerState: ControllerBrokerStateInfo) {
     try {
@@ -178,7 +183,8 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
       brokerState.requestSendThread.shutdown()
       brokerState.networkClient.close()
       brokerState.messageQueue.clear()
-      removeMetric(QueueSizeMetricName, queueSizeTags(brokerState.brokerNode.id))
+      removeMetric(QueueSizeMetricName, brokerMetricTags(brokerState.brokerNode.id))
+      removeMetric(RequestRateAndQueueTimeMetricName, brokerMetricTags(brokerState.brokerNode.id))
       brokerStateInfo.remove(brokerState.brokerNode.id)
     } catch {
       case e: Throwable => error("Error while removing broker by the controller", e)
@@ -193,7 +199,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
 }
 
 case class QueueItem(apiKey: ApiKeys, request: AbstractRequest.Builder[_ <: AbstractRequest],
-                     callback: AbstractResponse => Unit)
+                     callback: AbstractResponse => Unit, enqueueTimeMs: Long)
 
 class RequestSendThread(val controllerId: Int,
                         val controllerContext: ControllerContext,
@@ -202,6 +208,7 @@ class RequestSendThread(val controllerId: Int,
                         val brokerNode: Node,
                         val config: KafkaConfig,
                         val time: Time,
+                        val requestRateAndQueueTimeMetrics: Timer,
                         val stateChangeLogger: StateChangeLogger,
                         name: String)
   extends ShutdownableThread(name = name) {
@@ -214,7 +221,9 @@ class RequestSendThread(val controllerId: Int,
 
     def backoff(): Unit = pause(100, TimeUnit.MILLISECONDS)
 
-    val QueueItem(apiKey, requestBuilder, callback) = queue.take()
+    val QueueItem(apiKey, requestBuilder, callback, enqueueTimeMs) = queue.take()
+    requestRateAndQueueTimeMetrics.update(time.milliseconds() - enqueueTimeMs, TimeUnit.MILLISECONDS)
+
     var clientResponse: ClientResponse = null
     try {
       var isSendSuccessful = false
@@ -496,7 +505,8 @@ case class ControllerBrokerStateInfo(networkClient: NetworkClient,
                                      brokerNode: Node,
                                      messageQueue: BlockingQueue[QueueItem],
                                      requestSendThread: RequestSendThread,
-                                     queueSizeGauge: Gauge[Int])
+                                     queueSizeGauge: Gauge[Int],
+                                     requestRateAndTimeMetrics: Timer)
 
 case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: AbstractResponse => Unit)
 
diff --git a/core/src/main/scala/kafka/controller/ControllerEventManager.scala b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
index b880f07f2dd..13967e029ed 100644
--- a/core/src/main/scala/kafka/controller/ControllerEventManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
@@ -20,9 +20,11 @@ package kafka.controller
 import java.util.concurrent.LinkedBlockingQueue
 import java.util.concurrent.locks.ReentrantLock
 
-import kafka.metrics.KafkaTimer
+import com.yammer.metrics.core.Gauge
+import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
 import kafka.utils.CoreUtils.inLock
 import kafka.utils.ShutdownableThread
+import org.apache.kafka.common.utils.Time
 
 import scala.collection._
 
@@ -30,12 +32,25 @@ object ControllerEventManager {
   val ControllerEventThreadName = "controller-event-thread"
 }
 class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[ControllerState, KafkaTimer],
-                             eventProcessedListener: ControllerEvent => Unit) {
+                             eventProcessedListener: ControllerEvent => Unit) extends KafkaMetricsGroup {
 
   @volatile private var _state: ControllerState = ControllerState.Idle
   private val putLock = new ReentrantLock()
   private val queue = new LinkedBlockingQueue[ControllerEvent]
   private val thread = new ControllerEventThread(ControllerEventManager.ControllerEventThreadName)
+  private val time = Time.SYSTEM
+
+  private val eventQueueTimeHist = newHistogram("EventQueueTimeMs")
+
+  newGauge(
+    "EventQueueSize",
+    new Gauge[Int] {
+      def value: Int = {
+        queue.size()
+      }
+    }
+  )
+
 
   def state: ControllerState = _state
 
@@ -52,7 +67,7 @@ class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[Controll
 
   def clearAndPut(event: ControllerEvent): Unit = inLock(putLock) {
     queue.clear()
-    queue.put(event)
+    put(event)
   }
 
   class ControllerEventThread(name: String) extends ShutdownableThread(name = name, isInterruptible = false) {
@@ -64,6 +79,8 @@ class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[Controll
         case controllerEvent =>
           _state = controllerEvent.state
 
+          eventQueueTimeHist.update(time.milliseconds() - controllerEvent.enqueueTimeMs)
+
           try {
             rateAndTimeMetrics(state).time {
               controllerEvent.process()
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index d3d1a81c41f..0b50e345518 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1663,6 +1663,8 @@ private[controller] class ControllerStats extends KafkaMetricsGroup {
 }
 
 sealed trait ControllerEvent {
+  val enqueueTimeMs: Long = Time.SYSTEM.milliseconds()
+
   def state: ControllerState
   def process(): Unit
 }
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala b/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala
index 407297a8713..b0413a71af6 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala
@@ -22,7 +22,7 @@ object ControllerTestUtils {
 
   /** Since ControllerEvent is sealed, return a subclass of ControllerEvent created with EasyMock */
   def createMockControllerEvent(controllerState: ControllerState, process: () => Unit): ControllerEvent = {
-    val mockEvent = EasyMock.createMock(classOf[ControllerEvent])
+    val mockEvent = EasyMock.createNiceMock(classOf[ControllerEvent])
     EasyMock.expect(mockEvent.state).andReturn(controllerState)
     EasyMock.expect(mockEvent.process()).andAnswer(new IAnswer[Unit]() {
       def answer(): Unit = {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> KIP-237: More Controller Health Metrics
> ---------------------------------------
>
>                 Key: KAFKA-3473
>                 URL: https://issues.apache.org/jira/browse/KAFKA-3473
>             Project: Kafka
>          Issue Type: Improvement
>          Components: controller
>    Affects Versions: 1.0.1
>            Reporter: Jiangjie Qin
>            Assignee: Dong Lin
>            Priority: Major
>             Fix For: 2.0.0
>
>
> Currently controller appends the requests to brokers into controller channel manager queue during state transition. i.e. the state transition are propagated asynchronously. We need to track the request queue time on the controller side to see how long the state propagation is delayed after the state transition finished on the controller.
> We also want to have metrics to monitor the ControllerEventManager queue size and the average time it takes for a event to wait in this queue before being processed.
> See https://cwiki.apache.org/confluence/display/KAFKA/KIP-237%3A+More+Controller+Health+Metrics for more detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)