You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/30 18:30:16 UTC

[GitHub] [kafka] cmccabe opened a new pull request #10949: Add MetadataImage

cmccabe opened a new pull request #10949:
URL: https://github.com/apache/kafka/pull/10949


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10949: KAFKA-13019: Add MetadataImage and MetadataDelta classes for KRaft Snapshots

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10949:
URL: https://github.com/apache/kafka/pull/10949#discussion_r661849874



##########
File path: metadata/src/main/java/org/apache/kafka/image/ClientQuotaImage.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData.ValueData;
+import org.apache.kafka.common.metadata.ClientQuotaRecord;
+import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.metadata.MetadataRecordType.CLIENT_QUOTA_RECORD;
+
+
+/**
+ * Represents a quota for a client entity in the metadata image.
+ *
+ * This class is thread-safe.
+ */
+public final class ClientQuotaImage {
+    public final static ClientQuotaImage EMPTY = new ClientQuotaImage(Collections.emptyMap());
+
+    private final Map<String, Double> quotas;
+
+    public ClientQuotaImage(Map<String, Double> quotas) {
+        this.quotas = quotas;
+    }
+
+    Map<String, Double> quotas() {
+        return quotas;
+    }
+
+    public void write(ClientQuotaEntity entity, Consumer<List<ApiMessageAndVersion>> out) throws IOException {
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        for (Entry<String, Double> entry : quotas.entrySet()) {
+            records.add(new ApiMessageAndVersion(new ClientQuotaRecord().
+                setEntity(entityToData(entity)).
+                setKey(entry.getKey()).
+                setValue(entry.getValue()).
+                setRemove(false),
+                CLIENT_QUOTA_RECORD.highestSupportedVersion()));
+        }
+        out.accept(records);
+    }
+
+    public static List<EntityData> entityToData(ClientQuotaEntity entity) {
+        List<EntityData> entityData = new ArrayList<>(entity.entries().size());
+        for (Entry<String, String> entry : entity.entries().entrySet()) {
+            entityData.add(new EntityData().
+                setEntityType(entry.getKey()).
+                setEntityName(entry.getValue()));
+        }
+        return entityData;
+    }
+
+    public static ClientQuotaEntity dataToEntity(List<EntityData> entityData) {
+        Map<String, String> entries = new HashMap<>();
+        for (EntityData data : entityData) {
+            entries.put(data.entityType(), data.entityName());
+        }
+        return new ClientQuotaEntity(Collections.unmodifiableMap(entries));
+    }
+
+    public List<ValueData> toDescribeValues() {
+        List<ValueData> values = new ArrayList<>();
+        for (Entry<String, Double> entry : quotas.entrySet()) {
+            values.add(new ValueData().setKey(entry.getKey()).setValue(entry.getValue()));
+        }
+        return values;
+    }
+
+    public boolean isEmpty() {
+        return quotas.isEmpty();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof ClientQuotaImage)) return false;

Review comment:
       Do we know how significant this optimization is? I can't think offhand of a situation where we'd compare one of these objects to itself. Equals is mainly (only?) here for unit tests.
   
   hashCode and toString are present because of the rule of three (if you have any one of equals, hashCode, toString, you must have all of them).
   
   Can't wait for Java14 Records classes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10949: KAFKA-13019: Add MetadataImage and MetadataDelta classes for KRaft Snapshots

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10949:
URL: https://github.com/apache/kafka/pull/10949#discussion_r661857990



##########
File path: metadata/src/main/java/org/apache/kafka/image/ClientQuotaDelta.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.metadata.ClientQuotaRecord;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.Map.Entry;
+
+
+public final class ClientQuotaDelta {
+    private final ClientQuotaImage image;
+    private final Map<String, Optional<Double>> changes = new HashMap<>();
+
+    public ClientQuotaDelta(ClientQuotaImage image) {
+        this.image = image;
+    }
+
+    public Map<String, Optional<Double>> changes() {
+        return changes;
+    }
+
+    public void finishSnapshot() {
+        for (String key : image.quotas().keySet()) {
+            if (!changes.containsKey(key)) {
+                changes.put(key, Optional.empty());

Review comment:
       I added this comment:
   ```
                   // If a quota from the image did not appear in the snapshot, mark it as removed.
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10949: KAFKA-13019: Add MetadataImage and MetadataDelta classes for KRaft Snapshots

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10949:
URL: https://github.com/apache/kafka/pull/10949#discussion_r661869382



##########
File path: metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.metadata.BrokerRegistration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+
+
+/**
+ * Represents changes to the cluster in the metadata image.
+ */
+public final class ClusterDelta {
+    private final ClusterImage image;
+    private final HashMap<Integer, Optional<BrokerRegistration>> changedBrokers = new HashMap<>();
+
+    public ClusterDelta(ClusterImage image) {
+        this.image = image;
+    }
+
+    public HashMap<Integer, Optional<BrokerRegistration>> changedBrokers() {
+        return changedBrokers;
+    }
+
+    public BrokerRegistration broker(int nodeId) {
+        Optional<BrokerRegistration> result = changedBrokers.get(nodeId);
+        if (result != null) {
+            return result.orElse(null);
+        }
+        return image.broker(nodeId);
+    }
+
+    public void finishSnapshot() {
+        for (Integer brokerId : image.brokers().keySet()) {
+            if (!changedBrokers.containsKey(brokerId)) {
+                changedBrokers.put(brokerId, Optional.empty());
+            }
+        }
+    }
+
+    public void replay(RegisterBrokerRecord record) {
+        BrokerRegistration broker = BrokerRegistration.fromRecord(record);
+        changedBrokers.put(broker.id(), Optional.of(broker));
+    }
+
+    public void replay(UnregisterBrokerRecord record) {
+        changedBrokers.put(record.brokerId(), Optional.empty());
+    }
+
+    public void replay(FenceBrokerRecord record) {
+        BrokerRegistration broker = broker(record.id());
+        if (broker == null) {
+            throw new RuntimeException("Tried to fence broker " + record.id() +
+                ", but that broker was not registered.");
+        }
+        changedBrokers.put(record.id(), Optional.of(broker.cloneWithFencing(true)));
+    }
+
+    public void replay(UnfenceBrokerRecord record) {
+        BrokerRegistration broker = broker(record.id());
+        if (broker == null) {
+            throw new RuntimeException("Tried to unfence broker " + record.id() +
+                ", but that broker was not registered.");
+        }
+        changedBrokers.put(record.id(), Optional.of(broker.cloneWithFencing(false)));
+    }
+
+    public void replay(BrokerRegistrationChangeRecord record) {
+        BrokerRegistration broker = broker(record.brokerId());
+        if (broker == null) {
+            throw new RuntimeException("Tried to change broker " + record.brokerId() +
+                ", but that broker was not registered.");
+        }
+        if (record.fenced() < 0) {

Review comment:
       Let's validate the epoch for each




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10949: KAFKA-13019: Add MetadataImage and MetadataDelta classes for KRaft Snapshots

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10949:
URL: https://github.com/apache/kafka/pull/10949#discussion_r661849874



##########
File path: metadata/src/main/java/org/apache/kafka/image/ClientQuotaImage.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData.ValueData;
+import org.apache.kafka.common.metadata.ClientQuotaRecord;
+import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.metadata.MetadataRecordType.CLIENT_QUOTA_RECORD;
+
+
+/**
+ * Represents a quota for a client entity in the metadata image.
+ *
+ * This class is thread-safe.
+ */
+public final class ClientQuotaImage {
+    public final static ClientQuotaImage EMPTY = new ClientQuotaImage(Collections.emptyMap());
+
+    private final Map<String, Double> quotas;
+
+    public ClientQuotaImage(Map<String, Double> quotas) {
+        this.quotas = quotas;
+    }
+
+    Map<String, Double> quotas() {
+        return quotas;
+    }
+
+    public void write(ClientQuotaEntity entity, Consumer<List<ApiMessageAndVersion>> out) throws IOException {
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        for (Entry<String, Double> entry : quotas.entrySet()) {
+            records.add(new ApiMessageAndVersion(new ClientQuotaRecord().
+                setEntity(entityToData(entity)).
+                setKey(entry.getKey()).
+                setValue(entry.getValue()).
+                setRemove(false),
+                CLIENT_QUOTA_RECORD.highestSupportedVersion()));
+        }
+        out.accept(records);
+    }
+
+    public static List<EntityData> entityToData(ClientQuotaEntity entity) {
+        List<EntityData> entityData = new ArrayList<>(entity.entries().size());
+        for (Entry<String, String> entry : entity.entries().entrySet()) {
+            entityData.add(new EntityData().
+                setEntityType(entry.getKey()).
+                setEntityName(entry.getValue()));
+        }
+        return entityData;
+    }
+
+    public static ClientQuotaEntity dataToEntity(List<EntityData> entityData) {
+        Map<String, String> entries = new HashMap<>();
+        for (EntityData data : entityData) {
+            entries.put(data.entityType(), data.entityName());
+        }
+        return new ClientQuotaEntity(Collections.unmodifiableMap(entries));
+    }
+
+    public List<ValueData> toDescribeValues() {
+        List<ValueData> values = new ArrayList<>();
+        for (Entry<String, Double> entry : quotas.entrySet()) {
+            values.add(new ValueData().setKey(entry.getKey()).setValue(entry.getValue()));
+        }
+        return values;
+    }
+
+    public boolean isEmpty() {
+        return quotas.isEmpty();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof ClientQuotaImage)) return false;

Review comment:
       The `equals` implementations are only used in unit tests currently, so maybe best to keep it simple for now?
   
   I really wish we had those Java 14 records classes.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #10949: KAFKA-13019: Add MetadataImage and MetadataDelta classes for KRaft Snapshots

Posted by GitBox <gi...@apache.org>.
cmccabe merged pull request #10949:
URL: https://github.com/apache/kafka/pull/10949


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10949: KAFKA-13019: Add MetadataImage and MetadataDelta classes for KRaft Snapshots

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10949:
URL: https://github.com/apache/kafka/pull/10949#discussion_r661846974



##########
File path: metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.metadata.BrokerRegistration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+
+
+/**
+ * Represents changes to the cluster in the metadata image.
+ */
+public final class ClusterDelta {
+    private final ClusterImage image;
+    private final HashMap<Integer, Optional<BrokerRegistration>> changedBrokers = new HashMap<>();
+
+    public ClusterDelta(ClusterImage image) {
+        this.image = image;
+    }
+
+    public HashMap<Integer, Optional<BrokerRegistration>> changedBrokers() {
+        return changedBrokers;
+    }
+
+    public BrokerRegistration broker(int nodeId) {
+        Optional<BrokerRegistration> result = changedBrokers.get(nodeId);
+        if (result != null) {
+            return result.orElse(null);
+        }
+        return image.broker(nodeId);
+    }
+
+    public void finishSnapshot() {
+        for (Integer brokerId : image.brokers().keySet()) {
+            if (!changedBrokers.containsKey(brokerId)) {
+                changedBrokers.put(brokerId, Optional.empty());
+            }
+        }
+    }
+
+    public void replay(RegisterBrokerRecord record) {
+        BrokerRegistration broker = BrokerRegistration.fromRecord(record);
+        changedBrokers.put(broker.id(), Optional.of(broker));
+    }
+
+    public void replay(UnregisterBrokerRecord record) {
+        changedBrokers.put(record.brokerId(), Optional.empty());
+    }
+
+    public void replay(FenceBrokerRecord record) {
+        BrokerRegistration broker = broker(record.id());
+        if (broker == null) {
+            throw new RuntimeException("Tried to fence broker " + record.id() +

Review comment:
       Yes, it should be impossible to get into this state unless records are replayed in incorrect order or there are missing records, etc. I can use `IllegalStateException` if that makes it clearer.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10949: KAFKA-13019: Add MetadataImage and MetadataDelta classes for KRaft Snapshots

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10949:
URL: https://github.com/apache/kafka/pull/10949#discussion_r661735739



##########
File path: metadata/src/main/java/org/apache/kafka/image/ClientQuotaDelta.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.metadata.ClientQuotaRecord;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.Map.Entry;
+
+
+public final class ClientQuotaDelta {
+    private final ClientQuotaImage image;
+    private final Map<String, Optional<Double>> changes = new HashMap<>();

Review comment:
       You might consider using `OptionalDouble`.

##########
File path: metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntityData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntryData;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.protocol.Errors.INVALID_REQUEST;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+import static org.apache.kafka.common.requests.DescribeClientQuotasRequest.MATCH_TYPE_EXACT;
+import static org.apache.kafka.common.requests.DescribeClientQuotasRequest.MATCH_TYPE_DEFAULT;
+import static org.apache.kafka.common.requests.DescribeClientQuotasRequest.MATCH_TYPE_SPECIFIED;
+
+
+/**
+ * Represents the client quotas in the metadata image.
+ *
+ * This class is thread-safe.
+ */
+public final class ClientQuotasImage {
+    public final static ClientQuotasImage EMPTY = new ClientQuotasImage(Collections.emptyMap());
+
+    private final Map<ClientQuotaEntity, ClientQuotaImage> entities;
+
+    public ClientQuotasImage(Map<ClientQuotaEntity, ClientQuotaImage> entities) {
+        this.entities = entities;
+    }
+
+    public boolean isEmpty() {
+        return entities.isEmpty();
+    }
+
+    Map<ClientQuotaEntity, ClientQuotaImage> entities() {
+        return entities;
+    }
+
+    public void write(Consumer<List<ApiMessageAndVersion>> out) throws IOException {
+        for (Entry<ClientQuotaEntity, ClientQuotaImage> entry : entities.entrySet()) {
+            ClientQuotaEntity entity = entry.getKey();
+            ClientQuotaImage clientQuotaImage = entry.getValue();
+            clientQuotaImage.write(entity, out);
+        }
+    }
+
+    public DescribeClientQuotasResponseData describe(DescribeClientQuotasRequestData request) {
+        DescribeClientQuotasResponseData response = new DescribeClientQuotasResponseData();
+        Map<String, String> exactMatch = new HashMap<>();
+        Set<String> typeMatch = new HashSet<>();
+        for (DescribeClientQuotasRequestData.ComponentData component : request.components()) {
+            if (component.entityType().isEmpty()) {
+                response.setErrorCode(INVALID_REQUEST.code());
+                response.setErrorMessage("Invalid empty entity type.");
+                return response;
+            } else if (exactMatch.containsKey(component.entityType()) ||
+                    typeMatch.contains(component.entityType())) {
+                response.setErrorCode(INVALID_REQUEST.code());
+                response.setErrorMessage("Entity type " + component.entityType() +
+                    " cannot appear more than once in the filter.");
+                return response;
+            }
+            switch (component.matchType()) {
+                case MATCH_TYPE_EXACT:
+                    if (component.match() == null) {
+                        response.setErrorCode(INVALID_REQUEST.code());
+                        response.setErrorMessage("Request specified MATCH_TYPE_EXACT, " +
+                            "but set match string to null.");
+                        return response;
+                    }
+                    exactMatch.put(component.entityType(), component.match());
+                    break;
+                case MATCH_TYPE_DEFAULT:
+                    if (component.match() != null) {
+                        response.setErrorCode(INVALID_REQUEST.code());
+                        response.setErrorMessage("Request specified MATCH_TYPE_DEFAULT, " +
+                            "but also specified a match string.");
+                        return response;
+                    }
+                    exactMatch.put(component.entityType(), null);
+                    break;
+                case MATCH_TYPE_SPECIFIED:
+                    if (component.match() != null) {
+                        response.setErrorCode(INVALID_REQUEST.code());
+                        response.setErrorMessage("Request specified MATCH_TYPE_SPECIFIED, " +
+                            "but also specified a match string.");
+                        return response;
+                    }
+                    typeMatch.add(component.entityType());
+                    break;
+                default:
+                    response.setErrorCode(UNSUPPORTED_VERSION.code());
+                    response.setErrorMessage("Unknown match type " + component.matchType());
+                    return response;
+            }
+        }
+        // TODO: this is O(N). We should do some indexing here to speed it up.

Review comment:
       Should we file a JIRA for this? 

##########
File path: metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntityData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntryData;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.protocol.Errors.INVALID_REQUEST;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+import static org.apache.kafka.common.requests.DescribeClientQuotasRequest.MATCH_TYPE_EXACT;
+import static org.apache.kafka.common.requests.DescribeClientQuotasRequest.MATCH_TYPE_DEFAULT;
+import static org.apache.kafka.common.requests.DescribeClientQuotasRequest.MATCH_TYPE_SPECIFIED;
+
+
+/**
+ * Represents the client quotas in the metadata image.
+ *
+ * This class is thread-safe.
+ */
+public final class ClientQuotasImage {
+    public final static ClientQuotasImage EMPTY = new ClientQuotasImage(Collections.emptyMap());
+
+    private final Map<ClientQuotaEntity, ClientQuotaImage> entities;
+
+    public ClientQuotasImage(Map<ClientQuotaEntity, ClientQuotaImage> entities) {
+        this.entities = entities;
+    }
+
+    public boolean isEmpty() {
+        return entities.isEmpty();
+    }
+
+    Map<ClientQuotaEntity, ClientQuotaImage> entities() {
+        return entities;
+    }
+
+    public void write(Consumer<List<ApiMessageAndVersion>> out) throws IOException {
+        for (Entry<ClientQuotaEntity, ClientQuotaImage> entry : entities.entrySet()) {
+            ClientQuotaEntity entity = entry.getKey();
+            ClientQuotaImage clientQuotaImage = entry.getValue();
+            clientQuotaImage.write(entity, out);
+        }
+    }
+
+    public DescribeClientQuotasResponseData describe(DescribeClientQuotasRequestData request) {
+        DescribeClientQuotasResponseData response = new DescribeClientQuotasResponseData();
+        Map<String, String> exactMatch = new HashMap<>();
+        Set<String> typeMatch = new HashSet<>();
+        for (DescribeClientQuotasRequestData.ComponentData component : request.components()) {
+            if (component.entityType().isEmpty()) {
+                response.setErrorCode(INVALID_REQUEST.code());
+                response.setErrorMessage("Invalid empty entity type.");
+                return response;
+            } else if (exactMatch.containsKey(component.entityType()) ||
+                    typeMatch.contains(component.entityType())) {
+                response.setErrorCode(INVALID_REQUEST.code());
+                response.setErrorMessage("Entity type " + component.entityType() +
+                    " cannot appear more than once in the filter.");
+                return response;
+            }
+            switch (component.matchType()) {
+                case MATCH_TYPE_EXACT:
+                    if (component.match() == null) {
+                        response.setErrorCode(INVALID_REQUEST.code());
+                        response.setErrorMessage("Request specified MATCH_TYPE_EXACT, " +
+                            "but set match string to null.");
+                        return response;
+                    }
+                    exactMatch.put(component.entityType(), component.match());

Review comment:
       There may be a couple validations we have in `ZkAdminManager.describeClientQuotas` that do not appear here. For example, we detect invalid mixes of ip and user searches:
   ```
       if ((userComponent.isDefined || clientIdComponent.isDefined) && ipComponent.isDefined)
         throw new InvalidRequestException(s"Invalid entity filter component combination, IP filter component should not be used with " +
           s"user or clientId filter component.")
   ```

##########
File path: metadata/src/main/java/org/apache/kafka/image/ClientQuotaImage.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData.ValueData;
+import org.apache.kafka.common.metadata.ClientQuotaRecord;
+import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.metadata.MetadataRecordType.CLIENT_QUOTA_RECORD;
+
+
+/**
+ * Represents a quota for a client entity in the metadata image.
+ *
+ * This class is thread-safe.
+ */
+public final class ClientQuotaImage {
+    public final static ClientQuotaImage EMPTY = new ClientQuotaImage(Collections.emptyMap());
+
+    private final Map<String, Double> quotas;
+
+    public ClientQuotaImage(Map<String, Double> quotas) {
+        this.quotas = quotas;
+    }
+
+    Map<String, Double> quotas() {
+        return quotas;
+    }
+
+    public void write(ClientQuotaEntity entity, Consumer<List<ApiMessageAndVersion>> out) throws IOException {
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        for (Entry<String, Double> entry : quotas.entrySet()) {
+            records.add(new ApiMessageAndVersion(new ClientQuotaRecord().
+                setEntity(entityToData(entity)).
+                setKey(entry.getKey()).
+                setValue(entry.getValue()).
+                setRemove(false),
+                CLIENT_QUOTA_RECORD.highestSupportedVersion()));
+        }
+        out.accept(records);
+    }
+
+    public static List<EntityData> entityToData(ClientQuotaEntity entity) {
+        List<EntityData> entityData = new ArrayList<>(entity.entries().size());
+        for (Entry<String, String> entry : entity.entries().entrySet()) {
+            entityData.add(new EntityData().
+                setEntityType(entry.getKey()).
+                setEntityName(entry.getValue()));
+        }
+        return entityData;
+    }
+
+    public static ClientQuotaEntity dataToEntity(List<EntityData> entityData) {
+        Map<String, String> entries = new HashMap<>();
+        for (EntityData data : entityData) {
+            entries.put(data.entityType(), data.entityName());
+        }
+        return new ClientQuotaEntity(Collections.unmodifiableMap(entries));
+    }
+
+    public List<ValueData> toDescribeValues() {
+        List<ValueData> values = new ArrayList<>();
+        for (Entry<String, Double> entry : quotas.entrySet()) {
+            values.add(new ValueData().setKey(entry.getKey()).setValue(entry.getValue()));
+        }
+        return values;
+    }
+
+    public boolean isEmpty() {
+        return quotas.isEmpty();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof ClientQuotaImage)) return false;

Review comment:
       nit: maybe worth adding `this == o` to these `equals` implementations. This applies to all of the similar classes in this PR.

##########
File path: metadata/src/main/java/org/apache/kafka/image/ClientQuotaDelta.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.metadata.ClientQuotaRecord;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.Map.Entry;
+
+
+public final class ClientQuotaDelta {
+    private final ClientQuotaImage image;
+    private final Map<String, Optional<Double>> changes = new HashMap<>();
+
+    public ClientQuotaDelta(ClientQuotaImage image) {
+        this.image = image;
+    }
+
+    public Map<String, Optional<Double>> changes() {
+        return changes;
+    }
+
+    public void finishSnapshot() {
+        for (String key : image.quotas().keySet()) {
+            if (!changes.containsKey(key)) {
+                changes.put(key, Optional.empty());

Review comment:
       A little documentation about the purpose of these methods would help. It seems we use `Optional.empty` in `changes` to represent removal, but it is a tad obscure to why we use that to fill in missing entries here. I am probably just not understanding what the point of `finishSnapshot` is.

##########
File path: metadata/src/main/java/org/apache/kafka/image/TopicImage.java
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.metadata.PartitionRegistration;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.metadata.MetadataRecordType.TOPIC_RECORD;
+
+
+/**
+ * Represents a topic in the metadata image.
+ *
+ * This class is thread-safe.
+ */
+public final class TopicImage {
+    private final String name;
+
+    private final Uuid id;
+
+    private final Map<Integer, PartitionRegistration> partitions;
+
+    public TopicImage(String name,
+                      Uuid id,
+                      Map<Integer, PartitionRegistration> partitions) {
+        this.name = name;
+        this.id = id;
+        this.partitions = partitions;
+    }
+
+    public String name() {
+        return name;
+    }
+
+    public Uuid id() {
+        return id;
+    }
+
+    public Map<Integer, PartitionRegistration> partitions() {
+        return partitions;
+    }
+
+    public void write(Consumer<List<ApiMessageAndVersion>> out) throws IOException {

Review comment:
       nit: this doesn't throw IOException. You can remove all of these all the way up to `MetadataImage.write`.

##########
File path: metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.metadata.BrokerRegistration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+
+
+/**
+ * Represents changes to the cluster in the metadata image.
+ */
+public final class ClusterDelta {
+    private final ClusterImage image;
+    private final HashMap<Integer, Optional<BrokerRegistration>> changedBrokers = new HashMap<>();
+
+    public ClusterDelta(ClusterImage image) {
+        this.image = image;
+    }
+
+    public HashMap<Integer, Optional<BrokerRegistration>> changedBrokers() {
+        return changedBrokers;
+    }
+
+    public BrokerRegistration broker(int nodeId) {
+        Optional<BrokerRegistration> result = changedBrokers.get(nodeId);
+        if (result != null) {
+            return result.orElse(null);
+        }
+        return image.broker(nodeId);
+    }
+
+    public void finishSnapshot() {
+        for (Integer brokerId : image.brokers().keySet()) {
+            if (!changedBrokers.containsKey(brokerId)) {
+                changedBrokers.put(brokerId, Optional.empty());
+            }
+        }
+    }
+
+    public void replay(RegisterBrokerRecord record) {
+        BrokerRegistration broker = BrokerRegistration.fromRecord(record);
+        changedBrokers.put(broker.id(), Optional.of(broker));
+    }
+
+    public void replay(UnregisterBrokerRecord record) {
+        changedBrokers.put(record.brokerId(), Optional.empty());
+    }
+
+    public void replay(FenceBrokerRecord record) {
+        BrokerRegistration broker = broker(record.id());
+        if (broker == null) {
+            throw new RuntimeException("Tried to fence broker " + record.id() +
+                ", but that broker was not registered.");
+        }
+        changedBrokers.put(record.id(), Optional.of(broker.cloneWithFencing(true)));
+    }
+
+    public void replay(UnfenceBrokerRecord record) {
+        BrokerRegistration broker = broker(record.id());
+        if (broker == null) {
+            throw new RuntimeException("Tried to unfence broker " + record.id() +
+                ", but that broker was not registered.");
+        }
+        changedBrokers.put(record.id(), Optional.of(broker.cloneWithFencing(false)));
+    }
+
+    public void replay(BrokerRegistrationChangeRecord record) {
+        BrokerRegistration broker = broker(record.brokerId());
+        if (broker == null) {
+            throw new RuntimeException("Tried to change broker " + record.brokerId() +
+                ", but that broker was not registered.");
+        }
+        if (record.fenced() < 0) {

Review comment:
       It would be helpful to understand which validations we are free to skip. For example, `BrokerRegistrationChangeRecord` also indicates an epoch. Do we need to check that?

##########
File path: metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
##########
@@ -18,20 +18,39 @@
 package org.apache.kafka.metadata;
 
 import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerFeature;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static org.apache.kafka.common.metadata.MetadataRecordType.REGISTER_BROKER_RECORD;
+
+
 /**
  * An immutable class which represents broker registrations.
  */
 public class BrokerRegistration {
+    private static Map<String, Endpoint> listenersToMap(Collection<Endpoint> listeners) {
+        Map<String, Endpoint> listenersMap = new HashMap<>();
+        for (Endpoint endpoint : listeners) {
+            listenersMap.put(endpoint.listenerName().get(), endpoint);

Review comment:
       I think we are guaranteed to have the listener present, but perhaps it's worth checking explicitly and throwing if it is not the case.

##########
File path: metadata/src/main/java/org/apache/kafka/image/ClientQuotaImage.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData.ValueData;
+import org.apache.kafka.common.metadata.ClientQuotaRecord;
+import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.metadata.MetadataRecordType.CLIENT_QUOTA_RECORD;
+
+
+/**
+ * Represents a quota for a client entity in the metadata image.
+ *
+ * This class is thread-safe.
+ */
+public final class ClientQuotaImage {
+    public final static ClientQuotaImage EMPTY = new ClientQuotaImage(Collections.emptyMap());
+
+    private final Map<String, Double> quotas;
+
+    public ClientQuotaImage(Map<String, Double> quotas) {
+        this.quotas = quotas;
+    }
+
+    Map<String, Double> quotas() {
+        return quotas;
+    }
+
+    public void write(ClientQuotaEntity entity, Consumer<List<ApiMessageAndVersion>> out) throws IOException {
+        List<ApiMessageAndVersion> records = new ArrayList<>();

Review comment:
       nit: add a size? There are a few cases in here where we could do this.

##########
File path: metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.metadata.BrokerRegistration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+
+
+/**
+ * Represents changes to the cluster in the metadata image.
+ */
+public final class ClusterDelta {
+    private final ClusterImage image;
+    private final HashMap<Integer, Optional<BrokerRegistration>> changedBrokers = new HashMap<>();
+
+    public ClusterDelta(ClusterImage image) {
+        this.image = image;
+    }
+
+    public HashMap<Integer, Optional<BrokerRegistration>> changedBrokers() {
+        return changedBrokers;
+    }
+
+    public BrokerRegistration broker(int nodeId) {
+        Optional<BrokerRegistration> result = changedBrokers.get(nodeId);
+        if (result != null) {
+            return result.orElse(null);
+        }
+        return image.broker(nodeId);
+    }
+
+    public void finishSnapshot() {
+        for (Integer brokerId : image.brokers().keySet()) {
+            if (!changedBrokers.containsKey(brokerId)) {
+                changedBrokers.put(brokerId, Optional.empty());
+            }
+        }
+    }
+
+    public void replay(RegisterBrokerRecord record) {
+        BrokerRegistration broker = BrokerRegistration.fromRecord(record);
+        changedBrokers.put(broker.id(), Optional.of(broker));
+    }
+
+    public void replay(UnregisterBrokerRecord record) {
+        changedBrokers.put(record.brokerId(), Optional.empty());
+    }
+
+    public void replay(FenceBrokerRecord record) {
+        BrokerRegistration broker = broker(record.id());
+        if (broker == null) {
+            throw new RuntimeException("Tried to fence broker " + record.id() +
+                ", but that broker was not registered.");
+        }
+        changedBrokers.put(record.id(), Optional.of(broker.cloneWithFencing(true)));
+    }
+
+    public void replay(UnfenceBrokerRecord record) {
+        BrokerRegistration broker = broker(record.id());
+        if (broker == null) {

Review comment:
       nit: might be helpful adding a little helper since we do this a few times in here

##########
File path: metadata/src/main/java/org/apache/kafka/image/ClientQuotasDelta.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.metadata.ClientQuotaRecord;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+
+public final class ClientQuotasDelta {
+    private final ClientQuotasImage image;
+    private final Map<ClientQuotaEntity, ClientQuotaDelta> changes = new HashMap<>();
+
+    public ClientQuotasDelta(ClientQuotasImage image) {
+        this.image = image;
+    }
+
+    public Map<ClientQuotaEntity, ClientQuotaDelta> changes() {
+        return changes;
+    }
+
+    public void finishSnapshot() {
+        for (Entry<ClientQuotaEntity, ClientQuotaImage> entry : image.entities().entrySet()) {
+            ClientQuotaEntity entity = entry.getKey();
+            ClientQuotaImage quotaImage = entry.getValue();
+            ClientQuotaDelta quotaDelta = changes.computeIfAbsent(entity,
+                __ -> new ClientQuotaDelta(quotaImage));
+            quotaDelta.finishSnapshot();
+        }
+    }
+
+    public void replay(ClientQuotaRecord record) {
+        ClientQuotaEntity entity = ClientQuotaImage.dataToEntity(record.entity());
+        ClientQuotaDelta change = changes.get(entity);

Review comment:
       nit: I guess you could use `computeIfAbsent` here as well




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10949: KAFKA-13019: Add MetadataImage and MetadataDelta classes for KRaft Snapshots

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10949:
URL: https://github.com/apache/kafka/pull/10949#discussion_r661845115



##########
File path: metadata/src/main/java/org/apache/kafka/image/ConfigurationsImage.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+
+/**
+ * Represents the configurations in the metadata image.
+ *
+ * This class is thread-safe.
+ */
+public final class ConfigurationsImage {
+    public static final ConfigurationsImage EMPTY =
+        new ConfigurationsImage(Collections.emptyMap());
+
+    private final Map<ConfigResource, ConfigurationImage> data;
+
+    public ConfigurationsImage(Map<ConfigResource, ConfigurationImage> data) {
+        this.data = data;
+    }
+
+    public boolean isEmpty() {
+        return data.isEmpty();
+    }
+
+    Map<ConfigResource, ConfigurationImage> resourceData() {
+        return data;
+    }
+
+    public Properties configProperties(ConfigResource configResource) {

Review comment:
       Eventually it would be good to transition away from using the `Properties` class, I think. It's a weird old Java 1.0 (or whatever) thing with a lot of odd behaviors. One example is that its default encoding for writing out to a file is ISO-8859-1 (Latin-1), which I guess made sense in the 1990s, but not today. Because of their decision to inherit from HashTable (now deprecated) you can also insert non-string values, which behave in ways that can only be described as "incorrect" (like appearing as null when accessed by `getProperty`).
   
   But, a bunch of other parts of the code use `Properties` for entity configurations, so we have to support it, for now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10949: KAFKA-13019: Add MetadataImage and MetadataDelta classes for KRaft Snapshots

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10949:
URL: https://github.com/apache/kafka/pull/10949#discussion_r661858804



##########
File path: metadata/src/main/java/org/apache/kafka/image/ClientQuotasImage.java
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.message.DescribeClientQuotasRequestData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntityData;
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData.EntryData;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.protocol.Errors.INVALID_REQUEST;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+import static org.apache.kafka.common.requests.DescribeClientQuotasRequest.MATCH_TYPE_EXACT;
+import static org.apache.kafka.common.requests.DescribeClientQuotasRequest.MATCH_TYPE_DEFAULT;
+import static org.apache.kafka.common.requests.DescribeClientQuotasRequest.MATCH_TYPE_SPECIFIED;
+
+
+/**
+ * Represents the client quotas in the metadata image.
+ *
+ * This class is thread-safe.
+ */
+public final class ClientQuotasImage {
+    public final static ClientQuotasImage EMPTY = new ClientQuotasImage(Collections.emptyMap());
+
+    private final Map<ClientQuotaEntity, ClientQuotaImage> entities;
+
+    public ClientQuotasImage(Map<ClientQuotaEntity, ClientQuotaImage> entities) {
+        this.entities = entities;
+    }
+
+    public boolean isEmpty() {
+        return entities.isEmpty();
+    }
+
+    Map<ClientQuotaEntity, ClientQuotaImage> entities() {
+        return entities;
+    }
+
+    public void write(Consumer<List<ApiMessageAndVersion>> out) throws IOException {
+        for (Entry<ClientQuotaEntity, ClientQuotaImage> entry : entities.entrySet()) {
+            ClientQuotaEntity entity = entry.getKey();
+            ClientQuotaImage clientQuotaImage = entry.getValue();
+            clientQuotaImage.write(entity, out);
+        }
+    }
+
+    public DescribeClientQuotasResponseData describe(DescribeClientQuotasRequestData request) {
+        DescribeClientQuotasResponseData response = new DescribeClientQuotasResponseData();
+        Map<String, String> exactMatch = new HashMap<>();
+        Set<String> typeMatch = new HashSet<>();
+        for (DescribeClientQuotasRequestData.ComponentData component : request.components()) {
+            if (component.entityType().isEmpty()) {
+                response.setErrorCode(INVALID_REQUEST.code());
+                response.setErrorMessage("Invalid empty entity type.");
+                return response;
+            } else if (exactMatch.containsKey(component.entityType()) ||
+                    typeMatch.contains(component.entityType())) {
+                response.setErrorCode(INVALID_REQUEST.code());
+                response.setErrorMessage("Entity type " + component.entityType() +
+                    " cannot appear more than once in the filter.");
+                return response;
+            }
+            switch (component.matchType()) {
+                case MATCH_TYPE_EXACT:
+                    if (component.match() == null) {
+                        response.setErrorCode(INVALID_REQUEST.code());
+                        response.setErrorMessage("Request specified MATCH_TYPE_EXACT, " +
+                            "but set match string to null.");
+                        return response;
+                    }
+                    exactMatch.put(component.entityType(), component.match());
+                    break;
+                case MATCH_TYPE_DEFAULT:
+                    if (component.match() != null) {
+                        response.setErrorCode(INVALID_REQUEST.code());
+                        response.setErrorMessage("Request specified MATCH_TYPE_DEFAULT, " +
+                            "but also specified a match string.");
+                        return response;
+                    }
+                    exactMatch.put(component.entityType(), null);
+                    break;
+                case MATCH_TYPE_SPECIFIED:
+                    if (component.match() != null) {
+                        response.setErrorCode(INVALID_REQUEST.code());
+                        response.setErrorMessage("Request specified MATCH_TYPE_SPECIFIED, " +
+                            "but also specified a match string.");
+                        return response;
+                    }
+                    typeMatch.add(component.entityType());
+                    break;
+                default:
+                    response.setErrorCode(UNSUPPORTED_VERSION.code());
+                    response.setErrorMessage("Unknown match type " + component.matchType());
+                    return response;
+            }
+        }
+        // TODO: this is O(N). We should do some indexing here to speed it up.

Review comment:
       Filed KAFKA-13022




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10949: KAFKA-13019: Add MetadataImage and MetadataDelta classes for KRaft Snapshots

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10949:
URL: https://github.com/apache/kafka/pull/10949#discussion_r661848799



##########
File path: metadata/src/main/java/org/apache/kafka/image/ClientQuotaDelta.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.metadata.ClientQuotaRecord;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.Map.Entry;
+
+
+public final class ClientQuotaDelta {
+    private final ClientQuotaImage image;
+    private final Map<String, Optional<Double>> changes = new HashMap<>();

Review comment:
       Good idea. I had forgotten about that. I wish SpotBugs would warn about this kind of stuff. Maybe in a later version.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10949: KAFKA-13019: Add MetadataImage and MetadataDelta classes for KRaft Snapshots

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10949:
URL: https://github.com/apache/kafka/pull/10949#discussion_r661861527



##########
File path: metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.metadata.BrokerRegistration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+
+
+/**
+ * Represents changes to the cluster in the metadata image.
+ */
+public final class ClusterDelta {
+    private final ClusterImage image;
+    private final HashMap<Integer, Optional<BrokerRegistration>> changedBrokers = new HashMap<>();
+
+    public ClusterDelta(ClusterImage image) {
+        this.image = image;
+    }
+
+    public HashMap<Integer, Optional<BrokerRegistration>> changedBrokers() {
+        return changedBrokers;
+    }
+
+    public BrokerRegistration broker(int nodeId) {
+        Optional<BrokerRegistration> result = changedBrokers.get(nodeId);
+        if (result != null) {
+            return result.orElse(null);
+        }
+        return image.broker(nodeId);
+    }
+
+    public void finishSnapshot() {
+        for (Integer brokerId : image.brokers().keySet()) {
+            if (!changedBrokers.containsKey(brokerId)) {
+                changedBrokers.put(brokerId, Optional.empty());
+            }
+        }
+    }
+
+    public void replay(RegisterBrokerRecord record) {
+        BrokerRegistration broker = BrokerRegistration.fromRecord(record);
+        changedBrokers.put(broker.id(), Optional.of(broker));
+    }
+
+    public void replay(UnregisterBrokerRecord record) {
+        changedBrokers.put(record.brokerId(), Optional.empty());
+    }
+
+    public void replay(FenceBrokerRecord record) {
+        BrokerRegistration broker = broker(record.id());
+        if (broker == null) {
+            throw new RuntimeException("Tried to fence broker " + record.id() +
+                ", but that broker was not registered.");
+        }
+        changedBrokers.put(record.id(), Optional.of(broker.cloneWithFencing(true)));
+    }
+
+    public void replay(UnfenceBrokerRecord record) {
+        BrokerRegistration broker = broker(record.id());
+        if (broker == null) {

Review comment:
       I wanted to have a different exception message for each, though....




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10949: KAFKA-13019: Add MetadataImage and MetadataDelta classes for KRaft Snapshots

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10949:
URL: https://github.com/apache/kafka/pull/10949#discussion_r661846319



##########
File path: metadata/src/main/java/org/apache/kafka/image/ClientQuotaDelta.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.metadata.ClientQuotaRecord;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.Map.Entry;
+
+
+public final class ClientQuotaDelta {
+    private final ClientQuotaImage image;
+    private final Map<String, Optional<Double>> changes = new HashMap<>();
+
+    public ClientQuotaDelta(ClientQuotaImage image) {
+        this.image = image;
+    }
+
+    public Map<String, Optional<Double>> changes() {
+        return changes;
+    }
+
+    public void finishSnapshot() {
+        for (String key : image.quotas().keySet()) {
+            if (!changes.containsKey(key)) {
+                changes.put(key, Optional.empty());
+            }
+        }
+    }
+
+    public void replay(ClientQuotaRecord record) {
+        if (record.remove()) {
+            changes.put(record.key(), Optional.empty());
+        } else {
+            changes.put(record.key(), Optional.of(record.value()));
+        }
+    }
+
+    public ClientQuotaImage apply() {
+        Map<String, Double> newQuotas = new HashMap<>(image.quotas().size());
+        for (Entry<String, Double> entry : image.quotas().entrySet()) {
+            Optional<Double> change = changes.get(entry.getKey());
+            if (change == null) {
+                newQuotas.put(entry.getKey(), entry.getValue());
+            } else if (change.isPresent()) {
+                newQuotas.put(entry.getKey(), change.get());
+            }

Review comment:
       If the value is `Optional.empty`, nothing will be inserted into the new map. So that should handle removal.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on pull request #10949: KAFKA-13019: Add MetadataImage and MetadataDelta classes for KRaft Snapshots

Posted by GitBox <gi...@apache.org>.
cmccabe commented on pull request #10949:
URL: https://github.com/apache/kafka/pull/10949#issuecomment-871756196


   > For the Image classes where the underlying cache is just a Map, it seems unsafe to expose that map through the accessor. Should we make a defensive copy? Or perhaps we could have some other methods for accessing the map without allowing mutation, e.g.
   > 
   > ```java
   > public final class ClusterImage {
   >     // ...
   >     public void brokers(BiConsumer<Integer, BrokerRegistration> brokerConsumer) {
   >         // ...
   >     }
   > }
   > ```
   
   I did try to hide the maps, initially, but it just gets too cumbersome to try to do everything through accessors.
   
   Copying the map would be a huge performance hit. Something like using `Collections#unmodifiableMap` is probably OK. Although it still adds some overhead, it's probably not too bad... I will put that in place for now, at least until performance testing tells us otherwise.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10949: KAFKA-13019: Add MetadataImage and MetadataDelta classes for KRaft Snapshots

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10949:
URL: https://github.com/apache/kafka/pull/10949#discussion_r661847782



##########
File path: metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.metadata.BrokerRegistration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+
+
+/**
+ * Represents changes to the cluster in the metadata image.
+ */
+public final class ClusterDelta {
+    private final ClusterImage image;
+    private final HashMap<Integer, Optional<BrokerRegistration>> changedBrokers = new HashMap<>();
+
+    public ClusterDelta(ClusterImage image) {
+        this.image = image;
+    }
+
+    public HashMap<Integer, Optional<BrokerRegistration>> changedBrokers() {
+        return changedBrokers;
+    }
+
+    public BrokerRegistration broker(int nodeId) {
+        Optional<BrokerRegistration> result = changedBrokers.get(nodeId);
+        if (result != null) {
+            return result.orElse(null);
+        }
+        return image.broker(nodeId);
+    }
+
+    public void finishSnapshot() {
+        for (Integer brokerId : image.brokers().keySet()) {
+            if (!changedBrokers.containsKey(brokerId)) {
+                changedBrokers.put(brokerId, Optional.empty());
+            }
+        }
+    }
+
+    public void replay(RegisterBrokerRecord record) {
+        BrokerRegistration broker = BrokerRegistration.fromRecord(record);
+        changedBrokers.put(broker.id(), Optional.of(broker));
+    }
+
+    public void replay(UnregisterBrokerRecord record) {
+        changedBrokers.put(record.brokerId(), Optional.empty());
+    }
+
+    public void replay(FenceBrokerRecord record) {
+        BrokerRegistration broker = broker(record.id());
+        if (broker == null) {
+            throw new RuntimeException("Tried to fence broker " + record.id() +
+                ", but that broker was not registered.");
+        }
+        changedBrokers.put(record.id(), Optional.of(broker.cloneWithFencing(true)));
+    }
+
+    public void replay(UnfenceBrokerRecord record) {
+        BrokerRegistration broker = broker(record.id());
+        if (broker == null) {
+            throw new RuntimeException("Tried to unfence broker " + record.id() +
+                ", but that broker was not registered.");
+        }
+        changedBrokers.put(record.id(), Optional.of(broker.cloneWithFencing(false)));
+    }
+
+    public void replay(BrokerRegistrationChangeRecord record) {
+        BrokerRegistration broker = broker(record.brokerId());
+        if (broker == null) {
+            throw new RuntimeException("Tried to change broker " + record.brokerId() +
+                ", but that broker was not registered.");
+        }
+        if (record.fenced() < 0) {
+            changedBrokers.put(record.brokerId(), Optional.of(broker.cloneWithFencing(false)));
+        } else if (record.fenced() > 0) {
+            changedBrokers.put(record.brokerId(), Optional.of(broker.cloneWithFencing(true)));
+        }
+    }
+
+    public ClusterImage apply() {
+        Map<Integer, BrokerRegistration> newBrokers = new HashMap<>(image.brokers().size());
+        for (Entry<Integer, BrokerRegistration> entry : image.brokers().entrySet()) {
+            int nodeId = entry.getKey();
+            Optional<BrokerRegistration> change = changedBrokers.get(nodeId);
+            if (change == null) {
+                newBrokers.put(nodeId, entry.getValue());
+            } else if (change.isPresent()) {
+                newBrokers.put(nodeId, change.get());
+            }

Review comment:
       `changedBrokers` only contains the brokers that have changed. If a broker is absent from this map, it means that nothing has changed versus the image, not that the broker has been removed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10949: KAFKA-13019: Add MetadataImage and MetadataDelta classes for KRaft Snapshots

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10949:
URL: https://github.com/apache/kafka/pull/10949#discussion_r661871442



##########
File path: metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
##########
@@ -18,20 +18,39 @@
 package org.apache.kafka.metadata;
 
 import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerFeature;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
 
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static org.apache.kafka.common.metadata.MetadataRecordType.REGISTER_BROKER_RECORD;
+
+
 /**
  * An immutable class which represents broker registrations.
  */
 public class BrokerRegistration {
+    private static Map<String, Endpoint> listenersToMap(Collection<Endpoint> listeners) {
+        Map<String, Endpoint> listenersMap = new HashMap<>();
+        for (Endpoint endpoint : listeners) {
+            listenersMap.put(endpoint.listenerName().get(), endpoint);

Review comment:
       Supposedly it's mandatory for brokers, but optional for clients (which really means they should have used a different type for the optional scenarios.)
   
   I will add a check to the constructor that all listeners have names.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10949: KAFKA-13019: Add MetadataImage and MetadataDelta classes for KRaft Snapshots

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10949:
URL: https://github.com/apache/kafka/pull/10949#discussion_r661901885



##########
File path: metadata/src/main/java/org/apache/kafka/image/ClientQuotaImage.java
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.message.DescribeClientQuotasResponseData.ValueData;
+import org.apache.kafka.common.metadata.ClientQuotaRecord;
+import org.apache.kafka.common.metadata.ClientQuotaRecord.EntityData;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Objects;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.common.metadata.MetadataRecordType.CLIENT_QUOTA_RECORD;
+
+
+/**
+ * Represents a quota for a client entity in the metadata image.
+ *
+ * This class is thread-safe.
+ */
+public final class ClientQuotaImage {
+    public final static ClientQuotaImage EMPTY = new ClientQuotaImage(Collections.emptyMap());
+
+    private final Map<String, Double> quotas;
+
+    public ClientQuotaImage(Map<String, Double> quotas) {
+        this.quotas = quotas;
+    }
+
+    Map<String, Double> quotas() {
+        return quotas;
+    }
+
+    public void write(ClientQuotaEntity entity, Consumer<List<ApiMessageAndVersion>> out) throws IOException {
+        List<ApiMessageAndVersion> records = new ArrayList<>();
+        for (Entry<String, Double> entry : quotas.entrySet()) {
+            records.add(new ApiMessageAndVersion(new ClientQuotaRecord().
+                setEntity(entityToData(entity)).
+                setKey(entry.getKey()).
+                setValue(entry.getValue()).
+                setRemove(false),
+                CLIENT_QUOTA_RECORD.highestSupportedVersion()));
+        }
+        out.accept(records);
+    }
+
+    public static List<EntityData> entityToData(ClientQuotaEntity entity) {
+        List<EntityData> entityData = new ArrayList<>(entity.entries().size());
+        for (Entry<String, String> entry : entity.entries().entrySet()) {
+            entityData.add(new EntityData().
+                setEntityType(entry.getKey()).
+                setEntityName(entry.getValue()));
+        }
+        return entityData;
+    }
+
+    public static ClientQuotaEntity dataToEntity(List<EntityData> entityData) {
+        Map<String, String> entries = new HashMap<>();
+        for (EntityData data : entityData) {
+            entries.put(data.entityType(), data.entityName());
+        }
+        return new ClientQuotaEntity(Collections.unmodifiableMap(entries));
+    }
+
+    public List<ValueData> toDescribeValues() {
+        List<ValueData> values = new ArrayList<>();
+        for (Entry<String, Double> entry : quotas.entrySet()) {
+            values.add(new ValueData().setKey(entry.getKey()).setValue(entry.getValue()));
+        }
+        return values;
+    }
+
+    public boolean isEmpty() {
+        return quotas.isEmpty();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof ClientQuotaImage)) return false;

Review comment:
       Yeah, I am not sure. I was thinking we might run into situations where we are trying to detect when a cached image has changed. It is a conventional thing to do, but I don't feel too strongly about it. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10949: KAFKA-13019: Add MetadataImage and MetadataDelta classes for KRaft Snapshots

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10949:
URL: https://github.com/apache/kafka/pull/10949#discussion_r661800484



##########
File path: metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.metadata.BrokerRegistration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+
+
+/**
+ * Represents changes to the cluster in the metadata image.
+ */
+public final class ClusterDelta {
+    private final ClusterImage image;
+    private final HashMap<Integer, Optional<BrokerRegistration>> changedBrokers = new HashMap<>();
+
+    public ClusterDelta(ClusterImage image) {
+        this.image = image;
+    }
+
+    public HashMap<Integer, Optional<BrokerRegistration>> changedBrokers() {
+        return changedBrokers;
+    }
+
+    public BrokerRegistration broker(int nodeId) {
+        Optional<BrokerRegistration> result = changedBrokers.get(nodeId);
+        if (result != null) {
+            return result.orElse(null);
+        }
+        return image.broker(nodeId);
+    }
+
+    public void finishSnapshot() {
+        for (Integer brokerId : image.brokers().keySet()) {
+            if (!changedBrokers.containsKey(brokerId)) {
+                changedBrokers.put(brokerId, Optional.empty());
+            }
+        }
+    }
+
+    public void replay(RegisterBrokerRecord record) {
+        BrokerRegistration broker = BrokerRegistration.fromRecord(record);
+        changedBrokers.put(broker.id(), Optional.of(broker));
+    }
+
+    public void replay(UnregisterBrokerRecord record) {
+        changedBrokers.put(record.brokerId(), Optional.empty());
+    }
+
+    public void replay(FenceBrokerRecord record) {
+        BrokerRegistration broker = broker(record.id());
+        if (broker == null) {
+            throw new RuntimeException("Tried to fence broker " + record.id() +

Review comment:
       Maybe IllegalStateException for these? Shouldn't it be impossible to get into this state (aside from missing records or controller bugs)?

##########
File path: metadata/src/main/java/org/apache/kafka/image/ClientQuotasDelta.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.metadata.ClientQuotaRecord;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+
+public final class ClientQuotasDelta {
+    private final ClientQuotasImage image;
+    private final Map<ClientQuotaEntity, ClientQuotaDelta> changes = new HashMap<>();
+
+    public ClientQuotasDelta(ClientQuotasImage image) {
+        this.image = image;
+    }
+
+    public Map<ClientQuotaEntity, ClientQuotaDelta> changes() {
+        return changes;
+    }
+
+    public void finishSnapshot() {
+        for (Entry<ClientQuotaEntity, ClientQuotaImage> entry : image.entities().entrySet()) {
+            ClientQuotaEntity entity = entry.getKey();
+            ClientQuotaImage quotaImage = entry.getValue();
+            ClientQuotaDelta quotaDelta = changes.computeIfAbsent(entity,
+                __ -> new ClientQuotaDelta(quotaImage));
+            quotaDelta.finishSnapshot();
+        }
+    }
+
+    public void replay(ClientQuotaRecord record) {
+        ClientQuotaEntity entity = ClientQuotaImage.dataToEntity(record.entity());
+        ClientQuotaDelta change = changes.get(entity);
+        if (change == null) {

Review comment:
       I think this could be done with `computeIfAbsent` like in "finishSnapshot" above

##########
File path: metadata/src/main/java/org/apache/kafka/image/ClusterDelta.java
##########
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
+import org.apache.kafka.common.metadata.FenceBrokerRecord;
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
+import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
+import org.apache.kafka.metadata.BrokerRegistration;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Optional;
+
+
+/**
+ * Represents changes to the cluster in the metadata image.
+ */
+public final class ClusterDelta {
+    private final ClusterImage image;
+    private final HashMap<Integer, Optional<BrokerRegistration>> changedBrokers = new HashMap<>();
+
+    public ClusterDelta(ClusterImage image) {
+        this.image = image;
+    }
+
+    public HashMap<Integer, Optional<BrokerRegistration>> changedBrokers() {
+        return changedBrokers;
+    }
+
+    public BrokerRegistration broker(int nodeId) {
+        Optional<BrokerRegistration> result = changedBrokers.get(nodeId);
+        if (result != null) {
+            return result.orElse(null);
+        }
+        return image.broker(nodeId);
+    }
+
+    public void finishSnapshot() {
+        for (Integer brokerId : image.brokers().keySet()) {
+            if (!changedBrokers.containsKey(brokerId)) {
+                changedBrokers.put(brokerId, Optional.empty());
+            }
+        }
+    }
+
+    public void replay(RegisterBrokerRecord record) {
+        BrokerRegistration broker = BrokerRegistration.fromRecord(record);
+        changedBrokers.put(broker.id(), Optional.of(broker));
+    }
+
+    public void replay(UnregisterBrokerRecord record) {
+        changedBrokers.put(record.brokerId(), Optional.empty());
+    }
+
+    public void replay(FenceBrokerRecord record) {
+        BrokerRegistration broker = broker(record.id());
+        if (broker == null) {
+            throw new RuntimeException("Tried to fence broker " + record.id() +
+                ", but that broker was not registered.");
+        }
+        changedBrokers.put(record.id(), Optional.of(broker.cloneWithFencing(true)));
+    }
+
+    public void replay(UnfenceBrokerRecord record) {
+        BrokerRegistration broker = broker(record.id());
+        if (broker == null) {
+            throw new RuntimeException("Tried to unfence broker " + record.id() +
+                ", but that broker was not registered.");
+        }
+        changedBrokers.put(record.id(), Optional.of(broker.cloneWithFencing(false)));
+    }
+
+    public void replay(BrokerRegistrationChangeRecord record) {
+        BrokerRegistration broker = broker(record.brokerId());
+        if (broker == null) {
+            throw new RuntimeException("Tried to change broker " + record.brokerId() +
+                ", but that broker was not registered.");
+        }
+        if (record.fenced() < 0) {
+            changedBrokers.put(record.brokerId(), Optional.of(broker.cloneWithFencing(false)));
+        } else if (record.fenced() > 0) {
+            changedBrokers.put(record.brokerId(), Optional.of(broker.cloneWithFencing(true)));
+        }
+    }
+
+    public ClusterImage apply() {
+        Map<Integer, BrokerRegistration> newBrokers = new HashMap<>(image.brokers().size());
+        for (Entry<Integer, BrokerRegistration> entry : image.brokers().entrySet()) {
+            int nodeId = entry.getKey();
+            Optional<BrokerRegistration> change = changedBrokers.get(nodeId);
+            if (change == null) {
+                newBrokers.put(nodeId, entry.getValue());
+            } else if (change.isPresent()) {
+                newBrokers.put(nodeId, change.get());
+            }

Review comment:
       It's a little unclear as to how we are removing things from the cache here. If I understand correctly, we stick an empty Optional in the cache to represent removal. And here by _not_ handling the empty optional case, we exclude the broker from the new image. But if that's the case, why do we need the Optional in the first place? Can we directly remove the broker from "changedBrokers" when handling UnregisterBrokerRecord?
   
   This question applies to some other Delta classes that are using Optional as well

##########
File path: metadata/src/main/java/org/apache/kafka/image/ConfigurationsDelta.java
##########
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.config.ConfigResource.Type;
+import org.apache.kafka.common.metadata.ConfigRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+
+/**
+ * Represents changes to the configurations in the metadata image.
+ */
+public final class ConfigurationsDelta {
+    private final ConfigurationsImage image;
+    private final Map<ConfigResource, ConfigurationDelta> changes = new HashMap<>();
+
+    public ConfigurationsDelta(ConfigurationsImage image) {
+        this.image = image;
+    }
+
+    public Map<ConfigResource, ConfigurationDelta> changes() {
+        return changes;
+    }
+
+    public void finishSnapshot() {
+        for (Entry<ConfigResource, ConfigurationImage> entry : image.resourceData().entrySet()) {
+            ConfigResource resource = entry.getKey();
+            ConfigurationImage configurationImage = entry.getValue();
+            ConfigurationDelta configurationDelta = changes.computeIfAbsent(resource,

Review comment:
       (very minor) nit: inconsistent naming of these in this class

##########
File path: metadata/src/main/java/org/apache/kafka/image/ClientQuotaDelta.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.metadata.ClientQuotaRecord;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import static java.util.Map.Entry;
+
+
+public final class ClientQuotaDelta {
+    private final ClientQuotaImage image;
+    private final Map<String, Optional<Double>> changes = new HashMap<>();
+
+    public ClientQuotaDelta(ClientQuotaImage image) {
+        this.image = image;
+    }
+
+    public Map<String, Optional<Double>> changes() {
+        return changes;
+    }
+
+    public void finishSnapshot() {
+        for (String key : image.quotas().keySet()) {
+            if (!changes.containsKey(key)) {
+                changes.put(key, Optional.empty());
+            }
+        }
+    }
+
+    public void replay(ClientQuotaRecord record) {
+        if (record.remove()) {
+            changes.put(record.key(), Optional.empty());
+        } else {
+            changes.put(record.key(), Optional.of(record.value()));
+        }
+    }
+
+    public ClientQuotaImage apply() {
+        Map<String, Double> newQuotas = new HashMap<>(image.quotas().size());
+        for (Entry<String, Double> entry : image.quotas().entrySet()) {
+            Optional<Double> change = changes.get(entry.getKey());
+            if (change == null) {
+                newQuotas.put(entry.getKey(), entry.getValue());
+            } else if (change.isPresent()) {
+                newQuotas.put(entry.getKey(), change.get());
+            }

Review comment:
       Are we missing the removal case here? 

##########
File path: metadata/src/main/java/org/apache/kafka/image/ConfigurationsImage.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+
+/**
+ * Represents the configurations in the metadata image.
+ *
+ * This class is thread-safe.
+ */
+public final class ConfigurationsImage {
+    public static final ConfigurationsImage EMPTY =
+        new ConfigurationsImage(Collections.emptyMap());
+
+    private final Map<ConfigResource, ConfigurationImage> data;
+
+    public ConfigurationsImage(Map<ConfigResource, ConfigurationImage> data) {
+        this.data = data;
+    }
+
+    public boolean isEmpty() {
+        return data.isEmpty();
+    }
+
+    Map<ConfigResource, ConfigurationImage> resourceData() {
+        return data;
+    }
+
+    public Properties configProperties(ConfigResource configResource) {

Review comment:
       Hm, kind of annoying that we have to return Properties here, but (as far as I know) there is no way to make an immutable Properties




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org