You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/10/22 19:59:40 UTC

[kafka] branch trunk updated: KAFKA-12697: Add FencedBrokerCount and ActiveBrokerCount metrics to the QuorumController (#10772)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3f433c0  KAFKA-12697: Add FencedBrokerCount and ActiveBrokerCount metrics to the QuorumController (#10772)
3f433c0 is described below

commit 3f433c0b4a3fc024d4ec48f7e8b5541336fca053
Author: Ryan Dielhenn <rd...@confluent.io>
AuthorDate: Fri Oct 22 12:57:36 2021 -0700

    KAFKA-12697: Add FencedBrokerCount and ActiveBrokerCount metrics to the QuorumController (#10772)
    
    Reviewers: Colin P. McCabe <cm...@apache.org>
---
 .../kafka/controller/ClusterControlManager.java    | 39 +++++++++++++++++++-
 .../apache/kafka/controller/ControllerMetrics.java |  8 ++++
 .../apache/kafka/controller/QuorumController.java  |  2 +-
 .../kafka/controller/QuorumControllerMetrics.java  | 43 +++++++++++++++++++++-
 .../controller/ClusterControlManagerTest.java      |  8 ++--
 .../kafka/controller/MockControllerMetrics.java    | 24 ++++++++++++
 .../controller/ProducerIdControlManagerTest.java   |  2 +-
 .../controller/ReplicationControlManagerTest.java  | 39 +++++++++++++++++++-
 8 files changed, 155 insertions(+), 10 deletions(-)

diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index b2cdcc1..5916cdc 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -117,6 +117,11 @@ public class ClusterControlManager {
     private final TimelineHashMap<Integer, BrokerRegistration> brokerRegistrations;
 
     /**
+     * A reference to the controller's metrics registry.
+     */
+    private final ControllerMetrics controllerMetrics;
+
+    /**
      * The broker heartbeat manager, or null if this controller is on standby.
      */
     private BrokerHeartbeatManager heartbeatManager;
@@ -131,7 +136,8 @@ public class ClusterControlManager {
                           Time time,
                           SnapshotRegistry snapshotRegistry,
                           long sessionTimeoutNs,
-                          ReplicaPlacer replicaPlacer) {
+                          ReplicaPlacer replicaPlacer,
+                          ControllerMetrics metrics) {
         this.logContext = logContext;
         this.log = logContext.logger(ClusterControlManager.class);
         this.time = time;
@@ -140,6 +146,7 @@ public class ClusterControlManager {
         this.brokerRegistrations = new TimelineHashMap<>(snapshotRegistry, 0);
         this.heartbeatManager = null;
         this.readyBrokersFuture = Optional.empty();
+        this.controllerMetrics = metrics;
     }
 
     /**
@@ -249,11 +256,13 @@ public class ClusterControlManager {
             features.put(feature.name(), new VersionRange(
                 feature.minSupportedVersion(), feature.maxSupportedVersion()));
         }
+       
         // Update broker registrations.
         BrokerRegistration prevRegistration = brokerRegistrations.put(brokerId,
                 new BrokerRegistration(brokerId, record.brokerEpoch(),
                     record.incarnationId(), listeners, features,
                     Optional.ofNullable(record.rack()), record.fenced()));
+        updateMetrics(prevRegistration, brokerRegistrations.get(brokerId));
         if (prevRegistration == null) {
             log.info("Registered new broker: {}", record);
         } else if (prevRegistration.incarnationId().equals(record.incarnationId())) {
@@ -274,6 +283,7 @@ public class ClusterControlManager {
                 "registration with that epoch found", record.toString()));
         } else {
             brokerRegistrations.remove(brokerId);
+            updateMetrics(registration, brokerRegistrations.get(brokerId));
             log.info("Unregistered broker: {}", record);
         }
     }
@@ -289,6 +299,7 @@ public class ClusterControlManager {
                 "registration with that epoch found", record.toString()));
         } else {
             brokerRegistrations.put(brokerId, registration.cloneWithFencing(true));
+            updateMetrics(registration, brokerRegistrations.get(brokerId));
             log.info("Fenced broker: {}", record);
         }
     }
@@ -304,6 +315,7 @@ public class ClusterControlManager {
                 "registration with that epoch found", record.toString()));
         } else {
             brokerRegistrations.put(brokerId, registration.cloneWithFencing(false));
+            updateMetrics(registration, brokerRegistrations.get(brokerId));
             log.info("Unfenced broker: {}", record);
         }
         if (readyBrokersFuture.isPresent()) {
@@ -314,6 +326,31 @@ public class ClusterControlManager {
         }
     }
 
+    private void updateMetrics(BrokerRegistration prevRegistration, BrokerRegistration registration) {
+        if (registration == null) {
+            if (prevRegistration.fenced()) {
+                controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() - 1);
+            } else {
+                controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() - 1);
+            }
+        } else if (prevRegistration == null) {
+            if (registration.fenced()) {
+                controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() + 1);
+            } else {
+                controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() + 1);
+            }
+        } else {
+            if (prevRegistration.fenced() && !registration.fenced()) {
+                controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() - 1);
+                controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() + 1);
+            } else if (!prevRegistration.fenced() && registration.fenced()) {
+                controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() + 1);
+                controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() - 1);
+            }
+        }
+    }
+
+
     public List<List<Integer>> placeReplicas(int startPartition,
                                              int numPartitions,
                                              short numReplicas) {
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
index 3fd0e66..fa03e05 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
@@ -27,6 +27,14 @@ public interface ControllerMetrics extends AutoCloseable {
 
     void updateEventQueueProcessingTime(long durationMs);
 
+    void setFencedBrokerCount(int brokerCount);
+
+    int fencedBrokerCount();
+
+    void setActiveBrokerCount(int brokerCount);
+
+    int activeBrokerCount();
+
     void setGlobalTopicsCount(int topicCount);
 
     int globalTopicsCount();
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 0a35140..67d4eb0 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1125,7 +1125,7 @@ public final class QuorumController implements Controller {
             snapshotRegistry, configDefs, alterConfigPolicy);
         this.clientQuotaControlManager = new ClientQuotaControlManager(snapshotRegistry);
         this.clusterControl = new ClusterControlManager(logContext, time,
-            snapshotRegistry, sessionTimeoutNs, replicaPlacer);
+            snapshotRegistry, sessionTimeoutNs, replicaPlacer, controllerMetrics);
         this.featureControl = new FeatureControlManager(supportedFeatures, snapshotRegistry);
         this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
         this.snapshotMaxNewRecordBytes = snapshotMaxNewRecordBytes;
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
index 3dd3336..9b3a4dd 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
@@ -32,6 +32,10 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
         "ControllerEventManager", "EventQueueTimeMs");
     private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = getMetricName(
         "ControllerEventManager", "EventQueueProcessingTimeMs");
+    private final static MetricName FENCED_BROKER_COUNT = getMetricName(
+        "KafkaController", "FencedBrokerCount");
+    private final static MetricName ACTIVE_BROKER_COUNT = getMetricName(
+        "KafkaController", "ActiveBrokerCount");
     private final static MetricName GLOBAL_TOPIC_COUNT = getMetricName(
         "KafkaController", "GlobalTopicCount");
     private final static MetricName GLOBAL_PARTITION_COUNT = getMetricName(
@@ -40,14 +44,18 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
         "KafkaController", "OfflinePartitionsCount");
     private final static MetricName PREFERRED_REPLICA_IMBALANCE_COUNT = getMetricName(
         "KafkaController", "PreferredReplicaImbalanceCount");
-
+    
     private final MetricsRegistry registry;
     private volatile boolean active;
+    private volatile int fencedBrokerCount;
+    private volatile int activeBrokerCount;
     private volatile int globalTopicCount;
     private volatile int globalPartitionCount;
     private volatile int offlinePartitionCount;
     private volatile int preferredReplicaImbalanceCount;
     private final Gauge<Integer> activeControllerCount;
+    private final Gauge<Integer> fencedBrokerCountGauge;
+    private final Gauge<Integer> activeBrokerCountGauge;
     private final Gauge<Integer> globalPartitionCountGauge;
     private final Gauge<Integer> globalTopicCountGauge;
     private final Gauge<Integer> offlinePartitionCountGauge;
@@ -58,6 +66,8 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
     public QuorumControllerMetrics(MetricsRegistry registry) {
         this.registry = Objects.requireNonNull(registry);
         this.active = false;
+        this.fencedBrokerCount = 0;
+        this.activeBrokerCount = 0;
         this.globalTopicCount = 0;
         this.globalPartitionCount = 0;
         this.offlinePartitionCount = 0;
@@ -70,6 +80,18 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
         });
         this.eventQueueTime = registry.newHistogram(EVENT_QUEUE_TIME_MS, true);
         this.eventQueueProcessingTime = registry.newHistogram(EVENT_QUEUE_PROCESSING_TIME_MS, true);
+        this.fencedBrokerCountGauge = registry.newGauge(FENCED_BROKER_COUNT, new Gauge<Integer>() {
+            @Override
+            public Integer value() {
+                return fencedBrokerCount;
+            }
+        });
+        this.activeBrokerCountGauge = registry.newGauge(ACTIVE_BROKER_COUNT, new Gauge<Integer>() {
+            @Override
+            public Integer value() {
+                return activeBrokerCount;
+            }
+        });
         this.globalTopicCountGauge = registry.newGauge(GLOBAL_TOPIC_COUNT, new Gauge<Integer>() {
             @Override
             public Integer value() {
@@ -117,6 +139,25 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
     }
 
     @Override
+    public void setFencedBrokerCount(int brokerCount) {
+        this.fencedBrokerCount = brokerCount;
+    }
+
+    @Override
+    public int fencedBrokerCount() {
+        return this.fencedBrokerCount;
+    }
+
+    public void setActiveBrokerCount(int brokerCount) {
+        this.activeBrokerCount = brokerCount;
+    }
+
+    @Override
+    public int activeBrokerCount() {
+        return this.activeBrokerCount;
+    }
+
+    @Override
     public void setGlobalTopicsCount(int topicCount) {
         this.globalTopicCount = topicCount;
     }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index 195b02a..16625b5 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -58,7 +58,7 @@ public class ClusterControlManagerTest {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
         ClusterControlManager clusterControl = new ClusterControlManager(
             new LogContext(), time, snapshotRegistry, 1000,
-                new StripedReplicaPlacer(new Random()));
+                new StripedReplicaPlacer(new Random()), new MockControllerMetrics());
         clusterControl.activate();
         assertFalse(clusterControl.unfenced(0));
 
@@ -99,7 +99,7 @@ public class ClusterControlManagerTest {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
         ClusterControlManager clusterControl = new ClusterControlManager(
             new LogContext(), new MockTime(0, 0, 0), snapshotRegistry, 1000,
-            new StripedReplicaPlacer(new Random()));
+            new StripedReplicaPlacer(new Random()), new MockControllerMetrics());
         clusterControl.activate();
         clusterControl.replay(brokerRecord);
         assertEquals(new BrokerRegistration(1, 100,
@@ -122,7 +122,7 @@ public class ClusterControlManagerTest {
         MockRandom random = new MockRandom();
         ClusterControlManager clusterControl = new ClusterControlManager(
             new LogContext(), time, snapshotRegistry, 1000,
-            new StripedReplicaPlacer(random));
+            new StripedReplicaPlacer(random), new MockControllerMetrics());
         clusterControl.activate();
         for (int i = 0; i < numUsableBrokers; i++) {
             RegisterBrokerRecord brokerRecord =
@@ -159,7 +159,7 @@ public class ClusterControlManagerTest {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
         ClusterControlManager clusterControl = new ClusterControlManager(
             new LogContext(), time, snapshotRegistry, 1000,
-            new StripedReplicaPlacer(new Random()));
+            new StripedReplicaPlacer(new Random()), new MockControllerMetrics());
         clusterControl.activate();
         assertFalse(clusterControl.unfenced(0));
         for (int i = 0; i < 3; i++) {
diff --git a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
index 3d3075e..0120f15 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
@@ -19,6 +19,8 @@ package org.apache.kafka.controller;
 
 public final class MockControllerMetrics implements ControllerMetrics {
     private volatile boolean active;
+    private volatile int fencedBrokers;
+    private volatile int activeBrokers;
     private volatile int topics;
     private volatile int partitions;
     private volatile int offlinePartitions;
@@ -27,6 +29,8 @@ public final class MockControllerMetrics implements ControllerMetrics {
 
     public MockControllerMetrics() {
         this.active = false;
+        this.fencedBrokers = 0;
+        this.activeBrokers = 0;
         this.topics = 0;
         this.partitions = 0;
         this.offlinePartitions = 0;
@@ -54,6 +58,26 @@ public final class MockControllerMetrics implements ControllerMetrics {
     }
 
     @Override
+    public void setFencedBrokerCount(int brokerCount) {
+        this.fencedBrokers = brokerCount;
+    }
+
+    @Override
+    public int fencedBrokerCount() {
+        return this.fencedBrokers;
+    }
+
+    @Override
+    public void setActiveBrokerCount(int brokerCount) {
+        this.activeBrokers = brokerCount;
+    }
+
+    @Override
+    public int activeBrokerCount() {
+        return activeBrokers;
+    }
+
+    @Override
     public void setGlobalTopicsCount(int topicCount) {
         this.topics = topicCount;
     }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
index f96510d..990395b 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
@@ -54,7 +54,7 @@ public class ProducerIdControlManagerTest {
         snapshotRegistry = new SnapshotRegistry(logContext);
         clusterControl = new ClusterControlManager(
             logContext, time, snapshotRegistry, 1000,
-            new StripedReplicaPlacer(random));
+            new StripedReplicaPlacer(random), new MockControllerMetrics());
 
         clusterControl.activate();
         for (int i = 0; i < 4; i++) {
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 3737f92..6543073 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -132,10 +132,10 @@ public class ReplicationControlManagerTest {
         final LogContext logContext = new LogContext();
         final MockTime time = new MockTime();
         final MockRandom random = new MockRandom();
+        final ControllerMetrics metrics = new MockControllerMetrics();
         final ClusterControlManager clusterControl = new ClusterControlManager(
             logContext, time, snapshotRegistry, TimeUnit.MILLISECONDS.convert(BROKER_SESSION_TIMEOUT_MS, TimeUnit.NANOSECONDS),
-            new StripedReplicaPlacer(random));
-        final ControllerMetrics metrics = new MockControllerMetrics();
+            new StripedReplicaPlacer(random), metrics);
         final ConfigurationControlManager configurationControl = new ConfigurationControlManager(
             new LogContext(), snapshotRegistry, Collections.emptyMap(), Optional.empty());
         final ReplicationControlManager replicationControl;
@@ -430,6 +430,41 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
+    public void testBrokerCountMetrics() throws Exception {
+        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+        ReplicationControlManager replicationControl = ctx.replicationControl;
+
+        ctx.registerBrokers(0);
+
+        assertEquals(1, ctx.metrics.fencedBrokerCount());
+        assertEquals(0, ctx.metrics.activeBrokerCount());
+
+        ctx.unfenceBrokers(0);
+
+        assertEquals(0, ctx.metrics.fencedBrokerCount());
+        assertEquals(1, ctx.metrics.activeBrokerCount());
+
+        ctx.registerBrokers(1);
+        ctx.unfenceBrokers(1);
+
+        assertEquals(2, ctx.metrics.activeBrokerCount());
+
+        ctx.registerBrokers(2);
+        ctx.unfenceBrokers(2);
+
+        assertEquals(0, ctx.metrics.fencedBrokerCount());
+        assertEquals(3, ctx.metrics.activeBrokerCount());
+
+        ControllerResult<Void> result = replicationControl.unregisterBroker(0);
+        ctx.replay(result.records());
+        result = replicationControl.unregisterBroker(2);
+        ctx.replay(result.records());
+
+        assertEquals(0, ctx.metrics.fencedBrokerCount());
+        assertEquals(1, ctx.metrics.activeBrokerCount());
+    }
+
+    @Test
     public void testCreateTopicsWithValidateOnlyFlag() throws Exception {
         ReplicationControlTestContext ctx = new ReplicationControlTestContext();
         ctx.registerBrokers(0, 1, 2);