You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/02/11 16:44:02 UTC

[kafka] branch trunk updated: MINOR: add the MetaLogListener, LocalLogManager, and Controller interface. (#10106)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new bf5e1f1  MINOR: add the MetaLogListener, LocalLogManager, and Controller interface. (#10106)
bf5e1f1 is described below

commit bf5e1f1cc0d7f7b1f54c879fbb5f415d30fa9c06
Author: Colin Patrick McCabe <cm...@confluent.io>
AuthorDate: Thu Feb 11 08:42:59 2021 -0800

    MINOR: add the MetaLogListener, LocalLogManager, and Controller interface. (#10106)
    
    Add MetaLogListener, LocalLogManager, and related classes. These
    classes are used by the KIP-500 controller and broker to interface with the
    Raft log.
    
    Also add the Controller interface. The implementation will be added in a separate PR.
    
    Reviewers: Ron Dagostino <rd...@confluent.io>, David Arthur <mu...@gmail.com>
---
 checkstyle/import-control.xml                      |  30 ++
 .../org/apache/kafka/controller/Controller.java    | 180 ++++++++++
 .../org/apache/kafka/controller/ResultOrError.java |  84 +++++
 .../kafka/metadata/BrokerHeartbeatReply.java       |  80 +++++
 .../apache/kafka/metadata/BrokerRegistration.java  | 153 +++++++++
 .../kafka/metadata/BrokerRegistrationReply.java    |  50 +++
 .../java/org/apache/kafka/metadata/FeatureMap.java |  67 ++++
 .../apache/kafka/metadata/FeatureMapAndEpoch.java  |  64 ++++
 .../kafka/metalog/metalog/MetaLogLeader.java       |  58 ++++
 .../kafka/metalog/metalog/MetaLogListener.java     |  55 +++
 .../kafka/metalog/metalog/MetaLogManager.java      |  79 +++++
 .../kafka/metadata/BrokerRegistrationTest.java     |  78 +++++
 .../org/apache/kafka/metalog/LocalLogManager.java  | 378 +++++++++++++++++++++
 .../kafka/metalog/metalog/LocalLogManagerTest.java | 153 +++++++++
 .../metalog/metalog/LocalLogManagerTestEnv.java    | 143 ++++++++
 .../metalog/MockMetaLogManagerListener.java        |  77 +++++
 16 files changed, 1729 insertions(+)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 9cc432e..b658370 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -192,6 +192,27 @@
     </subpackage>
   </subpackage>
 
+  <subpackage name="controller">
+    <allow pkg="org.apache.kafka.clients" />
+    <allow pkg="org.apache.kafka.clients.admin" />
+    <allow pkg="org.apache.kafka.common.config" />
+    <allow pkg="org.apache.kafka.common.feature" />
+    <allow pkg="org.apache.kafka.common.internals" />
+    <allow pkg="org.apache.kafka.common.message" />
+    <allow pkg="org.apache.kafka.common.metadata" />
+    <allow pkg="org.apache.kafka.common.metrics" />
+    <allow pkg="org.apache.kafka.common.network" />
+    <allow pkg="org.apache.kafka.common.protocol" />
+    <allow pkg="org.apache.kafka.common.quota" />
+    <allow pkg="org.apache.kafka.common.requests" />
+    <allow pkg="org.apache.kafka.controller" />
+    <allow pkg="org.apache.kafka.metadata" />
+    <allow pkg="org.apache.kafka.metalog" />
+    <allow pkg="org.apache.kafka.queue" />
+    <allow pkg="org.apache.kafka.test" />
+    <allow pkg="org.apache.kafka.timeline" />
+  </subpackage>
+
   <subpackage name="metadata">
     <allow pkg="org.apache.kafka.clients" />
     <allow pkg="org.apache.kafka.common.annotation" />
@@ -201,6 +222,15 @@
     <allow pkg="org.apache.kafka.test" />
   </subpackage>
 
+  <subpackage name="metalog">
+    <allow pkg="org.apache.kafka.common.metadata" />
+    <allow pkg="org.apache.kafka.common.protocol" />
+    <allow pkg="org.apache.kafka.metadata" />
+    <allow pkg="org.apache.kafka.metalog" />
+    <allow pkg="org.apache.kafka.queue" />
+    <allow pkg="org.apache.kafka.test" />
+  </subpackage>
+
   <subpackage name="clients">
     <allow pkg="org.slf4j" />
     <allow pkg="org.apache.kafka.common" />
diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
new file mode 100644
index 0000000..0f6a54b
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureMapAndEpoch;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+
+public interface Controller extends AutoCloseable {
+    /**
+     * Change the in-sync replica sets for some partitions.
+     *
+     * @param request       The AlterIsrRequest data.
+     *
+     * @return              A future yielding the response.
+     */
+    CompletableFuture<AlterIsrResponseData> alterIsr(AlterIsrRequestData request);
+
+    /**
+     * Create a batch of topics.
+     *
+     * @param request       The CreateTopicsRequest data.
+     *
+     * @return              A future yielding the response.
+     */
+    CompletableFuture<CreateTopicsResponseData>
+        createTopics(CreateTopicsRequestData request);
+
+    /**
+     * Unregister a broker.
+     *
+     * @param brokerId      The broker id to unregister.
+     *
+     * @return              A future that is completed successfully when the broker is
+     *                      unregistered.
+     */
+    CompletableFuture<Void> unregisterBroker(int brokerId);
+
+    /**
+     * Describe the current configuration of various resources.
+     *
+     * @param resources     A map from resources to the collection of config keys that we
+     *                      want to describe for each.  If the collection is empty, then
+     *                      all configuration keys will be described.
+     *
+     * @return
+     */
+    CompletableFuture<Map<ConfigResource, ResultOrError<Map<String, String>>>>
+        describeConfigs(Map<ConfigResource, Collection<String>> resources);
+
+    /**
+     * Elect new partition leaders.
+     *
+     * @param request       The request.
+     *
+     * @return              A future yielding the elect leaders response.
+     */
+    CompletableFuture<ElectLeadersResponseData> electLeaders(ElectLeadersRequestData request);
+
+    /**
+     * Get the current finalized feature ranges for each feature.
+     *
+     * @return              A future yielding the feature ranges.
+     */
+    CompletableFuture<FeatureMapAndEpoch> finalizedFeatures();
+
+    /**
+     * Perform some incremental configuration changes.
+     *
+     * @param configChanges The changes.
+     * @param validateOnly  True if we should validate the changes but not apply them.
+     *
+     * @return              A future yielding a map from partitions to error results.
+     */
+    CompletableFuture<Map<ConfigResource, ApiError>> incrementalAlterConfigs(
+        Map<ConfigResource, Map<String, Map.Entry<AlterConfigOp.OpType, String>>> configChanges,
+        boolean validateOnly);
+
+    /**
+     * Perform some configuration changes using the legacy API.
+     *
+     * @param newConfigs    The new configuration maps to apply.
+     * @param validateOnly  True if we should validate the changes but not apply them.
+     *
+     * @return              A future yielding a map from partitions to error results.
+     */
+    CompletableFuture<Map<ConfigResource, ApiError>> legacyAlterConfigs(
+        Map<ConfigResource, Map<String, String>> newConfigs, boolean validateOnly);
+
+    /**
+     * Process a heartbeat from a broker.
+     *
+     * @param request      The broker heartbeat request.
+     *
+     * @return              A future yielding a heartbeat reply.
+     */
+    CompletableFuture<BrokerHeartbeatReply> processBrokerHeartbeat(
+        BrokerHeartbeatRequestData request);
+
+    /**
+     * Attempt to register the given broker.
+     *
+     * @param request      The registration request.
+     *
+     * @return              A future yielding a registration reply.
+     */
+    CompletableFuture<BrokerRegistrationReply> registerBroker(
+        BrokerRegistrationRequestData request);
+
+    /**
+     * Wait for the given number of brokers to be registered and unfenced.
+     * This is for testing.
+     *
+     * @param minBrokers    The minimum number of brokers to wait for.
+     * @return              A future which is completed when the given number of brokers
+     *                      is reached.
+     */
+    CompletableFuture<Void> waitForReadyBrokers(int minBrokers);
+
+    /**
+     * Perform some client quota changes
+     *
+     * @param quotaAlterations The list of quotas to alter
+     * @param validateOnly     True if we should validate the changes but not apply them.
+     * @return                 A future yielding a map of quota entities to error results.
+     */
+    CompletableFuture<Map<ClientQuotaEntity, ApiError>> alterClientQuotas(
+        Collection<ClientQuotaAlteration> quotaAlterations, boolean validateOnly
+    );
+
+    /**
+     * Begin shutting down, but don't block.  You must still call close to clean up all
+     * resources.
+     */
+    void beginShutdown();
+
+    /**
+     * If this controller is active, this is the non-negative controller epoch.
+     * Otherwise, this is -1.
+     */
+    long curClaimEpoch();
+
+    /**
+     * Blocks until we have shut down and freed all resources.
+     */
+    void close() throws InterruptedException;
+}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java b/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java
new file mode 100644
index 0000000..82e2b49
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+
+import java.util.Objects;
+
+
+class ResultOrError<T> {
+    private final ApiError error;
+    private final T result;
+
+    public ResultOrError(Errors error, String message) {
+        this(new ApiError(error, message));
+    }
+
+    public ResultOrError(ApiError error) {
+        Objects.requireNonNull(error);
+        this.error = error;
+        this.result = null;
+    }
+
+    public ResultOrError(T result) {
+        this.error = null;
+        this.result = result;
+    }
+
+    public boolean isError() {
+        return error != null;
+    }
+
+    public boolean isResult() {
+        return error == null;
+    }
+
+    public ApiError error() {
+        return error;
+    }
+
+    public T result() {
+        return result;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || (!o.getClass().equals(getClass()))) {
+            return false;
+        }
+        ResultOrError other = (ResultOrError) o;
+        return error.equals(other.error) &&
+            Objects.equals(result, other.result);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(error, result);
+    }
+
+    @Override
+    public String toString() {
+        if (error.isSuccess()) {
+            return "ResultOrError(" + result + ")";
+        } else {
+            return "ResultOrError(" + error + ")";
+        }
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/BrokerHeartbeatReply.java b/metadata/src/main/java/org/apache/kafka/metadata/BrokerHeartbeatReply.java
new file mode 100644
index 0000000..5ab2a52
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerHeartbeatReply.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import java.util.Objects;
+
+
+public class BrokerHeartbeatReply {
+    /**
+     * True if the heartbeat reply should tell the broker that it has caught up.
+     */
+    private final boolean isCaughtUp;
+
+    /**
+     * True if the heartbeat reply should tell the broker that it is fenced.
+     */
+    private final boolean isFenced;
+
+    /**
+     * True if the heartbeat reply should tell the broker that it should shut down.
+     */
+    private final boolean shouldShutDown;
+
+    public BrokerHeartbeatReply(boolean isCaughtUp,
+                                boolean isFenced,
+                                boolean shouldShutDown) {
+        this.isCaughtUp = isCaughtUp;
+        this.isFenced = isFenced;
+        this.shouldShutDown = shouldShutDown;
+    }
+
+    public boolean isCaughtUp() {
+        return isCaughtUp;
+    }
+
+    public boolean isFenced() {
+        return isFenced;
+    }
+
+    public boolean shouldShutDown() {
+        return shouldShutDown;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(isCaughtUp, isFenced, shouldShutDown);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof BrokerHeartbeatReply)) return false;
+        BrokerHeartbeatReply other = (BrokerHeartbeatReply) o;
+        return other.isCaughtUp == isCaughtUp &&
+            other.isFenced == isFenced &&
+            other.shouldShutDown == shouldShutDown;
+    }
+
+    @Override
+    public String toString() {
+        return "BrokerHeartbeatReply(isCaughtUp=" + isCaughtUp +
+            ", isFenced=" + isFenced +
+            ", shouldShutDown = " + shouldShutDown +
+            ")";
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
new file mode 100644
index 0000000..c2be061
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.Uuid;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * An immutable class which represents broker registrations.
+ */
+public class BrokerRegistration {
+    private final int id;
+    private final long epoch;
+    private final Uuid incarnationId;
+    private final Map<String, Endpoint> listeners;
+    private final Map<String, VersionRange> supportedFeatures;
+    private final Optional<String> rack;
+    private final boolean fenced;
+
+    public BrokerRegistration(int id,
+                              long epoch,
+                              Uuid incarnationId,
+                              List<Endpoint> listeners,
+                              Map<String, VersionRange> supportedFeatures,
+                              Optional<String> rack,
+                              boolean fenced) {
+        this.id = id;
+        this.epoch = epoch;
+        this.incarnationId = incarnationId;
+        Map<String, Endpoint> listenersMap = new HashMap<>();
+        for (Endpoint endpoint : listeners) {
+            listenersMap.put(endpoint.listenerName().get(), endpoint);
+        }
+        this.listeners = Collections.unmodifiableMap(listenersMap);
+        Objects.requireNonNull(supportedFeatures);
+        this.supportedFeatures = new HashMap<>(supportedFeatures);
+        Objects.requireNonNull(rack);
+        this.rack = rack;
+        this.fenced = fenced;
+    }
+
+    public BrokerRegistration(int id,
+                              long epoch,
+                              Uuid incarnationId,
+                              Map<String, Endpoint> listeners,
+                              Map<String, VersionRange> supportedFeatures,
+                              Optional<String> rack,
+                              boolean fenced) {
+        this.id = id;
+        this.epoch = epoch;
+        this.incarnationId = incarnationId;
+        this.listeners = new HashMap<>(listeners);
+        this.supportedFeatures = new HashMap<>(supportedFeatures);
+        this.rack = rack;
+        this.fenced = fenced;
+    }
+
+    public int id() {
+        return id;
+    }
+
+    public long epoch() {
+        return epoch;
+    }
+
+    public Uuid incarnationId() {
+        return incarnationId;
+    }
+
+    public Map<String, Endpoint> listeners() {
+        return listeners;
+    }
+
+    public Map<String, VersionRange> supportedFeatures() {
+        return supportedFeatures;
+    }
+
+    public Optional<String> rack() {
+        return rack;
+    }
+
+    public boolean fenced() {
+        return fenced;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id, epoch, incarnationId, listeners, supportedFeatures,
+            rack, fenced);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof BrokerRegistration)) return false;
+        BrokerRegistration other = (BrokerRegistration) o;
+        return other.id == id &&
+            other.epoch == epoch &&
+            other.incarnationId.equals(incarnationId) &&
+            other.listeners.equals(listeners) &&
+            other.supportedFeatures.equals(supportedFeatures) &&
+            other.rack.equals(rack) &&
+            other.fenced == fenced;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder bld = new StringBuilder();
+        bld.append("BrokerRegistration(id=").append(id);
+        bld.append(", epoch=").append(epoch);
+        bld.append(", incarnationId=").append(incarnationId);
+        bld.append(", listeners=[").append(
+            listeners.keySet().stream().sorted().
+                map(n -> listeners.get(n).toString()).
+                collect(Collectors.joining(", ")));
+        bld.append("], supportedFeatures={").append(
+            supportedFeatures.entrySet().stream().sorted().
+                map(e -> e.getKey() + ": " + e.getValue()).
+                collect(Collectors.joining(", ")));
+        bld.append("}");
+        bld.append(", rack=").append(rack);
+        bld.append(", fenced=").append(fenced);
+        bld.append(")");
+        return bld.toString();
+    }
+
+    public BrokerRegistration cloneWithFencing(boolean fencing) {
+        return new BrokerRegistration(id, epoch, incarnationId, listeners,
+            supportedFeatures, rack, fencing);
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationReply.java b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationReply.java
new file mode 100644
index 0000000..40678ed
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationReply.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import java.util.Objects;
+
+
+public class BrokerRegistrationReply {
+    private final long epoch;
+
+    public BrokerRegistrationReply(long epoch) {
+        this.epoch = epoch;
+    }
+
+    public long epoch() {
+        return epoch;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(epoch);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof BrokerRegistrationReply)) return false;
+        BrokerRegistrationReply other = (BrokerRegistrationReply) o;
+        return other.epoch == epoch;
+    }
+
+    @Override
+    public String toString() {
+        return "BrokerRegistrationReply(epoch=" + epoch + ")";
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/FeatureMap.java b/metadata/src/main/java/org/apache/kafka/metadata/FeatureMap.java
new file mode 100644
index 0000000..272c87d
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/FeatureMap.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+
+/**
+ * A map of feature names to their supported versions.
+ */
+public class FeatureMap {
+    private final Map<String, VersionRange> features;
+
+    public FeatureMap(Map<String, VersionRange> features) {
+        this.features = Collections.unmodifiableMap(new HashMap<>(features));
+    }
+
+    public Optional<VersionRange> get(String name) {
+        return Optional.ofNullable(features.get(name));
+    }
+
+    public Map<String, VersionRange> features() {
+        return features;
+    }
+
+    @Override
+    public int hashCode() {
+        return features.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof FeatureMap)) return false;
+        FeatureMap other = (FeatureMap) o;
+        return features.equals(other.features);
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder bld = new StringBuilder();
+        bld.append("{");
+        bld.append(features.keySet().stream().sorted().
+            map(k -> k + ": " + features.get(k)).
+            collect(Collectors.joining(", ")));
+        bld.append("}");
+        return bld.toString();
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/FeatureMapAndEpoch.java b/metadata/src/main/java/org/apache/kafka/metadata/FeatureMapAndEpoch.java
new file mode 100644
index 0000000..26096ea
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/FeatureMapAndEpoch.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import java.util.Objects;
+
+
+/**
+ * A map of feature names to their supported versions.
+ */
+public class FeatureMapAndEpoch {
+    private final FeatureMap map;
+    private final long epoch;
+
+    public FeatureMapAndEpoch(FeatureMap map, long epoch) {
+        this.map = map;
+        this.epoch = epoch;
+    }
+
+    public FeatureMap map() {
+        return map;
+    }
+
+    public long epoch() {
+        return epoch;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(map, epoch);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof FeatureMapAndEpoch)) return false;
+        FeatureMapAndEpoch other = (FeatureMapAndEpoch) o;
+        return map.equals(other.map) && epoch == other.epoch;
+    }
+
+    @Override
+    public String toString() {
+        StringBuilder bld = new StringBuilder();
+        bld.append("{");
+        bld.append("map=").append(map.toString());
+        bld.append(", epoch=").append(epoch);
+        bld.append("}");
+        return bld.toString();
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogLeader.java b/metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogLeader.java
new file mode 100644
index 0000000..2bf4f7c
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogLeader.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import java.util.Objects;
+
+/**
+ * The current leader of the MetaLog.
+ */
+public class MetaLogLeader {
+    private final int nodeId;
+    private final long epoch;
+
+    public MetaLogLeader(int nodeId, long epoch) {
+        this.nodeId = nodeId;
+        this.epoch = epoch;
+    }
+
+    public int nodeId() {
+        return nodeId;
+    }
+
+    public long epoch() {
+        return epoch;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (!(o instanceof MetaLogLeader)) return false;
+        MetaLogLeader other = (MetaLogLeader) o;
+        return other.nodeId == nodeId && other.epoch == epoch;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(nodeId, epoch);
+    }
+
+    @Override
+    public String toString() {
+        return "MetaLogLeader(nodeId=" + nodeId + ", epoch=" + epoch + ")";
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogListener.java b/metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogListener.java
new file mode 100644
index 0000000..9374420
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogListener.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+
+import java.util.List;
+
+/**
+ * Listeners receive notifications from the MetaLogManager.
+ */
+public interface MetaLogListener {
+    /**
+     * Called when the MetaLogManager commits some messages.
+     *
+     * @param lastOffset    The last offset found in all the given messages.
+     * @param messages      The messages.
+     */
+    void handleCommits(long lastOffset, List<ApiMessage> messages);
+
+    /**
+     * Called when a new leader is elected.
+     *
+     * @param leader        The new leader id and epoch.
+     */
+    default void handleNewLeader(MetaLogLeader leader) {}
+
+    /**
+     * Called when the MetaLogManager has renounced the leadership.
+     *
+     * @param epoch         The controller epoch that has ended.
+     */
+    default void handleRenounce(long epoch) {}
+
+    /**
+     * Called when the MetaLogManager has finished shutting down, and wants to tell its
+     * listener that it is safe to shut down as well.
+     */
+    default void beginShutdown() {}
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogManager.java b/metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogManager.java
new file mode 100644
index 0000000..67a6ca5
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogManager.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+
+import java.util.List;
+
+/**
+ * The MetaLogManager handles storing metadata and electing leaders.
+ */
+public interface MetaLogManager {
+
+    /**
+     * Start this meta log manager.
+     * The manager must be ready to accept incoming calls after this function returns.
+     * It is an error to initialize a MetaLogManager more than once.
+     */
+    void initialize() throws Exception;
+
+    /**
+     * Register the listener.  The manager must be initialized already.
+     * The listener must be ready to accept incoming calls immediately.
+     *
+     * @param listener      The listener to register.
+     */
+    void register(MetaLogListener listener) throws Exception;
+
+    /**
+     * Schedule a write to the log.
+     *
+     * The write will be scheduled to happen at some time in the future.  There is no
+     * error return or exception thrown if the write fails.  Instead, the listener may
+     * regard the write as successful if and only if the MetaLogManager reaches the given
+     * offset before renouncing its leadership.  The listener should determine this by
+     * monitoring the committed offsets.
+     *
+     * @param epoch         The controller epoch.
+     * @param batch         The batch of messages to write.
+     *
+     * @return              The offset of the message.
+     */
+    long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch);
+
+    /**
+     * Renounce the leadership.
+     *
+     * @param epoch         The epoch.  If this does not match the current epoch, this
+     *                      call will be ignored.
+     */
+    void renounce(long epoch);
+
+    /**
+     * Returns the current leader.  The active node may change immediately after this
+     * function is called, of course.
+     */
+    MetaLogLeader leader();
+
+    /**
+     * Returns the node id.
+     */
+    int nodeId();
+
+}
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
new file mode 100644
index 0000000..7a01c37
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Timeout(value = 40)
+public class BrokerRegistrationTest {
+    private static final List<BrokerRegistration> REGISTRATIONS = Arrays.asList(
+        new BrokerRegistration(0, 0, Uuid.fromString("pc1GhUlBS92cGGaKXl6ipw"),
+            Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9090)),
+            Collections.singletonMap("foo", new VersionRange((short) 1, (short) 2)),
+            Optional.empty(), false),
+        new BrokerRegistration(1, 0, Uuid.fromString("3MfdxWlNSn2UDYsmDP1pYg"),
+            Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9091)),
+            Collections.singletonMap("foo", new VersionRange((short) 1, (short) 2)),
+            Optional.empty(), false),
+        new BrokerRegistration(2, 0, Uuid.fromString("eY7oaG1RREie5Kk9uy1l6g"),
+            Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9092)),
+            Collections.singletonMap("foo", new VersionRange((short) 2, (short) 3)),
+            Optional.empty(), false));
+
+    @Test
+    public void testValues() {
+        assertEquals(0, REGISTRATIONS.get(0).id());
+        assertEquals(1, REGISTRATIONS.get(1).id());
+        assertEquals(2, REGISTRATIONS.get(2).id());
+    }
+
+    @Test
+    public void testEquals() {
+        assertFalse(REGISTRATIONS.get(0).equals(REGISTRATIONS.get(1)));
+        assertFalse(REGISTRATIONS.get(1).equals(REGISTRATIONS.get(0)));
+        assertFalse(REGISTRATIONS.get(0).equals(REGISTRATIONS.get(2)));
+        assertFalse(REGISTRATIONS.get(2).equals(REGISTRATIONS.get(0)));
+        assertTrue(REGISTRATIONS.get(0).equals(REGISTRATIONS.get(0)));
+        assertTrue(REGISTRATIONS.get(1).equals(REGISTRATIONS.get(1)));
+        assertTrue(REGISTRATIONS.get(2).equals(REGISTRATIONS.get(2)));
+    }
+
+    @Test
+    public void testToString() {
+        assertEquals("BrokerRegistration(id=1, epoch=0, " +
+            "incarnationId=3MfdxWlNSn2UDYsmDP1pYg, listeners=[Endpoint(" +
+            "listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
+            "host='localhost', port=9091)], supportedFeatures={foo: 1-2}, " +
+            "rack=Optional.empty, fenced=false)",
+            REGISTRATIONS.get(1).toString());
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
new file mode 100644
index 0000000..ef85314
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+/**
+ * The LocalLogManager is a test implementation that relies on the contents of memory.
+ */
+public final class LocalLogManager implements MetaLogManager, AutoCloseable {
+    interface LocalBatch {
+        int size();
+    }
+
+    static class LeaderChangeBatch implements LocalBatch {
+        private final MetaLogLeader newLeader;
+
+        LeaderChangeBatch(MetaLogLeader newLeader) {
+            this.newLeader = newLeader;
+        }
+
+        @Override
+        public int size() {
+            return 1;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof LeaderChangeBatch)) return false;
+            LeaderChangeBatch other = (LeaderChangeBatch) o;
+            if (!other.newLeader.equals(newLeader)) return false;
+            return true;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(newLeader);
+        }
+
+        @Override
+        public String toString() {
+            return "LeaderChangeBatch(newLeader=" + newLeader + ")";
+        }
+    }
+
+    static class LocalRecordBatch implements LocalBatch {
+        private final List<ApiMessage> records;
+
+        LocalRecordBatch(List<ApiMessage> records) {
+            this.records = records;
+        }
+
+        @Override
+        public int size() {
+            return records.size();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (!(o instanceof LocalRecordBatch)) return false;
+            LocalRecordBatch other = (LocalRecordBatch) o;
+            if (!other.records.equals(records)) return false;
+            return true;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(records);
+        }
+
+        @Override
+        public String toString() {
+            return "LocalRecordBatch(records=" + records + ")";
+        }
+    }
+
+    public static class SharedLogData {
+        private final Logger log = LoggerFactory.getLogger(SharedLogData.class);
+        private final HashMap<Integer, LocalLogManager> logManagers = new HashMap<>();
+        private final TreeMap<Long, LocalBatch> batches = new TreeMap<>();
+        private MetaLogLeader leader = new MetaLogLeader(-1, -1);
+        private long prevOffset = -1;
+
+        synchronized void registerLogManager(LocalLogManager logManager) {
+            if (logManagers.put(logManager.nodeId(), logManager) != null) {
+                throw new RuntimeException("Can't have multiple LocalLogManagers " +
+                    "with id " + logManager.nodeId());
+            }
+            electLeaderIfNeeded();
+        }
+
+        synchronized void unregisterLogManager(LocalLogManager logManager) {
+            if (!logManagers.remove(logManager.nodeId(), logManager)) {
+                throw new RuntimeException("Log manager " + logManager.nodeId() +
+                    " was not found.");
+            }
+        }
+
+        synchronized long tryAppend(int nodeId, long epoch, LocalBatch batch) {
+            if (epoch != leader.epoch()) {
+                log.trace("tryAppend(nodeId={}, epoch={}): the provided epoch does not " +
+                    "match the current leader epoch of {}.", nodeId, epoch, leader.epoch());
+                return Long.MAX_VALUE;
+            }
+            if (nodeId != leader.nodeId()) {
+                log.trace("tryAppend(nodeId={}, epoch={}): the given node id does not " +
+                    "match the current leader id of {}.", nodeId, leader.nodeId());
+                return Long.MAX_VALUE;
+            }
+            log.trace("tryAppend(nodeId={}): appending {}.", nodeId, batch);
+            long offset = append(batch);
+            electLeaderIfNeeded();
+            return offset;
+        }
+
+        synchronized long append(LocalBatch batch) {
+            prevOffset += batch.size();
+            log.debug("append(batch={}, prevOffset={})", batch, prevOffset);
+            batches.put(prevOffset, batch);
+            if (batch instanceof LeaderChangeBatch) {
+                LeaderChangeBatch leaderChangeBatch = (LeaderChangeBatch) batch;
+                leader = leaderChangeBatch.newLeader;
+            }
+            for (LocalLogManager logManager : logManagers.values()) {
+                logManager.scheduleLogCheck();
+            }
+            return prevOffset;
+        }
+
+        synchronized void electLeaderIfNeeded() {
+            if (leader.nodeId() != -1 || logManagers.isEmpty()) {
+                return;
+            }
+            int nextLeaderIndex = ThreadLocalRandom.current().nextInt(logManagers.size());
+            Iterator<Integer> iter = logManagers.keySet().iterator();
+            Integer nextLeaderNode = null;
+            for (int i = 0; i <= nextLeaderIndex; i++) {
+                nextLeaderNode = iter.next();
+            }
+            MetaLogLeader newLeader = new MetaLogLeader(nextLeaderNode, leader.epoch() + 1);
+            log.info("Elected new leader: {}.", newLeader);
+            append(new LeaderChangeBatch(newLeader));
+        }
+
+        synchronized Entry<Long, LocalBatch> nextBatch(long offset) {
+            Entry<Long, LocalBatch> entry = batches.higherEntry(offset);
+            if (entry == null) {
+                return null;
+            }
+            return new SimpleImmutableEntry<>(entry.getKey(), entry.getValue());
+        }
+    }
+
+    private static class MetaLogListenerData {
+        private long offset = -1;
+        private final MetaLogListener listener;
+
+        MetaLogListenerData(MetaLogListener listener) {
+            this.listener = listener;
+        }
+    }
+
+    private final Logger log;
+
+    private final int nodeId;
+
+    private final SharedLogData shared;
+
+    private final EventQueue eventQueue;
+
+    private boolean initialized = false;
+
+    private boolean shutdown = false;
+
+    private long maxReadOffset = Long.MAX_VALUE;
+
+    private final List<MetaLogListenerData> listeners = new ArrayList<>();
+
+    private volatile MetaLogLeader leader = new MetaLogLeader(-1, -1);
+
+    public LocalLogManager(LogContext logContext,
+                           int nodeId,
+                           SharedLogData shared,
+                           String threadNamePrefix) {
+        this.log = logContext.logger(LocalLogManager.class);
+        this.nodeId = nodeId;
+        this.shared = shared;
+        this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, threadNamePrefix);
+        shared.registerLogManager(this);
+    }
+
+    private void scheduleLogCheck() {
+        eventQueue.append(() -> {
+            try {
+                log.debug("Node {}: running log check.", nodeId);
+                int numEntriesFound = 0;
+                for (MetaLogListenerData listenerData : listeners) {
+                    while (true) {
+                        Entry<Long, LocalBatch> entry = shared.nextBatch(listenerData.offset);
+                        if (entry == null) {
+                            log.trace("Node {}: reached the end of the log after finding " +
+                                "{} entries.", nodeId, numEntriesFound);
+                            break;
+                        }
+                        long entryOffset = entry.getKey();
+                        if (entryOffset > maxReadOffset) {
+                            log.trace("Node {}: after {} entries, not reading the next " +
+                                "entry because its offset is {}, and maxReadOffset is {}.",
+                                nodeId, numEntriesFound, entryOffset, maxReadOffset);
+                            break;
+                        }
+                        if (entry.getValue() instanceof LeaderChangeBatch) {
+                            LeaderChangeBatch batch = (LeaderChangeBatch) entry.getValue();
+                            log.trace("Node {}: handling LeaderChange to {}.",
+                                nodeId, batch.newLeader);
+                            listenerData.listener.handleNewLeader(batch.newLeader);
+                            if (batch.newLeader.epoch() > leader.epoch()) {
+                                leader = batch.newLeader;
+                            }
+                        } else if (entry.getValue() instanceof LocalRecordBatch) {
+                            LocalRecordBatch batch = (LocalRecordBatch) entry.getValue();
+                            log.trace("Node {}: handling LocalRecordBatch with offset {}.",
+                                nodeId, entryOffset);
+                            listenerData.listener.handleCommits(entryOffset, batch.records);
+                        }
+                        numEntriesFound++;
+                        listenerData.offset = entryOffset;
+                    }
+                }
+                log.trace("Completed log check for node " + nodeId);
+            } catch (Exception e) {
+                log.error("Exception while handling log check", e);
+            }
+        });
+    }
+
+    public void beginShutdown() {
+        eventQueue.beginShutdown("beginShutdown", () -> {
+            try {
+                if (initialized && !shutdown) {
+                    log.debug("Node {}: beginning shutdown.", nodeId);
+                    renounce(leader.epoch());
+                    for (MetaLogListenerData listenerData : listeners) {
+                        listenerData.listener.beginShutdown();
+                    }
+                    shared.unregisterLogManager(this);
+                }
+            } catch (Exception e) {
+                log.error("Unexpected exception while sending beginShutdown callbacks", e);
+            }
+            shutdown = true;
+        });
+    }
+
+    @Override
+    public void close() throws InterruptedException {
+        log.debug("Node {}: closing.", nodeId);
+        beginShutdown();
+        eventQueue.close();
+    }
+
+    @Override
+    public void initialize() throws Exception {
+        eventQueue.append(() -> {
+            log.debug("initialized local log manager for node " + nodeId);
+            initialized = true;
+        });
+    }
+
+    @Override
+    public void register(MetaLogListener listener) throws Exception {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        eventQueue.append(() -> {
+            if (shutdown) {
+                log.info("Node {}: can't register because local log manager has " +
+                    "already been shut down.", nodeId);
+                future.complete(null);
+            } else if (initialized) {
+                log.info("Node {}: registered MetaLogListener.", nodeId);
+                listeners.add(new MetaLogListenerData(listener));
+                shared.electLeaderIfNeeded();
+                scheduleLogCheck();
+                future.complete(null);
+            } else {
+                log.info("Node {}: can't register because local log manager has not " +
+                    "been initialized.", nodeId);
+                future.completeExceptionally(new RuntimeException(
+                    "LocalLogManager was not initialized."));
+            }
+        });
+        future.get();
+    }
+
+    @Override
+    public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
+        return shared.tryAppend(nodeId, leader.epoch(), new LocalRecordBatch(
+            batch.stream().map(r -> r.message()).collect(Collectors.toList())));
+    }
+
+    @Override
+    public void renounce(long epoch) {
+        MetaLogLeader curLeader = leader;
+        MetaLogLeader nextLeader = new MetaLogLeader(-1, curLeader.epoch() + 1);
+        shared.tryAppend(nodeId, curLeader.epoch(), new LeaderChangeBatch(nextLeader));
+    }
+
+    @Override
+    public MetaLogLeader leader() {
+        return leader;
+    }
+
+    @Override
+    public int nodeId() {
+        return nodeId;
+    }
+
+    public List<MetaLogListener> listeners() {
+        final CompletableFuture<List<MetaLogListener>> future = new CompletableFuture<>();
+        eventQueue.append(() -> {
+            future.complete(listeners.stream().map(l -> l.listener).collect(Collectors.toList()));
+        });
+        try {
+            return future.get();
+        } catch (ExecutionException | InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void setMaxReadOffset(long maxReadOffset) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        eventQueue.append(() -> {
+            log.trace("Node {}: set maxReadOffset to {}.", nodeId, maxReadOffset);
+            this.maxReadOffset = maxReadOffset;
+            scheduleLogCheck();
+            future.complete(null);
+        });
+        try {
+            future.get();
+        } catch (ExecutionException | InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/metalog/LocalLogManagerTest.java b/metadata/src/test/java/org/apache/kafka/metalog/metalog/LocalLogManagerTest.java
new file mode 100644
index 0000000..9d4eb8b
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metalog/metalog/LocalLogManagerTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.metalog.MockMetaLogManagerListener.COMMIT;
+import static org.apache.kafka.metalog.MockMetaLogManagerListener.LAST_COMMITTED_OFFSET;
+import static org.apache.kafka.metalog.MockMetaLogManagerListener.SHUTDOWN;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Timeout(value = 40)
+public class LocalLogManagerTest {
+    private static final Logger log = LoggerFactory.getLogger(LocalLogManagerTest.class);
+
+    /**
+     * Test creating a LocalLogManager and closing it.
+     */
+    @Test
+    public void testCreateAndClose() throws Exception {
+        try (LocalLogManagerTestEnv env =
+                 LocalLogManagerTestEnv.createWithMockListeners(1)) {
+            env.close();
+            assertEquals(null, env.firstError.get());
+        }
+    }
+
+    /**
+     * Test that the local log maanger will claim leadership.
+     */
+    @Test
+    public void testClaimsLeadership() throws Exception {
+        try (LocalLogManagerTestEnv env =
+                 LocalLogManagerTestEnv.createWithMockListeners(1)) {
+            assertEquals(new MetaLogLeader(0, 0), env.waitForLeader());
+            env.close();
+            assertEquals(null, env.firstError.get());
+        }
+    }
+
+    /**
+     * Test that we can pass leadership back and forth between log managers.
+     */
+    @Test
+    public void testPassLeadership() throws Exception {
+        try (LocalLogManagerTestEnv env =
+                 LocalLogManagerTestEnv.createWithMockListeners(3)) {
+            MetaLogLeader first = env.waitForLeader();
+            MetaLogLeader cur = first;
+            do {
+                env.logManagers().get(cur.nodeId()).renounce(cur.epoch());
+                MetaLogLeader next = env.waitForLeader();
+                while (next.epoch() == cur.epoch()) {
+                    Thread.sleep(1);
+                    next = env.waitForLeader();
+                }
+                long expectedNextEpoch = cur.epoch() + 2;
+                assertEquals(expectedNextEpoch, next.epoch(), "Expected next epoch to be " + expectedNextEpoch +
+                    ", but found  " + next);
+                cur = next;
+            } while (cur.nodeId() == first.nodeId());
+            env.close();
+            assertEquals(null, env.firstError.get());
+        }
+    }
+
+    private static void waitForLastCommittedOffset(long targetOffset,
+                LocalLogManager logManager) throws InterruptedException {
+        TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> {
+            MockMetaLogManagerListener listener =
+                (MockMetaLogManagerListener) logManager.listeners().get(0);
+            long highestOffset = -1;
+            for (String event : listener.serializedEvents()) {
+                if (event.startsWith(LAST_COMMITTED_OFFSET)) {
+                    long offset = Long.valueOf(
+                        event.substring(LAST_COMMITTED_OFFSET.length() + 1));
+                    if (offset < highestOffset) {
+                        throw new RuntimeException("Invalid offset: " + offset +
+                            " is less than the previous offset of " + highestOffset);
+                    }
+                    highestOffset = offset;
+                }
+            }
+            if (highestOffset < targetOffset) {
+                throw new RuntimeException("Offset for log manager " +
+                    logManager.nodeId() + " only reached " + highestOffset);
+            }
+        });
+    }
+
+    /**
+     * Test that all the log managers see all the commits.
+     */
+    @Test
+    public void testCommits() throws Exception {
+        try (LocalLogManagerTestEnv env =
+                 LocalLogManagerTestEnv.createWithMockListeners(3)) {
+            MetaLogLeader leaderInfo = env.waitForLeader();
+            LocalLogManager activeLogManager = env.logManagers().get(leaderInfo.nodeId());
+            long epoch = activeLogManager.leader().epoch();
+            List<ApiMessageAndVersion> messages = Arrays.asList(
+                new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(0), (short) 0),
+                new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(1), (short) 0),
+                new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(2), (short) 0));
+            assertEquals(3, activeLogManager.scheduleWrite(epoch, messages));
+            for (LocalLogManager logManager : env.logManagers()) {
+                waitForLastCommittedOffset(3, logManager);
+            }
+            List<MockMetaLogManagerListener> listeners = env.logManagers().stream().
+                map(m -> (MockMetaLogManagerListener) m.listeners().get(0)).
+                collect(Collectors.toList());
+            env.close();
+            for (MockMetaLogManagerListener listener : listeners) {
+                List<String> events = listener.serializedEvents();
+                assertEquals(SHUTDOWN, events.get(events.size() - 1));
+                int foundIndex = 0;
+                for (String event : events) {
+                    if (event.startsWith(COMMIT)) {
+                        assertEquals(messages.get(foundIndex).message().toString(),
+                            event.substring(COMMIT.length() + 1));
+                        foundIndex++;
+                    }
+                }
+                assertEquals(messages.size(), foundIndex);
+            }
+        }
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/metalog/LocalLogManagerTestEnv.java b/metadata/src/test/java/org/apache/kafka/metalog/metalog/LocalLogManagerTestEnv.java
new file mode 100644
index 0000000..52aeea0
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metalog/metalog/LocalLogManagerTestEnv.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metalog.LocalLogManager.SharedLogData;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class LocalLogManagerTestEnv implements AutoCloseable {
+    private static final Logger log =
+        LoggerFactory.getLogger(LocalLogManagerTestEnv.class);
+
+    /**
+     * The first error we encountered during this test, or the empty string if we have
+     * not encountered any.
+     */
+    final AtomicReference<String> firstError = new AtomicReference<>(null);
+
+    /**
+     * The test directory, which we will delete once the test is over.
+     */
+    private final File dir;
+
+    /**
+     * The shared data for our LocalLogManager instances.
+     */
+    private final SharedLogData shared;
+
+    /**
+     * A list of log managers.
+     */
+    private final List<LocalLogManager> logManagers;
+
+    public static LocalLogManagerTestEnv createWithMockListeners(int numManagers) throws Exception {
+        LocalLogManagerTestEnv testEnv = new LocalLogManagerTestEnv(numManagers);
+        try {
+            for (LocalLogManager logManager : testEnv.logManagers) {
+                logManager.register(new MockMetaLogManagerListener());
+            }
+        } catch (Exception e) {
+            testEnv.close();
+            throw e;
+        }
+        return testEnv;
+    }
+
+    public LocalLogManagerTestEnv(int numManagers) throws Exception {
+        dir = TestUtils.tempDirectory();
+        shared = new SharedLogData();
+        List<LocalLogManager> newLogManagers = new ArrayList<>(numManagers);
+        try {
+            for (int nodeId = 0; nodeId < numManagers; nodeId++) {
+                newLogManagers.add(new LocalLogManager(
+                    new LogContext(String.format("[LocalLogManager %d] ", nodeId)),
+                    nodeId,
+                    shared,
+                    String.format("LocalLogManager-%d_", nodeId)));
+            }
+            for (LocalLogManager logManager : newLogManagers) {
+                logManager.initialize();
+            }
+        } catch (Throwable t) {
+            for (LocalLogManager logManager : newLogManagers) {
+                logManager.close();
+            }
+            throw t;
+        }
+        this.logManagers = newLogManagers;
+    }
+
+    AtomicReference<String> firstError() {
+        return firstError;
+    }
+
+    File dir() {
+        return dir;
+    }
+
+    MetaLogLeader waitForLeader() throws InterruptedException {
+        AtomicReference<MetaLogLeader> value = new AtomicReference<>(null);
+        TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> {
+            MetaLogLeader result = null;
+            for (LocalLogManager logManager : logManagers) {
+                MetaLogLeader leader = logManager.leader();
+                if (leader.nodeId() == logManager.nodeId()) {
+                    if (result != null) {
+                        throw new RuntimeException("node " + leader.nodeId() +
+                            " thinks it's the leader, but so does " + result.nodeId());
+                    }
+                    result = leader;
+                }
+            }
+            if (result == null) {
+                throw new RuntimeException("No leader found.");
+            }
+            value.set(result);
+        });
+        return value.get();
+    }
+
+    public List<LocalLogManager> logManagers() {
+        return logManagers;
+    }
+
+    @Override
+    public void close() throws InterruptedException {
+        try {
+            for (LocalLogManager logManager : logManagers) {
+                logManager.beginShutdown();
+            }
+            for (LocalLogManager logManager : logManagers) {
+                logManager.close();
+            }
+            Utils.delete(dir);
+        } catch (IOException e) {
+            log.error("Error deleting {}", dir.getAbsolutePath(), e);
+        }
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/metalog/MockMetaLogManagerListener.java b/metadata/src/test/java/org/apache/kafka/metalog/metalog/MockMetaLogManagerListener.java
new file mode 100644
index 0000000..fe61ec0
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metalog/metalog/MockMetaLogManagerListener.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MockMetaLogManagerListener implements MetaLogListener {
+    public static final String COMMIT = "COMMIT";
+    public static final String LAST_COMMITTED_OFFSET = "LAST_COMMITTED_OFFSET";
+    public static final String NEW_LEADER = "NEW_LEADER";
+    public static final String RENOUNCE = "RENOUNCE";
+    public static final String SHUTDOWN = "SHUTDOWN";
+
+    private final List<String> serializedEvents = new ArrayList<>();
+
+    @Override
+    public synchronized void handleCommits(long lastCommittedOffset, List<ApiMessage> messages) {
+        for (ApiMessage message : messages) {
+            StringBuilder bld = new StringBuilder();
+            bld.append(COMMIT).append(" ").append(message.toString());
+            serializedEvents.add(bld.toString());
+        }
+        StringBuilder bld = new StringBuilder();
+        bld.append(LAST_COMMITTED_OFFSET).append(" ").append(lastCommittedOffset);
+        serializedEvents.add(bld.toString());
+    }
+
+    @Override
+    public void handleNewLeader(MetaLogLeader leader) {
+        StringBuilder bld = new StringBuilder();
+        bld.append(NEW_LEADER).append(" ").
+            append(leader.nodeId()).append(" ").append(leader.epoch());
+        synchronized (this) {
+            serializedEvents.add(bld.toString());
+        }
+    }
+
+    @Override
+    public void handleRenounce(long epoch) {
+        StringBuilder bld = new StringBuilder();
+        bld.append(RENOUNCE).append(" ").append(epoch);
+        synchronized (this) {
+            serializedEvents.add(bld.toString());
+        }
+    }
+
+    @Override
+    public void beginShutdown() {
+        StringBuilder bld = new StringBuilder();
+        bld.append(SHUTDOWN);
+        synchronized (this) {
+            serializedEvents.add(bld.toString());
+        }
+    }
+
+    public synchronized List<String> serializedEvents() {
+        return new ArrayList<>(serializedEvents);
+    }
+}