You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/10 23:11:31 UTC

[GitHub] [kafka] cmccabe opened a new pull request #10106: MINOR: add the MetaLogListener, LocalLogManager, etc.

cmccabe opened a new pull request #10106:
URL: https://github.com/apache/kafka/pull/10106


   Add MetaLogListener, LocalLogManager, and related classes.  These
   classes are used by the KIP-500 controller and broker to interface with the
   Raft log.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe merged pull request #10106: MINOR: add the MetaLogListener, LocalLogManager, and Controller interface.

Posted by GitBox <gi...@apache.org>.
cmccabe merged pull request #10106:
URL: https://github.com/apache/kafka/pull/10106


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10106: MINOR: add the MetaLogListener, LocalLogManager, and Controller interface.

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10106:
URL: https://github.com/apache/kafka/pull/10106#discussion_r574638178



##########
File path: metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.Uuid;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * An immutable class which represents broker registrations.
+ */
+public class BrokerRegistration {
+    private final int id;
+    private final long epoch;
+    private final Uuid incarnationId;
+    private final Map<String, Endpoint> listeners;
+    private final Map<String, VersionRange> supportedFeatures;
+    private final Optional<String> rack;
+    private final boolean fenced;
+
+    public BrokerRegistration(int id,
+                              long epoch,
+                              Uuid incarnationId,
+                              List<Endpoint> listeners,
+                              Map<String, VersionRange> supportedFeatures,
+                              Optional<String> rack,
+                              boolean fenced) {
+        this.id = id;
+        this.epoch = epoch;
+        this.incarnationId = incarnationId;
+        Map<String, Endpoint> listenersMap = new HashMap<>();
+        for (Endpoint endpoint : listeners) {
+            listenersMap.put(endpoint.listenerName().get(), endpoint);
+        }
+        this.listeners = Collections.unmodifiableMap(listenersMap);
+        Objects.requireNonNull(supportedFeatures);
+        this.supportedFeatures = supportedFeatures;
+        Objects.requireNonNull(rack);
+        this.rack = rack;
+        this.fenced = fenced;
+    }
+
+    public BrokerRegistration(int id,

Review comment:
       there aren't that many broker registrations, so it's fine to just do defensive copies in both places.  I will add that




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10106: MINOR: add the MetaLogListener, LocalLogManager, and Controller interface.

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10106:
URL: https://github.com/apache/kafka/pull/10106#discussion_r574602989



##########
File path: metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.Uuid;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * An immutable class which represents broker registrations.
+ */
+public class BrokerRegistration {
+    private final int id;
+    private final long epoch;
+    private final Uuid incarnationId;
+    private final Map<String, Endpoint> listeners;
+    private final Map<String, VersionRange> supportedFeatures;
+    private final Optional<String> rack;
+    private final boolean fenced;
+
+    public BrokerRegistration(int id,
+                              long epoch,
+                              Uuid incarnationId,
+                              List<Endpoint> listeners,
+                              Map<String, VersionRange> supportedFeatures,
+                              Optional<String> rack,
+                              boolean fenced) {
+        this.id = id;
+        this.epoch = epoch;
+        this.incarnationId = incarnationId;
+        Map<String, Endpoint> listenersMap = new HashMap<>();
+        for (Endpoint endpoint : listeners) {
+            listenersMap.put(endpoint.listenerName().get(), endpoint);
+        }
+        this.listeners = Collections.unmodifiableMap(listenersMap);
+        Objects.requireNonNull(supportedFeatures);
+        this.supportedFeatures = supportedFeatures;

Review comment:
       Defensive copy of supportedFeatures?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/Controller.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureMapAndEpoch;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+
+public interface Controller extends AutoCloseable {
+    /**
+     * Change partition ISRs.

Review comment:
       Should we spell out ISR? Though I suppose anyone working with the controller code should know what it stands for :) 

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+
+import java.util.Objects;
+
+
+class ResultOrError<T> {

Review comment:
       Should we have methods like `isError` and `isResult` on this class? 

##########
File path: metadata/src/main/java/org/apache/kafka/metalog/metalog/LocalLogManager.java
##########
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+/**
+ * The LocalLogManager is a test implementation that relies on the contents of memory.

Review comment:
       Javadoc says this is a test implementation, but it's in the main sources. Is this class used by any production code? Maybe the metadata shell? If so, we should probably not say it's a "test" thing

##########
File path: metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogManager.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+
+import java.util.List;
+
+/**
+ * The MetaLogManager handles storing metadata and electing leaders.
+ */
+public interface MetaLogManager {
+
+    /**
+     * Start this meta log manager.
+     * The manager must be ready to accept incoming calls after this function returns.
+     * It is an error to initialize a MetaLogManager more than once.
+     */
+    void initialize() throws Exception;
+
+    /**
+     * Register the listener.  The manager must be initialized already.
+     * The listener must be ready to accept incoming calls immediately.
+     *
+     * @param listener      The listener to register.
+     */
+    void register(MetaLogListener listener) throws Exception;
+
+    /**
+     * Schedule a write to the log.
+     *
+     * The write will be scheduled to happen at some time in the future.  There is no
+     * error return or exception thrown if the write fails.  Instead, the listener may
+     * regard the write as successful if and only if the MetaLogManager reaches the given
+     * index before renouncing its leadership.  The listener should determine this by
+     * monitoring the committed indexes.
+     *
+     * @param epoch         The controller epoch.
+     * @param batch         The batch of messages to write.
+     *
+     * @return              The index of the message.

Review comment:
       Is this index the same as the metadata log offset?

##########
File path: metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.Uuid;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * An immutable class which represents broker registrations.
+ */
+public class BrokerRegistration {
+    private final int id;
+    private final long epoch;
+    private final Uuid incarnationId;
+    private final Map<String, Endpoint> listeners;
+    private final Map<String, VersionRange> supportedFeatures;
+    private final Optional<String> rack;
+    private final boolean fenced;
+
+    public BrokerRegistration(int id,
+                              long epoch,
+                              Uuid incarnationId,
+                              List<Endpoint> listeners,
+                              Map<String, VersionRange> supportedFeatures,
+                              Optional<String> rack,
+                              boolean fenced) {
+        this.id = id;
+        this.epoch = epoch;
+        this.incarnationId = incarnationId;
+        Map<String, Endpoint> listenersMap = new HashMap<>();
+        for (Endpoint endpoint : listeners) {
+            listenersMap.put(endpoint.listenerName().get(), endpoint);
+        }
+        this.listeners = Collections.unmodifiableMap(listenersMap);
+        Objects.requireNonNull(supportedFeatures);
+        this.supportedFeatures = supportedFeatures;
+        Objects.requireNonNull(rack);
+        this.rack = rack;
+        this.fenced = fenced;
+    }
+
+    public BrokerRegistration(int id,

Review comment:
       In the other constructor we have some nice preconditions and defensive copies, should we do the same here? Or maybe this constructor is meant to be private to `cloneWithFencing` below

##########
File path: metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Timeout(value = 40)
+public class BrokerRegistrationTest {
+    private static final List<BrokerRegistration> REGISTRATIONS = Arrays.asList(
+        new BrokerRegistration(0, 0, Uuid.fromString("pc1GhUlBS92cGGaKXl6ipw"),
+            Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9090)),
+            Collections.singletonMap("foo", new VersionRange((short) 1, (short) 2)),
+            Optional.empty(), false),
+        new BrokerRegistration(1, 0, Uuid.fromString("3MfdxWlNSn2UDYsmDP1pYg"),
+            Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9091)),
+            Collections.singletonMap("foo", new VersionRange((short) 1, (short) 2)),
+            Optional.empty(), false),
+        new BrokerRegistration(2, 0, Uuid.fromString("eY7oaG1RREie5Kk9uy1l6g"),
+            Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9092)),
+            Collections.singletonMap("foo", new VersionRange((short) 2, (short) 3)),
+            Optional.empty(), false));
+
+    @Test
+    public void testValues() {
+        assertEquals(0, REGISTRATIONS.get(0).id());
+        assertEquals(1, REGISTRATIONS.get(1).id());
+        assertEquals(2, REGISTRATIONS.get(2).id());
+    }
+
+    @Test
+    public void testEquals() {
+        assertFalse(REGISTRATIONS.get(0).equals(REGISTRATIONS.get(1)));
+        assertFalse(REGISTRATIONS.get(1).equals(REGISTRATIONS.get(0)));
+        assertFalse(REGISTRATIONS.get(0).equals(REGISTRATIONS.get(2)));
+        assertFalse(REGISTRATIONS.get(2).equals(REGISTRATIONS.get(0)));
+        assertTrue(REGISTRATIONS.get(0).equals(REGISTRATIONS.get(0)));
+        assertTrue(REGISTRATIONS.get(1).equals(REGISTRATIONS.get(1)));
+        assertTrue(REGISTRATIONS.get(2).equals(REGISTRATIONS.get(2)));
+    }
+
+    @Test
+    public void testToString() {
+        assertEquals("BrokerRegistration(id=1, epoch=0, " +
+            "incarnationId=3MfdxWlNSn2UDYsmDP1pYg, listeners=[Endpoint(" +
+            "listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
+            "host='localhost', port=9091)], supportedFeatures={foo: 1-2}, " +
+            "rack=Optional.empty, fenced=false)",
+            REGISTRATIONS.get(1).toString());
+    }
+}

Review comment:
       nit: newline




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10106: MINOR: add the MetaLogListener, LocalLogManager, and Controller interface.

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10106:
URL: https://github.com/apache/kafka/pull/10106#discussion_r574637676



##########
File path: metadata/src/main/java/org/apache/kafka/metalog/metalog/LocalLogManager.java
##########
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+/**
+ * The LocalLogManager is a test implementation that relies on the contents of memory.

Review comment:
       Good question.  It was here because this code existed before the Raft integration.  Let's just move it into the test directory.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10106: MINOR: add the MetaLogListener, LocalLogManager, and Controller interface.

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10106:
URL: https://github.com/apache/kafka/pull/10106#discussion_r574639391



##########
File path: metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogManager.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+
+import java.util.List;
+
+/**
+ * The MetaLogManager handles storing metadata and electing leaders.
+ */
+public interface MetaLogManager {
+
+    /**
+     * Start this meta log manager.
+     * The manager must be ready to accept incoming calls after this function returns.
+     * It is an error to initialize a MetaLogManager more than once.
+     */
+    void initialize() throws Exception;
+
+    /**
+     * Register the listener.  The manager must be initialized already.
+     * The listener must be ready to accept incoming calls immediately.
+     *
+     * @param listener      The listener to register.
+     */
+    void register(MetaLogListener listener) throws Exception;
+
+    /**
+     * Schedule a write to the log.
+     *
+     * The write will be scheduled to happen at some time in the future.  There is no
+     * error return or exception thrown if the write fails.  Instead, the listener may
+     * regard the write as successful if and only if the MetaLogManager reaches the given
+     * index before renouncing its leadership.  The listener should determine this by
+     * monitoring the committed indexes.
+     *
+     * @param epoch         The controller epoch.
+     * @param batch         The batch of messages to write.
+     *
+     * @return              The index of the message.

Review comment:
       Oh, this is another historical artifact, back when "index" was separate from "offset".
   
   Right now it should just be replaced with "offset."  I will do that




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] rondagostino commented on a change in pull request #10106: MINOR: add the MetaLogListener, LocalLogManager, and Controller interface.

Posted by GitBox <gi...@apache.org>.
rondagostino commented on a change in pull request #10106:
URL: https://github.com/apache/kafka/pull/10106#discussion_r574402728



##########
File path: metadata/src/main/java/org/apache/kafka/controller/Controller.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureMapAndEpoch;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+
+public interface Controller extends AutoCloseable {
+    /**
+     * Change partition ISRs.
+     *
+     * @param request       The AlterIsrRequest data.
+     *
+     * @return              A future yielding the response.
+     */
+    CompletableFuture<AlterIsrResponseData> alterIsr(AlterIsrRequestData request);
+
+    /**
+     * Create a batch of topics.
+     *
+     * @param request       The CreateTopicsRequest data.
+     *
+     * @return              A future yielding the response.
+     */
+    CompletableFuture<CreateTopicsResponseData>
+        createTopics(CreateTopicsRequestData request);
+
+    /**
+     * Decommission a broker.
+     *
+     * @param brokerId      The broker id to decommission.
+     *
+     * @return              A future that is completed successfully hwne the broker is
+     *                      decommissioned, or if it is not registered in the first place.
+     */
+    CompletableFuture<Void> decommissionBroker(int brokerId);

Review comment:
       `unregister` instead of `decommission` given the recent rename?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] cmccabe commented on a change in pull request #10106: MINOR: add the MetaLogListener, LocalLogManager, and Controller interface.

Posted by GitBox <gi...@apache.org>.
cmccabe commented on a change in pull request #10106:
URL: https://github.com/apache/kafka/pull/10106#discussion_r574638269



##########
File path: metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.Uuid;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * An immutable class which represents broker registrations.
+ */
+public class BrokerRegistration {
+    private final int id;
+    private final long epoch;
+    private final Uuid incarnationId;
+    private final Map<String, Endpoint> listeners;
+    private final Map<String, VersionRange> supportedFeatures;
+    private final Optional<String> rack;
+    private final boolean fenced;
+
+    public BrokerRegistration(int id,
+                              long epoch,
+                              Uuid incarnationId,
+                              List<Endpoint> listeners,
+                              Map<String, VersionRange> supportedFeatures,
+                              Optional<String> rack,
+                              boolean fenced) {
+        this.id = id;
+        this.epoch = epoch;
+        this.incarnationId = incarnationId;
+        Map<String, Endpoint> listenersMap = new HashMap<>();
+        for (Endpoint endpoint : listeners) {
+            listenersMap.put(endpoint.listenerName().get(), endpoint);
+        }
+        this.listeners = Collections.unmodifiableMap(listenersMap);
+        Objects.requireNonNull(supportedFeatures);
+        this.supportedFeatures = supportedFeatures;

Review comment:
       ok




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org