You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/19 18:13:32 UTC

[GitHub] [kafka] rondagostino commented on a diff in pull request #12994: KAFKA-14457; Controller metrics should only expose committed data

rondagostino commented on code in PR #12994:
URL: https://github.com/apache/kafka/pull/12994#discussion_r1052461737


##########
metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.placement.PartitionAssignment;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * Type for updating controller metrics based on metadata records.
+ */
+final class ControllerMetricsManager {

Review Comment:
   Does this need to be `final` for a specific reason?  It makes mocking difficult, so unless there's a good reason for it I would tend to avoid doing it.
   
   At first I wondered why you didn't make this `public` -- but now I see that it is because we continue to pass in an instance of `ControllerMetrics`.  I now agree with keeping this package-private, though a comment caliing it out explicitly and explaining why might help others down the road.



##########
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##########
@@ -17,6 +17,11 @@
 
 package org.apache.kafka.controller;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.IntPredicate;

Review Comment:
   I see the same ordering difference in the below class's files within `metadata` (a few of which are already modified in this PR but without changing import order).  If we indeed want java packages first, then maybe fix the order on other places as well?
   
   AclControlManager
   ClientQuotaControlManager
   ClusterControlManager // modified in this PR
   ConfigurationControlManager
   PartitionReassignmentRevert
   QuorumController // modified in this PR
   QuorumFeatures
   ReplicationControlManager // modified in this PR
   ClientQuotaImage
   FeaturesImage
   RaftSnapshotWriter
   RecordListWriter
   ClusterMetadataAuthorizer
   StandardAclRecordIterator
   StandardAuthorizer
   StandardAuthorizerData
   BootstrapDirectory
   BatchFileReader
   SnapshotFileReader
   



##########
metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.placement.PartitionAssignment;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * Type for updating controller metrics based on metadata records.
+ */
+final class ControllerMetricsManager {
+    private final static class PartitionState {
+        final int leader;
+        final PartitionAssignment assignment;
+
+        PartitionState(int leader, PartitionAssignment assignment) {
+            this.leader = leader;
+            this.assignment = assignment;
+        }
+
+        int leader() {
+            return leader;
+        }
+
+        PartitionAssignment assignment() {
+            return assignment;
+        }
+    }
+
+    private final Set<Integer> registeredBrokers = new HashSet<>();
+
+    private final Set<Integer> fencedBrokers = new HashSet<>();
+
+    private int topicCount = 0;
+
+    private final Map<TopicIdPartition, PartitionState> topicPartitions = new HashMap<>();
+
+    private final Set<TopicIdPartition> offlineTopicPartitions = new HashSet<>();
+
+    private final Set<TopicIdPartition> imbalancedTopicPartitions = new HashSet<>();
+
+    private final ControllerMetrics controllerMetrics;
+
+    ControllerMetricsManager(ControllerMetrics controllerMetrics) {
+        this.controllerMetrics = controllerMetrics;
+    }
+
+    void replayBatch(long baseOffset, List<ApiMessageAndVersion> messages) {
+        int i = 1;
+        for (ApiMessageAndVersion message : messages) {
+            try {
+                replay(message.message());
+            } catch (Exception e) {
+                String failureMessage = String.format(
+                    "Unable to update controller metrics for %s record, it was %d of %d record(s) " +
+                    "in the batch with baseOffset %d.",
+                    message.message().getClass().getSimpleName(),
+                    i,
+                    messages.size(),
+                    baseOffset
+                );
+                throw new IllegalArgumentException(failureMessage, e);
+            }
+            i++;
+        }
+    }
+
+    /**
+     * Update controller metrics by replaying a metadata record.
+     *
+     * This method assumes that the provided ApiMessage is one of the type covered by MetadataRecordType.
+     *
+     * @param message a metadata record
+     */
+    @SuppressWarnings("checkstyle:cyclomaticComplexity")
+    void replay(ApiMessage message) {
+        MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
+        switch (type) {
+            case REGISTER_BROKER_RECORD:
+                replay((RegisterBrokerRecord) message);
+                break;
+            case UNREGISTER_BROKER_RECORD:
+                replay((UnregisterBrokerRecord) message);
+                break;
+            case FENCE_BROKER_RECORD:
+                replay((FenceBrokerRecord) message);
+                break;
+            case UNFENCE_BROKER_RECORD:
+                replay((UnfenceBrokerRecord) message);
+                break;
+            case BROKER_REGISTRATION_CHANGE_RECORD:
+                replay((BrokerRegistrationChangeRecord) message);
+                break;
+            case TOPIC_RECORD:
+                replay((TopicRecord) message);
+                break;
+            case PARTITION_RECORD:
+                replay((PartitionRecord) message);
+                break;
+            case PARTITION_CHANGE_RECORD:
+                replay((PartitionChangeRecord) message);
+                break;
+            case REMOVE_TOPIC_RECORD:
+                replay((RemoveTopicRecord) message);
+                break;
+            case CONFIG_RECORD:
+            case FEATURE_LEVEL_RECORD:
+            case CLIENT_QUOTA_RECORD:
+            case PRODUCER_IDS_RECORD:
+            case ACCESS_CONTROL_ENTRY_RECORD:
+            case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
+            case NO_OP_RECORD:
+                // These record types do not affect metrics
+                break;
+            default:
+                throw new RuntimeException("Unhandled record type " + type);
+        }
+    }
+
+    private void replay(RegisterBrokerRecord record) {
+        Integer brokerId = record.brokerId();
+        registeredBrokers.add(brokerId);
+        if (record.fenced()) {
+            fencedBrokers.add(brokerId);
+        } else {
+            fencedBrokers.remove(brokerId);
+        }
+
+        updateBrokerStateMetrics();
+    }
+
+    private void replay(UnregisterBrokerRecord record) {
+        Integer brokerId = record.brokerId();
+        registeredBrokers.remove(brokerId);
+        fencedBrokers.remove(brokerId);
+
+        updateBrokerStateMetrics();
+    }
+
+    private void replay(FenceBrokerRecord record) {
+        handleFencingChange(record.id(), BrokerRegistrationFencingChange.FENCE);
+    }
+
+    private void replay(UnfenceBrokerRecord record) {
+        handleFencingChange(record.id(), BrokerRegistrationFencingChange.UNFENCE);
+    }
+
+    private void replay(BrokerRegistrationChangeRecord record) {
+        BrokerRegistrationFencingChange fencingChange = BrokerRegistrationFencingChange
+            .fromValue(record.fenced())
+            .orElseThrow(() -> {
+                return new IllegalArgumentException(
+                    String.format(
+                        "Registration change record for %d has unknown value for fenced field: %x",
+                        record.brokerId(),
+                        record.fenced()
+                    )
+                );
+            });
+
+        handleFencingChange(record.brokerId(), fencingChange);
+    }
+
+    private void handleFencingChange(Integer brokerId, BrokerRegistrationFencingChange fencingChange) {
+        if (!registeredBrokers.contains(brokerId)) {
+            throw new IllegalArgumentException(String.format("Broker with id %s is not registered", brokerId));
+        }
+
+        if (fencingChange == BrokerRegistrationFencingChange.FENCE) {
+            fencedBrokers.add(brokerId);
+            updateBrokerStateMetrics();
+        } else if (fencingChange == BrokerRegistrationFencingChange.UNFENCE) {
+            fencedBrokers.remove(brokerId);
+            updateBrokerStateMetrics();
+        }
+    }
+
+    private void updateBrokerStateMetrics() {
+        controllerMetrics.setFencedBrokerCount(fencedBrokers.size());
+
+        Set<Integer> activeBrokers = new HashSet<>(registeredBrokers);
+        activeBrokers.removeAll(fencedBrokers);
+        controllerMetrics.setActiveBrokerCount(activeBrokers.size());
+    }
+
+    private void replay(TopicRecord record) {
+        topicCount++;
+
+        controllerMetrics.setGlobalTopicCount(topicCount);
+    }
+
+    private void replay(PartitionRecord record) {
+        TopicIdPartition tp = new TopicIdPartition(record.topicId(), record.partitionId());
+
+        PartitionState partitionState = new PartitionState(record.leader(), new PartitionAssignment(record.replicas()));
+        topicPartitions.put(tp, partitionState);
+
+        updateBasedOnPartitionState(tp, partitionState);
+
+        updateTopicAndPartitionMetrics();
+    }
+
+    private void replay(PartitionChangeRecord record) {
+        TopicIdPartition tp = new TopicIdPartition(record.topicId(), record.partitionId());
+        if (!topicPartitions.containsKey(tp)) {
+            throw new IllegalArgumentException(String.format("Unknown topic partitions %s", tp));
+        }
+
+        PartitionState partitionState = topicPartitions.computeIfPresent(
+            tp,
+            (key, oldValue) -> {
+                PartitionState newValue = oldValue;
+                // Update replicas
+                if (record.replicas() != null) {
+                    newValue = new PartitionState(newValue.leader(), new PartitionAssignment(record.replicas()));
+                }
+
+                if (record.leader() != NO_LEADER_CHANGE) {
+                    newValue = new PartitionState(record.leader(), newValue.assignment());
+                }
+
+                return newValue;
+            }
+        );
+
+        updateBasedOnPartitionState(tp, partitionState);
+
+        updateTopicAndPartitionMetrics();
+    }
+
+    private void replay(RemoveTopicRecord record) {
+        Uuid topicId = record.topicId();
+        Predicate<TopicIdPartition> matchesTopic = tp -> tp.topicId() == topicId;
+
+        topicCount--;
+        topicPartitions.keySet().removeIf(matchesTopic);
+        offlineTopicPartitions.removeIf(matchesTopic);
+        imbalancedTopicPartitions.removeIf(matchesTopic);
+
+        updateTopicAndPartitionMetrics();
+    }
+
+    void updateBasedOnPartitionState(TopicIdPartition tp, PartitionState partitionState) {
+        if (partitionState.leader() == NO_LEADER) {
+            offlineTopicPartitions.add(tp);
+        } else {
+            offlineTopicPartitions.remove(tp);
+        }
+
+        if (partitionState.leader() == partitionState.assignment().replicas().get(0)) {
+            imbalancedTopicPartitions.remove(tp);
+        } else {
+            imbalancedTopicPartitions.add(tp);
+        }
+    }
+
+    void updateTopicAndPartitionMetrics() {

Review Comment:
   `private`?



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -871,6 +871,9 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
                                 i++;
                             }
                         }
+
+                        controllerMetricsManager.replayBatch(batch.baseOffset(), messages);
+

Review Comment:
   A comment indicating that we must apply the entire batch or none of it -- and therefore we cannot replay each individual message above as is done for the snapshot loading case below -- might be helpful.
   
   Although, now that I write that, I wonder why there is an asymmetry.  Is there a reason why the snapshot case below couldn't --= shouldn't -- just update based on the entire batch as well?  Then the `replay()` method could remain package-private for testing purposes instead of being public as it sort of makes sense for records to always be committed as part of a batch (either a true batch or a "full snapshot" batch, correct?



##########
metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.placement.PartitionAssignment;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * Type for updating controller metrics based on metadata records.

Review Comment:
   Might be good to indicate that the class is not thread-safe and is expected to be used only from within the controller thread.
   
   Noting that `ControllerMetrics` must be thread-safe, which interestingly enough is not mentioned in that interface's Javadoc.  Maybe add a comment there?



##########
metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.placement.PartitionAssignment;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * Type for updating controller metrics based on metadata records.
+ */
+final class ControllerMetricsManager {
+    private final static class PartitionState {
+        final int leader;
+        final PartitionAssignment assignment;
+
+        PartitionState(int leader, PartitionAssignment assignment) {
+            this.leader = leader;
+            this.assignment = assignment;
+        }
+
+        int leader() {
+            return leader;
+        }
+
+        PartitionAssignment assignment() {
+            return assignment;
+        }
+    }
+
+    private final Set<Integer> registeredBrokers = new HashSet<>();
+
+    private final Set<Integer> fencedBrokers = new HashSet<>();
+
+    private int topicCount = 0;
+
+    private final Map<TopicIdPartition, PartitionState> topicPartitions = new HashMap<>();
+
+    private final Set<TopicIdPartition> offlineTopicPartitions = new HashSet<>();
+
+    private final Set<TopicIdPartition> imbalancedTopicPartitions = new HashSet<>();
+
+    private final ControllerMetrics controllerMetrics;
+
+    ControllerMetricsManager(ControllerMetrics controllerMetrics) {
+        this.controllerMetrics = controllerMetrics;
+    }
+
+    void replayBatch(long baseOffset, List<ApiMessageAndVersion> messages) {
+        int i = 1;
+        for (ApiMessageAndVersion message : messages) {
+            try {
+                replay(message.message());
+            } catch (Exception e) {
+                String failureMessage = String.format(
+                    "Unable to update controller metrics for %s record, it was %d of %d record(s) " +
+                    "in the batch with baseOffset %d.",
+                    message.message().getClass().getSimpleName(),
+                    i,
+                    messages.size(),
+                    baseOffset
+                );
+                throw new IllegalArgumentException(failureMessage, e);
+            }
+            i++;
+        }
+    }
+
+    /**
+     * Update controller metrics by replaying a metadata record.
+     *
+     * This method assumes that the provided ApiMessage is one of the type covered by MetadataRecordType.
+     *
+     * @param message a metadata record
+     */
+    @SuppressWarnings("checkstyle:cyclomaticComplexity")
+    void replay(ApiMessage message) {
+        MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
+        switch (type) {
+            case REGISTER_BROKER_RECORD:
+                replay((RegisterBrokerRecord) message);
+                break;
+            case UNREGISTER_BROKER_RECORD:
+                replay((UnregisterBrokerRecord) message);
+                break;
+            case FENCE_BROKER_RECORD:
+                replay((FenceBrokerRecord) message);
+                break;
+            case UNFENCE_BROKER_RECORD:
+                replay((UnfenceBrokerRecord) message);
+                break;
+            case BROKER_REGISTRATION_CHANGE_RECORD:
+                replay((BrokerRegistrationChangeRecord) message);
+                break;
+            case TOPIC_RECORD:
+                replay((TopicRecord) message);
+                break;
+            case PARTITION_RECORD:
+                replay((PartitionRecord) message);
+                break;
+            case PARTITION_CHANGE_RECORD:
+                replay((PartitionChangeRecord) message);
+                break;
+            case REMOVE_TOPIC_RECORD:
+                replay((RemoveTopicRecord) message);
+                break;
+            case CONFIG_RECORD:
+            case FEATURE_LEVEL_RECORD:
+            case CLIENT_QUOTA_RECORD:
+            case PRODUCER_IDS_RECORD:
+            case ACCESS_CONTROL_ENTRY_RECORD:
+            case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
+            case NO_OP_RECORD:
+                // These record types do not affect metrics
+                break;
+            default:
+                throw new RuntimeException("Unhandled record type " + type);
+        }
+    }
+
+    private void replay(RegisterBrokerRecord record) {
+        Integer brokerId = record.brokerId();
+        registeredBrokers.add(brokerId);
+        if (record.fenced()) {
+            fencedBrokers.add(brokerId);
+        } else {
+            fencedBrokers.remove(brokerId);
+        }
+
+        updateBrokerStateMetrics();
+    }
+
+    private void replay(UnregisterBrokerRecord record) {
+        Integer brokerId = record.brokerId();
+        registeredBrokers.remove(brokerId);
+        fencedBrokers.remove(brokerId);
+
+        updateBrokerStateMetrics();
+    }
+
+    private void replay(FenceBrokerRecord record) {
+        handleFencingChange(record.id(), BrokerRegistrationFencingChange.FENCE);
+    }
+
+    private void replay(UnfenceBrokerRecord record) {
+        handleFencingChange(record.id(), BrokerRegistrationFencingChange.UNFENCE);
+    }
+
+    private void replay(BrokerRegistrationChangeRecord record) {
+        BrokerRegistrationFencingChange fencingChange = BrokerRegistrationFencingChange
+            .fromValue(record.fenced())
+            .orElseThrow(() -> {
+                return new IllegalArgumentException(
+                    String.format(
+                        "Registration change record for %d has unknown value for fenced field: %x",
+                        record.brokerId(),
+                        record.fenced()
+                    )
+                );
+            });
+
+        handleFencingChange(record.brokerId(), fencingChange);
+    }
+
+    private void handleFencingChange(Integer brokerId, BrokerRegistrationFencingChange fencingChange) {
+        if (!registeredBrokers.contains(brokerId)) {
+            throw new IllegalArgumentException(String.format("Broker with id %s is not registered", brokerId));
+        }
+
+        if (fencingChange == BrokerRegistrationFencingChange.FENCE) {
+            fencedBrokers.add(brokerId);
+            updateBrokerStateMetrics();
+        } else if (fencingChange == BrokerRegistrationFencingChange.UNFENCE) {
+            fencedBrokers.remove(brokerId);
+            updateBrokerStateMetrics();
+        }
+    }
+
+    private void updateBrokerStateMetrics() {
+        controllerMetrics.setFencedBrokerCount(fencedBrokers.size());
+
+        Set<Integer> activeBrokers = new HashSet<>(registeredBrokers);
+        activeBrokers.removeAll(fencedBrokers);
+        controllerMetrics.setActiveBrokerCount(activeBrokers.size());
+    }
+
+    private void replay(TopicRecord record) {
+        topicCount++;
+
+        controllerMetrics.setGlobalTopicCount(topicCount);
+    }
+
+    private void replay(PartitionRecord record) {
+        TopicIdPartition tp = new TopicIdPartition(record.topicId(), record.partitionId());
+
+        PartitionState partitionState = new PartitionState(record.leader(), new PartitionAssignment(record.replicas()));
+        topicPartitions.put(tp, partitionState);
+
+        updateBasedOnPartitionState(tp, partitionState);
+
+        updateTopicAndPartitionMetrics();
+    }
+
+    private void replay(PartitionChangeRecord record) {
+        TopicIdPartition tp = new TopicIdPartition(record.topicId(), record.partitionId());
+        if (!topicPartitions.containsKey(tp)) {
+            throw new IllegalArgumentException(String.format("Unknown topic partitions %s", tp));
+        }
+
+        PartitionState partitionState = topicPartitions.computeIfPresent(
+            tp,
+            (key, oldValue) -> {
+                PartitionState newValue = oldValue;
+                // Update replicas
+                if (record.replicas() != null) {
+                    newValue = new PartitionState(newValue.leader(), new PartitionAssignment(record.replicas()));
+                }
+
+                if (record.leader() != NO_LEADER_CHANGE) {
+                    newValue = new PartitionState(record.leader(), newValue.assignment());
+                }
+
+                return newValue;
+            }
+        );
+
+        updateBasedOnPartitionState(tp, partitionState);
+
+        updateTopicAndPartitionMetrics();
+    }
+
+    private void replay(RemoveTopicRecord record) {
+        Uuid topicId = record.topicId();
+        Predicate<TopicIdPartition> matchesTopic = tp -> tp.topicId() == topicId;
+
+        topicCount--;
+        topicPartitions.keySet().removeIf(matchesTopic);
+        offlineTopicPartitions.removeIf(matchesTopic);
+        imbalancedTopicPartitions.removeIf(matchesTopic);
+
+        updateTopicAndPartitionMetrics();
+    }
+
+    void updateBasedOnPartitionState(TopicIdPartition tp, PartitionState partitionState) {

Review Comment:
   `private` I think since `PartitionState` is private.



##########
metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.placement.PartitionAssignment;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * Type for updating controller metrics based on metadata records.
+ */
+final class ControllerMetricsManager {
+    private final static class PartitionState {
+        final int leader;
+        final PartitionAssignment assignment;
+
+        PartitionState(int leader, PartitionAssignment assignment) {
+            this.leader = leader;
+            this.assignment = assignment;
+        }
+
+        int leader() {
+            return leader;
+        }
+
+        PartitionAssignment assignment() {
+            return assignment;
+        }
+    }
+
+    private final Set<Integer> registeredBrokers = new HashSet<>();
+
+    private final Set<Integer> fencedBrokers = new HashSet<>();
+
+    private int topicCount = 0;
+
+    private final Map<TopicIdPartition, PartitionState> topicPartitions = new HashMap<>();
+
+    private final Set<TopicIdPartition> offlineTopicPartitions = new HashSet<>();
+
+    private final Set<TopicIdPartition> imbalancedTopicPartitions = new HashSet<>();
+
+    private final ControllerMetrics controllerMetrics;
+
+    ControllerMetricsManager(ControllerMetrics controllerMetrics) {
+        this.controllerMetrics = controllerMetrics;
+    }
+
+    void replayBatch(long baseOffset, List<ApiMessageAndVersion> messages) {
+        int i = 1;
+        for (ApiMessageAndVersion message : messages) {
+            try {
+                replay(message.message());
+            } catch (Exception e) {
+                String failureMessage = String.format(
+                    "Unable to update controller metrics for %s record, it was %d of %d record(s) " +
+                    "in the batch with baseOffset %d.",
+                    message.message().getClass().getSimpleName(),
+                    i,
+                    messages.size(),
+                    baseOffset
+                );
+                throw new IllegalArgumentException(failureMessage, e);

Review Comment:
   Just wondering what our options are here.  Seems to me that ignoring the error applying the metric is one.  Given that we applied the record successfully but fail applying the change in metrics, might we prioritize availability over consistency here?  By throwing an exception we seem to have chosen consistency over availability, and I'm not sure that is the right choice.  WDYT?



##########
metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.placement.PartitionAssignment;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * Type for updating controller metrics based on metadata records.
+ */
+final class ControllerMetricsManager {
+    private final static class PartitionState {
+        final int leader;
+        final PartitionAssignment assignment;
+
+        PartitionState(int leader, PartitionAssignment assignment) {
+            this.leader = leader;
+            this.assignment = assignment;
+        }
+
+        int leader() {
+            return leader;
+        }
+
+        PartitionAssignment assignment() {
+            return assignment;
+        }
+    }
+
+    private final Set<Integer> registeredBrokers = new HashSet<>();
+
+    private final Set<Integer> fencedBrokers = new HashSet<>();
+
+    private int topicCount = 0;
+
+    private final Map<TopicIdPartition, PartitionState> topicPartitions = new HashMap<>();
+
+    private final Set<TopicIdPartition> offlineTopicPartitions = new HashSet<>();
+
+    private final Set<TopicIdPartition> imbalancedTopicPartitions = new HashSet<>();
+
+    private final ControllerMetrics controllerMetrics;
+
+    ControllerMetricsManager(ControllerMetrics controllerMetrics) {
+        this.controllerMetrics = controllerMetrics;
+    }
+
+    void replayBatch(long baseOffset, List<ApiMessageAndVersion> messages) {
+        int i = 1;
+        for (ApiMessageAndVersion message : messages) {
+            try {
+                replay(message.message());
+            } catch (Exception e) {
+                String failureMessage = String.format(
+                    "Unable to update controller metrics for %s record, it was %d of %d record(s) " +
+                    "in the batch with baseOffset %d.",
+                    message.message().getClass().getSimpleName(),
+                    i,
+                    messages.size(),
+                    baseOffset
+                );
+                throw new IllegalArgumentException(failureMessage, e);
+            }
+            i++;
+        }
+    }
+
+    /**
+     * Update controller metrics by replaying a metadata record.
+     *
+     * This method assumes that the provided ApiMessage is one of the type covered by MetadataRecordType.
+     *
+     * @param message a metadata record
+     */
+    @SuppressWarnings("checkstyle:cyclomaticComplexity")
+    void replay(ApiMessage message) {
+        MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
+        switch (type) {
+            case REGISTER_BROKER_RECORD:
+                replay((RegisterBrokerRecord) message);
+                break;
+            case UNREGISTER_BROKER_RECORD:
+                replay((UnregisterBrokerRecord) message);
+                break;
+            case FENCE_BROKER_RECORD:
+                replay((FenceBrokerRecord) message);
+                break;
+            case UNFENCE_BROKER_RECORD:
+                replay((UnfenceBrokerRecord) message);
+                break;
+            case BROKER_REGISTRATION_CHANGE_RECORD:
+                replay((BrokerRegistrationChangeRecord) message);
+                break;
+            case TOPIC_RECORD:
+                replay((TopicRecord) message);
+                break;
+            case PARTITION_RECORD:
+                replay((PartitionRecord) message);
+                break;
+            case PARTITION_CHANGE_RECORD:
+                replay((PartitionChangeRecord) message);
+                break;
+            case REMOVE_TOPIC_RECORD:
+                replay((RemoveTopicRecord) message);
+                break;
+            case CONFIG_RECORD:
+            case FEATURE_LEVEL_RECORD:
+            case CLIENT_QUOTA_RECORD:
+            case PRODUCER_IDS_RECORD:
+            case ACCESS_CONTROL_ENTRY_RECORD:
+            case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
+            case NO_OP_RECORD:
+                // These record types do not affect metrics
+                break;
+            default:
+                throw new RuntimeException("Unhandled record type " + type);
+        }
+    }
+
+    private void replay(RegisterBrokerRecord record) {
+        Integer brokerId = record.brokerId();
+        registeredBrokers.add(brokerId);
+        if (record.fenced()) {
+            fencedBrokers.add(brokerId);
+        } else {
+            fencedBrokers.remove(brokerId);
+        }
+
+        updateBrokerStateMetrics();
+    }
+
+    private void replay(UnregisterBrokerRecord record) {
+        Integer brokerId = record.brokerId();
+        registeredBrokers.remove(brokerId);
+        fencedBrokers.remove(brokerId);
+
+        updateBrokerStateMetrics();
+    }
+
+    private void replay(FenceBrokerRecord record) {
+        handleFencingChange(record.id(), BrokerRegistrationFencingChange.FENCE);
+    }
+
+    private void replay(UnfenceBrokerRecord record) {
+        handleFencingChange(record.id(), BrokerRegistrationFencingChange.UNFENCE);
+    }
+
+    private void replay(BrokerRegistrationChangeRecord record) {
+        BrokerRegistrationFencingChange fencingChange = BrokerRegistrationFencingChange
+            .fromValue(record.fenced())
+            .orElseThrow(() -> {
+                return new IllegalArgumentException(
+                    String.format(
+                        "Registration change record for %d has unknown value for fenced field: %x",
+                        record.brokerId(),
+                        record.fenced()
+                    )
+                );
+            });
+
+        handleFencingChange(record.brokerId(), fencingChange);
+    }
+
+    private void handleFencingChange(Integer brokerId, BrokerRegistrationFencingChange fencingChange) {
+        if (!registeredBrokers.contains(brokerId)) {
+            throw new IllegalArgumentException(String.format("Broker with id %s is not registered", brokerId));
+        }
+
+        if (fencingChange == BrokerRegistrationFencingChange.FENCE) {
+            fencedBrokers.add(brokerId);
+            updateBrokerStateMetrics();

Review Comment:
   nit: move `updateBrokerStateMetrics()` invocation out of the `if` block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org