You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/05/28 04:15:46 UTC

[kafka] branch 2.3 updated: MINOR: Remove ControllerEventManager metrics on close (#6788)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new 4356454  MINOR: Remove ControllerEventManager metrics on close (#6788)
4356454 is described below

commit 435645429c1052f9665c9cdcc53580ac83dee229
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Wed May 22 15:51:25 2019 -0700

    MINOR: Remove ControllerEventManager metrics on close (#6788)
    
    Remove created metrics when shutting down `ControllerEventManager`. This fixes transient failures in `ControllerEventManagerTest.testEventQueueTime` and is generally good hygiene.
    
    Reviewers: José Armando García Sancio <js...@gmail.com>, Ismael Juma <is...@juma.me.uk>
---
 .../kafka/controller/ControllerEventManager.scala  | 20 +++++++++----
 .../controller/ControllerEventManagerTest.scala    | 33 ++++++++++++++++++++--
 2 files changed, 44 insertions(+), 9 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/ControllerEventManager.scala b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
index f579c2b..1ce594d 100644
--- a/core/src/main/scala/kafka/controller/ControllerEventManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerEventManager.scala
@@ -32,6 +32,8 @@ import scala.collection.JavaConverters._
 
 object ControllerEventManager {
   val ControllerEventThreadName = "controller-event-thread"
+  val EventQueueTimeMetricName = "EventQueueTimeMs"
+  val EventQueueSizeMetricName = "EventQueueSize"
 }
 
 trait ControllerEventProcessor {
@@ -70,17 +72,18 @@ class ControllerEventManager(controllerId: Int,
                              processor: ControllerEventProcessor,
                              time: Time,
                              rateAndTimeMetrics: Map[ControllerState, KafkaTimer]) extends KafkaMetricsGroup {
+  import ControllerEventManager._
 
   @volatile private var _state: ControllerState = ControllerState.Idle
   private val putLock = new ReentrantLock()
   private val queue = new LinkedBlockingQueue[QueuedEvent]
   // Visible for test
-  private[controller] val thread = new ControllerEventThread(ControllerEventManager.ControllerEventThreadName)
+  private[controller] val thread = new ControllerEventThread(ControllerEventThreadName)
 
-  private val eventQueueTimeHist = newHistogram("EventQueueTimeMs")
+  private val eventQueueTimeHist = newHistogram(EventQueueTimeMetricName)
 
   newGauge(
-    "EventQueueSize",
+    EventQueueSizeMetricName,
     new Gauge[Int] {
       def value: Int = {
         queue.size()
@@ -93,9 +96,14 @@ class ControllerEventManager(controllerId: Int,
   def start(): Unit = thread.start()
 
   def close(): Unit = {
-    thread.initiateShutdown()
-    clearAndPut(ShutdownEventThread)
-    thread.awaitShutdown()
+    try {
+      thread.initiateShutdown()
+      clearAndPut(ShutdownEventThread)
+      thread.awaitShutdown()
+    } finally {
+      removeMetric(EventQueueTimeMetricName)
+      removeMetric(EventQueueSizeMetricName)
+    }
   }
 
   def put(event: ControllerEvent): QueuedEvent = inLock(putLock) {
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
index fef9bd1..2c4f66d 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerEventManagerTest.scala
@@ -21,11 +21,11 @@ import java.util.concurrent.CountDownLatch
 import java.util.concurrent.atomic.AtomicInteger
 
 import com.yammer.metrics.Metrics
-import com.yammer.metrics.core.{Histogram, Timer}
+import com.yammer.metrics.core.{Histogram, MetricName, Timer}
 import kafka.utils.TestUtils
 import org.apache.kafka.common.utils.MockTime
+import org.junit.Assert.{assertEquals, assertTrue, fail}
 import org.junit.{After, Test}
-import org.junit.Assert.{assertEquals, fail}
 
 import scala.collection.JavaConverters._
 
@@ -40,7 +40,32 @@ class ControllerEventManagerTest {
   }
 
   @Test
+  def testMetricsCleanedOnClose(): Unit = {
+    val time = new MockTime()
+    val controllerStats = new ControllerStats
+    val eventProcessor = new ControllerEventProcessor {
+      override def process(event: ControllerEvent): Unit = {}
+      override def preempt(event: ControllerEvent): Unit = {}
+    }
+
+    def allEventManagerMetrics: Set[MetricName] = {
+      Metrics.defaultRegistry.allMetrics.asScala.keySet
+        .filter(_.getMBeanName.startsWith("kafka.controller:type=ControllerEventManager"))
+        .toSet
+    }
+
+    controllerEventManager = new ControllerEventManager(0, eventProcessor,
+      time, controllerStats.rateAndTimeMetrics)
+    controllerEventManager.start()
+    assertTrue(allEventManagerMetrics.nonEmpty)
+
+    controllerEventManager.close()
+    assertTrue(allEventManagerMetrics.isEmpty)
+  }
+
+  @Test
   def testEventQueueTime(): Unit = {
+    val metricName = "kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs"
     val controllerStats = new ControllerStats
     val time = new MockTime()
     val latch = new CountDownLatch(1)
@@ -53,6 +78,9 @@ class ControllerEventManagerTest {
       override def preempt(event: ControllerEvent): Unit = {}
     }
 
+    // The metric should not already exist
+    assertTrue(Metrics.defaultRegistry.allMetrics.asScala.filterKeys(_.getMBeanName == metricName).values.isEmpty)
+
     controllerEventManager = new ControllerEventManager(0, eventProcessor,
       time, controllerStats.rateAndTimeMetrics)
     controllerEventManager.start()
@@ -61,7 +89,6 @@ class ControllerEventManagerTest {
     controllerEventManager.put(TopicChange)
     latch.countDown()
 
-    val metricName = "kafka.controller:type=ControllerEventManager,name=EventQueueTimeMs"
     val queueTimeHistogram = Metrics.defaultRegistry.allMetrics.asScala.filterKeys(_.getMBeanName == metricName).values.headOption
       .getOrElse(fail(s"Unable to find metric $metricName")).asInstanceOf[Histogram]