You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/05/20 23:30:12 UTC
[kafka] branch trunk updated: KAFKA-12697: Add
OfflinePartitionCount and PreferredReplicaImbalanceCount metrics to Quorum
Controller (#10572)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 80ec8fb KAFKA-12697: Add OfflinePartitionCount and PreferredReplicaImbalanceCount metrics to Quorum Controller (#10572)
80ec8fb is described below
commit 80ec8fbcd560a7068cfa7baea72846f0ffd98b72
Author: Ryan Dielhenn <35...@users.noreply.github.com>
AuthorDate: Thu May 20 16:28:32 2021 -0700
KAFKA-12697: Add OfflinePartitionCount and PreferredReplicaImbalanceCount metrics to Quorum Controller (#10572)
Reviewers: Colin P. McCabe <cm...@apache.org>
---
.../org/apache/kafka/controller/BrokersToIsrs.java | 17 ++++++
.../apache/kafka/controller/ControllerMetrics.java | 8 +++
.../kafka/controller/QuorumControllerMetrics.java | 44 +++++++++++++++-
.../controller/ReplicationControlManager.java | 26 ++++++++++
.../kafka/controller/MockControllerMetrics.java | 25 +++++++++
.../controller/ReplicationControlManagerTest.java | 60 ++++++++++++++++++++++
6 files changed, 179 insertions(+), 1 deletion(-)
diff --git a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
index d8e0319..42f3c4e 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
@@ -20,6 +20,7 @@ package org.apache.kafka.controller;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.timeline.SnapshotRegistry;
import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineInteger;
import java.util.Arrays;
import java.util.Collections;
@@ -139,10 +140,13 @@ public class BrokersToIsrs {
* Partitions with no isr members appear in this map under id NO_LEADER.
*/
private final TimelineHashMap<Integer, TimelineHashMap<Uuid, int[]>> isrMembers;
+
+ private final TimelineInteger offlinePartitionCount;
BrokersToIsrs(SnapshotRegistry snapshotRegistry) {
this.snapshotRegistry = snapshotRegistry;
this.isrMembers = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.offlinePartitionCount = new TimelineInteger(snapshotRegistry);
}
/**
@@ -163,6 +167,9 @@ public class BrokersToIsrs {
} else {
if (prevLeader == NO_LEADER) {
prev = Replicas.copyWith(prevIsr, NO_LEADER);
+ if (nextLeader != NO_LEADER) {
+ offlinePartitionCount.decrement();
+ }
} else {
prev = Replicas.clone(prevIsr);
}
@@ -174,6 +181,9 @@ public class BrokersToIsrs {
} else {
if (nextLeader == NO_LEADER) {
next = Replicas.copyWith(nextIsr, NO_LEADER);
+ if (prevLeader != NO_LEADER) {
+ offlinePartitionCount.increment();
+ }
} else {
next = Replicas.clone(nextIsr);
}
@@ -217,6 +227,9 @@ public class BrokersToIsrs {
void removeTopicEntryForBroker(Uuid topicId, int brokerId) {
Map<Uuid, int[]> topicMap = isrMembers.get(brokerId);
if (topicMap != null) {
+ if (brokerId == NO_LEADER) {
+ offlinePartitionCount.set(offlinePartitionCount.get() - topicMap.get(topicId).length);
+ }
topicMap.remove(topicId);
}
}
@@ -326,4 +339,8 @@ public class BrokersToIsrs {
boolean hasLeaderships(int brokerId) {
return iterator(brokerId, true).hasNext();
}
+
+ int offlinePartitionCount() {
+ return offlinePartitionCount.get();
+ }
}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
index 406a533..7c862bc 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
@@ -34,4 +34,12 @@ public interface ControllerMetrics {
void setGlobalPartitionCount(int partitionCount);
int globalPartitionCount();
+
+ void setOfflinePartitionCount(int offlinePartitions);
+
+ int offlinePartitionCount();
+
+ void setPreferredReplicaImbalanceCount(int replicaImbalances);
+
+ int preferredReplicaImbalanceCount();
}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
index a9de1ff..52abc8c 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
@@ -34,13 +34,21 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
"kafka.controller", "KafkaController", "GlobalTopicCount", null);
private final static MetricName GLOBAL_PARTITION_COUNT = new MetricName(
"kafka.controller", "KafkaController", "GlobalPartitionCount", null);
-
+ private final static MetricName OFFLINE_PARTITION_COUNT = new MetricName(
+ "kafka.controller", "KafkaController", "OfflinePartitionCount", null);
+ private final static MetricName PREFERRED_REPLICA_IMBALANCE_COUNT = new MetricName(
+ "kafka.controller", "KafkaController", "PreferredReplicaImbalanceCount", null);
+
private volatile boolean active;
private volatile int globalTopicCount;
private volatile int globalPartitionCount;
+ private volatile int offlinePartitionCount;
+ private volatile int preferredReplicaImbalanceCount;
private final Gauge<Integer> activeControllerCount;
private final Gauge<Integer> globalPartitionCountGauge;
private final Gauge<Integer> globalTopicCountGauge;
+ private final Gauge<Integer> offlinePartitionCountGauge;
+ private final Gauge<Integer> preferredReplicaImbalanceCountGauge;
private final Histogram eventQueueTime;
private final Histogram eventQueueProcessingTime;
@@ -48,6 +56,8 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
this.active = false;
this.globalTopicCount = 0;
this.globalPartitionCount = 0;
+ this.offlinePartitionCount = 0;
+ this.preferredReplicaImbalanceCount = 0;
this.activeControllerCount = registry.newGauge(ACTIVE_CONTROLLER_COUNT, new Gauge<Integer>() {
@Override
public Integer value() {
@@ -68,6 +78,18 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
return globalPartitionCount;
}
});
+ this.offlinePartitionCountGauge = registry.newGauge(OFFLINE_PARTITION_COUNT, new Gauge<Integer>() {
+ @Override
+ public Integer value() {
+ return offlinePartitionCount;
+ }
+ });
+ this.preferredReplicaImbalanceCountGauge = registry.newGauge(PREFERRED_REPLICA_IMBALANCE_COUNT, new Gauge<Integer>() {
+ @Override
+ public Integer value() {
+ return preferredReplicaImbalanceCount;
+ }
+ });
}
@Override
@@ -109,4 +131,24 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
public int globalPartitionCount() {
return this.globalPartitionCount;
}
+
+ @Override
+ public void setOfflinePartitionCount(int offlinePartitions) {
+ this.offlinePartitionCount = offlinePartitions;
+ }
+
+ @Override
+ public int offlinePartitionCount() {
+ return this.offlinePartitionCount;
+ }
+
+ @Override
+ public void setPreferredReplicaImbalanceCount(int replicaImbalances) {
+ this.preferredReplicaImbalanceCount = replicaImbalances;
+ }
+
+ @Override
+ public int preferredReplicaImbalanceCount() {
+ return this.preferredReplicaImbalanceCount;
+ }
}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 3820642..e95cbf4 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -227,6 +227,10 @@ public class ReplicationControlManager {
return leader != NO_LEADER;
}
+ boolean hasPreferredLeader() {
+ return leader == preferredReplica();
+ }
+
int preferredReplica() {
return replicas.length == 0 ? NO_LEADER : replicas[0];
}
@@ -286,6 +290,11 @@ public class ReplicationControlManager {
private final TimelineInteger globalPartitionCount;
/**
+ * A count of the number of partitions that do not have their first replica as a leader.
+ */
+ private final TimelineInteger preferredReplicaImbalanceCount;
+
+ /**
* A reference to the controller's configuration control manager.
*/
private final ConfigurationControlManager configurationControl;
@@ -330,6 +339,7 @@ public class ReplicationControlManager {
this.controllerMetrics = controllerMetrics;
this.clusterControl = clusterControl;
this.globalPartitionCount = new TimelineInteger(snapshotRegistry);
+ this.preferredReplicaImbalanceCount = new TimelineInteger(snapshotRegistry);
this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0);
this.topics = new TimelineHashMap<>(snapshotRegistry, 0);
this.brokersToIsrs = new BrokersToIsrs(snapshotRegistry);
@@ -366,6 +376,11 @@ public class ReplicationControlManager {
brokersToIsrs.update(record.topicId(), record.partitionId(), prevPartInfo.isr,
newPartInfo.isr, prevPartInfo.leader, newPartInfo.leader);
}
+ if (newPartInfo.leader != newPartInfo.preferredReplica()) {
+ preferredReplicaImbalanceCount.increment();
+ }
+ controllerMetrics.setOfflinePartitionCount(brokersToIsrs.offlinePartitionCount());
+ controllerMetrics.setPreferredReplicaImbalanceCount(preferredReplicaImbalanceCount.get());
}
public void replay(PartitionChangeRecord record) {
@@ -387,6 +402,11 @@ public class ReplicationControlManager {
String topicPart = topicInfo.name + "-" + record.partitionId() + " with topic ID " +
record.topicId();
newPartitionInfo.maybeLogPartitionChange(log, topicPart, prevPartitionInfo);
+ if (!newPartitionInfo.hasPreferredLeader() && prevPartitionInfo.hasPreferredLeader()) {
+ preferredReplicaImbalanceCount.increment();
+ }
+ controllerMetrics.setOfflinePartitionCount(brokersToIsrs.offlinePartitionCount());
+ controllerMetrics.setPreferredReplicaImbalanceCount(preferredReplicaImbalanceCount.get());
}
public void replay(RemoveTopicRecord record) {
@@ -406,11 +426,17 @@ public class ReplicationControlManager {
for (int i = 0; i < partition.isr.length; i++) {
brokersToIsrs.removeTopicEntryForBroker(topic.id, partition.isr[i]);
}
+ if (partition.leader != partition.preferredReplica()) {
+ preferredReplicaImbalanceCount.decrement();
+ }
globalPartitionCount.decrement();
}
brokersToIsrs.removeTopicEntryForBroker(topic.id, NO_LEADER);
+
controllerMetrics.setGlobalTopicsCount(topics.size());
controllerMetrics.setGlobalPartitionCount(globalPartitionCount.get());
+ controllerMetrics.setOfflinePartitionCount(brokersToIsrs.offlinePartitionCount());
+ controllerMetrics.setPreferredReplicaImbalanceCount(preferredReplicaImbalanceCount.get());
log.info("Removed topic {} with ID {}.", topic.name, record.topicId());
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
index 45a69d7..844475a 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
@@ -21,11 +21,16 @@ public final class MockControllerMetrics implements ControllerMetrics {
private volatile boolean active;
private volatile int topics;
private volatile int partitions;
+ private volatile int offlinePartitions;
+ private volatile int preferredReplicaImbalances;
+
public MockControllerMetrics() {
this.active = false;
this.topics = 0;
this.partitions = 0;
+ this.offlinePartitions = 0;
+ this.preferredReplicaImbalances = 0;
}
@Override
@@ -67,4 +72,24 @@ public final class MockControllerMetrics implements ControllerMetrics {
public int globalPartitionCount() {
return this.partitions;
}
+
+ @Override
+ public void setOfflinePartitionCount(int offlinePartitions) {
+ this.offlinePartitions = offlinePartitions;
+ }
+
+ @Override
+ public int offlinePartitionCount() {
+ return this.offlinePartitions;
+ }
+
+ @Override
+ public void setPreferredReplicaImbalanceCount(int replicaImbalances) {
+ this.preferredReplicaImbalances = replicaImbalances;
+ }
+
+ @Override
+ public int preferredReplicaImbalanceCount() {
+ return this.preferredReplicaImbalances;
+ }
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 0e6d3a5..b121835 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -254,6 +254,66 @@ public class ReplicationControlManagerTest {
}
@Test
+ public void testOfflinePartitionAndReplicaImbalanceMetrics() throws Exception {
+ ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+ ReplicationControlManager replicationControl = ctx.replicationControl;
+
+ for (int i = 0; i < 4; i++) {
+ registerBroker(i, ctx);
+ unfenceBroker(i, ctx);
+ }
+
+ CreatableTopicResult foo = ctx.createTestTopic("foo", new int[][] {
+ new int[] {0, 2}, new int[] {0, 1}});
+
+ CreatableTopicResult zar = ctx.createTestTopic("zar", new int[][] {
+ new int[] {0, 1, 2}, new int[] {1, 2, 3}, new int[] {1, 2, 0}});
+
+ ControllerResult<Void> result = replicationControl.unregisterBroker(0);
+ ctx.replay(result.records());
+
+ // All partitions should still be online after unregistering broker 0
+ assertEquals(0, ctx.metrics.offlinePartitionCount());
+ // Three partitions should not have their preferred (first) replica 0
+ assertEquals(3, ctx.metrics.preferredReplicaImbalanceCount());
+
+ result = replicationControl.unregisterBroker(1);
+ ctx.replay(result.records());
+
+ // After unregistering broker 1, 1 partition for topic foo should go offline
+ assertEquals(1, ctx.metrics.offlinePartitionCount());
+ // All five partitions should not have their preferred (first) replica at this point
+ assertEquals(5, ctx.metrics.preferredReplicaImbalanceCount());
+
+ result = replicationControl.unregisterBroker(2);
+ ctx.replay(result.records());
+
+ // After unregistering broker 2, the last partition for topic foo should go offline
+ // and 2 partitions for topic zar should go offline
+ assertEquals(4, ctx.metrics.offlinePartitionCount());
+
+ result = replicationControl.unregisterBroker(3);
+ ctx.replay(result.records());
+
+ // After unregistering broker 3 the last partition for topic zar should go offline
+ assertEquals(5, ctx.metrics.offlinePartitionCount());
+
+ // Deleting topic foo should bring the offline partition count down to 3
+ ArrayList<ApiMessageAndVersion> records = new ArrayList<>();
+ replicationControl.deleteTopic(foo.topicId(), records);
+ ctx.replay(records);
+
+ assertEquals(3, ctx.metrics.offlinePartitionCount());
+
+ // Deleting topic zar should bring the offline partition count down to 0
+ records = new ArrayList<>();
+ replicationControl.deleteTopic(zar.topicId(), records);
+ ctx.replay(records);
+
+ assertEquals(0, ctx.metrics.offlinePartitionCount());
+ }
+
+ @Test
public void testValidateNewTopicNames() {
Map<String, ApiError> topicErrors = new HashMap<>();
CreatableTopicCollection topics = new CreatableTopicCollection();