You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/02/11 16:44:02 UTC
[kafka] branch trunk updated: MINOR: add the MetaLogListener,
LocalLogManager, and Controller interface. (#10106)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new bf5e1f1 MINOR: add the MetaLogListener, LocalLogManager, and Controller interface. (#10106)
bf5e1f1 is described below
commit bf5e1f1cc0d7f7b1f54c879fbb5f415d30fa9c06
Author: Colin Patrick McCabe <cm...@confluent.io>
AuthorDate: Thu Feb 11 08:42:59 2021 -0800
MINOR: add the MetaLogListener, LocalLogManager, and Controller interface. (#10106)
Add MetaLogListener, LocalLogManager, and related classes. These
classes are used by the KIP-500 controller and broker to interface with the
Raft log.
Also add the Controller interface. The implementation will be added in a separate PR.
Reviewers: Ron Dagostino <rd...@confluent.io>, David Arthur <mu...@gmail.com>
---
checkstyle/import-control.xml | 30 ++
.../org/apache/kafka/controller/Controller.java | 180 ++++++++++
.../org/apache/kafka/controller/ResultOrError.java | 84 +++++
.../kafka/metadata/BrokerHeartbeatReply.java | 80 +++++
.../apache/kafka/metadata/BrokerRegistration.java | 153 +++++++++
.../kafka/metadata/BrokerRegistrationReply.java | 50 +++
.../java/org/apache/kafka/metadata/FeatureMap.java | 67 ++++
.../apache/kafka/metadata/FeatureMapAndEpoch.java | 64 ++++
.../kafka/metalog/metalog/MetaLogLeader.java | 58 ++++
.../kafka/metalog/metalog/MetaLogListener.java | 55 +++
.../kafka/metalog/metalog/MetaLogManager.java | 79 +++++
.../kafka/metadata/BrokerRegistrationTest.java | 78 +++++
.../org/apache/kafka/metalog/LocalLogManager.java | 378 +++++++++++++++++++++
.../kafka/metalog/metalog/LocalLogManagerTest.java | 153 +++++++++
.../metalog/metalog/LocalLogManagerTestEnv.java | 143 ++++++++
.../metalog/MockMetaLogManagerListener.java | 77 +++++
16 files changed, 1729 insertions(+)
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 9cc432e..b658370 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -192,6 +192,27 @@
</subpackage>
</subpackage>
+ <subpackage name="controller">
+ <allow pkg="org.apache.kafka.clients" />
+ <allow pkg="org.apache.kafka.clients.admin" />
+ <allow pkg="org.apache.kafka.common.config" />
+ <allow pkg="org.apache.kafka.common.feature" />
+ <allow pkg="org.apache.kafka.common.internals" />
+ <allow pkg="org.apache.kafka.common.message" />
+ <allow pkg="org.apache.kafka.common.metadata" />
+ <allow pkg="org.apache.kafka.common.metrics" />
+ <allow pkg="org.apache.kafka.common.network" />
+ <allow pkg="org.apache.kafka.common.protocol" />
+ <allow pkg="org.apache.kafka.common.quota" />
+ <allow pkg="org.apache.kafka.common.requests" />
+ <allow pkg="org.apache.kafka.controller" />
+ <allow pkg="org.apache.kafka.metadata" />
+ <allow pkg="org.apache.kafka.metalog" />
+ <allow pkg="org.apache.kafka.queue" />
+ <allow pkg="org.apache.kafka.test" />
+ <allow pkg="org.apache.kafka.timeline" />
+ </subpackage>
+
<subpackage name="metadata">
<allow pkg="org.apache.kafka.clients" />
<allow pkg="org.apache.kafka.common.annotation" />
@@ -201,6 +222,15 @@
<allow pkg="org.apache.kafka.test" />
</subpackage>
+ <subpackage name="metalog">
+ <allow pkg="org.apache.kafka.common.metadata" />
+ <allow pkg="org.apache.kafka.common.protocol" />
+ <allow pkg="org.apache.kafka.metadata" />
+ <allow pkg="org.apache.kafka.metalog" />
+ <allow pkg="org.apache.kafka.queue" />
+ <allow pkg="org.apache.kafka.test" />
+ </subpackage>
+
<subpackage name="clients">
<allow pkg="org.slf4j" />
<allow pkg="org.apache.kafka.common" />
diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
new file mode 100644
index 0000000..0f6a54b
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureMapAndEpoch;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+
+public interface Controller extends AutoCloseable {
+ /**
+ * Change the in-sync replica sets for some partitions.
+ *
+ * @param request The AlterIsrRequest data.
+ *
+ * @return A future yielding the response.
+ */
+ CompletableFuture<AlterIsrResponseData> alterIsr(AlterIsrRequestData request);
+
+ /**
+ * Create a batch of topics.
+ *
+ * @param request The CreateTopicsRequest data.
+ *
+ * @return A future yielding the response.
+ */
+ CompletableFuture<CreateTopicsResponseData>
+ createTopics(CreateTopicsRequestData request);
+
+ /**
+ * Unregister a broker.
+ *
+ * @param brokerId The broker id to unregister.
+ *
+ * @return A future that is completed successfully when the broker is
+ * unregistered.
+ */
+ CompletableFuture<Void> unregisterBroker(int brokerId);
+
+ /**
+ * Describe the current configuration of various resources.
+ *
+ * @param resources A map from resources to the collection of config keys that we
+ * want to describe for each. If the collection is empty, then
+ * all configuration keys will be described.
+ *
+ * @return
+ */
+ CompletableFuture<Map<ConfigResource, ResultOrError<Map<String, String>>>>
+ describeConfigs(Map<ConfigResource, Collection<String>> resources);
+
+ /**
+ * Elect new partition leaders.
+ *
+ * @param request The request.
+ *
+ * @return A future yielding the elect leaders response.
+ */
+ CompletableFuture<ElectLeadersResponseData> electLeaders(ElectLeadersRequestData request);
+
+ /**
+ * Get the current finalized feature ranges for each feature.
+ *
+ * @return A future yielding the feature ranges.
+ */
+ CompletableFuture<FeatureMapAndEpoch> finalizedFeatures();
+
+ /**
+ * Perform some incremental configuration changes.
+ *
+ * @param configChanges The changes.
+ * @param validateOnly True if we should validate the changes but not apply them.
+ *
+ * @return A future yielding a map from partitions to error results.
+ */
+ CompletableFuture<Map<ConfigResource, ApiError>> incrementalAlterConfigs(
+ Map<ConfigResource, Map<String, Map.Entry<AlterConfigOp.OpType, String>>> configChanges,
+ boolean validateOnly);
+
+ /**
+ * Perform some configuration changes using the legacy API.
+ *
+ * @param newConfigs The new configuration maps to apply.
+ * @param validateOnly True if we should validate the changes but not apply them.
+ *
+ * @return A future yielding a map from partitions to error results.
+ */
+ CompletableFuture<Map<ConfigResource, ApiError>> legacyAlterConfigs(
+ Map<ConfigResource, Map<String, String>> newConfigs, boolean validateOnly);
+
+ /**
+ * Process a heartbeat from a broker.
+ *
+ * @param request The broker heartbeat request.
+ *
+ * @return A future yielding a heartbeat reply.
+ */
+ CompletableFuture<BrokerHeartbeatReply> processBrokerHeartbeat(
+ BrokerHeartbeatRequestData request);
+
+ /**
+ * Attempt to register the given broker.
+ *
+ * @param request The registration request.
+ *
+ * @return A future yielding a registration reply.
+ */
+ CompletableFuture<BrokerRegistrationReply> registerBroker(
+ BrokerRegistrationRequestData request);
+
+ /**
+ * Wait for the given number of brokers to be registered and unfenced.
+ * This is for testing.
+ *
+ * @param minBrokers The minimum number of brokers to wait for.
+ * @return A future which is completed when the given number of brokers
+ * is reached.
+ */
+ CompletableFuture<Void> waitForReadyBrokers(int minBrokers);
+
+ /**
+ * Perform some client quota changes
+ *
+ * @param quotaAlterations The list of quotas to alter
+ * @param validateOnly True if we should validate the changes but not apply them.
+ * @return A future yielding a map of quota entities to error results.
+ */
+ CompletableFuture<Map<ClientQuotaEntity, ApiError>> alterClientQuotas(
+ Collection<ClientQuotaAlteration> quotaAlterations, boolean validateOnly
+ );
+
+ /**
+ * Begin shutting down, but don't block. You must still call close to clean up all
+ * resources.
+ */
+ void beginShutdown();
+
+ /**
+ * If this controller is active, this is the non-negative controller epoch.
+ * Otherwise, this is -1.
+ */
+ long curClaimEpoch();
+
+ /**
+ * Blocks until we have shut down and freed all resources.
+ */
+ void close() throws InterruptedException;
+}
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java b/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java
new file mode 100644
index 0000000..82e2b49
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+
+import java.util.Objects;
+
+
+class ResultOrError<T> {
+ private final ApiError error;
+ private final T result;
+
+ public ResultOrError(Errors error, String message) {
+ this(new ApiError(error, message));
+ }
+
+ public ResultOrError(ApiError error) {
+ Objects.requireNonNull(error);
+ this.error = error;
+ this.result = null;
+ }
+
+ public ResultOrError(T result) {
+ this.error = null;
+ this.result = result;
+ }
+
+ public boolean isError() {
+ return error != null;
+ }
+
+ public boolean isResult() {
+ return error == null;
+ }
+
+ public ApiError error() {
+ return error;
+ }
+
+ public T result() {
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (o == null || (!o.getClass().equals(getClass()))) {
+ return false;
+ }
+ ResultOrError other = (ResultOrError) o;
+ return error.equals(other.error) &&
+ Objects.equals(result, other.result);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(error, result);
+ }
+
+ @Override
+ public String toString() {
+ if (error.isSuccess()) {
+ return "ResultOrError(" + result + ")";
+ } else {
+ return "ResultOrError(" + error + ")";
+ }
+ }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/BrokerHeartbeatReply.java b/metadata/src/main/java/org/apache/kafka/metadata/BrokerHeartbeatReply.java
new file mode 100644
index 0000000..5ab2a52
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerHeartbeatReply.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import java.util.Objects;
+
+
+public class BrokerHeartbeatReply {
+ /**
+ * True if the heartbeat reply should tell the broker that it has caught up.
+ */
+ private final boolean isCaughtUp;
+
+ /**
+ * True if the heartbeat reply should tell the broker that it is fenced.
+ */
+ private final boolean isFenced;
+
+ /**
+ * True if the heartbeat reply should tell the broker that it should shut down.
+ */
+ private final boolean shouldShutDown;
+
+ public BrokerHeartbeatReply(boolean isCaughtUp,
+ boolean isFenced,
+ boolean shouldShutDown) {
+ this.isCaughtUp = isCaughtUp;
+ this.isFenced = isFenced;
+ this.shouldShutDown = shouldShutDown;
+ }
+
+ public boolean isCaughtUp() {
+ return isCaughtUp;
+ }
+
+ public boolean isFenced() {
+ return isFenced;
+ }
+
+ public boolean shouldShutDown() {
+ return shouldShutDown;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(isCaughtUp, isFenced, shouldShutDown);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof BrokerHeartbeatReply)) return false;
+ BrokerHeartbeatReply other = (BrokerHeartbeatReply) o;
+ return other.isCaughtUp == isCaughtUp &&
+ other.isFenced == isFenced &&
+ other.shouldShutDown == shouldShutDown;
+ }
+
+ @Override
+ public String toString() {
+ return "BrokerHeartbeatReply(isCaughtUp=" + isCaughtUp +
+ ", isFenced=" + isFenced +
+ ", shouldShutDown = " + shouldShutDown +
+ ")";
+ }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
new file mode 100644
index 0000000..c2be061
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.Uuid;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * An immutable class which represents broker registrations.
+ */
+public class BrokerRegistration {
+ private final int id;
+ private final long epoch;
+ private final Uuid incarnationId;
+ private final Map<String, Endpoint> listeners;
+ private final Map<String, VersionRange> supportedFeatures;
+ private final Optional<String> rack;
+ private final boolean fenced;
+
+ public BrokerRegistration(int id,
+ long epoch,
+ Uuid incarnationId,
+ List<Endpoint> listeners,
+ Map<String, VersionRange> supportedFeatures,
+ Optional<String> rack,
+ boolean fenced) {
+ this.id = id;
+ this.epoch = epoch;
+ this.incarnationId = incarnationId;
+ Map<String, Endpoint> listenersMap = new HashMap<>();
+ for (Endpoint endpoint : listeners) {
+ listenersMap.put(endpoint.listenerName().get(), endpoint);
+ }
+ this.listeners = Collections.unmodifiableMap(listenersMap);
+ Objects.requireNonNull(supportedFeatures);
+ this.supportedFeatures = new HashMap<>(supportedFeatures);
+ Objects.requireNonNull(rack);
+ this.rack = rack;
+ this.fenced = fenced;
+ }
+
+ public BrokerRegistration(int id,
+ long epoch,
+ Uuid incarnationId,
+ Map<String, Endpoint> listeners,
+ Map<String, VersionRange> supportedFeatures,
+ Optional<String> rack,
+ boolean fenced) {
+ this.id = id;
+ this.epoch = epoch;
+ this.incarnationId = incarnationId;
+ this.listeners = new HashMap<>(listeners);
+ this.supportedFeatures = new HashMap<>(supportedFeatures);
+ this.rack = rack;
+ this.fenced = fenced;
+ }
+
+ public int id() {
+ return id;
+ }
+
+ public long epoch() {
+ return epoch;
+ }
+
+ public Uuid incarnationId() {
+ return incarnationId;
+ }
+
+ public Map<String, Endpoint> listeners() {
+ return listeners;
+ }
+
+ public Map<String, VersionRange> supportedFeatures() {
+ return supportedFeatures;
+ }
+
+ public Optional<String> rack() {
+ return rack;
+ }
+
+ public boolean fenced() {
+ return fenced;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, epoch, incarnationId, listeners, supportedFeatures,
+ rack, fenced);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof BrokerRegistration)) return false;
+ BrokerRegistration other = (BrokerRegistration) o;
+ return other.id == id &&
+ other.epoch == epoch &&
+ other.incarnationId.equals(incarnationId) &&
+ other.listeners.equals(listeners) &&
+ other.supportedFeatures.equals(supportedFeatures) &&
+ other.rack.equals(rack) &&
+ other.fenced == fenced;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder bld = new StringBuilder();
+ bld.append("BrokerRegistration(id=").append(id);
+ bld.append(", epoch=").append(epoch);
+ bld.append(", incarnationId=").append(incarnationId);
+ bld.append(", listeners=[").append(
+ listeners.keySet().stream().sorted().
+ map(n -> listeners.get(n).toString()).
+ collect(Collectors.joining(", ")));
+ bld.append("], supportedFeatures={").append(
+ supportedFeatures.entrySet().stream().sorted().
+ map(e -> e.getKey() + ": " + e.getValue()).
+ collect(Collectors.joining(", ")));
+ bld.append("}");
+ bld.append(", rack=").append(rack);
+ bld.append(", fenced=").append(fenced);
+ bld.append(")");
+ return bld.toString();
+ }
+
+ public BrokerRegistration cloneWithFencing(boolean fencing) {
+ return new BrokerRegistration(id, epoch, incarnationId, listeners,
+ supportedFeatures, rack, fencing);
+ }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationReply.java b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationReply.java
new file mode 100644
index 0000000..40678ed
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistrationReply.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import java.util.Objects;
+
+
+public class BrokerRegistrationReply {
+ private final long epoch;
+
+ public BrokerRegistrationReply(long epoch) {
+ this.epoch = epoch;
+ }
+
+ public long epoch() {
+ return epoch;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(epoch);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof BrokerRegistrationReply)) return false;
+ BrokerRegistrationReply other = (BrokerRegistrationReply) o;
+ return other.epoch == epoch;
+ }
+
+ @Override
+ public String toString() {
+ return "BrokerRegistrationReply(epoch=" + epoch + ")";
+ }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/FeatureMap.java b/metadata/src/main/java/org/apache/kafka/metadata/FeatureMap.java
new file mode 100644
index 0000000..272c87d
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/FeatureMap.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+
+/**
+ * A map of feature names to their supported versions.
+ */
+public class FeatureMap {
+ private final Map<String, VersionRange> features;
+
+ public FeatureMap(Map<String, VersionRange> features) {
+ this.features = Collections.unmodifiableMap(new HashMap<>(features));
+ }
+
+ public Optional<VersionRange> get(String name) {
+ return Optional.ofNullable(features.get(name));
+ }
+
+ public Map<String, VersionRange> features() {
+ return features;
+ }
+
+ @Override
+ public int hashCode() {
+ return features.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof FeatureMap)) return false;
+ FeatureMap other = (FeatureMap) o;
+ return features.equals(other.features);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder bld = new StringBuilder();
+ bld.append("{");
+ bld.append(features.keySet().stream().sorted().
+ map(k -> k + ": " + features.get(k)).
+ collect(Collectors.joining(", ")));
+ bld.append("}");
+ return bld.toString();
+ }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/FeatureMapAndEpoch.java b/metadata/src/main/java/org/apache/kafka/metadata/FeatureMapAndEpoch.java
new file mode 100644
index 0000000..26096ea
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/FeatureMapAndEpoch.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import java.util.Objects;
+
+
+/**
+ * A map of feature names to their supported versions.
+ */
+public class FeatureMapAndEpoch {
+ private final FeatureMap map;
+ private final long epoch;
+
+ public FeatureMapAndEpoch(FeatureMap map, long epoch) {
+ this.map = map;
+ this.epoch = epoch;
+ }
+
+ public FeatureMap map() {
+ return map;
+ }
+
+ public long epoch() {
+ return epoch;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(map, epoch);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof FeatureMapAndEpoch)) return false;
+ FeatureMapAndEpoch other = (FeatureMapAndEpoch) o;
+ return map.equals(other.map) && epoch == other.epoch;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder bld = new StringBuilder();
+ bld.append("{");
+ bld.append("map=").append(map.toString());
+ bld.append(", epoch=").append(epoch);
+ bld.append("}");
+ return bld.toString();
+ }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogLeader.java b/metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogLeader.java
new file mode 100644
index 0000000..2bf4f7c
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogLeader.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import java.util.Objects;
+
+/**
+ * The current leader of the MetaLog.
+ */
+public class MetaLogLeader {
+ private final int nodeId;
+ private final long epoch;
+
+ public MetaLogLeader(int nodeId, long epoch) {
+ this.nodeId = nodeId;
+ this.epoch = epoch;
+ }
+
+ public int nodeId() {
+ return nodeId;
+ }
+
+ public long epoch() {
+ return epoch;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof MetaLogLeader)) return false;
+ MetaLogLeader other = (MetaLogLeader) o;
+ return other.nodeId == nodeId && other.epoch == epoch;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(nodeId, epoch);
+ }
+
+ @Override
+ public String toString() {
+ return "MetaLogLeader(nodeId=" + nodeId + ", epoch=" + epoch + ")";
+ }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogListener.java b/metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogListener.java
new file mode 100644
index 0000000..9374420
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogListener.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+
+import java.util.List;
+
+/**
+ * Listeners receive notifications from the MetaLogManager.
+ */
+public interface MetaLogListener {
+ /**
+ * Called when the MetaLogManager commits some messages.
+ *
+ * @param lastOffset The last offset found in all the given messages.
+ * @param messages The messages.
+ */
+ void handleCommits(long lastOffset, List<ApiMessage> messages);
+
+ /**
+ * Called when a new leader is elected.
+ *
+ * @param leader The new leader id and epoch.
+ */
+ default void handleNewLeader(MetaLogLeader leader) {}
+
+ /**
+ * Called when the MetaLogManager has renounced the leadership.
+ *
+ * @param epoch The controller epoch that has ended.
+ */
+ default void handleRenounce(long epoch) {}
+
+ /**
+ * Called when the MetaLogManager has finished shutting down, and wants to tell its
+ * listener that it is safe to shut down as well.
+ */
+ default void beginShutdown() {}
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogManager.java b/metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogManager.java
new file mode 100644
index 0000000..67a6ca5
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogManager.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+
+import java.util.List;
+
+/**
+ * The MetaLogManager handles storing metadata and electing leaders.
+ */
+public interface MetaLogManager {
+
+ /**
+ * Start this meta log manager.
+ * The manager must be ready to accept incoming calls after this function returns.
+ * It is an error to initialize a MetaLogManager more than once.
+ */
+ void initialize() throws Exception;
+
+ /**
+ * Register the listener. The manager must be initialized already.
+ * The listener must be ready to accept incoming calls immediately.
+ *
+ * @param listener The listener to register.
+ */
+ void register(MetaLogListener listener) throws Exception;
+
+ /**
+ * Schedule a write to the log.
+ *
+ * The write will be scheduled to happen at some time in the future. There is no
+ * error return or exception thrown if the write fails. Instead, the listener may
+ * regard the write as successful if and only if the MetaLogManager reaches the given
+ * offset before renouncing its leadership. The listener should determine this by
+ * monitoring the committed offsets.
+ *
+ * @param epoch The controller epoch.
+ * @param batch The batch of messages to write.
+ *
+ * @return The offset of the message.
+ */
+ long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch);
+
+ /**
+ * Renounce the leadership.
+ *
+ * @param epoch The epoch. If this does not match the current epoch, this
+ * call will be ignored.
+ */
+ void renounce(long epoch);
+
+ /**
+ * Returns the current leader. The active node may change immediately after this
+ * function is called, of course.
+ */
+ MetaLogLeader leader();
+
+ /**
+ * Returns the node id.
+ */
+ int nodeId();
+
+}
diff --git a/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
new file mode 100644
index 0000000..7a01c37
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Timeout(value = 40)
+public class BrokerRegistrationTest {
+ private static final List<BrokerRegistration> REGISTRATIONS = Arrays.asList(
+ new BrokerRegistration(0, 0, Uuid.fromString("pc1GhUlBS92cGGaKXl6ipw"),
+ Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9090)),
+ Collections.singletonMap("foo", new VersionRange((short) 1, (short) 2)),
+ Optional.empty(), false),
+ new BrokerRegistration(1, 0, Uuid.fromString("3MfdxWlNSn2UDYsmDP1pYg"),
+ Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9091)),
+ Collections.singletonMap("foo", new VersionRange((short) 1, (short) 2)),
+ Optional.empty(), false),
+ new BrokerRegistration(2, 0, Uuid.fromString("eY7oaG1RREie5Kk9uy1l6g"),
+ Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9092)),
+ Collections.singletonMap("foo", new VersionRange((short) 2, (short) 3)),
+ Optional.empty(), false));
+
+ @Test
+ public void testValues() {
+ assertEquals(0, REGISTRATIONS.get(0).id());
+ assertEquals(1, REGISTRATIONS.get(1).id());
+ assertEquals(2, REGISTRATIONS.get(2).id());
+ }
+
+ @Test
+ public void testEquals() {
+ assertFalse(REGISTRATIONS.get(0).equals(REGISTRATIONS.get(1)));
+ assertFalse(REGISTRATIONS.get(1).equals(REGISTRATIONS.get(0)));
+ assertFalse(REGISTRATIONS.get(0).equals(REGISTRATIONS.get(2)));
+ assertFalse(REGISTRATIONS.get(2).equals(REGISTRATIONS.get(0)));
+ assertTrue(REGISTRATIONS.get(0).equals(REGISTRATIONS.get(0)));
+ assertTrue(REGISTRATIONS.get(1).equals(REGISTRATIONS.get(1)));
+ assertTrue(REGISTRATIONS.get(2).equals(REGISTRATIONS.get(2)));
+ }
+
+ @Test
+ public void testToString() {
+ assertEquals("BrokerRegistration(id=1, epoch=0, " +
+ "incarnationId=3MfdxWlNSn2UDYsmDP1pYg, listeners=[Endpoint(" +
+ "listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
+ "host='localhost', port=9091)], supportedFeatures={foo: 1-2}, " +
+ "rack=Optional.empty, fenced=false)",
+ REGISTRATIONS.get(1).toString());
+ }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
new file mode 100644
index 0000000..ef85314
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+/**
+ * The LocalLogManager is a test implementation that relies on the contents of memory.
+ */
+public final class LocalLogManager implements MetaLogManager, AutoCloseable {
+ interface LocalBatch {
+ int size();
+ }
+
+ static class LeaderChangeBatch implements LocalBatch {
+ private final MetaLogLeader newLeader;
+
+ LeaderChangeBatch(MetaLogLeader newLeader) {
+ this.newLeader = newLeader;
+ }
+
+ @Override
+ public int size() {
+ return 1;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof LeaderChangeBatch)) return false;
+ LeaderChangeBatch other = (LeaderChangeBatch) o;
+ if (!other.newLeader.equals(newLeader)) return false;
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(newLeader);
+ }
+
+ @Override
+ public String toString() {
+ return "LeaderChangeBatch(newLeader=" + newLeader + ")";
+ }
+ }
+
+ static class LocalRecordBatch implements LocalBatch {
+ private final List<ApiMessage> records;
+
+ LocalRecordBatch(List<ApiMessage> records) {
+ this.records = records;
+ }
+
+ @Override
+ public int size() {
+ return records.size();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (!(o instanceof LocalRecordBatch)) return false;
+ LocalRecordBatch other = (LocalRecordBatch) o;
+ if (!other.records.equals(records)) return false;
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(records);
+ }
+
+ @Override
+ public String toString() {
+ return "LocalRecordBatch(records=" + records + ")";
+ }
+ }
+
+ public static class SharedLogData {
+ private final Logger log = LoggerFactory.getLogger(SharedLogData.class);
+ private final HashMap<Integer, LocalLogManager> logManagers = new HashMap<>();
+ private final TreeMap<Long, LocalBatch> batches = new TreeMap<>();
+ private MetaLogLeader leader = new MetaLogLeader(-1, -1);
+ private long prevOffset = -1;
+
+ synchronized void registerLogManager(LocalLogManager logManager) {
+ if (logManagers.put(logManager.nodeId(), logManager) != null) {
+ throw new RuntimeException("Can't have multiple LocalLogManagers " +
+ "with id " + logManager.nodeId());
+ }
+ electLeaderIfNeeded();
+ }
+
+ synchronized void unregisterLogManager(LocalLogManager logManager) {
+ if (!logManagers.remove(logManager.nodeId(), logManager)) {
+ throw new RuntimeException("Log manager " + logManager.nodeId() +
+ " was not found.");
+ }
+ }
+
+ synchronized long tryAppend(int nodeId, long epoch, LocalBatch batch) {
+ if (epoch != leader.epoch()) {
+ log.trace("tryAppend(nodeId={}, epoch={}): the provided epoch does not " +
+ "match the current leader epoch of {}.", nodeId, epoch, leader.epoch());
+ return Long.MAX_VALUE;
+ }
+ if (nodeId != leader.nodeId()) {
+ log.trace("tryAppend(nodeId={}, epoch={}): the given node id does not " +
+ "match the current leader id of {}.", nodeId, leader.nodeId());
+ return Long.MAX_VALUE;
+ }
+ log.trace("tryAppend(nodeId={}): appending {}.", nodeId, batch);
+ long offset = append(batch);
+ electLeaderIfNeeded();
+ return offset;
+ }
+
+ synchronized long append(LocalBatch batch) {
+ prevOffset += batch.size();
+ log.debug("append(batch={}, prevOffset={})", batch, prevOffset);
+ batches.put(prevOffset, batch);
+ if (batch instanceof LeaderChangeBatch) {
+ LeaderChangeBatch leaderChangeBatch = (LeaderChangeBatch) batch;
+ leader = leaderChangeBatch.newLeader;
+ }
+ for (LocalLogManager logManager : logManagers.values()) {
+ logManager.scheduleLogCheck();
+ }
+ return prevOffset;
+ }
+
+ synchronized void electLeaderIfNeeded() {
+ if (leader.nodeId() != -1 || logManagers.isEmpty()) {
+ return;
+ }
+ int nextLeaderIndex = ThreadLocalRandom.current().nextInt(logManagers.size());
+ Iterator<Integer> iter = logManagers.keySet().iterator();
+ Integer nextLeaderNode = null;
+ for (int i = 0; i <= nextLeaderIndex; i++) {
+ nextLeaderNode = iter.next();
+ }
+ MetaLogLeader newLeader = new MetaLogLeader(nextLeaderNode, leader.epoch() + 1);
+ log.info("Elected new leader: {}.", newLeader);
+ append(new LeaderChangeBatch(newLeader));
+ }
+
+ synchronized Entry<Long, LocalBatch> nextBatch(long offset) {
+ Entry<Long, LocalBatch> entry = batches.higherEntry(offset);
+ if (entry == null) {
+ return null;
+ }
+ return new SimpleImmutableEntry<>(entry.getKey(), entry.getValue());
+ }
+ }
+
+ private static class MetaLogListenerData {
+ private long offset = -1;
+ private final MetaLogListener listener;
+
+ MetaLogListenerData(MetaLogListener listener) {
+ this.listener = listener;
+ }
+ }
+
+ private final Logger log;
+
+ private final int nodeId;
+
+ private final SharedLogData shared;
+
+ private final EventQueue eventQueue;
+
+ private boolean initialized = false;
+
+ private boolean shutdown = false;
+
+ private long maxReadOffset = Long.MAX_VALUE;
+
+ private final List<MetaLogListenerData> listeners = new ArrayList<>();
+
+ private volatile MetaLogLeader leader = new MetaLogLeader(-1, -1);
+
+ public LocalLogManager(LogContext logContext,
+ int nodeId,
+ SharedLogData shared,
+ String threadNamePrefix) {
+ this.log = logContext.logger(LocalLogManager.class);
+ this.nodeId = nodeId;
+ this.shared = shared;
+ this.eventQueue = new KafkaEventQueue(Time.SYSTEM, logContext, threadNamePrefix);
+ shared.registerLogManager(this);
+ }
+
+ private void scheduleLogCheck() {
+ eventQueue.append(() -> {
+ try {
+ log.debug("Node {}: running log check.", nodeId);
+ int numEntriesFound = 0;
+ for (MetaLogListenerData listenerData : listeners) {
+ while (true) {
+ Entry<Long, LocalBatch> entry = shared.nextBatch(listenerData.offset);
+ if (entry == null) {
+ log.trace("Node {}: reached the end of the log after finding " +
+ "{} entries.", nodeId, numEntriesFound);
+ break;
+ }
+ long entryOffset = entry.getKey();
+ if (entryOffset > maxReadOffset) {
+ log.trace("Node {}: after {} entries, not reading the next " +
+ "entry because its offset is {}, and maxReadOffset is {}.",
+ nodeId, numEntriesFound, entryOffset, maxReadOffset);
+ break;
+ }
+ if (entry.getValue() instanceof LeaderChangeBatch) {
+ LeaderChangeBatch batch = (LeaderChangeBatch) entry.getValue();
+ log.trace("Node {}: handling LeaderChange to {}.",
+ nodeId, batch.newLeader);
+ listenerData.listener.handleNewLeader(batch.newLeader);
+ if (batch.newLeader.epoch() > leader.epoch()) {
+ leader = batch.newLeader;
+ }
+ } else if (entry.getValue() instanceof LocalRecordBatch) {
+ LocalRecordBatch batch = (LocalRecordBatch) entry.getValue();
+ log.trace("Node {}: handling LocalRecordBatch with offset {}.",
+ nodeId, entryOffset);
+ listenerData.listener.handleCommits(entryOffset, batch.records);
+ }
+ numEntriesFound++;
+ listenerData.offset = entryOffset;
+ }
+ }
+ log.trace("Completed log check for node " + nodeId);
+ } catch (Exception e) {
+ log.error("Exception while handling log check", e);
+ }
+ });
+ }
+
+ public void beginShutdown() {
+ eventQueue.beginShutdown("beginShutdown", () -> {
+ try {
+ if (initialized && !shutdown) {
+ log.debug("Node {}: beginning shutdown.", nodeId);
+ renounce(leader.epoch());
+ for (MetaLogListenerData listenerData : listeners) {
+ listenerData.listener.beginShutdown();
+ }
+ shared.unregisterLogManager(this);
+ }
+ } catch (Exception e) {
+ log.error("Unexpected exception while sending beginShutdown callbacks", e);
+ }
+ shutdown = true;
+ });
+ }
+
+ @Override
+ public void close() throws InterruptedException {
+ log.debug("Node {}: closing.", nodeId);
+ beginShutdown();
+ eventQueue.close();
+ }
+
+ @Override
+ public void initialize() throws Exception {
+ eventQueue.append(() -> {
+ log.debug("initialized local log manager for node " + nodeId);
+ initialized = true;
+ });
+ }
+
+ @Override
+ public void register(MetaLogListener listener) throws Exception {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ eventQueue.append(() -> {
+ if (shutdown) {
+ log.info("Node {}: can't register because local log manager has " +
+ "already been shut down.", nodeId);
+ future.complete(null);
+ } else if (initialized) {
+ log.info("Node {}: registered MetaLogListener.", nodeId);
+ listeners.add(new MetaLogListenerData(listener));
+ shared.electLeaderIfNeeded();
+ scheduleLogCheck();
+ future.complete(null);
+ } else {
+ log.info("Node {}: can't register because local log manager has not " +
+ "been initialized.", nodeId);
+ future.completeExceptionally(new RuntimeException(
+ "LocalLogManager was not initialized."));
+ }
+ });
+ future.get();
+ }
+
+ @Override
+ public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
+ return shared.tryAppend(nodeId, leader.epoch(), new LocalRecordBatch(
+ batch.stream().map(r -> r.message()).collect(Collectors.toList())));
+ }
+
+ @Override
+ public void renounce(long epoch) {
+ MetaLogLeader curLeader = leader;
+ MetaLogLeader nextLeader = new MetaLogLeader(-1, curLeader.epoch() + 1);
+ shared.tryAppend(nodeId, curLeader.epoch(), new LeaderChangeBatch(nextLeader));
+ }
+
+ @Override
+ public MetaLogLeader leader() {
+ return leader;
+ }
+
+ @Override
+ public int nodeId() {
+ return nodeId;
+ }
+
+ public List<MetaLogListener> listeners() {
+ final CompletableFuture<List<MetaLogListener>> future = new CompletableFuture<>();
+ eventQueue.append(() -> {
+ future.complete(listeners.stream().map(l -> l.listener).collect(Collectors.toList()));
+ });
+ try {
+ return future.get();
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void setMaxReadOffset(long maxReadOffset) {
+ CompletableFuture<Void> future = new CompletableFuture<>();
+ eventQueue.append(() -> {
+ log.trace("Node {}: set maxReadOffset to {}.", nodeId, maxReadOffset);
+ this.maxReadOffset = maxReadOffset;
+ scheduleLogCheck();
+ future.complete(null);
+ });
+ try {
+ future.get();
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/metalog/LocalLogManagerTest.java b/metadata/src/test/java/org/apache/kafka/metalog/metalog/LocalLogManagerTest.java
new file mode 100644
index 0000000..9d4eb8b
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metalog/metalog/LocalLogManagerTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.common.metadata.RegisterBrokerRecord;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.metalog.MockMetaLogManagerListener.COMMIT;
+import static org.apache.kafka.metalog.MockMetaLogManagerListener.LAST_COMMITTED_OFFSET;
+import static org.apache.kafka.metalog.MockMetaLogManagerListener.SHUTDOWN;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+@Timeout(value = 40)
+public class LocalLogManagerTest {
+ private static final Logger log = LoggerFactory.getLogger(LocalLogManagerTest.class);
+
+ /**
+ * Test creating a LocalLogManager and closing it.
+ */
+ @Test
+ public void testCreateAndClose() throws Exception {
+ try (LocalLogManagerTestEnv env =
+ LocalLogManagerTestEnv.createWithMockListeners(1)) {
+ env.close();
+ assertEquals(null, env.firstError.get());
+ }
+ }
+
+ /**
+ * Test that the local log maanger will claim leadership.
+ */
+ @Test
+ public void testClaimsLeadership() throws Exception {
+ try (LocalLogManagerTestEnv env =
+ LocalLogManagerTestEnv.createWithMockListeners(1)) {
+ assertEquals(new MetaLogLeader(0, 0), env.waitForLeader());
+ env.close();
+ assertEquals(null, env.firstError.get());
+ }
+ }
+
+ /**
+ * Test that we can pass leadership back and forth between log managers.
+ */
+ @Test
+ public void testPassLeadership() throws Exception {
+ try (LocalLogManagerTestEnv env =
+ LocalLogManagerTestEnv.createWithMockListeners(3)) {
+ MetaLogLeader first = env.waitForLeader();
+ MetaLogLeader cur = first;
+ do {
+ env.logManagers().get(cur.nodeId()).renounce(cur.epoch());
+ MetaLogLeader next = env.waitForLeader();
+ while (next.epoch() == cur.epoch()) {
+ Thread.sleep(1);
+ next = env.waitForLeader();
+ }
+ long expectedNextEpoch = cur.epoch() + 2;
+ assertEquals(expectedNextEpoch, next.epoch(), "Expected next epoch to be " + expectedNextEpoch +
+ ", but found " + next);
+ cur = next;
+ } while (cur.nodeId() == first.nodeId());
+ env.close();
+ assertEquals(null, env.firstError.get());
+ }
+ }
+
+ private static void waitForLastCommittedOffset(long targetOffset,
+ LocalLogManager logManager) throws InterruptedException {
+ TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> {
+ MockMetaLogManagerListener listener =
+ (MockMetaLogManagerListener) logManager.listeners().get(0);
+ long highestOffset = -1;
+ for (String event : listener.serializedEvents()) {
+ if (event.startsWith(LAST_COMMITTED_OFFSET)) {
+ long offset = Long.valueOf(
+ event.substring(LAST_COMMITTED_OFFSET.length() + 1));
+ if (offset < highestOffset) {
+ throw new RuntimeException("Invalid offset: " + offset +
+ " is less than the previous offset of " + highestOffset);
+ }
+ highestOffset = offset;
+ }
+ }
+ if (highestOffset < targetOffset) {
+ throw new RuntimeException("Offset for log manager " +
+ logManager.nodeId() + " only reached " + highestOffset);
+ }
+ });
+ }
+
+ /**
+ * Test that all the log managers see all the commits.
+ */
+ @Test
+ public void testCommits() throws Exception {
+ try (LocalLogManagerTestEnv env =
+ LocalLogManagerTestEnv.createWithMockListeners(3)) {
+ MetaLogLeader leaderInfo = env.waitForLeader();
+ LocalLogManager activeLogManager = env.logManagers().get(leaderInfo.nodeId());
+ long epoch = activeLogManager.leader().epoch();
+ List<ApiMessageAndVersion> messages = Arrays.asList(
+ new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(0), (short) 0),
+ new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(1), (short) 0),
+ new ApiMessageAndVersion(new RegisterBrokerRecord().setBrokerId(2), (short) 0));
+ assertEquals(3, activeLogManager.scheduleWrite(epoch, messages));
+ for (LocalLogManager logManager : env.logManagers()) {
+ waitForLastCommittedOffset(3, logManager);
+ }
+ List<MockMetaLogManagerListener> listeners = env.logManagers().stream().
+ map(m -> (MockMetaLogManagerListener) m.listeners().get(0)).
+ collect(Collectors.toList());
+ env.close();
+ for (MockMetaLogManagerListener listener : listeners) {
+ List<String> events = listener.serializedEvents();
+ assertEquals(SHUTDOWN, events.get(events.size() - 1));
+ int foundIndex = 0;
+ for (String event : events) {
+ if (event.startsWith(COMMIT)) {
+ assertEquals(messages.get(foundIndex).message().toString(),
+ event.substring(COMMIT.length() + 1));
+ foundIndex++;
+ }
+ }
+ assertEquals(messages.size(), foundIndex);
+ }
+ }
+ }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/metalog/LocalLogManagerTestEnv.java b/metadata/src/test/java/org/apache/kafka/metalog/metalog/LocalLogManagerTestEnv.java
new file mode 100644
index 0000000..52aeea0
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metalog/metalog/LocalLogManagerTestEnv.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.metalog.LocalLogManager.SharedLogData;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class LocalLogManagerTestEnv implements AutoCloseable {
+ private static final Logger log =
+ LoggerFactory.getLogger(LocalLogManagerTestEnv.class);
+
+ /**
+ * The first error we encountered during this test, or the empty string if we have
+ * not encountered any.
+ */
+ final AtomicReference<String> firstError = new AtomicReference<>(null);
+
+ /**
+ * The test directory, which we will delete once the test is over.
+ */
+ private final File dir;
+
+ /**
+ * The shared data for our LocalLogManager instances.
+ */
+ private final SharedLogData shared;
+
+ /**
+ * A list of log managers.
+ */
+ private final List<LocalLogManager> logManagers;
+
+ public static LocalLogManagerTestEnv createWithMockListeners(int numManagers) throws Exception {
+ LocalLogManagerTestEnv testEnv = new LocalLogManagerTestEnv(numManagers);
+ try {
+ for (LocalLogManager logManager : testEnv.logManagers) {
+ logManager.register(new MockMetaLogManagerListener());
+ }
+ } catch (Exception e) {
+ testEnv.close();
+ throw e;
+ }
+ return testEnv;
+ }
+
+ public LocalLogManagerTestEnv(int numManagers) throws Exception {
+ dir = TestUtils.tempDirectory();
+ shared = new SharedLogData();
+ List<LocalLogManager> newLogManagers = new ArrayList<>(numManagers);
+ try {
+ for (int nodeId = 0; nodeId < numManagers; nodeId++) {
+ newLogManagers.add(new LocalLogManager(
+ new LogContext(String.format("[LocalLogManager %d] ", nodeId)),
+ nodeId,
+ shared,
+ String.format("LocalLogManager-%d_", nodeId)));
+ }
+ for (LocalLogManager logManager : newLogManagers) {
+ logManager.initialize();
+ }
+ } catch (Throwable t) {
+ for (LocalLogManager logManager : newLogManagers) {
+ logManager.close();
+ }
+ throw t;
+ }
+ this.logManagers = newLogManagers;
+ }
+
+ AtomicReference<String> firstError() {
+ return firstError;
+ }
+
+ File dir() {
+ return dir;
+ }
+
+ MetaLogLeader waitForLeader() throws InterruptedException {
+ AtomicReference<MetaLogLeader> value = new AtomicReference<>(null);
+ TestUtils.retryOnExceptionWithTimeout(3, 20000, () -> {
+ MetaLogLeader result = null;
+ for (LocalLogManager logManager : logManagers) {
+ MetaLogLeader leader = logManager.leader();
+ if (leader.nodeId() == logManager.nodeId()) {
+ if (result != null) {
+ throw new RuntimeException("node " + leader.nodeId() +
+ " thinks it's the leader, but so does " + result.nodeId());
+ }
+ result = leader;
+ }
+ }
+ if (result == null) {
+ throw new RuntimeException("No leader found.");
+ }
+ value.set(result);
+ });
+ return value.get();
+ }
+
+ public List<LocalLogManager> logManagers() {
+ return logManagers;
+ }
+
+ @Override
+ public void close() throws InterruptedException {
+ try {
+ for (LocalLogManager logManager : logManagers) {
+ logManager.beginShutdown();
+ }
+ for (LocalLogManager logManager : logManagers) {
+ logManager.close();
+ }
+ Utils.delete(dir);
+ } catch (IOException e) {
+ log.error("Error deleting {}", dir.getAbsolutePath(), e);
+ }
+ }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/metalog/MockMetaLogManagerListener.java b/metadata/src/test/java/org/apache/kafka/metalog/metalog/MockMetaLogManagerListener.java
new file mode 100644
index 0000000..fe61ec0
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/metalog/metalog/MockMetaLogManagerListener.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class MockMetaLogManagerListener implements MetaLogListener {
+ public static final String COMMIT = "COMMIT";
+ public static final String LAST_COMMITTED_OFFSET = "LAST_COMMITTED_OFFSET";
+ public static final String NEW_LEADER = "NEW_LEADER";
+ public static final String RENOUNCE = "RENOUNCE";
+ public static final String SHUTDOWN = "SHUTDOWN";
+
+ private final List<String> serializedEvents = new ArrayList<>();
+
+ @Override
+ public synchronized void handleCommits(long lastCommittedOffset, List<ApiMessage> messages) {
+ for (ApiMessage message : messages) {
+ StringBuilder bld = new StringBuilder();
+ bld.append(COMMIT).append(" ").append(message.toString());
+ serializedEvents.add(bld.toString());
+ }
+ StringBuilder bld = new StringBuilder();
+ bld.append(LAST_COMMITTED_OFFSET).append(" ").append(lastCommittedOffset);
+ serializedEvents.add(bld.toString());
+ }
+
+ @Override
+ public void handleNewLeader(MetaLogLeader leader) {
+ StringBuilder bld = new StringBuilder();
+ bld.append(NEW_LEADER).append(" ").
+ append(leader.nodeId()).append(" ").append(leader.epoch());
+ synchronized (this) {
+ serializedEvents.add(bld.toString());
+ }
+ }
+
+ @Override
+ public void handleRenounce(long epoch) {
+ StringBuilder bld = new StringBuilder();
+ bld.append(RENOUNCE).append(" ").append(epoch);
+ synchronized (this) {
+ serializedEvents.add(bld.toString());
+ }
+ }
+
+ @Override
+ public void beginShutdown() {
+ StringBuilder bld = new StringBuilder();
+ bld.append(SHUTDOWN);
+ synchronized (this) {
+ serializedEvents.add(bld.toString());
+ }
+ }
+
+ public synchronized List<String> serializedEvents() {
+ return new ArrayList<>(serializedEvents);
+ }
+}