You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/05/20 23:30:12 UTC

[kafka] branch trunk updated: KAFKA-12697: Add OfflinePartitionCount and PreferredReplicaImbalanceCount metrics to Quorum Controller (#10572)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 80ec8fb  KAFKA-12697: Add OfflinePartitionCount and PreferredReplicaImbalanceCount metrics to Quorum Controller (#10572)
80ec8fb is described below

commit 80ec8fbcd560a7068cfa7baea72846f0ffd98b72
Author: Ryan Dielhenn <35...@users.noreply.github.com>
AuthorDate: Thu May 20 16:28:32 2021 -0700

    KAFKA-12697: Add OfflinePartitionCount and PreferredReplicaImbalanceCount metrics to Quorum Controller (#10572)
    
    Reviewers: Colin P. McCabe <cm...@apache.org>
---
 .../org/apache/kafka/controller/BrokersToIsrs.java | 17 ++++++
 .../apache/kafka/controller/ControllerMetrics.java |  8 +++
 .../kafka/controller/QuorumControllerMetrics.java  | 44 +++++++++++++++-
 .../controller/ReplicationControlManager.java      | 26 ++++++++++
 .../kafka/controller/MockControllerMetrics.java    | 25 +++++++++
 .../controller/ReplicationControlManagerTest.java  | 60 ++++++++++++++++++++++
 6 files changed, 179 insertions(+), 1 deletion(-)

diff --git a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
index d8e0319..42f3c4e 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/BrokersToIsrs.java
@@ -20,6 +20,7 @@ package org.apache.kafka.controller;
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.apache.kafka.timeline.TimelineHashMap;
+import org.apache.kafka.timeline.TimelineInteger;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -139,10 +140,13 @@ public class BrokersToIsrs {
      * Partitions with no isr members appear in this map under id NO_LEADER.
      */
     private final TimelineHashMap<Integer, TimelineHashMap<Uuid, int[]>> isrMembers;
+    
+    private final TimelineInteger offlinePartitionCount;
 
     BrokersToIsrs(SnapshotRegistry snapshotRegistry) {
         this.snapshotRegistry = snapshotRegistry;
         this.isrMembers = new TimelineHashMap<>(snapshotRegistry, 0);
+        this.offlinePartitionCount = new TimelineInteger(snapshotRegistry);
     }
 
     /**
@@ -163,6 +167,9 @@ public class BrokersToIsrs {
         } else {
             if (prevLeader == NO_LEADER) {
                 prev = Replicas.copyWith(prevIsr, NO_LEADER);
+                if (nextLeader != NO_LEADER) {
+                    offlinePartitionCount.decrement();
+                }
             } else {
                 prev = Replicas.clone(prevIsr);
             }
@@ -174,6 +181,9 @@ public class BrokersToIsrs {
         } else {
             if (nextLeader == NO_LEADER) {
                 next = Replicas.copyWith(nextIsr, NO_LEADER);
+                if (prevLeader != NO_LEADER) {
+                    offlinePartitionCount.increment();
+                }
             } else {
                 next = Replicas.clone(nextIsr);
             }
@@ -217,6 +227,9 @@ public class BrokersToIsrs {
     void removeTopicEntryForBroker(Uuid topicId, int brokerId) {
         Map<Uuid, int[]> topicMap = isrMembers.get(brokerId);
         if (topicMap != null) {
+            if (brokerId == NO_LEADER) {
+                offlinePartitionCount.set(offlinePartitionCount.get() - topicMap.get(topicId).length);
+            }
             topicMap.remove(topicId);
         }
     }
@@ -326,4 +339,8 @@ public class BrokersToIsrs {
     boolean hasLeaderships(int brokerId) {
         return iterator(brokerId, true).hasNext();
     }
+
+    int offlinePartitionCount() {
+        return offlinePartitionCount.get();
+    }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
index 406a533..7c862bc 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
@@ -34,4 +34,12 @@ public interface ControllerMetrics {
     void setGlobalPartitionCount(int partitionCount);
 
     int globalPartitionCount();
+
+    void setOfflinePartitionCount(int offlinePartitions);
+
+    int offlinePartitionCount();
+
+    void setPreferredReplicaImbalanceCount(int replicaImbalances);
+
+    int preferredReplicaImbalanceCount();
 }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
index a9de1ff..52abc8c 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
@@ -34,13 +34,21 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
         "kafka.controller", "KafkaController", "GlobalTopicCount", null);
     private final static MetricName GLOBAL_PARTITION_COUNT = new MetricName(
         "kafka.controller", "KafkaController", "GlobalPartitionCount", null);
-
+    private final static MetricName OFFLINE_PARTITION_COUNT = new MetricName(
+        "kafka.controller", "KafkaController", "OfflinePartitionCount", null);
+    private final static MetricName PREFERRED_REPLICA_IMBALANCE_COUNT = new MetricName(
+        "kafka.controller", "KafkaController", "PreferredReplicaImbalanceCount", null);
+    
     private volatile boolean active;
     private volatile int globalTopicCount;
     private volatile int globalPartitionCount;
+    private volatile int offlinePartitionCount;
+    private volatile int preferredReplicaImbalanceCount;
     private final Gauge<Integer> activeControllerCount;
     private final Gauge<Integer> globalPartitionCountGauge;
     private final Gauge<Integer> globalTopicCountGauge;
+    private final Gauge<Integer> offlinePartitionCountGauge;
+    private final Gauge<Integer> preferredReplicaImbalanceCountGauge;
     private final Histogram eventQueueTime;
     private final Histogram eventQueueProcessingTime;
 
@@ -48,6 +56,8 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
         this.active = false;
         this.globalTopicCount = 0;
         this.globalPartitionCount = 0;
+        this.offlinePartitionCount = 0;
+        this.preferredReplicaImbalanceCount = 0;
         this.activeControllerCount = registry.newGauge(ACTIVE_CONTROLLER_COUNT, new Gauge<Integer>() {
             @Override
             public Integer value() {
@@ -68,6 +78,18 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
                 return globalPartitionCount;
             }
         });
+        this.offlinePartitionCountGauge = registry.newGauge(OFFLINE_PARTITION_COUNT, new Gauge<Integer>() {
+            @Override
+            public Integer value() {
+                return offlinePartitionCount;
+            }
+        });
+        this.preferredReplicaImbalanceCountGauge = registry.newGauge(PREFERRED_REPLICA_IMBALANCE_COUNT, new Gauge<Integer>() {
+            @Override
+            public Integer value() {
+                return preferredReplicaImbalanceCount;
+            }
+        });
     }
 
     @Override
@@ -109,4 +131,24 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
     public int globalPartitionCount() {
         return this.globalPartitionCount;
     }
+
+    @Override
+    public void setOfflinePartitionCount(int offlinePartitions) {
+        this.offlinePartitionCount = offlinePartitions;
+    }
+
+    @Override
+    public int offlinePartitionCount() {
+        return this.offlinePartitionCount;
+    }
+
+    @Override
+    public void setPreferredReplicaImbalanceCount(int replicaImbalances) {
+        this.preferredReplicaImbalanceCount = replicaImbalances;
+    }
+
+    @Override
+    public int preferredReplicaImbalanceCount() {
+        return this.preferredReplicaImbalanceCount;
+    }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index 3820642..e95cbf4 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -227,6 +227,10 @@ public class ReplicationControlManager {
             return leader != NO_LEADER;
         }
 
+        boolean hasPreferredLeader() {
+            return leader == preferredReplica();
+        }
+
         int preferredReplica() {
             return replicas.length == 0 ? NO_LEADER : replicas[0];
         }
@@ -286,6 +290,11 @@ public class ReplicationControlManager {
     private final TimelineInteger globalPartitionCount;
 
     /**
+     * A count of the number of partitions that do not have their first replica as a leader.
+     */
+    private final TimelineInteger preferredReplicaImbalanceCount;
+
+    /**
      * A reference to the controller's configuration control manager.
      */
     private final ConfigurationControlManager configurationControl;
@@ -330,6 +339,7 @@ public class ReplicationControlManager {
         this.controllerMetrics = controllerMetrics;
         this.clusterControl = clusterControl;
         this.globalPartitionCount = new TimelineInteger(snapshotRegistry);
+        this.preferredReplicaImbalanceCount = new TimelineInteger(snapshotRegistry);
         this.topicsByName = new TimelineHashMap<>(snapshotRegistry, 0);
         this.topics = new TimelineHashMap<>(snapshotRegistry, 0);
         this.brokersToIsrs = new BrokersToIsrs(snapshotRegistry);
@@ -366,6 +376,11 @@ public class ReplicationControlManager {
             brokersToIsrs.update(record.topicId(), record.partitionId(), prevPartInfo.isr,
                 newPartInfo.isr, prevPartInfo.leader, newPartInfo.leader);
         }
+        if (newPartInfo.leader != newPartInfo.preferredReplica()) {
+            preferredReplicaImbalanceCount.increment();
+        }
+        controllerMetrics.setOfflinePartitionCount(brokersToIsrs.offlinePartitionCount());
+        controllerMetrics.setPreferredReplicaImbalanceCount(preferredReplicaImbalanceCount.get());
     }
 
     public void replay(PartitionChangeRecord record) {
@@ -387,6 +402,11 @@ public class ReplicationControlManager {
         String topicPart = topicInfo.name + "-" + record.partitionId() + " with topic ID " +
             record.topicId();
         newPartitionInfo.maybeLogPartitionChange(log, topicPart, prevPartitionInfo);
+        if (!newPartitionInfo.hasPreferredLeader() && prevPartitionInfo.hasPreferredLeader()) {
+            preferredReplicaImbalanceCount.increment();
+        }
+        controllerMetrics.setOfflinePartitionCount(brokersToIsrs.offlinePartitionCount());
+        controllerMetrics.setPreferredReplicaImbalanceCount(preferredReplicaImbalanceCount.get());
     }
 
     public void replay(RemoveTopicRecord record) {
@@ -406,11 +426,17 @@ public class ReplicationControlManager {
             for (int i = 0; i < partition.isr.length; i++) {
                 brokersToIsrs.removeTopicEntryForBroker(topic.id, partition.isr[i]);
             }
+            if (partition.leader != partition.preferredReplica()) {
+                preferredReplicaImbalanceCount.decrement();
+            }
             globalPartitionCount.decrement();
         }
         brokersToIsrs.removeTopicEntryForBroker(topic.id, NO_LEADER);
+
         controllerMetrics.setGlobalTopicsCount(topics.size());
         controllerMetrics.setGlobalPartitionCount(globalPartitionCount.get());
+        controllerMetrics.setOfflinePartitionCount(brokersToIsrs.offlinePartitionCount());
+        controllerMetrics.setPreferredReplicaImbalanceCount(preferredReplicaImbalanceCount.get());
         log.info("Removed topic {} with ID {}.", topic.name, record.topicId());
     }
 
diff --git a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
index 45a69d7..844475a 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
@@ -21,11 +21,16 @@ public final class MockControllerMetrics implements ControllerMetrics {
     private volatile boolean active;
     private volatile int topics;
     private volatile int partitions;
+    private volatile int offlinePartitions;
+    private volatile int preferredReplicaImbalances;
+
 
     public MockControllerMetrics() {
         this.active = false;
         this.topics = 0;
         this.partitions = 0;
+        this.offlinePartitions = 0;
+        this.preferredReplicaImbalances = 0;
     }
 
     @Override
@@ -67,4 +72,24 @@ public final class MockControllerMetrics implements ControllerMetrics {
     public int globalPartitionCount() {
         return this.partitions;
     }
+
+    @Override
+    public void setOfflinePartitionCount(int offlinePartitions) {
+        this.offlinePartitions = offlinePartitions;
+    }
+
+    @Override
+    public int offlinePartitionCount() {
+        return this.offlinePartitions;
+    }
+
+    @Override
+    public void setPreferredReplicaImbalanceCount(int replicaImbalances) {
+        this.preferredReplicaImbalances = replicaImbalances;
+    }
+
+    @Override
+    public int preferredReplicaImbalanceCount() {
+        return this.preferredReplicaImbalances;
+    }
 }
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 0e6d3a5..b121835 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -254,6 +254,66 @@ public class ReplicationControlManagerTest {
     }
 
     @Test
+    public void testOfflinePartitionAndReplicaImbalanceMetrics() throws Exception {
+        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+        ReplicationControlManager replicationControl = ctx.replicationControl;
+
+        for (int i = 0; i < 4; i++) {
+            registerBroker(i, ctx);
+            unfenceBroker(i, ctx);
+        }
+
+        CreatableTopicResult foo = ctx.createTestTopic("foo", new int[][] {
+                new int[] {0, 2}, new int[] {0, 1}});
+
+        CreatableTopicResult zar = ctx.createTestTopic("zar", new int[][] {
+                new int[] {0, 1, 2}, new int[] {1, 2, 3}, new int[] {1, 2, 0}});
+
+        ControllerResult<Void> result = replicationControl.unregisterBroker(0);
+        ctx.replay(result.records());
+
+        // All partitions should still be online after unregistering broker 0
+        assertEquals(0, ctx.metrics.offlinePartitionCount());
+        // Three partitions should not have their preferred (first) replica 0
+        assertEquals(3, ctx.metrics.preferredReplicaImbalanceCount());
+
+        result = replicationControl.unregisterBroker(1);
+        ctx.replay(result.records());
+
+        // After unregistering broker 1, 1 partition for topic foo should go offline
+        assertEquals(1, ctx.metrics.offlinePartitionCount());
+        // All five partitions should not have their preferred (first) replica at this point
+        assertEquals(5, ctx.metrics.preferredReplicaImbalanceCount());
+
+        result = replicationControl.unregisterBroker(2);
+        ctx.replay(result.records());
+
+        // After unregistering broker 2, the last partition for topic foo should go offline
+        // and 2 partitions for topic zar should go offline
+        assertEquals(4, ctx.metrics.offlinePartitionCount());
+
+        result = replicationControl.unregisterBroker(3);
+        ctx.replay(result.records());
+
+        // After unregistering broker 3 the last partition for topic zar should go offline
+        assertEquals(5, ctx.metrics.offlinePartitionCount());
+
+        // Deleting topic foo should bring the offline partition count down to 3
+        ArrayList<ApiMessageAndVersion> records = new ArrayList<>();
+        replicationControl.deleteTopic(foo.topicId(), records);
+        ctx.replay(records);
+
+        assertEquals(3, ctx.metrics.offlinePartitionCount());
+
+        // Deleting topic zar should bring the offline partition count down to 0
+        records = new ArrayList<>();
+        replicationControl.deleteTopic(zar.topicId(), records);
+        ctx.replay(records);
+
+        assertEquals(0, ctx.metrics.offlinePartitionCount());
+    }
+
+    @Test
     public void testValidateNewTopicNames() {
         Map<String, ApiError> topicErrors = new HashMap<>();
         CreatableTopicCollection topics = new CreatableTopicCollection();