You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/30 22:11:23 UTC

[GitHub] [kafka] hachikuji commented on a change in pull request #10949: KAFKA-13019: Add MetadataImage and MetadataDelta classes for KRaft Snapshots

hachikuji commented on a change in pull request #10949:
URL: https://github.com/apache/kafka/pull/10949#discussion_r661735739



##########
File path: metadata/src/main/java/org/apache/kafka/image/ClientQuotaDelta.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.metadata.ClientQuotaRecord;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.Map.Entry;
+
+
+public final class ClientQuotaDelta {
+    private final ClientQuotaImage image;
+    private final Map<String, Optional<Double>> changes = new HashMap<>();

Review comment:
       You might consider using `OptionalDouble`.

##########
File path: metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntityData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntryData;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.protocol.Errors.INVALID_REQUEST;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+import static org.apache.kafka.common.requests.DescribeClientQuotasRequest.MATCH_TYPE_EXACT;
+import static org.apache.kafka.common.requests.DescribeClientQuotasRequest.MATCH_TYPE_DEFAULT;
+import static org.apache.kafka.common.requests.DescribeClientQuotasRequest.MATCH_TYPE_SPECIFIED;
+
+
+/**
+ * Represents the client quotas in the metadata image.
+ *
+ * This class is thread-safe.
+ */
+public final class ClientQuotasImage {
+    public final static ClientQuotasImage EMPTY = new ClientQuotasImage(Collections.emptyMap());
+
+    private final Map<ClientQuotaEntity, ClientQuotaImage> entities;
+
+    public ClientQuotasImage(Map<ClientQuotaEntity, ClientQuotaImage> entities) {
+        this.entities = entities;
+    }
+
+    public boolean isEmpty() {
+        return entities.isEmpty();
+    }
+
+    Map<ClientQuotaEntity, ClientQuotaImage> entities() {
+        return entities;
+    }
+
+    public void write(Consumer<List<ApiMessageAndVersion>> out) throws IOException {
+        for (Entry<ClientQuotaEntity, ClientQuotaImage> entry : entities.entrySet()) {
+            ClientQuotaEntity entity = entry.getKey();
+            ClientQuotaImage clientQuotaImage = entry.getValue();
+            clientQuotaImage.write(entity, out);
+        }
+    }
+
+    public DescribeClientQuotasResponseData describe(DescribeClientQuotasRequestData request) {
+        DescribeClientQuotasResponseData response = new DescribeClientQuotasResponseData();
+        Map<String, String> exactMatch = new HashMap<>();
+        Set<String> typeMatch = new HashSet<>();
+        for (DescribeClientQuotasRequestData.ComponentData component : request.components()) {
+            if (component.entityType().isEmpty()) {
+                response.setErrorCode(INVALID_REQUEST.code());
+                response.setErrorMessage("Invalid empty entity type.");
+                return response;
+            } else if (exactMatch.containsKey(component.entityType()) ||
+                    typeMatch.contains(component.entityType())) {
+                response.setErrorCode(INVALID_REQUEST.code());
+                response.setErrorMessage("Entity type " + component.entityType() +
+                    " cannot appear more than once in the filter.");
+                return response;
+            }
+            switch (component.matchType()) {
+                case MATCH_TYPE_EXACT:
+                    if (component.match() == null) {
+                        response.setErrorCode(INVALID_REQUEST.code());
+                        response.setErrorMessage("Request specified MATCH_TYPE_EXACT, " +
+                            "but set match string to null.");
+                        return response;
+                    }
+                    exactMatch.put(component.entityType(), component.match());
+                    break;
+                case MATCH_TYPE_DEFAULT:
+                    if (component.match() != null) {
+                        response.setErrorCode(INVALID_REQUEST.code());
+                        response.setErrorMessage("Request specified MATCH_TYPE_DEFAULT, " +
+                            "but also specified a match string.");
+                        return response;
+                    }
+                    exactMatch.put(component.entityType(), null);
+                    break;
+                case MATCH_TYPE_SPECIFIED:
+                    if (component.match() != null) {
+                        response.setErrorCode(INVALID_REQUEST.code());
+                        response.setErrorMessage("Request specified MATCH_TYPE_SPECIFIED, " +
+                            "but also specified a match string.");
+                        return response;
+                    }
+                    typeMatch.add(component.entityType());
+                    break;
+                default:
+                    response.setErrorCode(UNSUPPORTED_VERSION.code());
+                    response.setErrorMessage("Unknown match type " + component.matchType());
+                    return response;
+            }
+        }
+        // TODO: this is O(N). We should do some indexing here to speed it up.

Review comment:
       Should we file a JIRA for this? 

##########
File path: metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntityData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntryData;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.protocol.Errors.INVALID_REQUEST;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+import static org.apache.kafka.common.requests.DescribeClientQuotasRequest.MATCH_TYPE_EXACT;
+import static org.apache.kafka.common.requests.DescribeClientQuotasRequest.MATCH_TYPE_DEFAULT;
+import static org.apache.kafka.common.requests.DescribeClientQuotasRequest.MATCH_TYPE_SPECIFIED;
+
+
+/**
+ * Represents the client quotas in the metadata image.
+ *
+ * This class is thread-safe.
+ */
+public final class ClientQuotasImage {
+    public final static ClientQuotasImage EMPTY = new ClientQuotasImage(Collections.emptyMap());
+
+    private final Map<ClientQuotaEntity, ClientQuotaImage> entities;
+
+    public ClientQuotasImage(Map<ClientQuotaEntity, ClientQuotaImage> entities) {
+        this.entities = entities;
+    }
+
+    public boolean isEmpty() {
+        return entities.isEmpty();
+    }
+
+    Map<ClientQuotaEntity, ClientQuotaImage> entities() {
+        return entities;
+    }
+
+    public void write(Consumer<List<ApiMessageAndVersion>> out) throws IOException {
+        for (Entry<ClientQuotaEntity, ClientQuotaImage> entry : entities.entrySet()) {
+            ClientQuotaEntity entity = entry.getKey();
+            ClientQuotaImage clientQuotaImage = entry.getValue();
+            clientQuotaImage.write(entity, out);
+        }
+    }
+
+    public DescribeClientQuotasResponseData describe(DescribeClientQuotasRequestData request) {
+        DescribeClientQuotasResponseData response = new DescribeClientQuotasResponseData();
+        Map<String, String> exactMatch = new HashMap<>();
+        Set<String> typeMatch = new HashSet<>();
+        for (DescribeClientQuotasRequestData.ComponentData component : request.components()) {
+            if (component.entityType().isEmpty()) {
+                response.setErrorCode(INVALID_REQUEST.code());
+                response.setErrorMessage("Invalid empty entity type.");
+                return response;
+            } else if (exactMatch.containsKey(component.entityType()) ||
+                    typeMatch.contains(component.entityType())) {
+                response.setErrorCode(INVALID_REQUEST.code());
+                response.setErrorMessage("Entity type " + component.entityType() +
+                    " cannot appear more than once in the filter.");
+                return response;
+            }
+            switch (component.matchType()) {
+                case MATCH_TYPE_EXACT:
+                    if (component.match() == null) {
+                        response.setErrorCode(INVALID_REQUEST.code());
+                        response.setErrorMessage("Request specified MATCH_TYPE_EXACT, " +
+                            "but set match string to null.");
+                        return response;
+                    }
+                    exactMatch.put(component.entityType(), component.match());

Review comment:
       There may be a couple validations we have in `ZkAdminManager.describeClientQuotas` that do not appear here. For example, we detect invalid mixes of ip and user searches:
   ```
       if ((userComponent.isDefined || clientIdComponent.isDefined) && ipComponent.isDefined)
         throw new InvalidRequestException(s"Invalid entity filter component combination, IP filter component should not be used with " +
           s"user or clientId filter component.")
   ```

##########
File path: metadata/src/main/java/org/apache/kafka/image/ClientQuotaImage.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData.ValueData;
+import org.apache.kafka.common.metadata.ClientQuotaRecord;
+import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.metadata.MetadataRecordType.CLIENT_QUOTA_RECORD;
+
+
+/**
+ * Represents a quota for a client entity in the metadata image.
+ *
+ * This class is thread-safe.
+ */
+public final class ClientQuotaImage {
+    public final static ClientQuotaImage EMPTY = new ClientQuotaImage(Collections.emptyMap());
+
+    private final Map<String, Double> quotas;
+
+    public ClientQuotaImage(Map<String, Double> quotas) {
+        this.quotas = quotas;
+    }
+
+    Map<String, Double> quotas() {
+        return quotas;
+    }
+
+    public void write(ClientQuotaEntity entity, Consumer<List<ApiMessageAndVersion>> out) throws IOException {
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        for (Entry<String, Double> entry : quotas.entrySet()) {
+            records.add(new ApiMessageAndVersion(new ClientQuotaRecord().
+                setEntity(entityToData(entity)).
+                setKey(entry.getKey()).
+                setValue(entry.getValue()).
+                setRemove(false),
+                CLIENT_QUOTA_RECORD.highestSupportedVersion()));
+        }
+        out.accept(records);
+    }
+
+    public static List<EntityData> entityToData(ClientQuotaEntity entity) {
+        List<EntityData> entityData = new ArrayList<>(entity.entries().size());
+        for (Entry<String, String> entry : entity.entries().entrySet()) {
+            entityData.add(new EntityData().
+                setEntityType(entry.getKey()).
+                setEntityName(entry.getValue()));
+        }
+        return entityData;
+    }
+
+    public static ClientQuotaEntity dataToEntity(List<EntityData> entityData) {
+        Map<String, String> entries = new HashMap<>();
+        for (EntityData data : entityData) {
+            entries.put(data.entityType(), data.entityName());
+        }
+        return new ClientQuotaEntity(Collections.unmodifiableMap(entries));
+    }
+
+    public List<ValueData> toDescribeValues() {
+        List<ValueData> values = new ArrayList<>();
+        for (Entry<String, Double> entry : quotas.entrySet()) {
+            values.add(new ValueData().setKey(entry.getKey()).setValue(entry.getValue()));
+        }
+        return values;
+    }
+
+    public boolean isEmpty() {
+        return quotas.isEmpty();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof ClientQuotaImage)) return false;

Review comment:
       nit: maybe worth adding `this == o` to these `equals` implementations. This applies to all of the similar classes in this PR.

##########
File path: metadata/src/main/java/org/apache/kafka/image/ClientQuotaDelta.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.metadata.ClientQuotaRecord;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.Map.Entry;
+
+
+public final class ClientQuotaDelta {
+    private final ClientQuotaImage image;
+    private final Map<String, Optional<Double>> changes = new HashMap<>();
+
+    public ClientQuotaDelta(ClientQuotaImage image) {
+        this.image = image;
+    }
+
+    public Map<String, Optional<Double>> changes() {
+        return changes;
+    }
+
+    public void finishSnapshot() {
+        for (String key : image.quotas().keySet()) {
+            if (!changes.containsKey(key)) {
+                changes.put(key, Optional.empty());

Review comment:
       A little documentation about the purpose of these methods would help. It seems we use `Optional.empty` in `changes` to represent removal, but it is a tad obscure to why we use that to fill in missing entries here. I am probably just not understanding what the point of `finishSnapshot` is.

##########
File path: metadata/src/main/java/org/apache/kafka/image/TopicImage.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.metadata.MetadataRecordType.TOPIC_RECORD;
+
+
+/**
+ * Represents a topic in the metadata image.
+ *
+ * This class is thread-safe.
+ */
+public final class TopicImage {
+    private final String name;
+
+    private final Uuid id;
+
+    private final Map<Integer, PartitionRegistration> partitions;
+
+    public TopicImage(String name,
+                      Uuid id,
+                      Map<Integer, PartitionRegistration> partitions) {
+        this.name = name;
+        this.id = id;
+        this.partitions = partitions;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    public Uuid id() {
+        return id;
+    }
+
+    public Map<Integer, PartitionRegistration> partitions() {
+        return partitions;
+    }
+
+    public void write(Consumer<List<ApiMessageAndVersion>> out) throws IOException {

Review comment:
       nit: this doesn't throw IOException. You can remove all of these all the way up to `MetadataImage.write`.

##########
File path: metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.metadata.BrokerRegistration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+
+
+/**
+ * Represents changes to the cluster in the metadata image.
+ */
+public final class ClusterDelta {
+    private final ClusterImage image;
+    private final HashMap<Integer, Optional<BrokerRegistration>> changedBrokers = new HashMap<>();
+
+    public ClusterDelta(ClusterImage image) {
+        this.image = image;
+    }
+
+    public HashMap<Integer, Optional<BrokerRegistration>> changedBrokers() {
+        return changedBrokers;
+    }
+
+    public BrokerRegistration broker(int nodeId) {
+        Optional<BrokerRegistration> result = changedBrokers.get(nodeId);
+        if (result != null) {
+            return result.orElse(null);
+        }
+        return image.broker(nodeId);
+    }
+
+    public void finishSnapshot() {
+        for (Integer brokerId : image.brokers().keySet()) {
+            if (!changedBrokers.containsKey(brokerId)) {
+                changedBrokers.put(brokerId, Optional.empty());
+            }
+        }
+    }
+
+    public void replay(RegisterBrokerRecord record) {
+        BrokerRegistration broker = BrokerRegistration.fromRecord(record);
+        changedBrokers.put(broker.id(), Optional.of(broker));
+    }
+
+    public void replay(UnregisterBrokerRecord record) {
+        changedBrokers.put(record.brokerId(), Optional.empty());
+    }
+
+    public void replay(FenceBrokerRecord record) {
+        BrokerRegistration broker = broker(record.id());
+        if (broker == null) {
+            throw new RuntimeException("Tried to fence broker " + record.id() +
+                ", but that broker was not registered.");
+        }
+        changedBrokers.put(record.id(), Optional.of(broker.cloneWithFencing(true)));
+    }
+
+    public void replay(UnfenceBrokerRecord record) {
+        BrokerRegistration broker = broker(record.id());
+        if (broker == null) {
+            throw new RuntimeException("Tried to unfence broker " + record.id() +
+                ", but that broker was not registered.");
+        }
+        changedBrokers.put(record.id(), Optional.of(broker.cloneWithFencing(false)));
+    }
+
+    public void replay(BrokerRegistrationChangeRecord record) {
+        BrokerRegistration broker = broker(record.brokerId());
+        if (broker == null) {
+            throw new RuntimeException("Tried to change broker " + record.brokerId() +
+                ", but that broker was not registered.");
+        }
+        if (record.fenced() < 0) {

Review comment:
       It would be helpful to understand which validations we are free to skip. For example, `BrokerRegistrationChangeRecord` also indicates an epoch. Do we need to check that?

##########
File path: metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
##########
@@ -18,20 +18,39 @@
 package org.apache.kafka.metadata;
 
 import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerFeature;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static org.apache.kafka.common.metadata.MetadataRecordType.REGISTER_BROKER_RECORD;
+
+
 /**
  * An immutable class which represents broker registrations.
  */
 public class BrokerRegistration {
+    private static Map<String, Endpoint> listenersToMap(Collection<Endpoint> listeners) {
+        Map<String, Endpoint> listenersMap = new HashMap<>();
+        for (Endpoint endpoint : listeners) {
+            listenersMap.put(endpoint.listenerName().get(), endpoint);

Review comment:
       I think we are guaranteed to have the listener present, but perhaps it's worth checking explicitly and throwing if it is not the case.

##########
File path: metadata/src/main/java/org/apache/kafka/image/ClientQuotaImage.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData.ValueData;
+import org.apache.kafka.common.metadata.ClientQuotaRecord;
+import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.metadata.MetadataRecordType.CLIENT_QUOTA_RECORD;
+
+
+/**
+ * Represents a quota for a client entity in the metadata image.
+ *
+ * This class is thread-safe.
+ */
+public final class ClientQuotaImage {
+    public final static ClientQuotaImage EMPTY = new ClientQuotaImage(Collections.emptyMap());
+
+    private final Map<String, Double> quotas;
+
+    public ClientQuotaImage(Map<String, Double> quotas) {
+        this.quotas = quotas;
+    }
+
+    Map<String, Double> quotas() {
+        return quotas;
+    }
+
+    public void write(ClientQuotaEntity entity, Consumer<List<ApiMessageAndVersion>> out) throws IOException {
+        List<ApiMessageAndVersion> records = new ArrayList<>();

Review comment:
       nit: add a size? There are a few cases in here where we could do this.

##########
File path: metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.metadata.BrokerRegistration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+
+
+/**
+ * Represents changes to the cluster in the metadata image.
+ */
+public final class ClusterDelta {
+    private final ClusterImage image;
+    private final HashMap<Integer, Optional<BrokerRegistration>> changedBrokers = new HashMap<>();
+
+    public ClusterDelta(ClusterImage image) {
+        this.image = image;
+    }
+
+    public HashMap<Integer, Optional<BrokerRegistration>> changedBrokers() {
+        return changedBrokers;
+    }
+
+    public BrokerRegistration broker(int nodeId) {
+        Optional<BrokerRegistration> result = changedBrokers.get(nodeId);
+        if (result != null) {
+            return result.orElse(null);
+        }
+        return image.broker(nodeId);
+    }
+
+    public void finishSnapshot() {
+        for (Integer brokerId : image.brokers().keySet()) {
+            if (!changedBrokers.containsKey(brokerId)) {
+                changedBrokers.put(brokerId, Optional.empty());
+            }
+        }
+    }
+
+    public void replay(RegisterBrokerRecord record) {
+        BrokerRegistration broker = BrokerRegistration.fromRecord(record);
+        changedBrokers.put(broker.id(), Optional.of(broker));
+    }
+
+    public void replay(UnregisterBrokerRecord record) {
+        changedBrokers.put(record.brokerId(), Optional.empty());
+    }
+
+    public void replay(FenceBrokerRecord record) {
+        BrokerRegistration broker = broker(record.id());
+        if (broker == null) {
+            throw new RuntimeException("Tried to fence broker " + record.id() +
+                ", but that broker was not registered.");
+        }
+        changedBrokers.put(record.id(), Optional.of(broker.cloneWithFencing(true)));
+    }
+
+    public void replay(UnfenceBrokerRecord record) {
+        BrokerRegistration broker = broker(record.id());
+        if (broker == null) {

Review comment:
       nit: might be helpful adding a little helper since we do this a few times in here

##########
File path: metadata/src/main/java/org/apache/kafka/image/ClientQuotasDelta.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.metadata.ClientQuotaRecord;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+
+public final class ClientQuotasDelta {
+    private final ClientQuotasImage image;
+    private final Map<ClientQuotaEntity, ClientQuotaDelta> changes = new HashMap<>();
+
+    public ClientQuotasDelta(ClientQuotasImage image) {
+        this.image = image;
+    }
+
+    public Map<ClientQuotaEntity, ClientQuotaDelta> changes() {
+        return changes;
+    }
+
+    public void finishSnapshot() {
+        for (Entry<ClientQuotaEntity, ClientQuotaImage> entry : image.entities().entrySet()) {
+            ClientQuotaEntity entity = entry.getKey();
+            ClientQuotaImage quotaImage = entry.getValue();
+            ClientQuotaDelta quotaDelta = changes.computeIfAbsent(entity,
+                __ -> new ClientQuotaDelta(quotaImage));
+            quotaDelta.finishSnapshot();
+        }
+    }
+
+    public void replay(ClientQuotaRecord record) {
+        ClientQuotaEntity entity = ClientQuotaImage.dataToEntity(record.entity());
+        ClientQuotaDelta change = changes.get(entity);

Review comment:
       nit: I guess you could use `computeIfAbsent` here as well




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org