You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by li...@apache.org on 2018/05/15 06:05:19 UTC
[kafka] branch trunk updated: KAFKA-3473;
More Controller Health Metrics (KIP-237)
This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0bb48a1 KAFKA-3473; More Controller Health Metrics (KIP-237)
0bb48a1 is described below
commit 0bb48a1669d7fdc30d3768f087fd6e592530dde2
Author: Dong Lin <li...@gmail.com>
AuthorDate: Mon May 14 23:04:56 2018 -0700
KAFKA-3473; More Controller Health Metrics (KIP-237)
This patch adds a few metrics that are useful for monitoring controller health. See KIP-237 for more detail.
Author: Dong Lin <li...@gmail.com>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #4392 from lindong28/KAFKA-3473
---
.../controller/ControllerChannelManager.scala | 32 ++++++++++++++--------
.../kafka/controller/ControllerEventManager.scala | 23 ++++++++++++++--
.../scala/kafka/controller/KafkaController.scala | 2 ++
.../kafka/controller/ControllerTestUtils.scala | 2 +-
4 files changed, 44 insertions(+), 15 deletions(-)
diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index aab4de2..addd88d 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -19,7 +19,7 @@ package kafka.controller
import java.net.SocketTimeoutException
import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit}
-import com.yammer.metrics.core.Gauge
+import com.yammer.metrics.core.{Gauge, Timer}
import kafka.api._
import kafka.cluster.Broker
import kafka.common.KafkaException
@@ -44,6 +44,7 @@ import scala.collection.{Set, mutable}
object ControllerChannelManager {
val QueueSizeMetricName = "QueueSize"
+ val RequestRateAndQueueTimeMetricName = "RequestRateAndQueueTimeMs"
}
class ControllerChannelManager(controllerContext: ControllerContext, config: KafkaConfig, time: Time, metrics: Metrics,
@@ -82,7 +83,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
val stateInfoOpt = brokerStateInfo.get(brokerId)
stateInfoOpt match {
case Some(stateInfo) =>
- stateInfo.messageQueue.put(QueueItem(apiKey, request, callback))
+ stateInfo.messageQueue.put(QueueItem(apiKey, request, callback, time.milliseconds()))
case None =>
warn(s"Not sending request $request to broker $brokerId, since it is offline.")
}
@@ -151,8 +152,12 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
case Some(name) => s"$name:Controller-${config.brokerId}-to-broker-${broker.id}-send-thread"
}
+ val requestRateAndQueueTimeMetrics = newTimer(
+ RequestRateAndQueueTimeMetricName, TimeUnit.MILLISECONDS, TimeUnit.SECONDS, brokerMetricTags(broker.id)
+ )
+
val requestThread = new RequestSendThread(config.brokerId, controllerContext, messageQueue, networkClient,
- brokerNode, config, time, stateChangeLogger, threadName)
+ brokerNode, config, time, requestRateAndQueueTimeMetrics, stateChangeLogger, threadName)
requestThread.setDaemon(false)
val queueSizeGauge = newGauge(
@@ -160,14 +165,14 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
new Gauge[Int] {
def value: Int = messageQueue.size
},
- queueSizeTags(broker.id)
+ brokerMetricTags(broker.id)
)
- brokerStateInfo.put(broker.id, new ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue,
- requestThread, queueSizeGauge))
+ brokerStateInfo.put(broker.id, ControllerBrokerStateInfo(networkClient, brokerNode, messageQueue,
+ requestThread, queueSizeGauge, requestRateAndQueueTimeMetrics))
}
- private def queueSizeTags(brokerId: Int) = Map("broker-id" -> brokerId.toString)
+ private def brokerMetricTags(brokerId: Int) = Map("broker-id" -> brokerId.toString)
private def removeExistingBroker(brokerState: ControllerBrokerStateInfo) {
try {
@@ -178,7 +183,8 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
brokerState.requestSendThread.shutdown()
brokerState.networkClient.close()
brokerState.messageQueue.clear()
- removeMetric(QueueSizeMetricName, queueSizeTags(brokerState.brokerNode.id))
+ removeMetric(QueueSizeMetricName, brokerMetricTags(brokerState.brokerNode.id))
+ removeMetric(RequestRateAndQueueTimeMetricName, brokerMetricTags(brokerState.brokerNode.id))
brokerStateInfo.remove(brokerState.brokerNode.id)
} catch {
case e: Throwable => error("Error while removing broker by the controller", e)
@@ -193,7 +199,7 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
}
case class QueueItem(apiKey: ApiKeys, request: AbstractRequest.Builder[_ <: AbstractRequest],
- callback: AbstractResponse => Unit)
+ callback: AbstractResponse => Unit, enqueueTimeMs: Long)
class RequestSendThread(val controllerId: Int,
val controllerContext: ControllerContext,
@@ -202,6 +208,7 @@ class RequestSendThread(val controllerId: Int,
val brokerNode: Node,
val config: KafkaConfig,
val time: Time,
+ val requestRateAndQueueTimeMetrics: Timer,
val stateChangeLogger: StateChangeLogger,
name: String)
extends ShutdownableThread(name = name) {
@@ -214,7 +221,9 @@ class RequestSendThread(val controllerId: Int,
def backoff(): Unit = pause(100, TimeUnit.MILLISECONDS)
- val QueueItem(apiKey, requestBuilder, callback) = queue.take()
+ val QueueItem(apiKey, requestBuilder, callback, enqueueTimeMs) = queue.take()
+ requestRateAndQueueTimeMetrics.update(time.milliseconds() - enqueueTimeMs, TimeUnit.MILLISECONDS)
+
var clientResponse: ClientResponse = null
try {
var isSendSuccessful = false
@@ -496,7 +505,8 @@ case class ControllerBrokerStateInfo(networkClient: NetworkClient,
brokerNode: Node,
messageQueue: BlockingQueue[QueueItem],
requestSendThread: RequestSendThread,
- queueSizeGauge: Gauge[Int])
+ queueSizeGauge: Gauge[Int],
+ requestRateAndTimeMetrics: Timer)
case class StopReplicaRequestInfo(replica: PartitionAndReplica, deletePartition: Boolean, callback: AbstractResponse => Unit)
diff --git a/core/src/main/scala/kafka/controller/ControllerEventManager.scala b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
index b880f07..13967e0 100644
--- a/core/src/main/scala/kafka/controller/ControllerEventManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
@@ -20,9 +20,11 @@ package kafka.controller
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.locks.ReentrantLock
-import kafka.metrics.KafkaTimer
+import com.yammer.metrics.core.Gauge
+import kafka.metrics.{KafkaMetricsGroup, KafkaTimer}
import kafka.utils.CoreUtils.inLock
import kafka.utils.ShutdownableThread
+import org.apache.kafka.common.utils.Time
import scala.collection._
@@ -30,12 +32,25 @@ object ControllerEventManager {
val ControllerEventThreadName = "controller-event-thread"
}
class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[ControllerState, KafkaTimer],
- eventProcessedListener: ControllerEvent => Unit) {
+ eventProcessedListener: ControllerEvent => Unit) extends KafkaMetricsGroup {
@volatile private var _state: ControllerState = ControllerState.Idle
private val putLock = new ReentrantLock()
private val queue = new LinkedBlockingQueue[ControllerEvent]
private val thread = new ControllerEventThread(ControllerEventManager.ControllerEventThreadName)
+ private val time = Time.SYSTEM
+
+ private val eventQueueTimeHist = newHistogram("EventQueueTimeMs")
+
+ newGauge(
+ "EventQueueSize",
+ new Gauge[Int] {
+ def value: Int = {
+ queue.size()
+ }
+ }
+ )
+
def state: ControllerState = _state
@@ -52,7 +67,7 @@ class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[Controll
def clearAndPut(event: ControllerEvent): Unit = inLock(putLock) {
queue.clear()
- queue.put(event)
+ put(event)
}
class ControllerEventThread(name: String) extends ShutdownableThread(name = name, isInterruptible = false) {
@@ -64,6 +79,8 @@ class ControllerEventManager(controllerId: Int, rateAndTimeMetrics: Map[Controll
case controllerEvent =>
_state = controllerEvent.state
+ eventQueueTimeHist.update(time.milliseconds() - controllerEvent.enqueueTimeMs)
+
try {
rateAndTimeMetrics(state).time {
controllerEvent.process()
diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala
index d3d1a81..0b50e34 100644
--- a/core/src/main/scala/kafka/controller/KafkaController.scala
+++ b/core/src/main/scala/kafka/controller/KafkaController.scala
@@ -1663,6 +1663,8 @@ private[controller] class ControllerStats extends KafkaMetricsGroup {
}
sealed trait ControllerEvent {
+ val enqueueTimeMs: Long = Time.SYSTEM.milliseconds()
+
def state: ControllerState
def process(): Unit
}
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala b/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala
index 407297a..b0413a7 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala
@@ -22,7 +22,7 @@ object ControllerTestUtils {
/** Since ControllerEvent is sealed, return a subclass of ControllerEvent created with EasyMock */
def createMockControllerEvent(controllerState: ControllerState, process: () => Unit): ControllerEvent = {
- val mockEvent = EasyMock.createMock(classOf[ControllerEvent])
+ val mockEvent = EasyMock.createNiceMock(classOf[ControllerEvent])
EasyMock.expect(mockEvent.state).andReturn(controllerState)
EasyMock.expect(mockEvent.process()).andAnswer(new IAnswer[Unit]() {
def answer(): Unit = {
--
To stop receiving notification emails like this one, please contact
lindong@apache.org.