You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/12 01:04:52 UTC

[GitHub] [kafka] mumrah commented on a change in pull request #10113: MINOR: Add KIP-500 Broker

mumrah commented on a change in pull request #10113:
URL: https://github.com/apache/kafka/pull/10113#discussion_r574930226



##########
File path: core/src/main/scala/kafka/network/SocketServer.scala
##########
@@ -78,12 +79,17 @@ class SocketServer(val config: KafkaConfig,
                    val metrics: Metrics,
                    val time: Time,
                    val credentialProvider: CredentialProvider,
+                   val configuredNodeId: Option[Int] = None,
+                   val configuredLogContext: Option[LogContext] = None,
                    val allowControllerOnlyApis: Boolean = false)
   extends Logging with KafkaMetricsGroup with BrokerReconfigurable {
 
   private val maxQueuedRequests = config.queuedMaxRequests
 
-  private val logContext = new LogContext(s"[SocketServer brokerId=${config.brokerId}] ")
+  val nodeId = configuredNodeId.getOrElse(config.brokerId)
+
+  val logContext = configuredLogContext.getOrElse(new LogContext(s"[SocketServer brokerId=${nodeId}] "))

Review comment:
       private?

##########
File path: core/src/main/scala/kafka/server/KafkaRaftServer.scala
##########
@@ -47,32 +51,47 @@ class KafkaRaftServer(
   KafkaMetricsReporter.startReporters(VerifiableProperties(config.originals))
   KafkaYammerMetrics.INSTANCE.configure(config.originals)
 
-  private val (metaProps, _) = KafkaRaftServer.initializeLogDirs(config)
+  private val (metaProps, offlineDirs) = KafkaRaftServer.initializeLogDirs(config)
 
   private val metrics = Server.initializeMetrics(
     config,
     time,
     metaProps.clusterId.toString
   )
 
-  private val raftManager = new KafkaRaftManager(
+  private val controllerQuorumVotersFuture = CompletableFuture.completedFuture(config.quorumVoters)
+
+  private val raftManager = new KafkaRaftManager[ApiMessageAndVersion](
     metaProps,
     config,
-    new StringSerde,
+    new MetadataRecordSerde,
     KafkaRaftServer.MetadataPartition,
     time,
     metrics,
     threadNamePrefix
   )
 
+  private val metaLogShim = new MetaLogRaftShim(raftManager.kafkaRaftClient, config.nodeId)
+
   private val broker: Option[BrokerServer] = if (config.processRoles.contains(BrokerRole)) {
-    Some(new BrokerServer())
+    Some(new BrokerServer(config, metaProps, metaLogShim, time, metrics, threadNamePrefix,
+      offlineDirs, controllerQuorumVotersFuture, Server.SUPPORTED_FEATURES))
   } else {
     None
   }
 
   private val controller: Option[ControllerServer] = if (config.processRoles.contains(ControllerRole)) {
     Some(new ControllerServer())
+//    Some(new ControllerServer(
+//      metaProps,
+//      config,
+//      metaLogShim,
+//      raftManager,
+//      time,
+//      metrics,
+//      threadNamePrefix,
+//      CompletableFuture.completedFuture(config.quorumVoters)
+//    ))

Review comment:
       I'm guessing this is here for easier integration with the controller once that PR lands, but do we really need to check this in?

##########
File path: raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
##########
@@ -975,7 +975,7 @@ public void handleClaim(int epoch) {
         }
 
         @Override
-        public void handleResign() {
+        public void handleResign(int epoch) {

Review comment:
       as mentioned above, should this be a long?

##########
File path: core/src/main/scala/kafka/server/Server.scala
##########
@@ -91,4 +95,12 @@ object Server {
     reporters
   }
 
+  sealed trait ProcessStatus
+  case object SHUTDOWN extends ProcessStatus
+  case object STARTING extends ProcessStatus
+  case object STARTED extends ProcessStatus
+  case object SHUTTING_DOWN extends ProcessStatus

Review comment:
       Seems this is only used in BrokerServer, should we move the trait there, or do we intend to use these states in the other server classes as well?

##########
File path: raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.metadata;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogLeader;
+import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.raft.RaftClient;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * For now, we rely on a shim to translate from `RaftClient` to `MetaLogManager`.
+ * Once we check in to trunk, we can drop `RaftClient` and implement `MetaLogManager`
+ * directly.
+ */
+public class MetaLogRaftShim implements MetaLogManager {
+    private final RaftClient<ApiMessageAndVersion> client;
+    private final int nodeId;
+
+    public MetaLogRaftShim(
+            RaftClient<ApiMessageAndVersion> client,
+            int nodeId
+    ) {

Review comment:
       nit: strange indentation and continuation here

##########
File path: core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
##########
@@ -91,11 +91,14 @@ class DefaultAutoTopicCreationManager(
   config: KafkaConfig,
   metadataCache: MetadataCache,
   channelManager: Option[BrokerToControllerChannelManager],
-  adminManager: ZkAdminManager,
-  controller: KafkaController,
+  adminManager: Option[ZkAdminManager],
+  controller: Option[KafkaController],
   groupCoordinator: GroupCoordinator,
   txnCoordinator: TransactionCoordinator
 ) extends AutoTopicCreationManager with Logging {
+  if (controller.isEmpty && channelManager.isEmpty) {
+    throw new IllegalArgumentException("Must supply a channel manager if not supplying a controller")
+  }

Review comment:
       Are these two arguments mutually exclusive? If so, maybe we can use `Either` instead of two options. 

##########
File path: raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.metadata;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogLeader;
+import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.raft.RaftClient;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * For now, we rely on a shim to translate from `RaftClient` to `MetaLogManager`.
+ * Once we check in to trunk, we can drop `RaftClient` and implement `MetaLogManager`
+ * directly.
+ */
+public class MetaLogRaftShim implements MetaLogManager {
+    private final RaftClient<ApiMessageAndVersion> client;
+    private final int nodeId;
+
+    public MetaLogRaftShim(
+            RaftClient<ApiMessageAndVersion> client,
+            int nodeId
+    ) {
+        this.client = client;
+        this.nodeId = nodeId;
+    }
+
+    @Override
+    public void initialize() {
+        // NO-OP - The RaftClient is initialized externally
+    }
+
+    @Override
+    public void register(MetaLogListener listener) {
+        client.register(new ListenerShim(listener));
+    }
+
+    @Override
+    public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
+        return client.scheduleAppend((int) epoch, batch);
+    }
+
+    @Override
+    public void renounce(long epoch) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public MetaLogLeader leader() {
+        LeaderAndEpoch leaderAndEpoch = client.leaderAndEpoch();
+        return new MetaLogLeader(leaderAndEpoch.leaderId.orElse(-1), leaderAndEpoch.epoch);
+    }
+
+    @Override
+    public int nodeId() {
+        return nodeId;
+    }
+
+    private class ListenerShim implements RaftClient.Listener<ApiMessageAndVersion> {
+        private final MetaLogListener listener;
+
+        private ListenerShim(MetaLogListener listener) {
+            this.listener = listener;
+        }
+
+        @Override
+        public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
+            try {
+                // TODO: The `BatchReader` might need to read from disk if this is
+                // not a leader. We want to move this IO to the state machine so that
+                // it does not block Raft replication
+                while (reader.hasNext()) {
+                    BatchReader.Batch<ApiMessageAndVersion> batch = reader.next();
+                    List<ApiMessage> records = batch.records().stream()
+                            .map(ApiMessageAndVersion::message)
+                            .collect(Collectors.toList());

Review comment:
       nit: indent

##########
File path: raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
##########
@@ -233,6 +235,17 @@ private static Integer parseVoterId(String idString) {
         return voterMap;
     }
 
+    public static List<Node> quorumVoterStringsToNodes(List<String> voters) {
+        return parseVoterConnections(voters).entrySet().stream()
+                .filter(connection -> connection.getValue() instanceof InetAddressSpec)

Review comment:
       nit: indentation seems a bit off




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org