You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/05/28 04:15:46 UTC
[kafka] branch 2.3 updated: MINOR: Remove ControllerEventManager
metrics on close (#6788)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new 4356454 MINOR: Remove ControllerEventManager metrics on close (#6788)
4356454 is described below
commit 435645429c1052f9665c9cdcc53580ac83dee229
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed May 22 15:51:25 2019 -0700
MINOR: Remove ControllerEventManager metrics on close (#6788)
Remove created metrics when shutting down `ControllerEventManager`. This fixes transient failures in `ControllerEventManagerTest.testEventQueueTime` and is generally good hygiene.
Reviewers: José Armando García Sancio <js...@gmail.com>, Ismael Juma <is...@juma.me.uk>
---
.../kafka/controller/ControllerEventManager.scala | 20 +++++++++----
.../controller/ControllerEventManagerTest.scala | 33 ++++++++++++++++++++--
2 files changed, 44 insertions(+), 9 deletions(-)
diff --git a/core/src/main/scala/kafka/controller/ControllerEventManager.scala b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
index f579c2b..1ce594d 100644
--- a/core/src/main/scala/kafka/controller/ControllerEventManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
@@ -32,6 +32,8 @@ import scala.collection.JavaConverters._
object ControllerEventManager {
val ControllerEventThreadName = "controller-event-thread"
+ val EventQueueTimeMetricName = "EventQueueTimeMs"
+ val EventQueueSizeMetricName = "EventQueueSize"
}
trait ControllerEventProcessor {
@@ -70,17 +72,18 @@ class ControllerEventManager(controllerId: Int,
processor: ControllerEventProcessor,
time: Time,
rateAndTimeMetrics: Map[ControllerState, KafkaTimer]) extends KafkaMetricsGroup {
+ import ControllerEventManager._
@volatile private var _state: ControllerState = ControllerState.Idle
private val putLock = new ReentrantLock()
private val queue = new LinkedBlockingQueue[QueuedEvent]
// Visible for test
- private[controller] val thread = new ControllerEventThread(ControllerEventManager.ControllerEventThreadName)
+ private[controller] val thread = new ControllerEventThread(ControllerEventThreadName)
- private val eventQueueTimeHist = newHistogram("EventQueueTimeMs")
+ private val eventQueueTimeHist = newHistogram(EventQueueTimeMetricName)
newGauge(
- "EventQueueSize",
+ EventQueueSizeMetricName,
new Gauge[Int] {
def value: Int = {
queue.size()
@@ -93,9 +96,14 @@ class ControllerEventManager(controllerId: Int,
def start(): Unit = thread.start()
def close(): Unit = {
- thread.initiateShutdown()
- clearAndPut(ShutdownEventThread)
- thread.awaitShutdown()
+ try {
+ thread.initiateShutdown()
+ clearAndPut(ShutdownEventThread)
+ thread.awaitShutdown()
+ } finally {
+ removeMetric(EventQueueTimeMetricName)
+ removeMetric(EventQueueSizeMetricName)
+ }
}
def put(event: ControllerEvent): QueuedEvent = inLock(putLock) {
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
index fef9bd1..2c4f66d 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
@@ -21,11 +21,11 @@ import java.util.concurrent.CountDownLatch
import java.util.concurrent.atomic.AtomicInteger
import com.yammer.metrics.Metrics
-import com.yammer.metrics.core.{Histogram, Timer}
+import com.yammer.metrics.core.{Histogram, MetricName, Timer}
import kafka.utils.TestUtils
import org.apache.kafka.common.utils.MockTime
+import org.junit.Assert.{assertEquals, assertTrue, fail}
import org.junit.{After, Test}
-import org.junit.Assert.{assertEquals, fail}
import scala.collection.JavaConverters._
@@ -40,7 +40,32 @@ class ControllerEventManagerTest {
}
@Test
+ def testMetricsCleanedOnClose(): Unit = {
+ val time = new MockTime()
+ val controllerStats = new ControllerStats
+ val eventProcessor = new ControllerEventProcessor {
+ override def process(event: ControllerEvent): Unit = {}
+ override def preempt(event: ControllerEvent): Unit = {}
+ }
+
+ def allEventManagerMetrics: Set[MetricName] = {
+ Metrics.defaultRegistry.allMetrics.asScala.keySet
+ .filter(_.getMBeanName.startsWith("kafka.controller:type=ControllerEventManager"))
+ .toSet
+ }
+
+ controllerEventManager = new ControllerEventManager(0, eventProcessor,
+ time, controllerStats.rateAndTimeMetrics)
+ controllerEventManager.start()
+ assertTrue(allEventManagerMetrics.nonEmpty)
+
+ controllerEventManager.close()
+ assertTrue(allEventManagerMetrics.isEmpty)
+ }
+
+ @Test
def testEventQueueTime(): Unit = {
+ val metricName = "kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs"
val controllerStats = new ControllerStats
val time = new MockTime()
val latch = new CountDownLatch(1)
@@ -53,6 +78,9 @@ class ControllerEventManagerTest {
override def preempt(event: ControllerEvent): Unit = {}
}
+ // The metric should not already exist
+ assertTrue(Metrics.defaultRegistry.allMetrics.asScala.filterKeys(_.getMBeanName == metricName).values.isEmpty)
+
controllerEventManager = new ControllerEventManager(0, eventProcessor,
time, controllerStats.rateAndTimeMetrics)
controllerEventManager.start()
@@ -61,7 +89,6 @@ class ControllerEventManagerTest {
controllerEventManager.put(TopicChange)
latch.countDown()
- val metricName = "kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs"
val queueTimeHistogram = Metrics.defaultRegistry.allMetrics.asScala.filterKeys(_.getMBeanName == metricName).values.headOption
.getOrElse(fail(s"Unable to find metric $metricName")).asInstanceOf[Histogram]