You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/11 15:48:30 UTC

[GitHub] [kafka] mumrah commented on a change in pull request #10106: MINOR: add the MetaLogListener, LocalLogManager, and Controller interface.

mumrah commented on a change in pull request #10106:
URL: https://github.com/apache/kafka/pull/10106#discussion_r574602989



##########
File path: metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.Uuid;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * An immutable class which represents broker registrations.
+ */
+public class BrokerRegistration {
+    private final int id;
+    private final long epoch;
+    private final Uuid incarnationId;
+    private final Map<String, Endpoint> listeners;
+    private final Map<String, VersionRange> supportedFeatures;
+    private final Optional<String> rack;
+    private final boolean fenced;
+
+    public BrokerRegistration(int id,
+                              long epoch,
+                              Uuid incarnationId,
+                              List<Endpoint> listeners,
+                              Map<String, VersionRange> supportedFeatures,
+                              Optional<String> rack,
+                              boolean fenced) {
+        this.id = id;
+        this.epoch = epoch;
+        this.incarnationId = incarnationId;
+        Map<String, Endpoint> listenersMap = new HashMap<>();
+        for (Endpoint endpoint : listeners) {
+            listenersMap.put(endpoint.listenerName().get(), endpoint);
+        }
+        this.listeners = Collections.unmodifiableMap(listenersMap);
+        Objects.requireNonNull(supportedFeatures);
+        this.supportedFeatures = supportedFeatures;

Review comment:
       Defensive copy of supportedFeatures?

##########
File path: metadata/src/main/java/org/apache/kafka/controller/Controller.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.message.AlterIsrRequestData;
+import org.apache.kafka.common.message.AlterIsrResponseData;
+import org.apache.kafka.common.message.BrokerHeartbeatRequestData;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
+import org.apache.kafka.common.message.CreateTopicsRequestData;
+import org.apache.kafka.common.message.CreateTopicsResponseData;
+import org.apache.kafka.common.message.ElectLeadersRequestData;
+import org.apache.kafka.common.message.ElectLeadersResponseData;
+import org.apache.kafka.common.quota.ClientQuotaAlteration;
+import org.apache.kafka.common.quota.ClientQuotaEntity;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationReply;
+import org.apache.kafka.metadata.FeatureMapAndEpoch;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+
+
+public interface Controller extends AutoCloseable {
+    /**
+     * Change partition ISRs.

Review comment:
       Should we spell out ISR? Though I suppose anyone working with the controller code should know what it stands for :) 

##########
File path: metadata/src/main/java/org/apache/kafka/controller/ResultOrError.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ApiError;
+
+import java.util.Objects;
+
+
+class ResultOrError<T> {

Review comment:
       Should we have methods like `isError` and `isResult` on this class? 

##########
File path: metadata/src/main/java/org/apache/kafka/metalog/metalog/LocalLogManager.java
##########
@@ -0,0 +1,378 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map.Entry;
+import java.util.Objects;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.Collectors;
+
+/**
+ * The LocalLogManager is a test implementation that relies on the contents of memory.

Review comment:
       Javadoc says this is a test implementation, but it's in the main sources. Is this class used by any production code? Maybe the metadata shell? If so, we should probably not say it's a "test" thing

##########
File path: metadata/src/main/java/org/apache/kafka/metalog/metalog/MetaLogManager.java
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metalog;
+
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+
+import java.util.List;
+
+/**
+ * The MetaLogManager handles storing metadata and electing leaders.
+ */
+public interface MetaLogManager {
+
+    /**
+     * Start this meta log manager.
+     * The manager must be ready to accept incoming calls after this function returns.
+     * It is an error to initialize a MetaLogManager more than once.
+     */
+    void initialize() throws Exception;
+
+    /**
+     * Register the listener.  The manager must be initialized already.
+     * The listener must be ready to accept incoming calls immediately.
+     *
+     * @param listener      The listener to register.
+     */
+    void register(MetaLogListener listener) throws Exception;
+
+    /**
+     * Schedule a write to the log.
+     *
+     * The write will be scheduled to happen at some time in the future.  There is no
+     * error return or exception thrown if the write fails.  Instead, the listener may
+     * regard the write as successful if and only if the MetaLogManager reaches the given
+     * index before renouncing its leadership.  The listener should determine this by
+     * monitoring the committed indexes.
+     *
+     * @param epoch         The controller epoch.
+     * @param batch         The batch of messages to write.
+     *
+     * @return              The index of the message.

Review comment:
       Is this index the same as the metadata log offset?

##########
File path: metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.Uuid;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+/**
+ * An immutable class which represents broker registrations.
+ */
+public class BrokerRegistration {
+    private final int id;
+    private final long epoch;
+    private final Uuid incarnationId;
+    private final Map<String, Endpoint> listeners;
+    private final Map<String, VersionRange> supportedFeatures;
+    private final Optional<String> rack;
+    private final boolean fenced;
+
+    public BrokerRegistration(int id,
+                              long epoch,
+                              Uuid incarnationId,
+                              List<Endpoint> listeners,
+                              Map<String, VersionRange> supportedFeatures,
+                              Optional<String> rack,
+                              boolean fenced) {
+        this.id = id;
+        this.epoch = epoch;
+        this.incarnationId = incarnationId;
+        Map<String, Endpoint> listenersMap = new HashMap<>();
+        for (Endpoint endpoint : listeners) {
+            listenersMap.put(endpoint.listenerName().get(), endpoint);
+        }
+        this.listeners = Collections.unmodifiableMap(listenersMap);
+        Objects.requireNonNull(supportedFeatures);
+        this.supportedFeatures = supportedFeatures;
+        Objects.requireNonNull(rack);
+        this.rack = rack;
+        this.fenced = fenced;
+    }
+
+    public BrokerRegistration(int id,

Review comment:
       In the other constructor we have some nice preconditions and defensive copies, should we do the same here? Or maybe this constructor is meant to be private to `cloneWithFencing` below

##########
File path: metadata/src/test/java/org/apache/kafka/metadata/BrokerRegistrationTest.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata;
+
+import org.apache.kafka.common.Endpoint;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.security.auth.SecurityProtocol;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@Timeout(value = 40)
+public class BrokerRegistrationTest {
+    private static final List<BrokerRegistration> REGISTRATIONS = Arrays.asList(
+        new BrokerRegistration(0, 0, Uuid.fromString("pc1GhUlBS92cGGaKXl6ipw"),
+            Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9090)),
+            Collections.singletonMap("foo", new VersionRange((short) 1, (short) 2)),
+            Optional.empty(), false),
+        new BrokerRegistration(1, 0, Uuid.fromString("3MfdxWlNSn2UDYsmDP1pYg"),
+            Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9091)),
+            Collections.singletonMap("foo", new VersionRange((short) 1, (short) 2)),
+            Optional.empty(), false),
+        new BrokerRegistration(2, 0, Uuid.fromString("eY7oaG1RREie5Kk9uy1l6g"),
+            Arrays.asList(new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9092)),
+            Collections.singletonMap("foo", new VersionRange((short) 2, (short) 3)),
+            Optional.empty(), false));
+
+    @Test
+    public void testValues() {
+        assertEquals(0, REGISTRATIONS.get(0).id());
+        assertEquals(1, REGISTRATIONS.get(1).id());
+        assertEquals(2, REGISTRATIONS.get(2).id());
+    }
+
+    @Test
+    public void testEquals() {
+        assertFalse(REGISTRATIONS.get(0).equals(REGISTRATIONS.get(1)));
+        assertFalse(REGISTRATIONS.get(1).equals(REGISTRATIONS.get(0)));
+        assertFalse(REGISTRATIONS.get(0).equals(REGISTRATIONS.get(2)));
+        assertFalse(REGISTRATIONS.get(2).equals(REGISTRATIONS.get(0)));
+        assertTrue(REGISTRATIONS.get(0).equals(REGISTRATIONS.get(0)));
+        assertTrue(REGISTRATIONS.get(1).equals(REGISTRATIONS.get(1)));
+        assertTrue(REGISTRATIONS.get(2).equals(REGISTRATIONS.get(2)));
+    }
+
+    @Test
+    public void testToString() {
+        assertEquals("BrokerRegistration(id=1, epoch=0, " +
+            "incarnationId=3MfdxWlNSn2UDYsmDP1pYg, listeners=[Endpoint(" +
+            "listenerName='INTERNAL', securityProtocol=PLAINTEXT, " +
+            "host='localhost', port=9091)], supportedFeatures={foo: 1-2}, " +
+            "rack=Optional.empty, fenced=false)",
+            REGISTRATIONS.get(1).toString());
+    }
+}

Review comment:
       nit: newline




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org