You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/12/14 23:57:25 UTC

[GitHub] [kafka] jsancio opened a new pull request, #12994: KAFKA-14457; Controller metrics should only expose committed data

jsancio opened a new pull request, #12994:
URL: https://github.com/apache/kafka/pull/12994

   TODO: Write description
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12994: KAFKA-14457; Controller metrics should only expose committed data

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12994:
URL: https://github.com/apache/kafka/pull/12994#discussion_r1052681560


##########
metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.placement.PartitionAssignment;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * Type for updating controller metrics based on metadata records.

Review Comment:
   > Might be good to indicate that the class is not thread-safe and is expected to be used only from within the controller thread.
   
   Hmm. I can do that if you think it is important for this type. I don't think that this should be required. Most types in Kafka and Java are not thread-safe. The default in Java and Kafka is for types to not be thread-safe. Thread-safe types are documented in Kafka and Java.
   
   > Noting that ControllerMetrics must be thread-safe, which interestingly enough is not mentioned in that interface's Javadoc. Maybe add a comment there?
   
   The interface `ControllerMetrics` doesn't need to be thread-safe. For example `MockControllerMetrics` is not thread-safe. `QuorumControllerMetrics` is thread safe but this is because internally it uses methods like `metric.newGauge` that require this.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rondagostino commented on a diff in pull request #12994: KAFKA-14457; Controller metrics should only expose committed data

Posted by GitBox <gi...@apache.org>.
rondagostino commented on code in PR #12994:
URL: https://github.com/apache/kafka/pull/12994#discussion_r1052714786


##########
metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.placement.PartitionAssignment;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * Type for updating controller metrics based on metadata records.
+ */
+final class ControllerMetricsManager {
+    private final static class PartitionState {
+        final int leader;
+        final PartitionAssignment assignment;
+
+        PartitionState(int leader, PartitionAssignment assignment) {
+            this.leader = leader;
+            this.assignment = assignment;
+        }
+
+        int leader() {
+            return leader;
+        }
+
+        PartitionAssignment assignment() {
+            return assignment;
+        }
+    }
+
+    private final Set<Integer> registeredBrokers = new HashSet<>();
+
+    private final Set<Integer> fencedBrokers = new HashSet<>();
+
+    private int topicCount = 0;
+
+    private final Map<TopicIdPartition, PartitionState> topicPartitions = new HashMap<>();
+
+    private final Set<TopicIdPartition> offlineTopicPartitions = new HashSet<>();
+
+    private final Set<TopicIdPartition> imbalancedTopicPartitions = new HashSet<>();
+
+    private final ControllerMetrics controllerMetrics;
+
+    ControllerMetricsManager(ControllerMetrics controllerMetrics) {
+        this.controllerMetrics = controllerMetrics;
+    }
+
+    void replayBatch(long baseOffset, List<ApiMessageAndVersion> messages) {
+        int i = 1;
+        for (ApiMessageAndVersion message : messages) {
+            try {
+                replay(message.message());
+            } catch (Exception e) {
+                String failureMessage = String.format(
+                    "Unable to update controller metrics for %s record, it was %d of %d record(s) " +
+                    "in the batch with baseOffset %d.",
+                    message.message().getClass().getSimpleName(),
+                    i,
+                    messages.size(),
+                    baseOffset
+                );
+                throw new IllegalArgumentException(failureMessage, e);

Review Comment:
   > For metadata, we have agreed that consistency is more important than availability. Inconsistent metadata in the controllers has an impact on the entire cluster.
   
   Agree, but if the records apply, and the metrics do not, we simply have inconsistent metrics but consistent metadata.  We have inconsistent metrics now, without this patch.  So I'm wondering if maybe I am missing something?  If not, I'm still wondering if we should really fail the batch here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12994: KAFKA-14457; Controller metrics should only expose committed data

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12994:
URL: https://github.com/apache/kafka/pull/12994#discussion_r1052722296


##########
metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.placement.PartitionAssignment;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * Type for updating controller metrics based on metadata records.
+ */
+final class ControllerMetricsManager {
+    private final static class PartitionState {
+        final int leader;
+        final PartitionAssignment assignment;
+
+        PartitionState(int leader, PartitionAssignment assignment) {
+            this.leader = leader;
+            this.assignment = assignment;
+        }
+
+        int leader() {
+            return leader;
+        }
+
+        PartitionAssignment assignment() {
+            return assignment;
+        }
+    }
+
+    private final Set<Integer> registeredBrokers = new HashSet<>();
+
+    private final Set<Integer> fencedBrokers = new HashSet<>();
+
+    private int topicCount = 0;
+
+    private final Map<TopicIdPartition, PartitionState> topicPartitions = new HashMap<>();
+
+    private final Set<TopicIdPartition> offlineTopicPartitions = new HashSet<>();
+
+    private final Set<TopicIdPartition> imbalancedTopicPartitions = new HashSet<>();
+
+    private final ControllerMetrics controllerMetrics;
+
+    ControllerMetricsManager(ControllerMetrics controllerMetrics) {
+        this.controllerMetrics = controllerMetrics;
+    }
+
+    void replayBatch(long baseOffset, List<ApiMessageAndVersion> messages) {
+        int i = 1;
+        for (ApiMessageAndVersion message : messages) {
+            try {
+                replay(message.message());
+            } catch (Exception e) {
+                String failureMessage = String.format(
+                    "Unable to update controller metrics for %s record, it was %d of %d record(s) " +
+                    "in the batch with baseOffset %d.",
+                    message.message().getClass().getSimpleName(),
+                    i,
+                    messages.size(),
+                    baseOffset
+                );
+                throw new IllegalArgumentException(failureMessage, e);

Review Comment:
   That is not true in general. In theory, it is possible for `ControllerManagerTest` to throw and for the other manager to also be in an inconsistent state. In those cases, we want the the controller's error metric to be non-zero.
   
   For cluster monitoring it is really important for the controller to report consistent metrics and to not silently continue running with incorrect metrics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12994: KAFKA-14457; Controller metrics should only expose committed data

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12994:
URL: https://github.com/apache/kafka/pull/12994#discussion_r1052656028


##########
metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.placement.PartitionAssignment;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * Type for updating controller metrics based on metadata records.
+ */
+final class ControllerMetricsManager {
+    private final static class PartitionState {
+        final int leader;
+        final PartitionAssignment assignment;
+
+        PartitionState(int leader, PartitionAssignment assignment) {
+            this.leader = leader;
+            this.assignment = assignment;
+        }
+
+        int leader() {
+            return leader;
+        }
+
+        PartitionAssignment assignment() {
+            return assignment;
+        }
+    }
+
+    private final Set<Integer> registeredBrokers = new HashSet<>();
+
+    private final Set<Integer> fencedBrokers = new HashSet<>();
+
+    private int topicCount = 0;
+
+    private final Map<TopicIdPartition, PartitionState> topicPartitions = new HashMap<>();
+
+    private final Set<TopicIdPartition> offlineTopicPartitions = new HashSet<>();
+
+    private final Set<TopicIdPartition> imbalancedTopicPartitions = new HashSet<>();
+
+    private final ControllerMetrics controllerMetrics;
+
+    ControllerMetricsManager(ControllerMetrics controllerMetrics) {
+        this.controllerMetrics = controllerMetrics;
+    }
+
+    void replayBatch(long baseOffset, List<ApiMessageAndVersion> messages) {
+        int i = 1;
+        for (ApiMessageAndVersion message : messages) {
+            try {
+                replay(message.message());
+            } catch (Exception e) {
+                String failureMessage = String.format(
+                    "Unable to update controller metrics for %s record, it was %d of %d record(s) " +
+                    "in the batch with baseOffset %d.",
+                    message.message().getClass().getSimpleName(),
+                    i,
+                    messages.size(),
+                    baseOffset
+                );
+                throw new IllegalArgumentException(failureMessage, e);
+            }
+            i++;
+        }
+    }
+
+    /**
+     * Update controller metrics by replaying a metadata record.
+     *
+     * This method assumes that the provided ApiMessage is one of the type covered by MetadataRecordType.
+     *
+     * @param message a metadata record
+     */
+    @SuppressWarnings("checkstyle:cyclomaticComplexity")
+    void replay(ApiMessage message) {
+        MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
+        switch (type) {
+            case REGISTER_BROKER_RECORD:
+                replay((RegisterBrokerRecord) message);
+                break;
+            case UNREGISTER_BROKER_RECORD:
+                replay((UnregisterBrokerRecord) message);
+                break;
+            case FENCE_BROKER_RECORD:
+                replay((FenceBrokerRecord) message);
+                break;
+            case UNFENCE_BROKER_RECORD:
+                replay((UnfenceBrokerRecord) message);
+                break;
+            case BROKER_REGISTRATION_CHANGE_RECORD:
+                replay((BrokerRegistrationChangeRecord) message);
+                break;
+            case TOPIC_RECORD:
+                replay((TopicRecord) message);
+                break;
+            case PARTITION_RECORD:
+                replay((PartitionRecord) message);
+                break;
+            case PARTITION_CHANGE_RECORD:
+                replay((PartitionChangeRecord) message);
+                break;
+            case REMOVE_TOPIC_RECORD:
+                replay((RemoveTopicRecord) message);
+                break;
+            case CONFIG_RECORD:
+            case FEATURE_LEVEL_RECORD:
+            case CLIENT_QUOTA_RECORD:
+            case PRODUCER_IDS_RECORD:
+            case ACCESS_CONTROL_ENTRY_RECORD:
+            case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
+            case NO_OP_RECORD:
+                // These record types do not affect metrics
+                break;
+            default:
+                throw new RuntimeException("Unhandled record type " + type);
+        }
+    }
+
+    private void replay(RegisterBrokerRecord record) {
+        Integer brokerId = record.brokerId();
+        registeredBrokers.add(brokerId);
+        if (record.fenced()) {
+            fencedBrokers.add(brokerId);
+        } else {
+            fencedBrokers.remove(brokerId);
+        }
+
+        updateBrokerStateMetrics();
+    }
+
+    private void replay(UnregisterBrokerRecord record) {
+        Integer brokerId = record.brokerId();
+        registeredBrokers.remove(brokerId);
+        fencedBrokers.remove(brokerId);
+
+        updateBrokerStateMetrics();
+    }
+
+    private void replay(FenceBrokerRecord record) {
+        handleFencingChange(record.id(), BrokerRegistrationFencingChange.FENCE);
+    }
+
+    private void replay(UnfenceBrokerRecord record) {
+        handleFencingChange(record.id(), BrokerRegistrationFencingChange.UNFENCE);
+    }
+
+    private void replay(BrokerRegistrationChangeRecord record) {
+        BrokerRegistrationFencingChange fencingChange = BrokerRegistrationFencingChange
+            .fromValue(record.fenced())
+            .orElseThrow(() -> {
+                return new IllegalArgumentException(
+                    String.format(
+                        "Registration change record for %d has unknown value for fenced field: %x",
+                        record.brokerId(),
+                        record.fenced()
+                    )
+                );
+            });
+
+        handleFencingChange(record.brokerId(), fencingChange);
+    }
+
+    private void handleFencingChange(Integer brokerId, BrokerRegistrationFencingChange fencingChange) {
+        if (!registeredBrokers.contains(brokerId)) {
+            throw new IllegalArgumentException(String.format("Broker with id %s is not registered", brokerId));
+        }
+
+        if (fencingChange == BrokerRegistrationFencingChange.FENCE) {
+            fencedBrokers.add(brokerId);
+            updateBrokerStateMetrics();
+        } else if (fencingChange == BrokerRegistrationFencingChange.UNFENCE) {
+            fencedBrokers.remove(brokerId);
+            updateBrokerStateMetrics();
+        }
+    }
+
+    private void updateBrokerStateMetrics() {
+        controllerMetrics.setFencedBrokerCount(fencedBrokers.size());
+
+        Set<Integer> activeBrokers = new HashSet<>(registeredBrokers);
+        activeBrokers.removeAll(fencedBrokers);
+        controllerMetrics.setActiveBrokerCount(activeBrokers.size());
+    }
+
+    private void replay(TopicRecord record) {
+        topicCount++;
+
+        controllerMetrics.setGlobalTopicCount(topicCount);
+    }
+
+    private void replay(PartitionRecord record) {
+        TopicIdPartition tp = new TopicIdPartition(record.topicId(), record.partitionId());
+
+        PartitionState partitionState = new PartitionState(record.leader(), new PartitionAssignment(record.replicas()));
+        topicPartitions.put(tp, partitionState);
+
+        updateBasedOnPartitionState(tp, partitionState);
+
+        updateTopicAndPartitionMetrics();
+    }
+
+    private void replay(PartitionChangeRecord record) {
+        TopicIdPartition tp = new TopicIdPartition(record.topicId(), record.partitionId());
+        if (!topicPartitions.containsKey(tp)) {
+            throw new IllegalArgumentException(String.format("Unknown topic partitions %s", tp));
+        }
+
+        PartitionState partitionState = topicPartitions.computeIfPresent(
+            tp,
+            (key, oldValue) -> {
+                PartitionState newValue = oldValue;
+                // Update replicas
+                if (record.replicas() != null) {
+                    newValue = new PartitionState(newValue.leader(), new PartitionAssignment(record.replicas()));
+                }
+
+                if (record.leader() != NO_LEADER_CHANGE) {
+                    newValue = new PartitionState(record.leader(), newValue.assignment());
+                }
+
+                return newValue;
+            }
+        );
+
+        updateBasedOnPartitionState(tp, partitionState);
+
+        updateTopicAndPartitionMetrics();
+    }
+
+    private void replay(RemoveTopicRecord record) {
+        Uuid topicId = record.topicId();
+        Predicate<TopicIdPartition> matchesTopic = tp -> tp.topicId() == topicId;
+
+        topicCount--;
+        topicPartitions.keySet().removeIf(matchesTopic);
+        offlineTopicPartitions.removeIf(matchesTopic);
+        imbalancedTopicPartitions.removeIf(matchesTopic);
+
+        updateTopicAndPartitionMetrics();
+    }
+
+    void updateBasedOnPartitionState(TopicIdPartition tp, PartitionState partitionState) {

Review Comment:
   Done.



##########
metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.placement.PartitionAssignment;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * Type for updating controller metrics based on metadata records.
+ */
+final class ControllerMetricsManager {
+    private final static class PartitionState {
+        final int leader;
+        final PartitionAssignment assignment;
+
+        PartitionState(int leader, PartitionAssignment assignment) {
+            this.leader = leader;
+            this.assignment = assignment;
+        }
+
+        int leader() {
+            return leader;
+        }
+
+        PartitionAssignment assignment() {
+            return assignment;
+        }
+    }
+
+    private final Set<Integer> registeredBrokers = new HashSet<>();
+
+    private final Set<Integer> fencedBrokers = new HashSet<>();
+
+    private int topicCount = 0;
+
+    private final Map<TopicIdPartition, PartitionState> topicPartitions = new HashMap<>();
+
+    private final Set<TopicIdPartition> offlineTopicPartitions = new HashSet<>();
+
+    private final Set<TopicIdPartition> imbalancedTopicPartitions = new HashSet<>();
+
+    private final ControllerMetrics controllerMetrics;
+
+    ControllerMetricsManager(ControllerMetrics controllerMetrics) {
+        this.controllerMetrics = controllerMetrics;
+    }
+
+    void replayBatch(long baseOffset, List<ApiMessageAndVersion> messages) {
+        int i = 1;
+        for (ApiMessageAndVersion message : messages) {
+            try {
+                replay(message.message());
+            } catch (Exception e) {
+                String failureMessage = String.format(
+                    "Unable to update controller metrics for %s record, it was %d of %d record(s) " +
+                    "in the batch with baseOffset %d.",
+                    message.message().getClass().getSimpleName(),
+                    i,
+                    messages.size(),
+                    baseOffset
+                );
+                throw new IllegalArgumentException(failureMessage, e);
+            }
+            i++;
+        }
+    }
+
+    /**
+     * Update controller metrics by replaying a metadata record.
+     *
+     * This method assumes that the provided ApiMessage is one of the type covered by MetadataRecordType.
+     *
+     * @param message a metadata record
+     */
+    @SuppressWarnings("checkstyle:cyclomaticComplexity")
+    void replay(ApiMessage message) {
+        MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
+        switch (type) {
+            case REGISTER_BROKER_RECORD:
+                replay((RegisterBrokerRecord) message);
+                break;
+            case UNREGISTER_BROKER_RECORD:
+                replay((UnregisterBrokerRecord) message);
+                break;
+            case FENCE_BROKER_RECORD:
+                replay((FenceBrokerRecord) message);
+                break;
+            case UNFENCE_BROKER_RECORD:
+                replay((UnfenceBrokerRecord) message);
+                break;
+            case BROKER_REGISTRATION_CHANGE_RECORD:
+                replay((BrokerRegistrationChangeRecord) message);
+                break;
+            case TOPIC_RECORD:
+                replay((TopicRecord) message);
+                break;
+            case PARTITION_RECORD:
+                replay((PartitionRecord) message);
+                break;
+            case PARTITION_CHANGE_RECORD:
+                replay((PartitionChangeRecord) message);
+                break;
+            case REMOVE_TOPIC_RECORD:
+                replay((RemoveTopicRecord) message);
+                break;
+            case CONFIG_RECORD:
+            case FEATURE_LEVEL_RECORD:
+            case CLIENT_QUOTA_RECORD:
+            case PRODUCER_IDS_RECORD:
+            case ACCESS_CONTROL_ENTRY_RECORD:
+            case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
+            case NO_OP_RECORD:
+                // These record types do not affect metrics
+                break;
+            default:
+                throw new RuntimeException("Unhandled record type " + type);
+        }
+    }
+
+    private void replay(RegisterBrokerRecord record) {
+        Integer brokerId = record.brokerId();
+        registeredBrokers.add(brokerId);
+        if (record.fenced()) {
+            fencedBrokers.add(brokerId);
+        } else {
+            fencedBrokers.remove(brokerId);
+        }
+
+        updateBrokerStateMetrics();
+    }
+
+    private void replay(UnregisterBrokerRecord record) {
+        Integer brokerId = record.brokerId();
+        registeredBrokers.remove(brokerId);
+        fencedBrokers.remove(brokerId);
+
+        updateBrokerStateMetrics();
+    }
+
+    private void replay(FenceBrokerRecord record) {
+        handleFencingChange(record.id(), BrokerRegistrationFencingChange.FENCE);
+    }
+
+    private void replay(UnfenceBrokerRecord record) {
+        handleFencingChange(record.id(), BrokerRegistrationFencingChange.UNFENCE);
+    }
+
+    private void replay(BrokerRegistrationChangeRecord record) {
+        BrokerRegistrationFencingChange fencingChange = BrokerRegistrationFencingChange
+            .fromValue(record.fenced())
+            .orElseThrow(() -> {
+                return new IllegalArgumentException(
+                    String.format(
+                        "Registration change record for %d has unknown value for fenced field: %x",
+                        record.brokerId(),
+                        record.fenced()
+                    )
+                );
+            });
+
+        handleFencingChange(record.brokerId(), fencingChange);
+    }
+
+    private void handleFencingChange(Integer brokerId, BrokerRegistrationFencingChange fencingChange) {
+        if (!registeredBrokers.contains(brokerId)) {
+            throw new IllegalArgumentException(String.format("Broker with id %s is not registered", brokerId));
+        }
+
+        if (fencingChange == BrokerRegistrationFencingChange.FENCE) {
+            fencedBrokers.add(brokerId);
+            updateBrokerStateMetrics();
+        } else if (fencingChange == BrokerRegistrationFencingChange.UNFENCE) {
+            fencedBrokers.remove(brokerId);
+            updateBrokerStateMetrics();
+        }
+    }
+
+    private void updateBrokerStateMetrics() {
+        controllerMetrics.setFencedBrokerCount(fencedBrokers.size());
+
+        Set<Integer> activeBrokers = new HashSet<>(registeredBrokers);
+        activeBrokers.removeAll(fencedBrokers);
+        controllerMetrics.setActiveBrokerCount(activeBrokers.size());
+    }
+
+    private void replay(TopicRecord record) {
+        topicCount++;
+
+        controllerMetrics.setGlobalTopicCount(topicCount);
+    }
+
+    private void replay(PartitionRecord record) {
+        TopicIdPartition tp = new TopicIdPartition(record.topicId(), record.partitionId());
+
+        PartitionState partitionState = new PartitionState(record.leader(), new PartitionAssignment(record.replicas()));
+        topicPartitions.put(tp, partitionState);
+
+        updateBasedOnPartitionState(tp, partitionState);
+
+        updateTopicAndPartitionMetrics();
+    }
+
+    private void replay(PartitionChangeRecord record) {
+        TopicIdPartition tp = new TopicIdPartition(record.topicId(), record.partitionId());
+        if (!topicPartitions.containsKey(tp)) {
+            throw new IllegalArgumentException(String.format("Unknown topic partitions %s", tp));
+        }
+
+        PartitionState partitionState = topicPartitions.computeIfPresent(
+            tp,
+            (key, oldValue) -> {
+                PartitionState newValue = oldValue;
+                // Update replicas
+                if (record.replicas() != null) {
+                    newValue = new PartitionState(newValue.leader(), new PartitionAssignment(record.replicas()));
+                }
+
+                if (record.leader() != NO_LEADER_CHANGE) {
+                    newValue = new PartitionState(record.leader(), newValue.assignment());
+                }
+
+                return newValue;
+            }
+        );
+
+        updateBasedOnPartitionState(tp, partitionState);
+
+        updateTopicAndPartitionMetrics();
+    }
+
+    private void replay(RemoveTopicRecord record) {
+        Uuid topicId = record.topicId();
+        Predicate<TopicIdPartition> matchesTopic = tp -> tp.topicId() == topicId;
+
+        topicCount--;
+        topicPartitions.keySet().removeIf(matchesTopic);
+        offlineTopicPartitions.removeIf(matchesTopic);
+        imbalancedTopicPartitions.removeIf(matchesTopic);
+
+        updateTopicAndPartitionMetrics();
+    }
+
+    void updateBasedOnPartitionState(TopicIdPartition tp, PartitionState partitionState) {
+        if (partitionState.leader() == NO_LEADER) {
+            offlineTopicPartitions.add(tp);
+        } else {
+            offlineTopicPartitions.remove(tp);
+        }
+
+        if (partitionState.leader() == partitionState.assignment().replicas().get(0)) {
+            imbalancedTopicPartitions.remove(tp);
+        } else {
+            imbalancedTopicPartitions.add(tp);
+        }
+    }
+
+    void updateTopicAndPartitionMetrics() {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio merged pull request #12994: KAFKA-14457; Controller metrics should only expose committed data

Posted by GitBox <gi...@apache.org>.
jsancio merged PR #12994:
URL: https://github.com/apache/kafka/pull/12994


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12994: KAFKA-14457; Controller metrics should only expose committed data

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12994:
URL: https://github.com/apache/kafka/pull/12994#discussion_r1052707902


##########
metadata/src/test/java/org/apache/kafka/controller/ControllerMetricsManagerTest.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalInt;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.junit.jupiter.api.Test;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+final class ControllerMetricsManagerTest {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rondagostino commented on a diff in pull request #12994: KAFKA-14457; Controller metrics should only expose committed data

Posted by GitBox <gi...@apache.org>.
rondagostino commented on code in PR #12994:
URL: https://github.com/apache/kafka/pull/12994#discussion_r1052472398


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -871,6 +871,9 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
                                 i++;
                             }
                         }
+
+                        controllerMetricsManager.replayBatch(batch.baseOffset(), messages);
+

Review Comment:
   A comment indicating that we must apply the entire batch or none of it -- and therefore we cannot replay each individual message above as is done for the snapshot loading case below -- might be helpful.
   
   Although, now that I write that, I wonder why there is an asymmetry.  Is there a reason why the snapshot case below couldn't -- shouldn't -- just update based on the entire batch as well?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rondagostino commented on a diff in pull request #12994: KAFKA-14457; Controller metrics should only expose committed data

Posted by GitBox <gi...@apache.org>.
rondagostino commented on code in PR #12994:
URL: https://github.com/apache/kafka/pull/12994#discussion_r1052461737


##########
metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.placement.PartitionAssignment;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * Type for updating controller metrics based on metadata records.
+ */
+final class ControllerMetricsManager {

Review Comment:
   Does this need to be `final` for a specific reason?  It makes mocking difficult, so unless there's a good reason for it I would tend to avoid doing it.
   
   At first I wondered why you didn't make this `public` -- but now I see that it is because we continue to pass in an instance of `ControllerMetrics`.  I now agree with keeping this package-private, though a comment caliing it out explicitly and explaining why might help others down the road.



##########
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##########
@@ -17,6 +17,11 @@
 
 package org.apache.kafka.controller;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.IntPredicate;

Review Comment:
   I see the same ordering difference in the below class's files within `metadata` (a few of which are already modified in this PR but without changing import order).  If we indeed want java packages first, then maybe fix the order on other places as well?
   
   AclControlManager
   ClientQuotaControlManager
   ClusterControlManager // modified in this PR
   ConfigurationControlManager
   PartitionReassignmentRevert
   QuorumController // modified in this PR
   QuorumFeatures
   ReplicationControlManager // modified in this PR
   ClientQuotaImage
   FeaturesImage
   RaftSnapshotWriter
   RecordListWriter
   ClusterMetadataAuthorizer
   StandardAclRecordIterator
   StandardAuthorizer
   StandardAuthorizerData
   BootstrapDirectory
   BatchFileReader
   SnapshotFileReader
   



##########
metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.placement.PartitionAssignment;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * Type for updating controller metrics based on metadata records.
+ */
+final class ControllerMetricsManager {
+    private final static class PartitionState {
+        final int leader;
+        final PartitionAssignment assignment;
+
+        PartitionState(int leader, PartitionAssignment assignment) {
+            this.leader = leader;
+            this.assignment = assignment;
+        }
+
+        int leader() {
+            return leader;
+        }
+
+        PartitionAssignment assignment() {
+            return assignment;
+        }
+    }
+
+    private final Set<Integer> registeredBrokers = new HashSet<>();
+
+    private final Set<Integer> fencedBrokers = new HashSet<>();
+
+    private int topicCount = 0;
+
+    private final Map<TopicIdPartition, PartitionState> topicPartitions = new HashMap<>();
+
+    private final Set<TopicIdPartition> offlineTopicPartitions = new HashSet<>();
+
+    private final Set<TopicIdPartition> imbalancedTopicPartitions = new HashSet<>();
+
+    private final ControllerMetrics controllerMetrics;
+
+    ControllerMetricsManager(ControllerMetrics controllerMetrics) {
+        this.controllerMetrics = controllerMetrics;
+    }
+
+    void replayBatch(long baseOffset, List<ApiMessageAndVersion> messages) {
+        int i = 1;
+        for (ApiMessageAndVersion message : messages) {
+            try {
+                replay(message.message());
+            } catch (Exception e) {
+                String failureMessage = String.format(
+                    "Unable to update controller metrics for %s record, it was %d of %d record(s) " +
+                    "in the batch with baseOffset %d.",
+                    message.message().getClass().getSimpleName(),
+                    i,
+                    messages.size(),
+                    baseOffset
+                );
+                throw new IllegalArgumentException(failureMessage, e);
+            }
+            i++;
+        }
+    }
+
+    /**
+     * Update controller metrics by replaying a metadata record.
+     *
+     * This method assumes that the provided ApiMessage is one of the type covered by MetadataRecordType.
+     *
+     * @param message a metadata record
+     */
+    @SuppressWarnings("checkstyle:cyclomaticComplexity")
+    void replay(ApiMessage message) {
+        MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
+        switch (type) {
+            case REGISTER_BROKER_RECORD:
+                replay((RegisterBrokerRecord) message);
+                break;
+            case UNREGISTER_BROKER_RECORD:
+                replay((UnregisterBrokerRecord) message);
+                break;
+            case FENCE_BROKER_RECORD:
+                replay((FenceBrokerRecord) message);
+                break;
+            case UNFENCE_BROKER_RECORD:
+                replay((UnfenceBrokerRecord) message);
+                break;
+            case BROKER_REGISTRATION_CHANGE_RECORD:
+                replay((BrokerRegistrationChangeRecord) message);
+                break;
+            case TOPIC_RECORD:
+                replay((TopicRecord) message);
+                break;
+            case PARTITION_RECORD:
+                replay((PartitionRecord) message);
+                break;
+            case PARTITION_CHANGE_RECORD:
+                replay((PartitionChangeRecord) message);
+                break;
+            case REMOVE_TOPIC_RECORD:
+                replay((RemoveTopicRecord) message);
+                break;
+            case CONFIG_RECORD:
+            case FEATURE_LEVEL_RECORD:
+            case CLIENT_QUOTA_RECORD:
+            case PRODUCER_IDS_RECORD:
+            case ACCESS_CONTROL_ENTRY_RECORD:
+            case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
+            case NO_OP_RECORD:
+                // These record types do not affect metrics
+                break;
+            default:
+                throw new RuntimeException("Unhandled record type " + type);
+        }
+    }
+
+    private void replay(RegisterBrokerRecord record) {
+        Integer brokerId = record.brokerId();
+        registeredBrokers.add(brokerId);
+        if (record.fenced()) {
+            fencedBrokers.add(brokerId);
+        } else {
+            fencedBrokers.remove(brokerId);
+        }
+
+        updateBrokerStateMetrics();
+    }
+
+    private void replay(UnregisterBrokerRecord record) {
+        Integer brokerId = record.brokerId();
+        registeredBrokers.remove(brokerId);
+        fencedBrokers.remove(brokerId);
+
+        updateBrokerStateMetrics();
+    }
+
+    private void replay(FenceBrokerRecord record) {
+        handleFencingChange(record.id(), BrokerRegistrationFencingChange.FENCE);
+    }
+
+    private void replay(UnfenceBrokerRecord record) {
+        handleFencingChange(record.id(), BrokerRegistrationFencingChange.UNFENCE);
+    }
+
+    private void replay(BrokerRegistrationChangeRecord record) {
+        BrokerRegistrationFencingChange fencingChange = BrokerRegistrationFencingChange
+            .fromValue(record.fenced())
+            .orElseThrow(() -> {
+                return new IllegalArgumentException(
+                    String.format(
+                        "Registration change record for %d has unknown value for fenced field: %x",
+                        record.brokerId(),
+                        record.fenced()
+                    )
+                );
+            });
+
+        handleFencingChange(record.brokerId(), fencingChange);
+    }
+
+    private void handleFencingChange(Integer brokerId, BrokerRegistrationFencingChange fencingChange) {
+        if (!registeredBrokers.contains(brokerId)) {
+            throw new IllegalArgumentException(String.format("Broker with id %s is not registered", brokerId));
+        }
+
+        if (fencingChange == BrokerRegistrationFencingChange.FENCE) {
+            fencedBrokers.add(brokerId);
+            updateBrokerStateMetrics();
+        } else if (fencingChange == BrokerRegistrationFencingChange.UNFENCE) {
+            fencedBrokers.remove(brokerId);
+            updateBrokerStateMetrics();
+        }
+    }
+
+    private void updateBrokerStateMetrics() {
+        controllerMetrics.setFencedBrokerCount(fencedBrokers.size());
+
+        Set<Integer> activeBrokers = new HashSet<>(registeredBrokers);
+        activeBrokers.removeAll(fencedBrokers);
+        controllerMetrics.setActiveBrokerCount(activeBrokers.size());
+    }
+
+    private void replay(TopicRecord record) {
+        topicCount++;
+
+        controllerMetrics.setGlobalTopicCount(topicCount);
+    }
+
+    private void replay(PartitionRecord record) {
+        TopicIdPartition tp = new TopicIdPartition(record.topicId(), record.partitionId());
+
+        PartitionState partitionState = new PartitionState(record.leader(), new PartitionAssignment(record.replicas()));
+        topicPartitions.put(tp, partitionState);
+
+        updateBasedOnPartitionState(tp, partitionState);
+
+        updateTopicAndPartitionMetrics();
+    }
+
+    private void replay(PartitionChangeRecord record) {
+        TopicIdPartition tp = new TopicIdPartition(record.topicId(), record.partitionId());
+        if (!topicPartitions.containsKey(tp)) {
+            throw new IllegalArgumentException(String.format("Unknown topic partitions %s", tp));
+        }
+
+        PartitionState partitionState = topicPartitions.computeIfPresent(
+            tp,
+            (key, oldValue) -> {
+                PartitionState newValue = oldValue;
+                // Update replicas
+                if (record.replicas() != null) {
+                    newValue = new PartitionState(newValue.leader(), new PartitionAssignment(record.replicas()));
+                }
+
+                if (record.leader() != NO_LEADER_CHANGE) {
+                    newValue = new PartitionState(record.leader(), newValue.assignment());
+                }
+
+                return newValue;
+            }
+        );
+
+        updateBasedOnPartitionState(tp, partitionState);
+
+        updateTopicAndPartitionMetrics();
+    }
+
+    private void replay(RemoveTopicRecord record) {
+        Uuid topicId = record.topicId();
+        Predicate<TopicIdPartition> matchesTopic = tp -> tp.topicId() == topicId;
+
+        topicCount--;
+        topicPartitions.keySet().removeIf(matchesTopic);
+        offlineTopicPartitions.removeIf(matchesTopic);
+        imbalancedTopicPartitions.removeIf(matchesTopic);
+
+        updateTopicAndPartitionMetrics();
+    }
+
+    void updateBasedOnPartitionState(TopicIdPartition tp, PartitionState partitionState) {
+        if (partitionState.leader() == NO_LEADER) {
+            offlineTopicPartitions.add(tp);
+        } else {
+            offlineTopicPartitions.remove(tp);
+        }
+
+        if (partitionState.leader() == partitionState.assignment().replicas().get(0)) {
+            imbalancedTopicPartitions.remove(tp);
+        } else {
+            imbalancedTopicPartitions.add(tp);
+        }
+    }
+
+    void updateTopicAndPartitionMetrics() {

Review Comment:
   `private`?



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -871,6 +871,9 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
                                 i++;
                             }
                         }
+
+                        controllerMetricsManager.replayBatch(batch.baseOffset(), messages);
+

Review Comment:
   A comment indicating that we must apply the entire batch or none of it -- and therefore we cannot replay each individual message above as is done for the snapshot loading case below -- might be helpful.
   
   Although, now that I write that, I wonder why there is an asymmetry.  Is there a reason why the snapshot case below couldn't --= shouldn't -- just update based on the entire batch as well?  Then the `replay()` method could remain package-private for testing purposes instead of being public as it sort of makes sense for records to always be committed as part of a batch (either a true batch or a "full snapshot" batch, correct?



##########
metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.placement.PartitionAssignment;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * Type for updating controller metrics based on metadata records.

Review Comment:
   Might be good to indicate that the class is not thread-safe and is expected to be used only from within the controller thread.
   
   Noting that `ControllerMetrics` must be thread-safe, which interestingly enough is not mentioned in that interface's Javadoc.  Maybe add a comment there?



##########
metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.placement.PartitionAssignment;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * Type for updating controller metrics based on metadata records.
+ */
+final class ControllerMetricsManager {
+    private final static class PartitionState {
+        final int leader;
+        final PartitionAssignment assignment;
+
+        PartitionState(int leader, PartitionAssignment assignment) {
+            this.leader = leader;
+            this.assignment = assignment;
+        }
+
+        int leader() {
+            return leader;
+        }
+
+        PartitionAssignment assignment() {
+            return assignment;
+        }
+    }
+
+    private final Set<Integer> registeredBrokers = new HashSet<>();
+
+    private final Set<Integer> fencedBrokers = new HashSet<>();
+
+    private int topicCount = 0;
+
+    private final Map<TopicIdPartition, PartitionState> topicPartitions = new HashMap<>();
+
+    private final Set<TopicIdPartition> offlineTopicPartitions = new HashSet<>();
+
+    private final Set<TopicIdPartition> imbalancedTopicPartitions = new HashSet<>();
+
+    private final ControllerMetrics controllerMetrics;
+
+    ControllerMetricsManager(ControllerMetrics controllerMetrics) {
+        this.controllerMetrics = controllerMetrics;
+    }
+
+    void replayBatch(long baseOffset, List<ApiMessageAndVersion> messages) {
+        int i = 1;
+        for (ApiMessageAndVersion message : messages) {
+            try {
+                replay(message.message());
+            } catch (Exception e) {
+                String failureMessage = String.format(
+                    "Unable to update controller metrics for %s record, it was %d of %d record(s) " +
+                    "in the batch with baseOffset %d.",
+                    message.message().getClass().getSimpleName(),
+                    i,
+                    messages.size(),
+                    baseOffset
+                );
+                throw new IllegalArgumentException(failureMessage, e);
+            }
+            i++;
+        }
+    }
+
+    /**
+     * Update controller metrics by replaying a metadata record.
+     *
+     * This method assumes that the provided ApiMessage is one of the type covered by MetadataRecordType.
+     *
+     * @param message a metadata record
+     */
+    @SuppressWarnings("checkstyle:cyclomaticComplexity")
+    void replay(ApiMessage message) {
+        MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
+        switch (type) {
+            case REGISTER_BROKER_RECORD:
+                replay((RegisterBrokerRecord) message);
+                break;
+            case UNREGISTER_BROKER_RECORD:
+                replay((UnregisterBrokerRecord) message);
+                break;
+            case FENCE_BROKER_RECORD:
+                replay((FenceBrokerRecord) message);
+                break;
+            case UNFENCE_BROKER_RECORD:
+                replay((UnfenceBrokerRecord) message);
+                break;
+            case BROKER_REGISTRATION_CHANGE_RECORD:
+                replay((BrokerRegistrationChangeRecord) message);
+                break;
+            case TOPIC_RECORD:
+                replay((TopicRecord) message);
+                break;
+            case PARTITION_RECORD:
+                replay((PartitionRecord) message);
+                break;
+            case PARTITION_CHANGE_RECORD:
+                replay((PartitionChangeRecord) message);
+                break;
+            case REMOVE_TOPIC_RECORD:
+                replay((RemoveTopicRecord) message);
+                break;
+            case CONFIG_RECORD:
+            case FEATURE_LEVEL_RECORD:
+            case CLIENT_QUOTA_RECORD:
+            case PRODUCER_IDS_RECORD:
+            case ACCESS_CONTROL_ENTRY_RECORD:
+            case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
+            case NO_OP_RECORD:
+                // These record types do not affect metrics
+                break;
+            default:
+                throw new RuntimeException("Unhandled record type " + type);
+        }
+    }
+
+    private void replay(RegisterBrokerRecord record) {
+        Integer brokerId = record.brokerId();
+        registeredBrokers.add(brokerId);
+        if (record.fenced()) {
+            fencedBrokers.add(brokerId);
+        } else {
+            fencedBrokers.remove(brokerId);
+        }
+
+        updateBrokerStateMetrics();
+    }
+
+    private void replay(UnregisterBrokerRecord record) {
+        Integer brokerId = record.brokerId();
+        registeredBrokers.remove(brokerId);
+        fencedBrokers.remove(brokerId);
+
+        updateBrokerStateMetrics();
+    }
+
+    private void replay(FenceBrokerRecord record) {
+        handleFencingChange(record.id(), BrokerRegistrationFencingChange.FENCE);
+    }
+
+    private void replay(UnfenceBrokerRecord record) {
+        handleFencingChange(record.id(), BrokerRegistrationFencingChange.UNFENCE);
+    }
+
+    private void replay(BrokerRegistrationChangeRecord record) {
+        BrokerRegistrationFencingChange fencingChange = BrokerRegistrationFencingChange
+            .fromValue(record.fenced())
+            .orElseThrow(() -> {
+                return new IllegalArgumentException(
+                    String.format(
+                        "Registration change record for %d has unknown value for fenced field: %x",
+                        record.brokerId(),
+                        record.fenced()
+                    )
+                );
+            });
+
+        handleFencingChange(record.brokerId(), fencingChange);
+    }
+
+    private void handleFencingChange(Integer brokerId, BrokerRegistrationFencingChange fencingChange) {
+        if (!registeredBrokers.contains(brokerId)) {
+            throw new IllegalArgumentException(String.format("Broker with id %s is not registered", brokerId));
+        }
+
+        if (fencingChange == BrokerRegistrationFencingChange.FENCE) {
+            fencedBrokers.add(brokerId);
+            updateBrokerStateMetrics();
+        } else if (fencingChange == BrokerRegistrationFencingChange.UNFENCE) {
+            fencedBrokers.remove(brokerId);
+            updateBrokerStateMetrics();
+        }
+    }
+
+    private void updateBrokerStateMetrics() {
+        controllerMetrics.setFencedBrokerCount(fencedBrokers.size());
+
+        Set<Integer> activeBrokers = new HashSet<>(registeredBrokers);
+        activeBrokers.removeAll(fencedBrokers);
+        controllerMetrics.setActiveBrokerCount(activeBrokers.size());
+    }
+
+    private void replay(TopicRecord record) {
+        topicCount++;
+
+        controllerMetrics.setGlobalTopicCount(topicCount);
+    }
+
+    private void replay(PartitionRecord record) {
+        TopicIdPartition tp = new TopicIdPartition(record.topicId(), record.partitionId());
+
+        PartitionState partitionState = new PartitionState(record.leader(), new PartitionAssignment(record.replicas()));
+        topicPartitions.put(tp, partitionState);
+
+        updateBasedOnPartitionState(tp, partitionState);
+
+        updateTopicAndPartitionMetrics();
+    }
+
+    private void replay(PartitionChangeRecord record) {
+        TopicIdPartition tp = new TopicIdPartition(record.topicId(), record.partitionId());
+        if (!topicPartitions.containsKey(tp)) {
+            throw new IllegalArgumentException(String.format("Unknown topic partitions %s", tp));
+        }
+
+        PartitionState partitionState = topicPartitions.computeIfPresent(
+            tp,
+            (key, oldValue) -> {
+                PartitionState newValue = oldValue;
+                // Update replicas
+                if (record.replicas() != null) {
+                    newValue = new PartitionState(newValue.leader(), new PartitionAssignment(record.replicas()));
+                }
+
+                if (record.leader() != NO_LEADER_CHANGE) {
+                    newValue = new PartitionState(record.leader(), newValue.assignment());
+                }
+
+                return newValue;
+            }
+        );
+
+        updateBasedOnPartitionState(tp, partitionState);
+
+        updateTopicAndPartitionMetrics();
+    }
+
+    private void replay(RemoveTopicRecord record) {
+        Uuid topicId = record.topicId();
+        Predicate<TopicIdPartition> matchesTopic = tp -> tp.topicId() == topicId;
+
+        topicCount--;
+        topicPartitions.keySet().removeIf(matchesTopic);
+        offlineTopicPartitions.removeIf(matchesTopic);
+        imbalancedTopicPartitions.removeIf(matchesTopic);
+
+        updateTopicAndPartitionMetrics();
+    }
+
+    void updateBasedOnPartitionState(TopicIdPartition tp, PartitionState partitionState) {

Review Comment:
   `private` I think since `PartitionState` is private.



##########
metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.placement.PartitionAssignment;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * Type for updating controller metrics based on metadata records.
+ */
+final class ControllerMetricsManager {
+    private final static class PartitionState {
+        final int leader;
+        final PartitionAssignment assignment;
+
+        PartitionState(int leader, PartitionAssignment assignment) {
+            this.leader = leader;
+            this.assignment = assignment;
+        }
+
+        int leader() {
+            return leader;
+        }
+
+        PartitionAssignment assignment() {
+            return assignment;
+        }
+    }
+
+    private final Set<Integer> registeredBrokers = new HashSet<>();
+
+    private final Set<Integer> fencedBrokers = new HashSet<>();
+
+    private int topicCount = 0;
+
+    private final Map<TopicIdPartition, PartitionState> topicPartitions = new HashMap<>();
+
+    private final Set<TopicIdPartition> offlineTopicPartitions = new HashSet<>();
+
+    private final Set<TopicIdPartition> imbalancedTopicPartitions = new HashSet<>();
+
+    private final ControllerMetrics controllerMetrics;
+
+    ControllerMetricsManager(ControllerMetrics controllerMetrics) {
+        this.controllerMetrics = controllerMetrics;
+    }
+
+    void replayBatch(long baseOffset, List<ApiMessageAndVersion> messages) {
+        int i = 1;
+        for (ApiMessageAndVersion message : messages) {
+            try {
+                replay(message.message());
+            } catch (Exception e) {
+                String failureMessage = String.format(
+                    "Unable to update controller metrics for %s record, it was %d of %d record(s) " +
+                    "in the batch with baseOffset %d.",
+                    message.message().getClass().getSimpleName(),
+                    i,
+                    messages.size(),
+                    baseOffset
+                );
+                throw new IllegalArgumentException(failureMessage, e);

Review Comment:
   Just wondering what our options are here.  Seems to me that ignoring the error applying the metric is one.  Given that we applied the record successfully but fail applying the change in metrics, might we prioritize availability over consistency here?  By throwing an exception we seem to have chosen consistency over availability, and I'm not sure that is the right choice.  WDYT?



##########
metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.placement.PartitionAssignment;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * Type for updating controller metrics based on metadata records.
+ */
+final class ControllerMetricsManager {
+    private final static class PartitionState {
+        final int leader;
+        final PartitionAssignment assignment;
+
+        PartitionState(int leader, PartitionAssignment assignment) {
+            this.leader = leader;
+            this.assignment = assignment;
+        }
+
+        int leader() {
+            return leader;
+        }
+
+        PartitionAssignment assignment() {
+            return assignment;
+        }
+    }
+
+    private final Set<Integer> registeredBrokers = new HashSet<>();
+
+    private final Set<Integer> fencedBrokers = new HashSet<>();
+
+    private int topicCount = 0;
+
+    private final Map<TopicIdPartition, PartitionState> topicPartitions = new HashMap<>();
+
+    private final Set<TopicIdPartition> offlineTopicPartitions = new HashSet<>();
+
+    private final Set<TopicIdPartition> imbalancedTopicPartitions = new HashSet<>();
+
+    private final ControllerMetrics controllerMetrics;
+
+    ControllerMetricsManager(ControllerMetrics controllerMetrics) {
+        this.controllerMetrics = controllerMetrics;
+    }
+
+    void replayBatch(long baseOffset, List<ApiMessageAndVersion> messages) {
+        int i = 1;
+        for (ApiMessageAndVersion message : messages) {
+            try {
+                replay(message.message());
+            } catch (Exception e) {
+                String failureMessage = String.format(
+                    "Unable to update controller metrics for %s record, it was %d of %d record(s) " +
+                    "in the batch with baseOffset %d.",
+                    message.message().getClass().getSimpleName(),
+                    i,
+                    messages.size(),
+                    baseOffset
+                );
+                throw new IllegalArgumentException(failureMessage, e);
+            }
+            i++;
+        }
+    }
+
+    /**
+     * Update controller metrics by replaying a metadata record.
+     *
+     * This method assumes that the provided ApiMessage is one of the type covered by MetadataRecordType.
+     *
+     * @param message a metadata record
+     */
+    @SuppressWarnings("checkstyle:cyclomaticComplexity")
+    void replay(ApiMessage message) {
+        MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
+        switch (type) {
+            case REGISTER_BROKER_RECORD:
+                replay((RegisterBrokerRecord) message);
+                break;
+            case UNREGISTER_BROKER_RECORD:
+                replay((UnregisterBrokerRecord) message);
+                break;
+            case FENCE_BROKER_RECORD:
+                replay((FenceBrokerRecord) message);
+                break;
+            case UNFENCE_BROKER_RECORD:
+                replay((UnfenceBrokerRecord) message);
+                break;
+            case BROKER_REGISTRATION_CHANGE_RECORD:
+                replay((BrokerRegistrationChangeRecord) message);
+                break;
+            case TOPIC_RECORD:
+                replay((TopicRecord) message);
+                break;
+            case PARTITION_RECORD:
+                replay((PartitionRecord) message);
+                break;
+            case PARTITION_CHANGE_RECORD:
+                replay((PartitionChangeRecord) message);
+                break;
+            case REMOVE_TOPIC_RECORD:
+                replay((RemoveTopicRecord) message);
+                break;
+            case CONFIG_RECORD:
+            case FEATURE_LEVEL_RECORD:
+            case CLIENT_QUOTA_RECORD:
+            case PRODUCER_IDS_RECORD:
+            case ACCESS_CONTROL_ENTRY_RECORD:
+            case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
+            case NO_OP_RECORD:
+                // These record types do not affect metrics
+                break;
+            default:
+                throw new RuntimeException("Unhandled record type " + type);
+        }
+    }
+
+    private void replay(RegisterBrokerRecord record) {
+        Integer brokerId = record.brokerId();
+        registeredBrokers.add(brokerId);
+        if (record.fenced()) {
+            fencedBrokers.add(brokerId);
+        } else {
+            fencedBrokers.remove(brokerId);
+        }
+
+        updateBrokerStateMetrics();
+    }
+
+    private void replay(UnregisterBrokerRecord record) {
+        Integer brokerId = record.brokerId();
+        registeredBrokers.remove(brokerId);
+        fencedBrokers.remove(brokerId);
+
+        updateBrokerStateMetrics();
+    }
+
+    private void replay(FenceBrokerRecord record) {
+        handleFencingChange(record.id(), BrokerRegistrationFencingChange.FENCE);
+    }
+
+    private void replay(UnfenceBrokerRecord record) {
+        handleFencingChange(record.id(), BrokerRegistrationFencingChange.UNFENCE);
+    }
+
+    private void replay(BrokerRegistrationChangeRecord record) {
+        BrokerRegistrationFencingChange fencingChange = BrokerRegistrationFencingChange
+            .fromValue(record.fenced())
+            .orElseThrow(() -> {
+                return new IllegalArgumentException(
+                    String.format(
+                        "Registration change record for %d has unknown value for fenced field: %x",
+                        record.brokerId(),
+                        record.fenced()
+                    )
+                );
+            });
+
+        handleFencingChange(record.brokerId(), fencingChange);
+    }
+
+    private void handleFencingChange(Integer brokerId, BrokerRegistrationFencingChange fencingChange) {
+        if (!registeredBrokers.contains(brokerId)) {
+            throw new IllegalArgumentException(String.format("Broker with id %s is not registered", brokerId));
+        }
+
+        if (fencingChange == BrokerRegistrationFencingChange.FENCE) {
+            fencedBrokers.add(brokerId);
+            updateBrokerStateMetrics();

Review Comment:
   nit: move `updateBrokerStateMetrics()` invocation out of the `if` block.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12994: KAFKA-14457; Controller metrics should only expose committed data

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12994:
URL: https://github.com/apache/kafka/pull/12994#discussion_r1052688881


##########
metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.placement.PartitionAssignment;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * Type for updating controller metrics based on metadata records.
+ */
+final class ControllerMetricsManager {
+    private final static class PartitionState {
+        final int leader;
+        final PartitionAssignment assignment;
+
+        PartitionState(int leader, PartitionAssignment assignment) {
+            this.leader = leader;
+            this.assignment = assignment;
+        }
+
+        int leader() {
+            return leader;
+        }
+
+        PartitionAssignment assignment() {
+            return assignment;
+        }
+    }
+
+    private final Set<Integer> registeredBrokers = new HashSet<>();
+
+    private final Set<Integer> fencedBrokers = new HashSet<>();
+
+    private int topicCount = 0;
+
+    private final Map<TopicIdPartition, PartitionState> topicPartitions = new HashMap<>();
+
+    private final Set<TopicIdPartition> offlineTopicPartitions = new HashSet<>();
+
+    private final Set<TopicIdPartition> imbalancedTopicPartitions = new HashSet<>();
+
+    private final ControllerMetrics controllerMetrics;
+
+    ControllerMetricsManager(ControllerMetrics controllerMetrics) {
+        this.controllerMetrics = controllerMetrics;
+    }
+
+    void replayBatch(long baseOffset, List<ApiMessageAndVersion> messages) {
+        int i = 1;
+        for (ApiMessageAndVersion message : messages) {
+            try {
+                replay(message.message());
+            } catch (Exception e) {
+                String failureMessage = String.format(
+                    "Unable to update controller metrics for %s record, it was %d of %d record(s) " +
+                    "in the batch with baseOffset %d.",
+                    message.message().getClass().getSimpleName(),
+                    i,
+                    messages.size(),
+                    baseOffset
+                );
+                throw new IllegalArgumentException(failureMessage, e);

Review Comment:
   For metadata, we have agreed that consistency is more important than availability. Inconsistent metadata in the controllers has an impact on the entire cluster.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rondagostino commented on a diff in pull request #12994: KAFKA-14457; Controller metrics should only expose committed data

Posted by GitBox <gi...@apache.org>.
rondagostino commented on code in PR #12994:
URL: https://github.com/apache/kafka/pull/12994#discussion_r1052472398


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -871,6 +871,9 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
                                 i++;
                             }
                         }
+
+                        controllerMetricsManager.replayBatch(batch.baseOffset(), messages);
+

Review Comment:
   A comment indicating that we must apply the entire batch or none of it -- and therefore we cannot replay each individual message above as is done for the snapshot loading case below -- might be helpful.
   
   Although, now that I write that, I wonder why there is an asymmetry.  Is there a reason why the snapshot case below couldn't --= shouldn't -- just update based on the entire batch as well?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12994: KAFKA-14457; Controller metrics should only expose committed data

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12994:
URL: https://github.com/apache/kafka/pull/12994#discussion_r1052646518


##########
metadata/src/main/java/org/apache/kafka/controller/PartitionChangeBuilder.java:
##########
@@ -17,6 +17,11 @@
 
 package org.apache.kafka.controller;
 
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.function.IntPredicate;

Review Comment:
   I think that is beyond the scope of this PR. We should do that in a follow up minor PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12994: KAFKA-14457; Controller metrics should only expose committed data

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12994:
URL: https://github.com/apache/kafka/pull/12994#discussion_r1052694125


##########
metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.placement.PartitionAssignment;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * Type for updating controller metrics based on metadata records.
+ */
+final class ControllerMetricsManager {
+    private final static class PartitionState {
+        final int leader;
+        final PartitionAssignment assignment;
+
+        PartitionState(int leader, PartitionAssignment assignment) {
+            this.leader = leader;
+            this.assignment = assignment;
+        }
+
+        int leader() {
+            return leader;
+        }
+
+        PartitionAssignment assignment() {
+            return assignment;
+        }
+    }
+
+    private final Set<Integer> registeredBrokers = new HashSet<>();
+
+    private final Set<Integer> fencedBrokers = new HashSet<>();
+
+    private int topicCount = 0;
+
+    private final Map<TopicIdPartition, PartitionState> topicPartitions = new HashMap<>();
+
+    private final Set<TopicIdPartition> offlineTopicPartitions = new HashSet<>();
+
+    private final Set<TopicIdPartition> imbalancedTopicPartitions = new HashSet<>();
+
+    private final ControllerMetrics controllerMetrics;
+
+    ControllerMetricsManager(ControllerMetrics controllerMetrics) {
+        this.controllerMetrics = controllerMetrics;
+    }
+
+    void replayBatch(long baseOffset, List<ApiMessageAndVersion> messages) {
+        int i = 1;
+        for (ApiMessageAndVersion message : messages) {
+            try {
+                replay(message.message());
+            } catch (Exception e) {
+                String failureMessage = String.format(
+                    "Unable to update controller metrics for %s record, it was %d of %d record(s) " +
+                    "in the batch with baseOffset %d.",
+                    message.message().getClass().getSimpleName(),
+                    i,
+                    messages.size(),
+                    baseOffset
+                );
+                throw new IllegalArgumentException(failureMessage, e);
+            }
+            i++;
+        }
+    }
+
+    /**
+     * Update controller metrics by replaying a metadata record.
+     *
+     * This method assumes that the provided ApiMessage is one of the type covered by MetadataRecordType.
+     *
+     * @param message a metadata record
+     */
+    @SuppressWarnings("checkstyle:cyclomaticComplexity")
+    void replay(ApiMessage message) {
+        MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
+        switch (type) {
+            case REGISTER_BROKER_RECORD:
+                replay((RegisterBrokerRecord) message);
+                break;
+            case UNREGISTER_BROKER_RECORD:
+                replay((UnregisterBrokerRecord) message);
+                break;
+            case FENCE_BROKER_RECORD:
+                replay((FenceBrokerRecord) message);
+                break;
+            case UNFENCE_BROKER_RECORD:
+                replay((UnfenceBrokerRecord) message);
+                break;
+            case BROKER_REGISTRATION_CHANGE_RECORD:
+                replay((BrokerRegistrationChangeRecord) message);
+                break;
+            case TOPIC_RECORD:
+                replay((TopicRecord) message);
+                break;
+            case PARTITION_RECORD:
+                replay((PartitionRecord) message);
+                break;
+            case PARTITION_CHANGE_RECORD:
+                replay((PartitionChangeRecord) message);
+                break;
+            case REMOVE_TOPIC_RECORD:
+                replay((RemoveTopicRecord) message);
+                break;
+            case CONFIG_RECORD:
+            case FEATURE_LEVEL_RECORD:
+            case CLIENT_QUOTA_RECORD:
+            case PRODUCER_IDS_RECORD:
+            case ACCESS_CONTROL_ENTRY_RECORD:
+            case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
+            case NO_OP_RECORD:
+                // These record types do not affect metrics
+                break;
+            default:
+                throw new RuntimeException("Unhandled record type " + type);
+        }
+    }
+
+    private void replay(RegisterBrokerRecord record) {
+        Integer brokerId = record.brokerId();
+        registeredBrokers.add(brokerId);
+        if (record.fenced()) {
+            fencedBrokers.add(brokerId);
+        } else {
+            fencedBrokers.remove(brokerId);
+        }
+
+        updateBrokerStateMetrics();
+    }
+
+    private void replay(UnregisterBrokerRecord record) {
+        Integer brokerId = record.brokerId();
+        registeredBrokers.remove(brokerId);
+        fencedBrokers.remove(brokerId);
+
+        updateBrokerStateMetrics();
+    }
+
+    private void replay(FenceBrokerRecord record) {
+        handleFencingChange(record.id(), BrokerRegistrationFencingChange.FENCE);
+    }
+
+    private void replay(UnfenceBrokerRecord record) {
+        handleFencingChange(record.id(), BrokerRegistrationFencingChange.UNFENCE);
+    }
+
+    private void replay(BrokerRegistrationChangeRecord record) {
+        BrokerRegistrationFencingChange fencingChange = BrokerRegistrationFencingChange
+            .fromValue(record.fenced())
+            .orElseThrow(() -> {
+                return new IllegalArgumentException(
+                    String.format(
+                        "Registration change record for %d has unknown value for fenced field: %x",
+                        record.brokerId(),
+                        record.fenced()
+                    )
+                );
+            });
+
+        handleFencingChange(record.brokerId(), fencingChange);
+    }
+
+    private void handleFencingChange(Integer brokerId, BrokerRegistrationFencingChange fencingChange) {
+        if (!registeredBrokers.contains(brokerId)) {
+            throw new IllegalArgumentException(String.format("Broker with id %s is not registered", brokerId));
+        }
+
+        if (fencingChange == BrokerRegistrationFencingChange.FENCE) {
+            fencedBrokers.add(brokerId);
+            updateBrokerStateMetrics();

Review Comment:
   `BrokerRegistrationFencingChange` has 3 possible values. `NONE` is the third value. In that case the broker state metrics didn't change. I added an `else` statement and a comment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rondagostino commented on a diff in pull request #12994: KAFKA-14457; Controller metrics should only expose committed data

Posted by GitBox <gi...@apache.org>.
rondagostino commented on code in PR #12994:
URL: https://github.com/apache/kafka/pull/12994#discussion_r1052658922


##########
metadata/src/main/java/org/apache/kafka/controller/ControllerMetricsManager.java:
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.MetadataRecordType;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.apache.kafka.metadata.placement.PartitionAssignment;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER_CHANGE;
+
+/**
+ * Type for updating controller metrics based on metadata records.
+ */
+final class ControllerMetricsManager {

Review Comment:
   Yeah, you are right, mocking with Mockito will work fine.  Sounds good to leave it final.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] rondagostino commented on a diff in pull request #12994: KAFKA-14457; Controller metrics should only expose committed data

Posted by GitBox <gi...@apache.org>.
rondagostino commented on code in PR #12994:
URL: https://github.com/apache/kafka/pull/12994#discussion_r1052506712


##########
metadata/src/test/java/org/apache/kafka/controller/ControllerMetricsManagerTest.java:
##########
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalInt;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.PartitionChangeRecord;
+import org.apache.kafka.common.metadata.PartitionRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
+import org.junit.jupiter.api.Test;
+import static org.apache.kafka.metadata.LeaderConstants.NO_LEADER;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+final class ControllerMetricsManagerTest {

Review Comment:
   Might want to have at least one method invoke `replayBatch()` with a batch of at least 2 records to ensure that it applies all the records.
   
   What do you think about testing `FENCE_BROKER_RECORD` and `UNFENCE_BROKER_RECORD`?  They have been superseded by `BROKER_REGISTRATION_CHANGE_RECORD`, but maybe best to add a quick test for them anyway?
   
   How about testing `UNREGISTER_BROKER_RECORD`?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] jsancio commented on a diff in pull request #12994: KAFKA-14457; Controller metrics should only expose committed data

Posted by GitBox <gi...@apache.org>.
jsancio commented on code in PR #12994:
URL: https://github.com/apache/kafka/pull/12994#discussion_r1052687763


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -871,6 +871,9 @@ public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
                                 i++;
                             }
                         }
+
+                        controllerMetricsManager.replayBatch(batch.baseOffset(), messages);
+

Review Comment:
   I added the two methods `replay` and `replayBatch` to avoid code duplication when replaying committed batches from the log segment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org