You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/10/22 19:59:40 UTC
[kafka] branch trunk updated: KAFKA-12697: Add FencedBrokerCount
and ActiveBrokerCount metrics to the QuorumController (#10772)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3f433c0 KAFKA-12697: Add FencedBrokerCount and ActiveBrokerCount metrics to the QuorumController (#10772)
3f433c0 is described below
commit 3f433c0b4a3fc024d4ec48f7e8b5541336fca053
Author: Ryan Dielhenn <rd...@confluent.io>
AuthorDate: Fri Oct 22 12:57:36 2021 -0700
KAFKA-12697: Add FencedBrokerCount and ActiveBrokerCount metrics to the QuorumController (#10772)
Reviewers: Colin P. McCabe <cm...@apache.org>
---
.../kafka/controller/ClusterControlManager.java | 39 +++++++++++++++++++-
.../apache/kafka/controller/ControllerMetrics.java | 8 ++++
.../apache/kafka/controller/QuorumController.java | 2 +-
.../kafka/controller/QuorumControllerMetrics.java | 43 +++++++++++++++++++++-
.../controller/ClusterControlManagerTest.java | 8 ++--
.../kafka/controller/MockControllerMetrics.java | 24 ++++++++++++
.../controller/ProducerIdControlManagerTest.java | 2 +-
.../controller/ReplicationControlManagerTest.java | 39 +++++++++++++++++++-
8 files changed, 155 insertions(+), 10 deletions(-)
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index b2cdcc1..5916cdc 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -117,6 +117,11 @@ public class ClusterControlManager {
private final TimelineHashMap<Integer, BrokerRegistration> brokerRegistrations;
/**
+ * A reference to the controller's metrics registry.
+ */
+ private final ControllerMetrics controllerMetrics;
+
+ /**
* The broker heartbeat manager, or null if this controller is on standby.
*/
private BrokerHeartbeatManager heartbeatManager;
@@ -131,7 +136,8 @@ public class ClusterControlManager {
Time time,
SnapshotRegistry snapshotRegistry,
long sessionTimeoutNs,
- ReplicaPlacer replicaPlacer) {
+ ReplicaPlacer replicaPlacer,
+ ControllerMetrics metrics) {
this.logContext = logContext;
this.log = logContext.logger(ClusterControlManager.class);
this.time = time;
@@ -140,6 +146,7 @@ public class ClusterControlManager {
this.brokerRegistrations = new TimelineHashMap<>(snapshotRegistry, 0);
this.heartbeatManager = null;
this.readyBrokersFuture = Optional.empty();
+ this.controllerMetrics = metrics;
}
/**
@@ -249,11 +256,13 @@ public class ClusterControlManager {
features.put(feature.name(), new VersionRange(
feature.minSupportedVersion(), feature.maxSupportedVersion()));
}
+
// Update broker registrations.
BrokerRegistration prevRegistration = brokerRegistrations.put(brokerId,
new BrokerRegistration(brokerId, record.brokerEpoch(),
record.incarnationId(), listeners, features,
Optional.ofNullable(record.rack()), record.fenced()));
+ updateMetrics(prevRegistration, brokerRegistrations.get(brokerId));
if (prevRegistration == null) {
log.info("Registered new broker: {}", record);
} else if (prevRegistration.incarnationId().equals(record.incarnationId())) {
@@ -274,6 +283,7 @@ public class ClusterControlManager {
"registration with that epoch found", record.toString()));
} else {
brokerRegistrations.remove(brokerId);
+ updateMetrics(registration, brokerRegistrations.get(brokerId));
log.info("Unregistered broker: {}", record);
}
}
@@ -289,6 +299,7 @@ public class ClusterControlManager {
"registration with that epoch found", record.toString()));
} else {
brokerRegistrations.put(brokerId, registration.cloneWithFencing(true));
+ updateMetrics(registration, brokerRegistrations.get(brokerId));
log.info("Fenced broker: {}", record);
}
}
@@ -304,6 +315,7 @@ public class ClusterControlManager {
"registration with that epoch found", record.toString()));
} else {
brokerRegistrations.put(brokerId, registration.cloneWithFencing(false));
+ updateMetrics(registration, brokerRegistrations.get(brokerId));
log.info("Unfenced broker: {}", record);
}
if (readyBrokersFuture.isPresent()) {
@@ -314,6 +326,31 @@ public class ClusterControlManager {
}
}
+ private void updateMetrics(BrokerRegistration prevRegistration, BrokerRegistration registration) {
+ if (registration == null) {
+ if (prevRegistration.fenced()) {
+ controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() - 1);
+ } else {
+ controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() - 1);
+ }
+ } else if (prevRegistration == null) {
+ if (registration.fenced()) {
+ controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() + 1);
+ } else {
+ controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() + 1);
+ }
+ } else {
+ if (prevRegistration.fenced() && !registration.fenced()) {
+ controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() - 1);
+ controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() + 1);
+ } else if (!prevRegistration.fenced() && registration.fenced()) {
+ controllerMetrics.setFencedBrokerCount(controllerMetrics.fencedBrokerCount() + 1);
+ controllerMetrics.setActiveBrokerCount(controllerMetrics.activeBrokerCount() - 1);
+ }
+ }
+ }
+
+
public List<List<Integer>> placeReplicas(int startPartition,
int numPartitions,
short numReplicas) {
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
index 3fd0e66..fa03e05 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
@@ -27,6 +27,14 @@ public interface ControllerMetrics extends AutoCloseable {
void updateEventQueueProcessingTime(long durationMs);
+ void setFencedBrokerCount(int brokerCount);
+
+ int fencedBrokerCount();
+
+ void setActiveBrokerCount(int brokerCount);
+
+ int activeBrokerCount();
+
void setGlobalTopicsCount(int topicCount);
int globalTopicsCount();
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 0a35140..67d4eb0 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -1125,7 +1125,7 @@ public final class QuorumController implements Controller {
snapshotRegistry, configDefs, alterConfigPolicy);
this.clientQuotaControlManager = new ClientQuotaControlManager(snapshotRegistry);
this.clusterControl = new ClusterControlManager(logContext, time,
- snapshotRegistry, sessionTimeoutNs, replicaPlacer);
+ snapshotRegistry, sessionTimeoutNs, replicaPlacer, controllerMetrics);
this.featureControl = new FeatureControlManager(supportedFeatures, snapshotRegistry);
this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
this.snapshotMaxNewRecordBytes = snapshotMaxNewRecordBytes;
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
index 3dd3336..9b3a4dd 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
@@ -32,6 +32,10 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
"ControllerEventManager", "EventQueueTimeMs");
private final static MetricName EVENT_QUEUE_PROCESSING_TIME_MS = getMetricName(
"ControllerEventManager", "EventQueueProcessingTimeMs");
+ private final static MetricName FENCED_BROKER_COUNT = getMetricName(
+ "KafkaController", "FencedBrokerCount");
+ private final static MetricName ACTIVE_BROKER_COUNT = getMetricName(
+ "KafkaController", "ActiveBrokerCount");
private final static MetricName GLOBAL_TOPIC_COUNT = getMetricName(
"KafkaController", "GlobalTopicCount");
private final static MetricName GLOBAL_PARTITION_COUNT = getMetricName(
@@ -40,14 +44,18 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
"KafkaController", "OfflinePartitionsCount");
private final static MetricName PREFERRED_REPLICA_IMBALANCE_COUNT = getMetricName(
"KafkaController", "PreferredReplicaImbalanceCount");
-
+
private final MetricsRegistry registry;
private volatile boolean active;
+ private volatile int fencedBrokerCount;
+ private volatile int activeBrokerCount;
private volatile int globalTopicCount;
private volatile int globalPartitionCount;
private volatile int offlinePartitionCount;
private volatile int preferredReplicaImbalanceCount;
private final Gauge<Integer> activeControllerCount;
+ private final Gauge<Integer> fencedBrokerCountGauge;
+ private final Gauge<Integer> activeBrokerCountGauge;
private final Gauge<Integer> globalPartitionCountGauge;
private final Gauge<Integer> globalTopicCountGauge;
private final Gauge<Integer> offlinePartitionCountGauge;
@@ -58,6 +66,8 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
public QuorumControllerMetrics(MetricsRegistry registry) {
this.registry = Objects.requireNonNull(registry);
this.active = false;
+ this.fencedBrokerCount = 0;
+ this.activeBrokerCount = 0;
this.globalTopicCount = 0;
this.globalPartitionCount = 0;
this.offlinePartitionCount = 0;
@@ -70,6 +80,18 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
});
this.eventQueueTime = registry.newHistogram(EVENT_QUEUE_TIME_MS, true);
this.eventQueueProcessingTime = registry.newHistogram(EVENT_QUEUE_PROCESSING_TIME_MS, true);
+ this.fencedBrokerCountGauge = registry.newGauge(FENCED_BROKER_COUNT, new Gauge<Integer>() {
+ @Override
+ public Integer value() {
+ return fencedBrokerCount;
+ }
+ });
+ this.activeBrokerCountGauge = registry.newGauge(ACTIVE_BROKER_COUNT, new Gauge<Integer>() {
+ @Override
+ public Integer value() {
+ return activeBrokerCount;
+ }
+ });
this.globalTopicCountGauge = registry.newGauge(GLOBAL_TOPIC_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
@@ -117,6 +139,25 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
}
@Override
+ public void setFencedBrokerCount(int brokerCount) {
+ this.fencedBrokerCount = brokerCount;
+ }
+
+ @Override
+ public int fencedBrokerCount() {
+ return this.fencedBrokerCount;
+ }
+
+ public void setActiveBrokerCount(int brokerCount) {
+ this.activeBrokerCount = brokerCount;
+ }
+
+ @Override
+ public int activeBrokerCount() {
+ return this.activeBrokerCount;
+ }
+
+ @Override
public void setGlobalTopicsCount(int topicCount) {
this.globalTopicCount = topicCount;
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index 195b02a..16625b5 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -58,7 +58,7 @@ public class ClusterControlManagerTest {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ClusterControlManager clusterControl = new ClusterControlManager(
new LogContext(), time, snapshotRegistry, 1000,
- new StripedReplicaPlacer(new Random()));
+ new StripedReplicaPlacer(new Random()), new MockControllerMetrics());
clusterControl.activate();
assertFalse(clusterControl.unfenced(0));
@@ -99,7 +99,7 @@ public class ClusterControlManagerTest {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ClusterControlManager clusterControl = new ClusterControlManager(
new LogContext(), new MockTime(0, 0, 0), snapshotRegistry, 1000,
- new StripedReplicaPlacer(new Random()));
+ new StripedReplicaPlacer(new Random()), new MockControllerMetrics());
clusterControl.activate();
clusterControl.replay(brokerRecord);
assertEquals(new BrokerRegistration(1, 100,
@@ -122,7 +122,7 @@ public class ClusterControlManagerTest {
MockRandom random = new MockRandom();
ClusterControlManager clusterControl = new ClusterControlManager(
new LogContext(), time, snapshotRegistry, 1000,
- new StripedReplicaPlacer(random));
+ new StripedReplicaPlacer(random), new MockControllerMetrics());
clusterControl.activate();
for (int i = 0; i < numUsableBrokers; i++) {
RegisterBrokerRecord brokerRecord =
@@ -159,7 +159,7 @@ public class ClusterControlManagerTest {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ClusterControlManager clusterControl = new ClusterControlManager(
new LogContext(), time, snapshotRegistry, 1000,
- new StripedReplicaPlacer(new Random()));
+ new StripedReplicaPlacer(new Random()), new MockControllerMetrics());
clusterControl.activate();
assertFalse(clusterControl.unfenced(0));
for (int i = 0; i < 3; i++) {
diff --git a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
index 3d3075e..0120f15 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
@@ -19,6 +19,8 @@ package org.apache.kafka.controller;
public final class MockControllerMetrics implements ControllerMetrics {
private volatile boolean active;
+ private volatile int fencedBrokers;
+ private volatile int activeBrokers;
private volatile int topics;
private volatile int partitions;
private volatile int offlinePartitions;
@@ -27,6 +29,8 @@ public final class MockControllerMetrics implements ControllerMetrics {
public MockControllerMetrics() {
this.active = false;
+ this.fencedBrokers = 0;
+ this.activeBrokers = 0;
this.topics = 0;
this.partitions = 0;
this.offlinePartitions = 0;
@@ -54,6 +58,26 @@ public final class MockControllerMetrics implements ControllerMetrics {
}
@Override
+ public void setFencedBrokerCount(int brokerCount) {
+ this.fencedBrokers = brokerCount;
+ }
+
+ @Override
+ public int fencedBrokerCount() {
+ return this.fencedBrokers;
+ }
+
+ @Override
+ public void setActiveBrokerCount(int brokerCount) {
+ this.activeBrokers = brokerCount;
+ }
+
+ @Override
+ public int activeBrokerCount() {
+ return activeBrokers;
+ }
+
+ @Override
public void setGlobalTopicsCount(int topicCount) {
this.topics = topicCount;
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
index f96510d..990395b 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
@@ -54,7 +54,7 @@ public class ProducerIdControlManagerTest {
snapshotRegistry = new SnapshotRegistry(logContext);
clusterControl = new ClusterControlManager(
logContext, time, snapshotRegistry, 1000,
- new StripedReplicaPlacer(random));
+ new StripedReplicaPlacer(random), new MockControllerMetrics());
clusterControl.activate();
for (int i = 0; i < 4; i++) {
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 3737f92..6543073 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -132,10 +132,10 @@ public class ReplicationControlManagerTest {
final LogContext logContext = new LogContext();
final MockTime time = new MockTime();
final MockRandom random = new MockRandom();
+ final ControllerMetrics metrics = new MockControllerMetrics();
final ClusterControlManager clusterControl = new ClusterControlManager(
logContext, time, snapshotRegistry, TimeUnit.MILLISECONDS.convert(BROKER_SESSION_TIMEOUT_MS, TimeUnit.NANOSECONDS),
- new StripedReplicaPlacer(random));
- final ControllerMetrics metrics = new MockControllerMetrics();
+ new StripedReplicaPlacer(random), metrics);
final ConfigurationControlManager configurationControl = new ConfigurationControlManager(
new LogContext(), snapshotRegistry, Collections.emptyMap(), Optional.empty());
final ReplicationControlManager replicationControl;
@@ -430,6 +430,41 @@ public class ReplicationControlManagerTest {
}
@Test
+ public void testBrokerCountMetrics() throws Exception {
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+ ReplicationControlManager replicationControl = ctx.replicationControl;
+
+ ctx.registerBrokers(0);
+
+ assertEquals(1, ctx.metrics.fencedBrokerCount());
+ assertEquals(0, ctx.metrics.activeBrokerCount());
+
+ ctx.unfenceBrokers(0);
+
+ assertEquals(0, ctx.metrics.fencedBrokerCount());
+ assertEquals(1, ctx.metrics.activeBrokerCount());
+
+ ctx.registerBrokers(1);
+ ctx.unfenceBrokers(1);
+
+ assertEquals(2, ctx.metrics.activeBrokerCount());
+
+ ctx.registerBrokers(2);
+ ctx.unfenceBrokers(2);
+
+ assertEquals(0, ctx.metrics.fencedBrokerCount());
+ assertEquals(3, ctx.metrics.activeBrokerCount());
+
+ ControllerResult<Void> result = replicationControl.unregisterBroker(0);
+ ctx.replay(result.records());
+ result = replicationControl.unregisterBroker(2);
+ ctx.replay(result.records());
+
+ assertEquals(0, ctx.metrics.fencedBrokerCount());
+ assertEquals(1, ctx.metrics.activeBrokerCount());
+ }
+
+ @Test
public void testCreateTopicsWithValidateOnlyFlag() throws Exception {
ReplicationControlTestContext ctx = new ReplicationControlTestContext();
ctx.registerBrokers(0, 1, 2);