You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/01/12 05:51:19 UTC

[kafka] branch trunk updated: MINOR: Fix EventQueueProcessingTimeMs metric #11668

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1aa26fa  MINOR: Fix EventQueueProcessingTimeMs metric #11668
1aa26fa is described below

commit 1aa26faf8889005d65e338540e888b60ef5a611f
Author: José Armando García Sancio <js...@users.noreply.github.com>
AuthorDate: Tue Jan 11 22:48:39 2022 -0700

    MINOR: Fix EventQueueProcessingTimeMs metric #11668
    
    Make sure that the event queue processing time histogram gets updated
    and add tests that verify that the update methods modify the correct
    histogram.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>
---
 .../kafka/controller/QuorumControllerMetrics.java  |  2 +-
 .../controller/QuorumControllerMetricsTest.java    | 82 +++++++++++++++-------
 2 files changed, 58 insertions(+), 26 deletions(-)

diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
index 9b3a4dd..03cd47c 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
@@ -135,7 +135,7 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
 
     @Override
     public void updateEventQueueProcessingTime(long durationMs) {
-        eventQueueTime.update(durationMs);
+        eventQueueProcessingTime.update(durationMs);
     }
 
     @Override
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsTest.java
index 74b24c7..4b0afb5 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsTest.java
@@ -17,13 +17,14 @@
 
 package org.apache.kafka.controller;
 
+import com.yammer.metrics.core.Histogram;
+import com.yammer.metrics.core.MetricName;
 import com.yammer.metrics.core.MetricsRegistry;
+import java.util.Set;
 import org.apache.kafka.common.utils.Utils;
 import org.junit.jupiter.api.Test;
-
-import java.util.Set;
-
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 public class QuorumControllerMetricsTest {
@@ -49,36 +50,67 @@ public class QuorumControllerMetricsTest {
         assertMetricsCreatedAndRemovedUponClose(expectedType, expectedMetricNames);
     }
 
+    @Test
+    public void testUpdateEventQueueTime() {
+        MetricsRegistry registry = new MetricsRegistry();
+        try {
+            try (QuorumControllerMetrics quorumControllerMetrics = new QuorumControllerMetrics(registry)) {
+                quorumControllerMetrics.updateEventQueueTime(1000);
+                assertMetricHistogram(registry, metricName("ControllerEventManager", "EventQueueTimeMs"), 1, 1000);
+            }
+        } finally {
+            registry.shutdown();
+        }
+    }
+
+    @Test
+    public void testUpdateEventQueueProcessingTime() {
+        MetricsRegistry registry = new MetricsRegistry();
+        try {
+            try (QuorumControllerMetrics quorumControllerMetrics = new QuorumControllerMetrics(registry)) {
+                quorumControllerMetrics.updateEventQueueProcessingTime(1000);
+                assertMetricHistogram(registry, metricName("ControllerEventManager", "EventQueueProcessingTimeMs"), 1, 1000);
+            }
+        } finally {
+            registry.shutdown();
+        }
+    }
+
     private static void assertMetricsCreatedAndRemovedUponClose(String expectedType, Set<String> expectedMetricNames) {
         MetricsRegistry registry = new MetricsRegistry();
-        try (QuorumControllerMetrics quorumControllerMetrics = new QuorumControllerMetrics(registry)) {
-            assertMetricsCreated(registry, expectedMetricNames, expectedType);
+        try {
+            try (QuorumControllerMetrics quorumControllerMetrics = new QuorumControllerMetrics(registry)) {
+                assertMetricsCreated(registry, expectedMetricNames, expectedType);
+            }
+            assertMetricsRemoved(registry, expectedMetricNames, expectedType);
+        } finally {
+            registry.shutdown();
         }
-        assertMetricsRemoved(registry, expectedMetricNames, expectedType);
+    }
+
+    private static void assertMetricHistogram(MetricsRegistry registry, MetricName metricName, long count, double sum) {
+        Histogram histogram = (Histogram) registry.allMetrics().get(metricName);
+
+        assertEquals(count, histogram.count());
+        assertEquals(sum, histogram.sum(), .1);
+    }
+
+    private static MetricName metricName(String type, String name) {
+        String mBeanName = String.format("kafka.controller:type=%s,name=%s", type, name);
+        return new MetricName("kafka.controller", type, name, null, mBeanName);
     }
 
     private static void assertMetricsCreated(MetricsRegistry registry, Set<String> expectedMetricNames, String expectedType) {
-        expectedMetricNames.forEach(expectedMetricName -> assertTrue(
-            registry.allMetrics().keySet().stream().anyMatch(metricName -> {
-                if (metricName.getGroup().equals(EXPECTED_GROUP) && metricName.getType().equals(expectedType)
-                    && metricName.getScope() == null && metricName.getName().equals(expectedMetricName)) {
-                    // It has to exist AND the MBean name has to be correct;
-                    // fail right here if the MBean name doesn't match
-                    String expectedMBeanPrefix = EXPECTED_GROUP + ":type=" + expectedType + ",name=";
-                    assertEquals(expectedMBeanPrefix + expectedMetricName, metricName.getMBeanName(),
-                        "Incorrect MBean name");
-                    return true; // the metric name exists and the associated MBean name matches
-                } else {
-                    return false; // this one didn't match
-                }
-            }), "Missing metric: " + expectedMetricName));
+        expectedMetricNames.forEach(expectedName -> {
+            MetricName expectMetricName = metricName(expectedType, expectedName);
+            assertTrue(registry.allMetrics().containsKey(expectMetricName), "Missing metric: " + expectMetricName);
+        });
     }
 
     private static void assertMetricsRemoved(MetricsRegistry registry, Set<String> expectedMetricNames, String expectedType) {
-        expectedMetricNames.forEach(expectedMetricName -> assertTrue(
-            registry.allMetrics().keySet().stream().noneMatch(metricName ->
-                metricName.getGroup().equals(EXPECTED_GROUP) && metricName.getType().equals(expectedType)
-                    && metricName.getScope() == null && metricName.getName().equals(expectedMetricName)),
-            "Metric not removed when closed: " + expectedMetricName));
+        expectedMetricNames.forEach(expectedName -> {
+            MetricName expectMetricName = metricName(expectedType, expectedName);
+            assertFalse(registry.allMetrics().containsKey(expectMetricName), "Found metric: " + expectMetricName);
+        });
     }
 }