You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2021/02/18 06:00:34 UTC

[kafka] branch 2.8 updated: MINOR: Add KIP-500 BrokerServer and ControllerServer (#10113)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.8 by this push:
     new d77759d  MINOR: Add KIP-500 BrokerServer and ControllerServer (#10113)
d77759d is described below

commit d77759d0fe64094a26a6aeaecebc045be317af6f
Author: Ron Dagostino <rd...@confluent.io>
AuthorDate: Thu Feb 18 00:35:13 2021 -0500

    MINOR: Add KIP-500 BrokerServer and ControllerServer (#10113)
    
    This PR adds the KIP-500 BrokerServer and ControllerServer classes and
    makes some related changes to get them working.  Note that the ControllerServer
    does not instantiate a QuorumController object yet, since that will be added in
    PR #10070.
    
    * Add BrokerServer and ControllerServer
    
    * Change ApiVersions#computeMaxUsableProduceMagic so that it can handle
    endpoints which do not support PRODUCE (such as KIP-500 controller nodes)
    
    * KafkaAdminClientTest: fix some lingering references to decommissionBroker
    that should be references to unregisterBroker.
    
    * Make some changes to allow SocketServer to be used by ControllerServer as
    we as by the broker.
    
    * We now return a random active Broker ID as the Controller ID in
    MetadataResponse for the Raft-based case as per KIP-590.
    
    * Add the RaftControllerNodeProvider
    
    * Add EnvelopeUtils
    
    * Add MetaLogRaftShim
    
    * In ducktape, in config_property.py: use a KIP-500 compatible cluster ID.
    
    Reviewers: Colin P. McCabe <cm...@apache.org>, David Arthur <mu...@gmail.com>
---
 checkstyle/import-control.xml                      |   1 +
 .../java/org/apache/kafka/clients/ApiVersions.java |  13 +-
 .../org/apache/kafka/clients/ApiVersionsTest.java  |  17 +
 .../kafka/clients/admin/KafkaAdminClientTest.java  |   4 +-
 core/src/main/scala/kafka/cluster/Broker.scala     |   2 +-
 core/src/main/scala/kafka/log/LogConfig.scala      |   2 +-
 .../main/scala/kafka/network/SocketServer.scala    |  37 +-
 .../scala/kafka/raft/KafkaNetworkChannel.scala     |   3 +-
 core/src/main/scala/kafka/raft/RaftManager.scala   |   2 +
 .../main/scala/kafka/server/AlterIsrManager.scala  |   5 +-
 .../kafka/server/AutoTopicCreationManager.scala    |  15 +-
 .../src/main/scala/kafka/server/BrokerServer.scala | 468 ++++++++++++++++++++-
 .../server/BrokerToControllerChannelManager.scala  |  36 ++
 .../main/scala/kafka/server/ControllerApis.scala   | 453 ++++++++++++++++++++
 .../main/scala/kafka/server/ControllerServer.scala | 198 ++++++++-
 .../main/scala/kafka/server/EnvelopeUtils.scala    | 137 ++++++
 core/src/main/scala/kafka/server/KafkaApis.scala   | 103 +----
 core/src/main/scala/kafka/server/KafkaBroker.scala |   8 +
 core/src/main/scala/kafka/server/KafkaConfig.scala |  32 +-
 .../main/scala/kafka/server/KafkaRaftServer.scala  |  33 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |  11 +-
 .../main/scala/kafka/server/MetadataSupport.scala  |  20 +-
 core/src/main/scala/kafka/server/Server.scala      |  12 +
 .../main/scala/kafka/tools/TestRaftServer.scala    |   2 +-
 .../server/AutoTopicCreationManagerTest.scala      |  12 +-
 .../unit/kafka/server/ControllerApisTest.scala     | 143 +++++++
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  40 +-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  | 114 ++++-
 .../unit/kafka/server/KafkaRaftServerTest.scala    |   2 +-
 .../jmh/metadata/MetadataRequestBenchmark.java     |   5 +-
 .../org/apache/kafka/raft/KafkaRaftClient.java     |  21 +-
 .../java/org/apache/kafka/raft/NetworkChannel.java |   5 +
 .../java/org/apache/kafka/raft/RaftClient.java     |  11 +-
 .../java/org/apache/kafka/raft/RaftConfig.java     |  13 +
 .../org/apache/kafka/raft/ReplicatedCounter.java   |   2 +-
 .../kafka/raft/metadata/MetaLogRaftShim.java       | 119 ++++++
 .../org/apache/kafka/raft/MockNetworkChannel.java  |   5 +
 .../apache/kafka/raft/RaftClientTestContext.java   |   2 +-
 tests/kafkatest/services/kafka/config_property.py  |   2 +-
 39 files changed, 1908 insertions(+), 202 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index bc0491e..9ec16b9 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -385,6 +385,7 @@
     <allow pkg="org.apache.kafka.common.record" />
     <allow pkg="org.apache.kafka.common.requests" />
     <allow pkg="org.apache.kafka.common.protocol" />
+    <allow pkg="org.apache.kafka.metalog" />
     <allow pkg="org.apache.kafka.test"/>
     <allow pkg="com.fasterxml.jackson" />
   </subpackage>
diff --git a/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java b/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java
index 8001f1c..a09d581 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ApiVersions.java
@@ -22,6 +22,7 @@ import org.apache.kafka.common.requests.ProduceRequest;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Optional;
 
 /**
  * Maintains node api versions for access outside of NetworkClient (which is where the information is derived).
@@ -51,12 +52,12 @@ public class ApiVersions {
     private byte computeMaxUsableProduceMagic() {
         // use a magic version which is supported by all brokers to reduce the chance that
         // we will need to convert the messages when they are ready to be sent.
-        byte maxUsableMagic = RecordBatch.CURRENT_MAGIC_VALUE;
-        for (NodeApiVersions versions : this.nodeApiVersions.values()) {
-            byte nodeMaxUsableMagic = ProduceRequest.requiredMagicForVersion(versions.latestUsableVersion(ApiKeys.PRODUCE));
-            maxUsableMagic = (byte) Math.min(nodeMaxUsableMagic, maxUsableMagic);
-        }
-        return maxUsableMagic;
+        Optional<Byte> knownBrokerNodesMinRequiredMagicForProduce = this.nodeApiVersions.values().stream()
+            .filter(versions -> versions.apiVersion(ApiKeys.PRODUCE) != null) // filter out Raft controller nodes
+            .map(versions -> ProduceRequest.requiredMagicForVersion(versions.latestUsableVersion(ApiKeys.PRODUCE)))
+            .min(Byte::compare);
+        return (byte) Math.min(RecordBatch.CURRENT_MAGIC_VALUE,
+            knownBrokerNodesMinRequiredMagicForProduce.orElse(RecordBatch.CURRENT_MAGIC_VALUE));
     }
 
     public synchronized byte maxUsableProduceMagic() {
diff --git a/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java b/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java
index 4a5c98d..206e95e4 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ApiVersionsTest.java
@@ -16,10 +16,13 @@
  */
 package org.apache.kafka.clients;
 
+import org.apache.kafka.common.message.ApiVersionsResponseData;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.record.RecordBatch;
 import org.junit.jupiter.api.Test;
 
+import java.util.Collections;
+
 import static org.junit.jupiter.api.Assertions.assertEquals;
 
 public class ApiVersionsTest {
@@ -38,4 +41,18 @@ public class ApiVersionsTest {
         apiVersions.remove("1");
         assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, apiVersions.maxUsableProduceMagic());
     }
+
+    @Test
+    public void testMaxUsableProduceMagicWithRaftController() {
+        ApiVersions apiVersions = new ApiVersions();
+        assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, apiVersions.maxUsableProduceMagic());
+
+        // something that doesn't support PRODUCE, which is the case with Raft-based controllers
+        apiVersions.update("2", new NodeApiVersions(Collections.singleton(
+            new ApiVersionsResponseData.ApiVersion()
+                .setApiKey(ApiKeys.FETCH.id)
+                .setMinVersion((short) 0)
+                .setMaxVersion((short) 2))));
+        assertEquals(RecordBatch.CURRENT_MAGIC_VALUE, apiVersions.maxUsableProduceMagic());
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index ddd01be..97b4b66 100644
--- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -5082,7 +5082,7 @@ public class KafkaAdminClientTest {
     }
 
     @Test
-    public void testDecommissionBrokerTimeoutMaxRetry() {
+    public void testUnregisterBrokerTimeoutMaxRetry() {
         int nodeId = 1;
         try (final AdminClientUnitTestEnv env = mockClientEnv(Time.SYSTEM, AdminClientConfig.RETRIES_CONFIG, "1")) {
             env.kafkaClient().setNodeApiVersions(
@@ -5099,7 +5099,7 @@ public class KafkaAdminClientTest {
     }
 
     @Test
-    public void testDecommissionBrokerTimeoutMaxWait() {
+    public void testUnregisterBrokerTimeoutMaxWait() {
         int nodeId = 1;
         try (final AdminClientUnitTestEnv env = mockClientEnv()) {
             env.kafkaClient().setNodeApiVersions(
diff --git a/core/src/main/scala/kafka/cluster/Broker.scala b/core/src/main/scala/kafka/cluster/Broker.scala
index 46483d0..657d89b 100755
--- a/core/src/main/scala/kafka/cluster/Broker.scala
+++ b/core/src/main/scala/kafka/cluster/Broker.scala
@@ -32,7 +32,7 @@ import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 
 object Broker {
-  private[cluster] case class ServerInfo(clusterResource: ClusterResource,
+  private[kafka] case class ServerInfo(clusterResource: ClusterResource,
                                          brokerId: Int,
                                          endpoints: util.List[Endpoint],
                                          interBrokerEndpoint: Endpoint) extends AuthorizerServerInfo
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 4299534..c2ab1d8 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -228,7 +228,7 @@ object LogConfig {
   }
 
   // Package private for testing, return a copy since it's a mutable global variable
-  private[log] def configDefCopy: LogConfigDef = new LogConfigDef(configDef)
+  private[kafka] def configDefCopy: LogConfigDef = new LogConfigDef(configDef)
 
   private val configDef: LogConfigDef = {
     import org.apache.kafka.common.config.ConfigDef.Importance._
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index 905c556..72c5141 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -78,12 +78,16 @@ class SocketServer(val config: KafkaConfig,
                    val metrics: Metrics,
                    val time: Time,
                    val credentialProvider: CredentialProvider,
-                   val allowControllerOnlyApis: Boolean = false)
+                   allowControllerOnlyApis: Boolean = false,
+                   controllerSocketServer: Boolean = false)
   extends Logging with KafkaMetricsGroup with BrokerReconfigurable {
 
   private val maxQueuedRequests = config.queuedMaxRequests
 
-  private val logContext = new LogContext(s"[SocketServer brokerId=${config.brokerId}] ")
+  private val nodeId = config.brokerId
+
+  private val logContext = new LogContext(s"[SocketServer ${if (controllerSocketServer) "controller" else "broker"}Id=${nodeId}] ")
+
   this.logIdent = logContext.logPrefix
 
   private val memoryPoolSensor = metrics.sensor("MemoryPoolUtilization")
@@ -117,11 +121,15 @@ class SocketServer(val config: KafkaConfig,
    * when processors start up and invoke [[org.apache.kafka.common.network.Selector#poll]].
    *
    * @param startProcessingRequests Flag indicating whether `Processor`s must be started.
+   * @param controlPlaneListener    The control plane listener, or None if there is none.
+   * @param dataPlaneListeners      The data plane listeners.
    */
-  def startup(startProcessingRequests: Boolean = true): Unit = {
+  def startup(startProcessingRequests: Boolean = true,
+              controlPlaneListener: Option[EndPoint] = config.controlPlaneListener,
+              dataPlaneListeners: Seq[EndPoint] = config.dataPlaneListeners): Unit = {
     this.synchronized {
-      createControlPlaneAcceptorAndProcessor(config.controlPlaneListener)
-      createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners)
+      createControlPlaneAcceptorAndProcessor(controlPlaneListener)
+      createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, dataPlaneListeners)
       if (startProcessingRequests) {
         this.startProcessingRequests()
       }
@@ -224,9 +232,11 @@ class SocketServer(val config: KafkaConfig,
   private def startDataPlaneProcessorsAndAcceptors(authorizerFutures: Map[Endpoint, CompletableFuture[Void]]): Unit = {
     val interBrokerListener = dataPlaneAcceptors.asScala.keySet
       .find(_.listenerName == config.interBrokerListenerName)
-      .getOrElse(throw new IllegalStateException(s"Inter-broker listener ${config.interBrokerListenerName} not found, endpoints=${dataPlaneAcceptors.keySet}"))
-    val orderedAcceptors = List(dataPlaneAcceptors.get(interBrokerListener)) ++
-      dataPlaneAcceptors.asScala.filter { case (k, _) => k != interBrokerListener }.values
+    val orderedAcceptors = interBrokerListener match {
+      case Some(interBrokerListener) => List(dataPlaneAcceptors.get(interBrokerListener)) ++
+        dataPlaneAcceptors.asScala.filter { case (k, _) => k != interBrokerListener }.values
+      case None => dataPlaneAcceptors.asScala.values
+    }
     orderedAcceptors.foreach { acceptor =>
       val endpoint = acceptor.endPoint
       startAcceptorAndProcessors(DataPlaneThreadPrefix, endpoint, acceptor, authorizerFutures)
@@ -276,8 +286,7 @@ class SocketServer(val config: KafkaConfig,
   private def createAcceptor(endPoint: EndPoint, metricPrefix: String) : Acceptor = {
     val sendBufferSize = config.socketSendBufferBytes
     val recvBufferSize = config.socketReceiveBufferBytes
-    val brokerId = config.brokerId
-    new Acceptor(endPoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas, metricPrefix, time)
+    new Acceptor(endPoint, sendBufferSize, recvBufferSize, nodeId, connectionQuotas, metricPrefix, time)
   }
 
   private def addDataPlaneProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = {
@@ -540,11 +549,13 @@ private[kafka] abstract class AbstractServerThread(connectionQuotas: ConnectionQ
 private[kafka] class Acceptor(val endPoint: EndPoint,
                               val sendBufferSize: Int,
                               val recvBufferSize: Int,
-                              brokerId: Int,
+                              nodeId: Int,
                               connectionQuotas: ConnectionQuotas,
                               metricPrefix: String,
-                              time: Time) extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
+                              time: Time,
+                              logPrefix: String = "") extends AbstractServerThread(connectionQuotas) with KafkaMetricsGroup {
 
+  this.logIdent = logPrefix
   private val nioSelector = NSelector.open()
   val serverChannel = openServerSocket(endPoint.host, endPoint.port)
   private val processors = new ArrayBuffer[Processor]()
@@ -573,7 +584,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
   private def startProcessors(processors: Seq[Processor], processorThreadPrefix: String): Unit = synchronized {
     processors.foreach { processor =>
       KafkaThread.nonDaemon(
-        s"${processorThreadPrefix}-kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
+        s"${processorThreadPrefix}-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
         processor
       ).start()
     }
diff --git a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
index f3b7f11..68f7b4a 100644
--- a/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
+++ b/core/src/main/scala/kafka/raft/KafkaNetworkChannel.scala
@@ -165,7 +165,7 @@ class KafkaNetworkChannel(
     RaftUtil.errorResponse(apiKey, error)
   }
 
-  def updateEndpoint(id: Int, spec: InetAddressSpec): Unit = {
+  override def updateEndpoint(id: Int, spec: InetAddressSpec): Unit = {
     val node = new Node(id, spec.address.getHostString, spec.address.getPort)
     endpoints.put(id, node)
   }
@@ -181,5 +181,4 @@ class KafkaNetworkChannel(
   override def close(): Unit = {
     requestThread.shutdown()
   }
-
 }
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index b9a77b7..6a74c27 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -121,6 +121,8 @@ class KafkaRaftManager[T](
   private val raftClient = buildRaftClient()
   private val raftIoThread = new RaftIoThread(raftClient, threadNamePrefix)
 
+  def kafkaRaftClient: KafkaRaftClient[T] = raftClient
+
   def startup(): Unit = {
     // Update the voter endpoints (if valid) with what's in RaftConfig
     val voterAddresses: util.Map[Integer, AddressSpec] = raftConfig.quorumVoterConnections
diff --git a/core/src/main/scala/kafka/server/AlterIsrManager.scala b/core/src/main/scala/kafka/server/AlterIsrManager.scala
index 70c0fc2..b58ca89 100644
--- a/core/src/main/scala/kafka/server/AlterIsrManager.scala
+++ b/core/src/main/scala/kafka/server/AlterIsrManager.scala
@@ -70,7 +70,8 @@ object AlterIsrManager {
     time: Time,
     metrics: Metrics,
     threadNamePrefix: Option[String],
-    brokerEpochSupplier: () => Long
+    brokerEpochSupplier: () => Long,
+    brokerId: Int
   ): AlterIsrManager = {
     val nodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache)
 
@@ -87,7 +88,7 @@ object AlterIsrManager {
       controllerChannelManager = channelManager,
       scheduler = scheduler,
       time = time,
-      brokerId = config.brokerId,
+      brokerId = brokerId,
       brokerEpochSupplier = brokerEpochSupplier
     )
   }
diff --git a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
index ec7f2df..01dabed 100644
--- a/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
+++ b/core/src/main/scala/kafka/server/AutoTopicCreationManager.scala
@@ -61,8 +61,8 @@ object AutoTopicCreationManager {
     time: Time,
     metrics: Metrics,
     threadNamePrefix: Option[String],
-    adminManager: ZkAdminManager,
-    controller: KafkaController,
+    adminManager: Option[ZkAdminManager],
+    controller: Option[KafkaController],
     groupCoordinator: GroupCoordinator,
     txnCoordinator: TransactionCoordinator,
     enableForwarding: Boolean
@@ -91,11 +91,14 @@ class DefaultAutoTopicCreationManager(
   config: KafkaConfig,
   metadataCache: MetadataCache,
   channelManager: Option[BrokerToControllerChannelManager],
-  adminManager: ZkAdminManager,
-  controller: KafkaController,
+  adminManager: Option[ZkAdminManager],
+  controller: Option[KafkaController],
   groupCoordinator: GroupCoordinator,
   txnCoordinator: TransactionCoordinator
 ) extends AutoTopicCreationManager with Logging {
+  if (controller.isEmpty && channelManager.isEmpty) {
+    throw new IllegalArgumentException("Must supply a channel manager if not supplying a controller")
+  }
 
   private val inflightTopics = Collections.newSetFromMap(new ConcurrentHashMap[String, java.lang.Boolean]())
 
@@ -116,7 +119,7 @@ class DefaultAutoTopicCreationManager(
 
     val creatableTopicResponses = if (creatableTopics.isEmpty) {
       Seq.empty
-    } else if (!controller.isActive && channelManager.isDefined) {
+    } else if (controller.isEmpty || !controller.get.isActive && channelManager.isDefined) {
       sendCreateTopicRequest(creatableTopics)
     } else {
       createTopicsInZk(creatableTopics, controllerMutationQuota)
@@ -133,7 +136,7 @@ class DefaultAutoTopicCreationManager(
     try {
       // Note that we use timeout = 0 since we do not need to wait for metadata propagation
       // and we want to get the response error immediately.
-      adminManager.createTopics(
+      adminManager.get.createTopics(
         timeout = 0,
         validateOnly = false,
         creatableTopics,
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 90f95ed..57ceb46 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -1,10 +1,10 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License.  You may obtain a copy of the License at
  *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -14,14 +14,464 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package kafka.server
 
+import java.util
+import java.util.concurrent.{CompletableFuture, TimeUnit, TimeoutException}
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.locks.ReentrantLock
+
+import kafka.cluster.Broker.ServerInfo
+import kafka.coordinator.group.GroupCoordinator
+import kafka.coordinator.transaction.{ProducerIdGenerator, TransactionCoordinator}
+import kafka.log.LogManager
+import kafka.metrics.KafkaYammerMetrics
+import kafka.network.SocketServer
+import kafka.security.CredentialProvider
+import kafka.server.KafkaBroker.metricsPrefix
+import kafka.server.metadata.{BrokerMetadataListener, CachedConfigRepository, ClientQuotaCache, ClientQuotaMetadataManager, RaftMetadataCache}
+import kafka.utils.{CoreUtils, KafkaScheduler}
+import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.message.BrokerRegistrationRequestData.{Listener, ListenerCollection}
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network.ListenerName
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.security.scram.internals.ScramMechanism
+import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
+import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
+import org.apache.kafka.common.{ClusterResource, Endpoint, KafkaException}
+import org.apache.kafka.metadata.{BrokerState, VersionRange}
+import org.apache.kafka.metalog.MetaLogManager
+import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.{Map, Seq}
+import scala.jdk.CollectionConverters._
+
 /**
- * Stubbed implementation of the KIP-500 broker which processes state
- * from the `@metadata` topic which is replicated through Raft.
+ * A KIP-500 Kafka broker.
  */
-class BrokerServer {
-  def startup(): Unit = ???
-  def shutdown(): Unit = ???
-  def awaitShutdown(): Unit = ???
+class BrokerServer(
+                    val config: KafkaConfig,
+                    val metaProps: MetaProperties,
+                    val metaLogManager: MetaLogManager,
+                    val time: Time,
+                    val metrics: Metrics,
+                    val threadNamePrefix: Option[String],
+                    val initialOfflineDirs: Seq[String],
+                    val controllerQuorumVotersFuture: CompletableFuture[util.List[String]],
+                    val supportedFeatures: util.Map[String, VersionRange]
+                  ) extends KafkaBroker {
+
+  import kafka.server.Server._
+
+  private val logContext: LogContext = new LogContext(s"[BrokerServer id=${config.nodeId}] ")
+
+  this.logIdent = logContext.logPrefix
+
+  val lifecycleManager: BrokerLifecycleManager =
+    new BrokerLifecycleManager(config, time, threadNamePrefix)
+
+  private val isShuttingDown = new AtomicBoolean(false)
+
+  val lock = new ReentrantLock()
+  val awaitShutdownCond = lock.newCondition()
+  var status: ProcessStatus = SHUTDOWN
+
+  var dataPlaneRequestProcessor: KafkaApis = null
+  var controlPlaneRequestProcessor: KafkaApis = null
+
+  var authorizer: Option[Authorizer] = None
+  var socketServer: SocketServer = null
+  var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
+  var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
+
+  var logDirFailureChannel: LogDirFailureChannel = null
+  var logManager: LogManager = null
+
+  var tokenManager: DelegationTokenManager = null
+
+  var replicaManager: RaftReplicaManager = null
+
+  var credentialProvider: CredentialProvider = null
+  var tokenCache: DelegationTokenCache = null
+
+  var groupCoordinator: GroupCoordinator = null
+
+  var transactionCoordinator: TransactionCoordinator = null
+
+  var forwardingManager: ForwardingManager = null
+
+  var alterIsrManager: AlterIsrManager = null
+
+  var autoTopicCreationManager: AutoTopicCreationManager = null
+
+  var kafkaScheduler: KafkaScheduler = null
+
+  var metadataCache: RaftMetadataCache = null
+
+  var quotaManagers: QuotaFactory.QuotaManagers = null
+  var quotaCache: ClientQuotaCache = null
+
+  private var _brokerTopicStats: BrokerTopicStats = null
+
+  val brokerFeatures: BrokerFeatures = BrokerFeatures.createDefault()
+
+  val featureCache: FinalizedFeatureCache = new FinalizedFeatureCache(brokerFeatures)
+
+  val clusterId: String = metaProps.clusterId.toString
+
+  val configRepository = new CachedConfigRepository()
+
+  var brokerMetadataListener: BrokerMetadataListener = null
+
+  def kafkaYammerMetrics: kafka.metrics.KafkaYammerMetrics = KafkaYammerMetrics.INSTANCE
+
+  private[kafka] def brokerTopicStats = _brokerTopicStats
+
+  private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = {
+    lock.lock()
+    try {
+      if (status != from) return false
+      status = to
+      if (to == SHUTTING_DOWN) {
+        isShuttingDown.set(true)
+      } else if (to == SHUTDOWN) {
+        isShuttingDown.set(false)
+        awaitShutdownCond.signalAll()
+      }
+    } finally {
+      lock.unlock()
+    }
+    true
+  }
+
+  def startup(): Unit = {
+    if (!maybeChangeStatus(SHUTDOWN, STARTING)) return
+    try {
+      info("Starting broker")
+
+      /* start scheduler */
+      kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
+      kafkaScheduler.startup()
+
+      /* register broker metrics */
+      _brokerTopicStats = new BrokerTopicStats
+
+      quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
+      quotaCache = new ClientQuotaCache()
+
+      logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
+
+      // Create log manager, but don't start it because we need to delay any potential unclean shutdown log recovery
+      // until we catch up on the metadata log and have up-to-date topic and broker configs.
+      logManager = LogManager(config, initialOfflineDirs, configRepository, kafkaScheduler, time,
+        brokerTopicStats, logDirFailureChannel, true)
+
+      metadataCache = MetadataCache.raftMetadataCache(config.nodeId)
+      // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
+      // This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
+      tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
+      credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
+
+      // Create and start the socket server acceptor threads so that the bound port is known.
+      // Delay starting processors until the end of the initialization sequence to ensure
+      // that credentials have been loaded before processing authentications.
+      socketServer = new SocketServer(config, metrics, time, credentialProvider, allowControllerOnlyApis = false)
+      socketServer.startup(startProcessingRequests = false)
+
+      val controllerNodes =
+        RaftConfig.quorumVoterStringsToNodes(controllerQuorumVotersFuture.get()).asScala
+      val controllerNodeProvider = RaftControllerNodeProvider(metaLogManager, config, controllerNodes)
+      val alterIsrChannelManager = BrokerToControllerChannelManager(controllerNodeProvider,
+        time, metrics, config, "alterisr", threadNamePrefix, 60000)
+      alterIsrManager = new DefaultAlterIsrManager(
+        controllerChannelManager = alterIsrChannelManager,
+        scheduler = kafkaScheduler,
+        time = time,
+        brokerId = config.nodeId,
+        brokerEpochSupplier = () => lifecycleManager.brokerEpoch()
+      )
+      alterIsrManager.start()
+
+      this.replicaManager = new RaftReplicaManager(config, metrics, time,
+        kafkaScheduler, logManager, isShuttingDown, quotaManagers,
+        brokerTopicStats, metadataCache, logDirFailureChannel, alterIsrManager,
+        configRepository, threadNamePrefix)
+
+      val forwardingChannelManager = BrokerToControllerChannelManager(controllerNodeProvider,
+        time, metrics, config, "forwarding", threadNamePrefix, 60000)
+      forwardingManager = new ForwardingManagerImpl(forwardingChannelManager)
+      forwardingManager.start()
+
+      /* start token manager */
+      if (config.tokenAuthEnabled) {
+        throw new UnsupportedOperationException("Delegation tokens are not supported")
+      }
+      tokenManager = new DelegationTokenManager(config, tokenCache, time , null)
+      tokenManager.startup() // does nothing, we just need a token manager in order to compile right now...
+
+      // Create group coordinator, but don't start it until we've started replica manager.
+      // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
+      groupCoordinator = GroupCoordinator(config, replicaManager, Time.SYSTEM, metrics)
+
+      // Create transaction coordinator, but don't start it until we've started replica manager.
+      // Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
+      transactionCoordinator = TransactionCoordinator(config, replicaManager,
+        new KafkaScheduler(threads = 1, threadNamePrefix = "transaction-log-manager-"),
+        createTemporaryProducerIdManager, metrics, metadataCache, Time.SYSTEM)
+
+      val autoTopicCreationChannelManager = BrokerToControllerChannelManager(controllerNodeProvider,
+        time, metrics, config, "autocreate", threadNamePrefix, 60000)
+      autoTopicCreationManager = new DefaultAutoTopicCreationManager(
+        config, metadataCache, Some(autoTopicCreationChannelManager), None, None,
+        groupCoordinator, transactionCoordinator)
+      autoTopicCreationManager.start()
+
+      /* Add all reconfigurables for config change notification before starting the metadata listener */
+      config.dynamicConfig.addReconfigurables(this)
+
+      val clientQuotaMetadataManager = new ClientQuotaMetadataManager(
+        quotaManagers, socketServer.connectionQuotas, quotaCache)
+
+      brokerMetadataListener = new BrokerMetadataListener(
+        config.nodeId,
+        time,
+        metadataCache,
+        configRepository,
+        groupCoordinator,
+        replicaManager,
+        transactionCoordinator,
+        logManager,
+        threadNamePrefix,
+        clientQuotaMetadataManager)
+
+      val networkListeners = new ListenerCollection()
+      config.advertisedListeners.foreach { ep =>
+        networkListeners.add(new Listener().
+          setHost(ep.host).
+          setName(ep.listenerName.value()).
+          setPort(socketServer.boundPort(ep.listenerName)).
+          setSecurityProtocol(ep.securityProtocol.id))
+      }
+      lifecycleManager.start(() => brokerMetadataListener.highestMetadataOffset(),
+        BrokerToControllerChannelManager(controllerNodeProvider, time, metrics, config,
+          "heartbeat", threadNamePrefix, config.brokerSessionTimeoutMs.toLong),
+        metaProps.clusterId, networkListeners, supportedFeatures)
+
+      // Register a listener with the Raft layer to receive metadata event notifications
+      metaLogManager.register(brokerMetadataListener)
+
+      val endpoints = new util.ArrayList[Endpoint](networkListeners.size())
+      var interBrokerListener: Endpoint = null
+      networkListeners.iterator().forEachRemaining(listener => {
+        val endPoint = new Endpoint(listener.name(),
+          SecurityProtocol.forId(listener.securityProtocol()),
+          listener.host(), listener.port())
+        endpoints.add(endPoint)
+        if (listener.name().equals(config.interBrokerListenerName.value())) {
+          interBrokerListener = endPoint
+        }
+      })
+      if (interBrokerListener == null) {
+        throw new RuntimeException("Unable to find inter-broker listener " +
+          config.interBrokerListenerName.value() + ". Found listener(s): " +
+          endpoints.asScala.map(ep => ep.listenerName().orElse("(none)")).mkString(", "))
+      }
+      val authorizerInfo = ServerInfo(new ClusterResource(clusterId),
+        config.nodeId, endpoints, interBrokerListener)
+
+      /* Get the authorizer and initialize it if one is specified.*/
+      authorizer = config.authorizer
+      authorizer.foreach(_.configure(config.originals))
+      val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
+        case Some(authZ) =>
+          authZ.start(authorizerInfo).asScala.map { case (ep, cs) =>
+            ep -> cs.toCompletableFuture
+          }
+        case None =>
+          authorizerInfo.endpoints.asScala.map { ep =>
+            ep -> CompletableFuture.completedFuture[Void](null)
+          }.toMap
+      }
+
+      val fetchManager = new FetchManager(Time.SYSTEM,
+        new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
+          KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
+
+      // Start processing requests once we've caught up on the metadata log, recovered logs if necessary,
+      // and started all services that we previously delayed starting.
+      val raftSupport = RaftSupport(forwardingManager, metadataCache)
+      dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, raftSupport,
+        replicaManager, groupCoordinator, transactionCoordinator, autoTopicCreationManager,
+        config.nodeId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,
+        fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
+
+      dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
+        config.numIoThreads, s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.DataPlaneThreadPrefix)
+
+      socketServer.controlPlaneRequestChannelOpt.foreach { controlPlaneRequestChannel =>
+        controlPlaneRequestProcessor = new KafkaApis(controlPlaneRequestChannel, raftSupport,
+          replicaManager, groupCoordinator, transactionCoordinator, autoTopicCreationManager,
+          config.nodeId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,
+          fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
+
+        controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.nodeId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
+          1, s"${SocketServer.ControlPlaneMetricPrefix}RequestHandlerAvgIdlePercent", SocketServer.ControlPlaneThreadPrefix)
+      }
+
+      // Block until we've caught up on the metadata log
+      lifecycleManager.initialCatchUpFuture.get()
+      // Start log manager, which will perform (potentially lengthy) recovery-from-unclean-shutdown if required.
+      logManager.startup(metadataCache.getAllTopics())
+      // Start other services that we've delayed starting, in the appropriate order.
+      replicaManager.endMetadataChangeDeferral(
+        RequestHandlerHelper.onLeadershipChange(groupCoordinator, transactionCoordinator, _, _))
+      replicaManager.startup()
+      replicaManager.startHighWatermarkCheckPointThread()
+      groupCoordinator.startup(() => metadataCache.numPartitions(Topic.GROUP_METADATA_TOPIC_NAME).
+        getOrElse(config.offsetsTopicPartitions))
+      transactionCoordinator.startup(() => metadataCache.numPartitions(Topic.TRANSACTION_STATE_TOPIC_NAME).
+        getOrElse(config.transactionTopicPartitions))
+
+      socketServer.startProcessingRequests(authorizerFutures)
+
+      // We're now ready to unfence the broker.
+      lifecycleManager.setReadyToUnfence()
+
+      maybeChangeStatus(STARTING, STARTED)
+    } catch {
+      case e: Throwable =>
+        maybeChangeStatus(STARTING, STARTED)
+        fatal("Fatal error during broker startup. Prepare to shutdown", e)
+        shutdown()
+        throw e
+    }
+  }
+
+  class TemporaryProducerIdManager() extends ProducerIdGenerator {
+    val maxProducerIdsPerBrokerEpoch = 1000000
+    var currentOffset = -1
+    override def generateProducerId(): Long = {
+      currentOffset = currentOffset + 1
+      if (currentOffset >= maxProducerIdsPerBrokerEpoch) {
+        fatal(s"Exhausted all demo/temporary producerIds as the next one will has extend past the block size of $maxProducerIdsPerBrokerEpoch")
+        throw new KafkaException("Have exhausted all demo/temporary producerIds.")
+      }
+      lifecycleManager.initialCatchUpFuture.get()
+      lifecycleManager.brokerEpoch() * maxProducerIdsPerBrokerEpoch + currentOffset
+    }
+  }
+
+  def createTemporaryProducerIdManager(): ProducerIdGenerator = {
+    new TemporaryProducerIdManager()
+  }
+
+  def shutdown(): Unit = {
+    if (!maybeChangeStatus(STARTED, SHUTTING_DOWN)) return
+    try {
+      info("shutting down")
+
+      if (config.controlledShutdownEnable) {
+        lifecycleManager.beginControlledShutdown()
+        try {
+          lifecycleManager.controlledShutdownFuture.get(5L, TimeUnit.MINUTES)
+        } catch {
+          case _: TimeoutException =>
+            error("Timed out waiting for the controller to approve controlled shutdown")
+          case e: Throwable =>
+            error("Got unexpected exception waiting for controlled shutdown future", e)
+        }
+      }
+      lifecycleManager.beginShutdown()
+
+      // Stop socket server to stop accepting any more connections and requests.
+      // Socket server will be shutdown towards the end of the sequence.
+      if (socketServer != null) {
+        CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
+      }
+      if (dataPlaneRequestHandlerPool != null)
+        CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this)
+      if (controlPlaneRequestHandlerPool != null)
+        CoreUtils.swallow(controlPlaneRequestHandlerPool.shutdown(), this)
+      if (kafkaScheduler != null)
+        CoreUtils.swallow(kafkaScheduler.shutdown(), this)
+
+      if (dataPlaneRequestProcessor != null)
+        CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
+      if (controlPlaneRequestProcessor != null)
+        CoreUtils.swallow(controlPlaneRequestProcessor.close(), this)
+      CoreUtils.swallow(authorizer.foreach(_.close()), this)
+
+      if (brokerMetadataListener !=  null) {
+        CoreUtils.swallow(brokerMetadataListener.close(), this)
+      }
+      if (transactionCoordinator != null)
+        CoreUtils.swallow(transactionCoordinator.shutdown(), this)
+      if (groupCoordinator != null)
+        CoreUtils.swallow(groupCoordinator.shutdown(), this)
+
+      if (tokenManager != null)
+        CoreUtils.swallow(tokenManager.shutdown(), this)
+
+      if (replicaManager != null)
+        CoreUtils.swallow(replicaManager.shutdown(), this)
+
+      if (alterIsrManager != null)
+        CoreUtils.swallow(alterIsrManager.shutdown(), this)
+
+      if (forwardingManager != null)
+        CoreUtils.swallow(forwardingManager.shutdown(), this)
+
+      if (autoTopicCreationManager != null)
+        CoreUtils.swallow(autoTopicCreationManager.shutdown(), this)
+
+      if (logManager != null)
+        CoreUtils.swallow(logManager.shutdown(), this)
+
+      if (quotaManagers != null)
+        CoreUtils.swallow(quotaManagers.shutdown(), this)
+
+      if (socketServer != null)
+        CoreUtils.swallow(socketServer.shutdown(), this)
+      if (metrics != null)
+        CoreUtils.swallow(metrics.close(), this)
+      if (brokerTopicStats != null)
+        CoreUtils.swallow(brokerTopicStats.close(), this)
+
+      // Clear all reconfigurable instances stored in DynamicBrokerConfig
+      config.dynamicConfig.clear()
+
+      isShuttingDown.set(false)
+
+      CoreUtils.swallow(lifecycleManager.close(), this)
+
+      CoreUtils.swallow(AppInfoParser.unregisterAppInfo(metricsPrefix, config.nodeId.toString, metrics), this)
+      info("shut down completed")
+    } catch {
+      case e: Throwable =>
+        fatal("Fatal error during broker shutdown.", e)
+        throw e
+    } finally {
+      maybeChangeStatus(SHUTTING_DOWN, SHUTDOWN)
+    }
+  }
+
+  def awaitShutdown(): Unit = {
+    lock.lock()
+    try {
+      while (true) {
+        if (status == SHUTDOWN) return
+        awaitShutdownCond.awaitUninterruptibly()
+      }
+    } finally {
+      lock.unlock()
+    }
+  }
+
+  def boundPort(listenerName: ListenerName): Int = socketServer.boundPort(listenerName)
+
+  def currentState(): BrokerState = lifecycleManager.state()
+
 }
diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
index 1e7af76..3b53522 100644
--- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
@@ -31,7 +31,9 @@ import org.apache.kafka.common.requests.AbstractRequest
 import org.apache.kafka.common.security.JaasContext
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.metalog.MetaLogManager
 
+import scala.collection.Seq
 import scala.jdk.CollectionConverters._
 
 trait ControllerNodeProvider {
@@ -71,6 +73,40 @@ class MetadataCacheControllerNodeProvider(
   }
 }
 
+object RaftControllerNodeProvider {
+  def apply(metaLogManager: MetaLogManager,
+            config: KafkaConfig,
+            controllerQuorumVoterNodes: Seq[Node]): RaftControllerNodeProvider = {
+
+    val listenerName = new ListenerName(config.controllerListenerNames.head)
+    val securityProtocol = config.listenerSecurityProtocolMap.getOrElse(listenerName, SecurityProtocol.forName(listenerName.value()))
+    new RaftControllerNodeProvider(metaLogManager, controllerQuorumVoterNodes, listenerName, securityProtocol)
+  }
+}
+
+/**
+ * Finds the controller node by checking the metadata log manager.
+ * This provider is used when we are using a Raft-based metadata quorum.
+ */
+class RaftControllerNodeProvider(val metaLogManager: MetaLogManager,
+                                 controllerQuorumVoterNodes: Seq[Node],
+                                 val listenerName: ListenerName,
+                                 val securityProtocol: SecurityProtocol
+                                ) extends ControllerNodeProvider with Logging {
+  val idToNode = controllerQuorumVoterNodes.map(node => node.id() -> node).toMap
+
+  override def get(): Option[Node] = {
+    val leader = metaLogManager.leader()
+    if (leader == null) {
+      None
+    } else if (leader.nodeId() < 0) {
+      None
+    } else {
+      idToNode.get(leader.nodeId())
+    }
+  }
+}
+
 object BrokerToControllerChannelManager {
   def apply(
     controllerNodeProvider: ControllerNodeProvider,
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala b/core/src/main/scala/kafka/server/ControllerApis.scala
new file mode 100644
index 0000000..2386da5
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -0,0 +1,453 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util
+
+import kafka.network.RequestChannel
+import kafka.raft.RaftManager
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.utils.Logging
+import org.apache.kafka.clients.admin.AlterConfigOp
+import org.apache.kafka.common.acl.AclOperation.{ALTER, ALTER_CONFIGS, CLUSTER_ACTION, CREATE, DESCRIBE}
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.errors.ApiException
+import org.apache.kafka.common.internals.FatalExitError
+import org.apache.kafka.common.message.ApiVersionsResponseData.{ApiVersion, SupportedFeatureKey}
+import org.apache.kafka.common.message.CreateTopicsRequestData.CreatableTopicCollection
+import org.apache.kafka.common.message.CreateTopicsResponseData.CreatableTopicResult
+import org.apache.kafka.common.message.MetadataResponseData.MetadataResponseBroker
+import org.apache.kafka.common.message.{ApiVersionsResponseData, BeginQuorumEpochResponseData, BrokerHeartbeatResponseData, BrokerRegistrationResponseData, CreateTopicsResponseData, DescribeQuorumResponseData, EndQuorumEpochResponseData, FetchResponseData, MetadataResponseData, UnregisterBrokerResponseData, VoteResponseData}
+import org.apache.kafka.common.protocol.{ApiKeys, ApiMessage, Errors}
+import org.apache.kafka.common.record.BaseRecords
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.resource.Resource
+import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
+import org.apache.kafka.common.resource.ResourceType.{CLUSTER, TOPIC}
+import org.apache.kafka.common.utils.Time
+import org.apache.kafka.common.Node
+import org.apache.kafka.controller.Controller
+import org.apache.kafka.metadata.{ApiMessageAndVersion, BrokerHeartbeatReply, BrokerRegistrationReply, FeatureMap, FeatureMapAndEpoch, VersionRange}
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * Request handler for Controller APIs
+ */
+class ControllerApis(val requestChannel: RequestChannel,
+                     val authorizer: Option[Authorizer],
+                     val quotas: QuotaManagers,
+                     val time: Time,
+                     val supportedFeatures: Map[String, VersionRange],
+                     val controller: Controller,
+                     val raftManager: RaftManager[ApiMessageAndVersion],
+                     val config: KafkaConfig,
+                     val metaProperties: MetaProperties,
+                     val controllerNodes: Seq[Node]) extends ApiRequestHandler with Logging {
+
+  val authHelper = new AuthHelper(authorizer)
+  val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time, s"[ControllerApis id=${config.nodeId}] ")
+
+  var supportedApiKeys = Set(
+    ApiKeys.FETCH,
+    ApiKeys.METADATA,
+    //ApiKeys.SASL_HANDSHAKE
+    ApiKeys.API_VERSIONS,
+    ApiKeys.CREATE_TOPICS,
+    //ApiKeys.DELETE_TOPICS,
+    //ApiKeys.DESCRIBE_ACLS,
+    //ApiKeys.CREATE_ACLS,
+    //ApiKeys.DELETE_ACLS,
+    //ApiKeys.DESCRIBE_CONFIGS,
+    //ApiKeys.ALTER_CONFIGS,
+    //ApiKeys.SASL_AUTHENTICATE,
+    //ApiKeys.CREATE_PARTITIONS,
+    //ApiKeys.CREATE_DELEGATION_TOKEN
+    //ApiKeys.RENEW_DELEGATION_TOKEN
+    //ApiKeys.EXPIRE_DELEGATION_TOKEN
+    //ApiKeys.DESCRIBE_DELEGATION_TOKEN
+    //ApiKeys.ELECT_LEADERS
+    ApiKeys.INCREMENTAL_ALTER_CONFIGS,
+    //ApiKeys.ALTER_PARTITION_REASSIGNMENTS
+    //ApiKeys.LIST_PARTITION_REASSIGNMENTS
+    ApiKeys.ALTER_CLIENT_QUOTAS,
+    //ApiKeys.DESCRIBE_USER_SCRAM_CREDENTIALS
+    //ApiKeys.ALTER_USER_SCRAM_CREDENTIALS
+    //ApiKeys.UPDATE_FEATURES
+    ApiKeys.ENVELOPE,
+    ApiKeys.VOTE,
+    ApiKeys.BEGIN_QUORUM_EPOCH,
+    ApiKeys.END_QUORUM_EPOCH,
+    ApiKeys.DESCRIBE_QUORUM,
+    ApiKeys.ALTER_ISR,
+    ApiKeys.BROKER_REGISTRATION,
+    ApiKeys.BROKER_HEARTBEAT,
+    ApiKeys.UNREGISTER_BROKER,
+  )
+
+  private def maybeHandleInvalidEnvelope(
+                                          envelope: RequestChannel.Request,
+                                          forwardedApiKey: ApiKeys
+                                        ): Boolean = {
+    def sendEnvelopeError(error: Errors): Unit = {
+      requestHelper.sendErrorResponseMaybeThrottle(envelope, error.exception)
+    }
+
+    if (!authHelper.authorize(envelope.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
+      // Forwarding request must have CLUSTER_ACTION authorization to reduce the risk of impersonation.
+      sendEnvelopeError(Errors.CLUSTER_AUTHORIZATION_FAILED)
+      true
+    } else if (!forwardedApiKey.forwardable) {
+      sendEnvelopeError(Errors.INVALID_REQUEST)
+      true
+    } else {
+      false
+    }
+  }
+
+  override def handle(request: RequestChannel.Request): Unit = {
+    try {
+      val handled = request.envelope.exists(envelope => {
+        maybeHandleInvalidEnvelope(envelope, request.header.apiKey)
+      })
+
+      if (handled)
+        return
+
+      request.header.apiKey match {
+        case ApiKeys.FETCH => handleFetch(request)
+        case ApiKeys.METADATA => handleMetadataRequest(request)
+        case ApiKeys.CREATE_TOPICS => handleCreateTopics(request)
+        case ApiKeys.API_VERSIONS => handleApiVersionsRequest(request)
+        case ApiKeys.VOTE => handleVote(request)
+        case ApiKeys.BEGIN_QUORUM_EPOCH => handleBeginQuorumEpoch(request)
+        case ApiKeys.END_QUORUM_EPOCH => handleEndQuorumEpoch(request)
+        case ApiKeys.DESCRIBE_QUORUM => handleDescribeQuorum(request)
+        case ApiKeys.ALTER_ISR => handleAlterIsrRequest(request)
+        case ApiKeys.BROKER_REGISTRATION => handleBrokerRegistration(request)
+        case ApiKeys.BROKER_HEARTBEAT => handleBrokerHeartBeatRequest(request)
+        case ApiKeys.UNREGISTER_BROKER => handleUnregisterBroker(request)
+        case ApiKeys.ALTER_CLIENT_QUOTAS => handleAlterClientQuotas(request)
+        case ApiKeys.INCREMENTAL_ALTER_CONFIGS => handleIncrementalAlterConfigs(request)
+        case ApiKeys.ENVELOPE => EnvelopeUtils.handleEnvelopeRequest(request, requestChannel.metrics, handle)
+        case _ => throw new ApiException(s"Unsupported ApiKey ${request.context.header.apiKey()}")
+      }
+    } catch {
+      case e: FatalExitError => throw e
+      case e: Throwable => requestHelper.handleError(request, e)
+    }
+  }
+
+  private def handleFetch(request: RequestChannel.Request): Unit = {
+    authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
+    handleRaftRequest(request, response => new FetchResponse[BaseRecords](response.asInstanceOf[FetchResponseData]))
+  }
+
+  def handleMetadataRequest(request: RequestChannel.Request): Unit = {
+    val metadataRequest = request.body[MetadataRequest]
+    def createResponseCallback(requestThrottleMs: Int): MetadataResponse = {
+      val metadataResponseData = new MetadataResponseData()
+      metadataResponseData.setThrottleTimeMs(requestThrottleMs)
+      controllerNodes.foreach { node =>
+        metadataResponseData.brokers().add(new MetadataResponseBroker()
+          .setHost(node.host)
+          .setNodeId(node.id)
+          .setPort(node.port)
+          .setRack(node.rack))
+      }
+      metadataResponseData.setClusterId(metaProperties.clusterId.toString)
+      if (controller.curClaimEpoch() > 0) {
+        metadataResponseData.setControllerId(config.nodeId)
+      } else {
+        metadataResponseData.setControllerId(MetadataResponse.NO_CONTROLLER_ID)
+      }
+      val clusterAuthorizedOperations = if (metadataRequest.data.includeClusterAuthorizedOperations) {
+        if (authHelper.authorize(request.context, DESCRIBE, CLUSTER, CLUSTER_NAME)) {
+          authHelper.authorizedOperations(request, Resource.CLUSTER)
+        } else {
+          0
+        }
+      } else {
+        Int.MinValue
+      }
+      // TODO: fill in information about the metadata topic
+      metadataResponseData.setClusterAuthorizedOperations(clusterAuthorizedOperations)
+      new MetadataResponse(metadataResponseData, request.header.apiVersion)
+    }
+    requestHelper.sendResponseMaybeThrottle(request,
+      requestThrottleMs => createResponseCallback(requestThrottleMs))
+  }
+
+  def handleCreateTopics(request: RequestChannel.Request): Unit = {
+    val createTopicRequest = request.body[CreateTopicsRequest]
+    val (authorizedCreateRequest, unauthorizedTopics) =
+      if (authHelper.authorize(request.context, CREATE, CLUSTER, CLUSTER_NAME)) {
+        (createTopicRequest.data, Seq.empty)
+      } else {
+        val duplicate = createTopicRequest.data.duplicate()
+        val authorizedTopics = new CreatableTopicCollection()
+        val unauthorizedTopics = mutable.Buffer.empty[String]
+
+        createTopicRequest.data.topics.asScala.foreach { topicData =>
+          if (authHelper.authorize(request.context, CREATE, TOPIC, topicData.name)) {
+            authorizedTopics.add(topicData)
+          } else {
+            unauthorizedTopics += topicData.name
+          }
+        }
+        (duplicate.setTopics(authorizedTopics), unauthorizedTopics)
+      }
+
+    def sendResponse(response: CreateTopicsResponseData): Unit = {
+      unauthorizedTopics.foreach { topic =>
+        val result = new CreatableTopicResult()
+          .setName(topic)
+          .setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+        response.topics.add(result)
+      }
+
+      requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+        response.setThrottleTimeMs(throttleTimeMs)
+        new CreateTopicsResponse(response)
+      })
+    }
+
+    if (authorizedCreateRequest.topics.isEmpty) {
+      sendResponse(new CreateTopicsResponseData())
+    } else {
+      val future = controller.createTopics(authorizedCreateRequest)
+      future.whenComplete((responseData, exception) => {
+        val response = if (exception != null) {
+          createTopicRequest.getErrorResponse(exception).asInstanceOf[CreateTopicsResponse].data
+        } else {
+          responseData
+        }
+        sendResponse(response)
+      })
+    }
+  }
+
+  def handleApiVersionsRequest(request: RequestChannel.Request): Unit = {
+    // Note that broker returns its full list of supported ApiKeys and versions regardless of current
+    // authentication state (e.g., before SASL authentication on an SASL listener, do note that no
+    // Kafka protocol requests may take place on an SSL listener before the SSL handshake is finished).
+    // If this is considered to leak information about the broker version a workaround is to use SSL
+    // with client authentication which is performed at an earlier stage of the connection where the
+    // ApiVersionRequest is not available.
+    def createResponseCallback(features: FeatureMapAndEpoch,
+                               requestThrottleMs: Int): ApiVersionsResponse = {
+      val apiVersionRequest = request.body[ApiVersionsRequest]
+      if (apiVersionRequest.hasUnsupportedRequestVersion)
+        apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.UNSUPPORTED_VERSION.exception)
+      else if (!apiVersionRequest.isValid)
+        apiVersionRequest.getErrorResponse(requestThrottleMs, Errors.INVALID_REQUEST.exception)
+      else {
+        val data = new ApiVersionsResponseData().
+          setErrorCode(0.toShort).
+          setThrottleTimeMs(requestThrottleMs).
+          setFinalizedFeaturesEpoch(features.epoch())
+        supportedFeatures.foreach {
+          case (k, v) => data.supportedFeatures().add(new SupportedFeatureKey().
+            setName(k).setMaxVersion(v.max()).setMinVersion(v.min()))
+        }
+        //        features.finalizedFeatures().asScala.foreach {
+        //          case (k, v) => data.finalizedFeatures().add(new FinalizedFeatureKey().
+        //            setName(k).setMaxVersionLevel(v.max()).setMinVersionLevel(v.min()))
+        //        }
+        ApiKeys.values().foreach {
+          key =>
+            if (supportedApiKeys.contains(key)) {
+              data.apiKeys().add(new ApiVersion().
+                setApiKey(key.id).
+                setMaxVersion(key.latestVersion()).
+                setMinVersion(key.oldestVersion()))
+            }
+        }
+        new ApiVersionsResponse(data)
+      }
+    }
+    //    FutureConverters.toScala(controller.finalizedFeatures()).onComplete {
+    //      case Success(features) =>
+    requestHelper.sendResponseMaybeThrottle(request,
+      requestThrottleMs => createResponseCallback(new FeatureMapAndEpoch(
+        new FeatureMap(new util.HashMap()), 0), requestThrottleMs))
+    //      case Failure(e) => requestHelper.handleError(request, e)
+    //    }
+  }
+
+  private def handleVote(request: RequestChannel.Request): Unit = {
+    authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
+    handleRaftRequest(request, response => new VoteResponse(response.asInstanceOf[VoteResponseData]))
+  }
+
+  private def handleBeginQuorumEpoch(request: RequestChannel.Request): Unit = {
+    authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
+    handleRaftRequest(request, response => new BeginQuorumEpochResponse(response.asInstanceOf[BeginQuorumEpochResponseData]))
+  }
+
+  private def handleEndQuorumEpoch(request: RequestChannel.Request): Unit = {
+    authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
+    handleRaftRequest(request, response => new EndQuorumEpochResponse(response.asInstanceOf[EndQuorumEpochResponseData]))
+  }
+
+  private def handleDescribeQuorum(request: RequestChannel.Request): Unit = {
+    authHelper.authorizeClusterOperation(request, DESCRIBE)
+    handleRaftRequest(request, response => new DescribeQuorumResponse(response.asInstanceOf[DescribeQuorumResponseData]))
+  }
+
+  def handleAlterIsrRequest(request: RequestChannel.Request): Unit = {
+    authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
+    val alterIsrRequest = request.body[AlterIsrRequest]
+    val future = controller.alterIsr(alterIsrRequest.data())
+    future.whenComplete((result, exception) => {
+      val response = if (exception != null) {
+        alterIsrRequest.getErrorResponse(exception)
+      } else {
+        new AlterIsrResponse(result)
+      }
+      requestHelper.sendResponseExemptThrottle(request, response)
+    })
+  }
+
+  def handleBrokerHeartBeatRequest(request: RequestChannel.Request): Unit = {
+    val heartbeatRequest = request.body[BrokerHeartbeatRequest]
+    authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
+
+    controller.processBrokerHeartbeat(heartbeatRequest.data).handle[Unit]((reply, e) => {
+      def createResponseCallback(requestThrottleMs: Int,
+                                 reply: BrokerHeartbeatReply,
+                                 e: Throwable): BrokerHeartbeatResponse = {
+        if (e != null) {
+          new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData().
+            setThrottleTimeMs(requestThrottleMs).
+            setErrorCode(Errors.forException(e).code()))
+        } else {
+          new BrokerHeartbeatResponse(new BrokerHeartbeatResponseData().
+            setThrottleTimeMs(requestThrottleMs).
+            setErrorCode(Errors.NONE.code()).
+            setIsCaughtUp(reply.isCaughtUp()).
+            setIsFenced(reply.isFenced()).
+            setShouldShutDown(reply.shouldShutDown()))
+        }
+      }
+      requestHelper.sendResponseMaybeThrottle(request,
+        requestThrottleMs => createResponseCallback(requestThrottleMs, reply, e))
+    })
+  }
+
+  def handleUnregisterBroker(request: RequestChannel.Request): Unit = {
+    val decommissionRequest = request.body[UnregisterBrokerRequest]
+    authHelper.authorizeClusterOperation(request, ALTER)
+
+    controller.unregisterBroker(decommissionRequest.data().brokerId()).handle[Unit]((_, e) => {
+      def createResponseCallback(requestThrottleMs: Int,
+                                 e: Throwable): UnregisterBrokerResponse = {
+        if (e != null) {
+          new UnregisterBrokerResponse(new UnregisterBrokerResponseData().
+            setThrottleTimeMs(requestThrottleMs).
+            setErrorCode(Errors.forException(e).code()))
+        } else {
+          new UnregisterBrokerResponse(new UnregisterBrokerResponseData().
+            setThrottleTimeMs(requestThrottleMs))
+        }
+      }
+      requestHelper.sendResponseMaybeThrottle(request,
+        requestThrottleMs => createResponseCallback(requestThrottleMs, e))
+    })
+  }
+
+  def handleBrokerRegistration(request: RequestChannel.Request): Unit = {
+    val registrationRequest = request.body[BrokerRegistrationRequest]
+    authHelper.authorizeClusterOperation(request, CLUSTER_ACTION)
+
+    controller.registerBroker(registrationRequest.data).handle[Unit]((reply, e) => {
+      def createResponseCallback(requestThrottleMs: Int,
+                                 reply: BrokerRegistrationReply,
+                                 e: Throwable): BrokerRegistrationResponse = {
+        if (e != null) {
+          new BrokerRegistrationResponse(new BrokerRegistrationResponseData().
+            setThrottleTimeMs(requestThrottleMs).
+            setErrorCode(Errors.forException(e).code()))
+        } else {
+          new BrokerRegistrationResponse(new BrokerRegistrationResponseData().
+            setThrottleTimeMs(requestThrottleMs).
+            setErrorCode(Errors.NONE.code()).
+            setBrokerEpoch(reply.epoch))
+        }
+      }
+      requestHelper.sendResponseMaybeThrottle(request,
+        requestThrottleMs => createResponseCallback(requestThrottleMs, reply, e))
+    })
+  }
+
+  private def handleRaftRequest(request: RequestChannel.Request,
+                                buildResponse: ApiMessage => AbstractResponse): Unit = {
+    val requestBody = request.body[AbstractRequest]
+    val future = raftManager.handleRequest(request.header, requestBody.data, time.milliseconds())
+
+    future.whenComplete((responseData, exception) => {
+      val response = if (exception != null) {
+        requestBody.getErrorResponse(exception)
+      } else {
+        buildResponse(responseData)
+      }
+      requestHelper.sendResponseExemptThrottle(request, response)
+    })
+  }
+
+  def handleAlterClientQuotas(request: RequestChannel.Request): Unit = {
+    val quotaRequest = request.body[AlterClientQuotasRequest]
+    authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
+
+    controller.alterClientQuotas(quotaRequest.entries(), quotaRequest.validateOnly())
+      .whenComplete((results, exception) => {
+        if (exception != null) {
+          requestHelper.handleError(request, exception)
+        } else {
+          requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+            AlterClientQuotasResponse.fromQuotaEntities(results, requestThrottleMs))
+        }
+      })
+  }
+
+  def handleIncrementalAlterConfigs(request: RequestChannel.Request): Unit = {
+    val alterConfigsRequest = request.body[IncrementalAlterConfigsRequest]
+    authHelper.authorize(request.context, ALTER_CONFIGS, CLUSTER, CLUSTER_NAME)
+    val configChanges = new util.HashMap[ConfigResource, util.Map[String, util.Map.Entry[AlterConfigOp.OpType, String]]]()
+    alterConfigsRequest.data.resources.forEach { resource =>
+      val configResource = new ConfigResource(ConfigResource.Type.forId(resource.resourceType()), resource.resourceName())
+      val altersByName = new util.HashMap[String, util.Map.Entry[AlterConfigOp.OpType, String]]()
+      resource.configs.forEach { config =>
+        altersByName.put(config.name(), new util.AbstractMap.SimpleEntry[AlterConfigOp.OpType, String](
+          AlterConfigOp.OpType.forId(config.configOperation()), config.value()))
+      }
+      configChanges.put(configResource, altersByName)
+    }
+    controller.incrementalAlterConfigs(configChanges, alterConfigsRequest.data().validateOnly())
+      .whenComplete((results, exception) => {
+        if (exception != null) {
+          requestHelper.handleError(request, exception)
+        } else {
+          requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+            new IncrementalAlterConfigsResponse(requestThrottleMs, results))
+        }
+      })
+  }
+}
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index b648e77..efcebb4 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -1,10 +1,10 @@
-/*
+/**
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
+ * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
+ * the License.  You may obtain a copy of the License at
  *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -14,14 +14,194 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package kafka.server
 
+import java.util.concurrent.CompletableFuture
+import java.util
+import java.util.concurrent.locks.ReentrantLock
+
+import kafka.cluster.Broker.ServerInfo
+import kafka.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, LinuxIoMetricsCollector}
+import kafka.network.SocketServer
+import kafka.raft.RaftManager
+import kafka.security.CredentialProvider
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.utils.{CoreUtils, Logging}
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.security.scram.internals.ScramMechanism
+import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
+import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.common.{ClusterResource, Endpoint}
+import org.apache.kafka.controller.Controller
+import org.apache.kafka.metadata.{ApiMessageAndVersion, VersionRange}
+import org.apache.kafka.metalog.MetaLogManager
+import org.apache.kafka.raft.RaftConfig
+import org.apache.kafka.server.authorizer.Authorizer
+
+import scala.jdk.CollectionConverters._
+
 /**
- * Stubbed implementation of the KIP-500 controller which is responsible
- * for managing the `@metadata` topic which is replicated through Raft.
+ * A KIP-500 Kafka controller.
  */
-class ControllerServer {
-  def startup(): Unit = ???
-  def shutdown(): Unit = ???
-  def awaitShutdown(): Unit = ???
+class ControllerServer(
+                        val metaProperties: MetaProperties,
+                        val config: KafkaConfig,
+                        val metaLogManager: MetaLogManager,
+                        val raftManager: RaftManager[ApiMessageAndVersion],
+                        val time: Time,
+                        val metrics: Metrics,
+                        val threadNamePrefix: Option[String],
+                        val controllerQuorumVotersFuture: CompletableFuture[util.List[String]]
+                      ) extends Logging with KafkaMetricsGroup {
+  import kafka.server.Server._
+
+  val lock = new ReentrantLock()
+  val awaitShutdownCond = lock.newCondition()
+  var status: ProcessStatus = SHUTDOWN
+
+  var linuxIoMetricsCollector: LinuxIoMetricsCollector = null
+  var authorizer: Option[Authorizer] = null
+  var tokenCache: DelegationTokenCache = null
+  var credentialProvider: CredentialProvider = null
+  var socketServer: SocketServer = null
+  val socketServerFirstBoundPortFuture = new CompletableFuture[Integer]()
+  var controller: Controller = null
+  val supportedFeatures: Map[String, VersionRange] = Map()
+  var quotaManagers: QuotaManagers = null
+  var controllerApis: ControllerApis = null
+  var controllerApisHandlerPool: KafkaRequestHandlerPool = null
+
+  private def maybeChangeStatus(from: ProcessStatus, to: ProcessStatus): Boolean = {
+    lock.lock()
+    try {
+      if (status != from) return false
+      status = to
+      if (to == SHUTDOWN) awaitShutdownCond.signalAll()
+    } finally {
+      lock.unlock()
+    }
+    true
+  }
+
+  def clusterId: String = metaProperties.clusterId.toString
+
+  def startup(): Unit = {
+    if (!maybeChangeStatus(SHUTDOWN, STARTING)) return
+    try {
+      info("Starting controller")
+
+      maybeChangeStatus(STARTING, STARTED)
+      // TODO: initialize the log dir(s)
+      this.logIdent = new LogContext(s"[ControllerServer id=${config.nodeId}] ").logPrefix()
+
+      newGauge("ClusterId", () => clusterId)
+      newGauge("yammer-metrics-count", () =>  KafkaYammerMetrics.defaultRegistry.allMetrics.size)
+
+      linuxIoMetricsCollector = new LinuxIoMetricsCollector("/proc", time, logger.underlying)
+      if (linuxIoMetricsCollector.usable()) {
+        newGauge("linux-disk-read-bytes", () => linuxIoMetricsCollector.readBytes())
+        newGauge("linux-disk-write-bytes", () => linuxIoMetricsCollector.writeBytes())
+      }
+
+      val javaListeners = config.controllerListeners.map(_.toJava).asJava
+      authorizer = config.authorizer
+      authorizer.foreach(_.configure(config.originals))
+
+      val authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = authorizer match {
+        case Some(authZ) =>
+          // It would be nice to remove some of the broker-specific assumptions from
+          // AuthorizerServerInfo, such as the assumption that there is an inter-broker
+          // listener, or that ID is named brokerId.
+          val controllerAuthorizerInfo = ServerInfo(
+            new ClusterResource(clusterId), config.nodeId, javaListeners, javaListeners.get(0))
+          authZ.start(controllerAuthorizerInfo).asScala.map { case (ep, cs) =>
+            ep -> cs.toCompletableFuture
+          }.toMap
+        case None =>
+          javaListeners.asScala.map {
+            ep => ep -> CompletableFuture.completedFuture[Void](null)
+          }.toMap
+      }
+
+      tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
+      credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
+      socketServer = new SocketServer(config,
+        metrics,
+        time,
+        credentialProvider,
+        allowControllerOnlyApis = true,
+        controllerSocketServer = true)
+      socketServer.startup(false, None, config.controllerListeners)
+      socketServerFirstBoundPortFuture.complete(socketServer.boundPort(
+        config.controllerListeners.head.listenerName))
+
+      controller = null
+      quotaManagers = QuotaFactory.instantiate(config, metrics, time, threadNamePrefix.getOrElse(""))
+      val controllerNodes =
+        RaftConfig.quorumVoterStringsToNodes(controllerQuorumVotersFuture.get()).asScala
+      controllerApis = new ControllerApis(socketServer.dataPlaneRequestChannel,
+        authorizer,
+        quotaManagers,
+        time,
+        supportedFeatures,
+        controller,
+        raftManager,
+        config,
+        metaProperties,
+        controllerNodes.toSeq)
+      controllerApisHandlerPool = new KafkaRequestHandlerPool(config.nodeId,
+        socketServer.dataPlaneRequestChannel,
+        controllerApis,
+        time,
+        config.numIoThreads,
+        s"${SocketServer.DataPlaneMetricPrefix}RequestHandlerAvgIdlePercent",
+        SocketServer.DataPlaneThreadPrefix)
+      socketServer.startProcessingRequests(authorizerFutures)
+    } catch {
+      case e: Throwable =>
+        maybeChangeStatus(STARTING, STARTED)
+        fatal("Fatal error during controller startup. Prepare to shutdown", e)
+        shutdown()
+        throw e
+    }
+  }
+
+  def shutdown(): Unit = {
+    if (!maybeChangeStatus(STARTED, SHUTTING_DOWN)) return
+    try {
+      info("shutting down")
+      if (socketServer != null)
+        CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
+      if (controller != null)
+        controller.beginShutdown()
+      if (socketServer != null)
+        CoreUtils.swallow(socketServer.shutdown(), this)
+      if (controllerApisHandlerPool != null)
+        CoreUtils.swallow(controllerApisHandlerPool.shutdown(), this)
+      if (quotaManagers != null)
+        CoreUtils.swallow(quotaManagers.shutdown(), this)
+      if (controller != null)
+        controller.close()
+      socketServerFirstBoundPortFuture.completeExceptionally(new RuntimeException("shutting down"))
+    } catch {
+      case e: Throwable =>
+        fatal("Fatal error during controller shutdown.", e)
+        throw e
+    } finally {
+      maybeChangeStatus(SHUTTING_DOWN, SHUTDOWN)
+    }
+  }
+
+  def awaitShutdown(): Unit = {
+    lock.lock()
+    try {
+      while (true) {
+        if (status == SHUTDOWN) return
+        awaitShutdownCond.awaitUninterruptibly()
+      }
+    } finally {
+      lock.unlock()
+    }
+  }
 }
diff --git a/core/src/main/scala/kafka/server/EnvelopeUtils.scala b/core/src/main/scala/kafka/server/EnvelopeUtils.scala
new file mode 100644
index 0000000..ec8871f
--- /dev/null
+++ b/core/src/main/scala/kafka/server/EnvelopeUtils.scala
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.net.{InetAddress, UnknownHostException}
+import java.nio.ByteBuffer
+
+import kafka.network.RequestChannel
+import org.apache.kafka.common.errors.{InvalidRequestException, PrincipalDeserializationException, UnsupportedVersionException}
+import org.apache.kafka.common.network.ClientInformation
+import org.apache.kafka.common.requests.{EnvelopeRequest, RequestContext, RequestHeader}
+import org.apache.kafka.common.security.auth.KafkaPrincipal
+
+import scala.compat.java8.OptionConverters._
+
+object EnvelopeUtils {
+  def handleEnvelopeRequest(
+    request: RequestChannel.Request,
+    requestChannelMetrics: RequestChannel.Metrics,
+    handler: RequestChannel.Request => Unit): Unit = {
+    val envelope = request.body[EnvelopeRequest]
+    val forwardedPrincipal = parseForwardedPrincipal(request.context, envelope.requestPrincipal)
+    val forwardedClientAddress = parseForwardedClientAddress(envelope.clientAddress)
+
+    val forwardedRequestBuffer = envelope.requestData.duplicate()
+    val forwardedRequestHeader = parseForwardedRequestHeader(forwardedRequestBuffer)
+
+    val forwardedApi = forwardedRequestHeader.apiKey
+    if (!forwardedApi.forwardable) {
+      throw new InvalidRequestException(s"API $forwardedApi is not enabled or is not eligible for forwarding")
+    }
+
+    val forwardedContext = new RequestContext(
+      forwardedRequestHeader,
+      request.context.connectionId,
+      forwardedClientAddress,
+      forwardedPrincipal,
+      request.context.listenerName,
+      request.context.securityProtocol,
+      ClientInformation.EMPTY,
+      request.context.fromPrivilegedListener
+    )
+
+    val forwardedRequest = parseForwardedRequest(
+      request,
+      forwardedContext,
+      forwardedRequestBuffer,
+      requestChannelMetrics
+    )
+    handler(forwardedRequest)
+  }
+
+  private def parseForwardedClientAddress(
+    address: Array[Byte]
+  ): InetAddress = {
+    try {
+      InetAddress.getByAddress(address)
+    } catch {
+      case e: UnknownHostException =>
+        throw new InvalidRequestException("Failed to parse client address from envelope", e)
+    }
+  }
+
+  private def parseForwardedRequest(
+    envelope: RequestChannel.Request,
+    forwardedContext: RequestContext,
+    buffer: ByteBuffer,
+    requestChannelMetrics: RequestChannel.Metrics
+  ): RequestChannel.Request = {
+    try {
+      new RequestChannel.Request(
+        processor = envelope.processor,
+        context = forwardedContext,
+        startTimeNanos = envelope.startTimeNanos,
+        envelope.memoryPool,
+        buffer,
+        requestChannelMetrics,
+        Some(envelope)
+      )
+    } catch {
+      case e: InvalidRequestException =>
+        // We use UNSUPPORTED_VERSION if the embedded request cannot be parsed.
+        // The purpose is to disambiguate structural errors in the envelope request
+        // itself, such as an invalid client address.
+        throw new UnsupportedVersionException(s"Failed to parse forwarded request " +
+          s"with header ${forwardedContext.header}", e)
+    }
+  }
+
+  private def parseForwardedRequestHeader(
+    buffer: ByteBuffer
+  ): RequestHeader = {
+    try {
+      RequestHeader.parse(buffer)
+    } catch {
+      case e: InvalidRequestException =>
+        // We use UNSUPPORTED_VERSION if the embedded request cannot be parsed.
+        // The purpose is to disambiguate structural errors in the envelope request
+        // itself, such as an invalid client address.
+        throw new UnsupportedVersionException("Failed to parse request header from envelope", e)
+    }
+  }
+
+  private def parseForwardedPrincipal(
+    envelopeContext: RequestContext,
+    principalBytes: Array[Byte]
+  ): KafkaPrincipal = {
+    envelopeContext.principalSerde.asScala match {
+      case Some(serde) =>
+        try {
+          serde.deserialize(principalBytes)
+        } catch {
+          case e: Exception =>
+            throw new PrincipalDeserializationException("Failed to deserialize client principal from envelope", e)
+        }
+
+      case None =>
+        throw new PrincipalDeserializationException("Could not deserialize principal since " +
+          "no `KafkaPrincipalSerde` has been defined")
+    }
+  }
+}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index c3195be..0545c74 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -18,7 +18,6 @@
 package kafka.server
 
 import java.lang.{Long => JLong}
-import java.net.{InetAddress, UnknownHostException}
 import java.nio.ByteBuffer
 import java.util
 import java.util.concurrent.ConcurrentHashMap
@@ -64,7 +63,7 @@ import org.apache.kafka.common.message.OffsetForLeaderEpochRequestData.OffsetFor
 import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{EpochEndOffset, OffsetForLeaderTopicResult, OffsetForLeaderTopicResultCollection}
 import org.apache.kafka.common.message.{AddOffsetsToTxnResponseData, AlterClientQuotasResponseData, AlterConfigsResponseData, AlterPartitionReassignmentsResponseData, AlterReplicaLogDirsResponseData, ApiVersionsResponseData, CreateAclsResponseData, CreatePartitionsResponseData, CreateTopicsResponseData, DeleteAclsResponseData, DeleteGroupsResponseData, DeleteRecordsResponseData, DeleteTopicsResponseData, DescribeAclsResponseData, DescribeClientQuotasResponseData, DescribeClusterResponseD [...]
 import org.apache.kafka.common.metrics.Metrics
-import org.apache.kafka.common.network.{ClientInformation, ListenerName, Send}
+import org.apache.kafka.common.network.{ListenerName, Send}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.replica.ClientMetadata
@@ -1224,7 +1223,7 @@ class KafkaApis(val requestChannel: RequestChannel,
          requestThrottleMs,
          brokers.flatMap(_.endpoints.get(request.context.listenerName.value())).toList.asJava,
          clusterId,
-         metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
+         metadataSupport.controllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID),
          completeTopicMetadata.asJava,
          clusterAuthorizedOperations
       ))
@@ -3210,7 +3209,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
 
     val brokers = metadataCache.getAliveBrokers
-    val controllerId = metadataCache.getControllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID)
+    val controllerId = metadataSupport.controllerId.getOrElse(MetadataResponse.NO_CONTROLLER_ID)
 
     requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => {
       val data = new DescribeClusterResponseData()
@@ -3234,7 +3233,6 @@ class KafkaApis(val requestChannel: RequestChannel,
 
   def handleEnvelope(request: RequestChannel.Request): Unit = {
     val zkSupport = metadataSupport.requireZkOrThrow(KafkaApis.shouldNeverReceive(request))
-    val envelope = request.body[EnvelopeRequest]
 
     // If forwarding is not yet enabled or this request has been received on an invalid endpoint,
     // then we treat the request as unparsable and close the connection.
@@ -3258,101 +3256,8 @@ class KafkaApis(val requestChannel: RequestChannel,
         s"Broker $brokerId is not the active controller"))
       return
     }
-
-    val forwardedPrincipal = parseForwardedPrincipal(request.context, envelope.requestPrincipal)
-    val forwardedClientAddress = parseForwardedClientAddress(envelope.clientAddress)
-
-    val forwardedRequestBuffer = envelope.requestData.duplicate()
-    val forwardedRequestHeader = parseForwardedRequestHeader(forwardedRequestBuffer)
-
-    val forwardedApi = forwardedRequestHeader.apiKey
-    if (!forwardedApi.forwardable) {
-      throw new InvalidRequestException(s"API $forwardedApi is not enabled or is not eligible for forwarding")
-    }
-
-    val forwardedContext = new RequestContext(
-      forwardedRequestHeader,
-      request.context.connectionId,
-      forwardedClientAddress,
-      forwardedPrincipal,
-      request.context.listenerName,
-      request.context.securityProtocol,
-      ClientInformation.EMPTY,
-      request.context.fromPrivilegedListener
-    )
-
-    val forwardedRequest = parseForwardedRequest(request, forwardedContext, forwardedRequestBuffer)
-    handle(forwardedRequest)
-  }
-
-  private def parseForwardedClientAddress(
-    address: Array[Byte]
-  ): InetAddress = {
-    try {
-      InetAddress.getByAddress(address)
-    } catch {
-      case e: UnknownHostException =>
-        throw new InvalidRequestException("Failed to parse client address from envelope", e)
-    }
-  }
-
-  private def parseForwardedRequest(
-    envelope: RequestChannel.Request,
-    forwardedContext: RequestContext,
-    buffer: ByteBuffer
-  ): RequestChannel.Request = {
-    try {
-      new RequestChannel.Request(
-        processor = envelope.processor,
-        context = forwardedContext,
-        startTimeNanos = envelope.startTimeNanos,
-        envelope.memoryPool,
-        buffer,
-        requestChannel.metrics,
-        Some(envelope)
-      )
-    } catch {
-      case e: InvalidRequestException =>
-        // We use UNSUPPORTED_VERSION if the embedded request cannot be parsed.
-        // The purpose is to disambiguate structural errors in the envelope request
-        // itself, such as an invalid client address.
-        throw new UnsupportedVersionException(s"Failed to parse forwarded request " +
-          s"with header ${forwardedContext.header}", e)
-    }
-  }
-
-  private def parseForwardedRequestHeader(
-    buffer: ByteBuffer
-  ): RequestHeader = {
-    try {
-      RequestHeader.parse(buffer)
-    } catch {
-      case e: InvalidRequestException =>
-        // We use UNSUPPORTED_VERSION if the embedded request cannot be parsed.
-        // The purpose is to disambiguate structural errors in the envelope request
-        // itself, such as an invalid client address.
-        throw new UnsupportedVersionException("Failed to parse request header from envelope", e)
-    }
-  }
-
-  private def parseForwardedPrincipal(
-    envelopeContext: RequestContext,
-    principalBytes: Array[Byte]
-  ): KafkaPrincipal = {
-    envelopeContext.principalSerde.asScala match {
-      case Some(serde) =>
-        try {
-          serde.deserialize(principalBytes)
-        } catch {
-          case e: Exception =>
-            throw new PrincipalDeserializationException("Failed to deserialize client principal from envelope", e)
-        }
-
-      case None =>
-        throw new PrincipalDeserializationException("Could not deserialize principal since " +
-          "no `KafkaPrincipalSerde` has been defined")
+    EnvelopeUtils.handleEnvelopeRequest(request, requestChannel.metrics, handle)
     }
-  }
 
   def handleDescribeProducersRequest(request: RequestChannel.Request): Unit = {
     val describeProducersRequest = request.body[DescribeProducersRequest]
diff --git a/core/src/main/scala/kafka/server/KafkaBroker.scala b/core/src/main/scala/kafka/server/KafkaBroker.scala
index d47283e..5fed236 100644
--- a/core/src/main/scala/kafka/server/KafkaBroker.scala
+++ b/core/src/main/scala/kafka/server/KafkaBroker.scala
@@ -68,6 +68,14 @@ object KafkaBroker {
       case _ => //do nothing
     }
   }
+
+  /**
+   * The log message that we print when the broker has been successfully started.
+   * The ducktape system tests look for a line matching the regex 'Kafka\s*Server.*started'
+   * to know when the broker is started, so it is best not to change this message -- but if
+   * you do change it, be sure to make it match that regex or the system tests will fail.
+   */
+  val STARTED_MESSAGE = "Kafka Server started"
 }
 
 trait KafkaBroker extends KafkaMetricsGroup {
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 2fd04ae..e01bf60 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1025,7 +1025,7 @@ object KafkaConfig {
   val PasswordEncoderKeyLengthDoc =  "The key length used for encoding dynamically configured passwords."
   val PasswordEncoderIterationsDoc =  "The iteration count used for encoding dynamically configured passwords."
 
-  private val configDef = {
+  private[server] val configDef = {
     import ConfigDef.Importance._
     import ConfigDef.Range._
     import ConfigDef.Type._
@@ -1893,14 +1893,25 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
   validateValues()
 
   private def validateValues(): Unit = {
-    if(brokerIdGenerationEnable) {
-      require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be equal or greater than -1 and not greater than reserved.broker.max.id")
+    if (requiresZookeeper) {
+      if (zkConnect == null) {
+        throw new ConfigException(s"Missing required configuration `${KafkaConfig.ZkConnectProp}` which has no default value.")
+      }
+      if (brokerIdGenerationEnable) {
+        require(brokerId >= -1 && brokerId <= maxReservedBrokerId, "broker.id must be greater than or equal to -1 and not greater than reserved.broker.max.id")
+      } else {
+        require(brokerId >= 0, "broker.id must be greater than or equal to 0")
+      }
     } else {
-      require(brokerId >= 0, "broker.id must be equal or greater than 0")
+      // Raft-based metadata quorum
+      if (nodeId < 0) {
+        throw new ConfigException(s"Missing configuration `${KafkaConfig.NodeIdProp}` which is required " +
+          s"when `process.roles` is defined (i.e. when using the self-managed quorum).")
+      }
     }
-    require(logRollTimeMillis >= 1, "log.roll.ms must be equal or greater than 1")
-    require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be equal or greater than 0")
-    require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, equal or greater than 1")
+    require(logRollTimeMillis >= 1, "log.roll.ms must be greater than or equal to 1")
+    require(logRollTimeJitterMillis >= 0, "log.roll.jitter.ms must be greater than or equal to 0")
+    require(logRetentionTimeMillis >= 1 || logRetentionTimeMillis == -1, "log.retention.ms must be unlimited (-1) or, greater than or equal to 1")
     require(logDirs.nonEmpty, "At least one log directory must be defined via log.dirs or log.dir.")
     require(logCleanerDedupeBufferSize / logCleanerThreads > 1024 * 1024, "log.cleaner.dedupe.buffer.size must be at least 1MB per cleaner thread.")
     require(replicaFetchWaitMaxMs <= replicaSocketTimeoutMs, "replica.socket.timeout.ms should always be at least replica.fetch.wait.max.ms" +
@@ -1975,12 +1986,5 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
         s"${KafkaConfig.FailedAuthenticationDelayMsProp}=$failedAuthenticationDelayMs should always be less than" +
         s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to prevent failed" +
         s" authentication responses from timing out")
-
-    if (requiresZookeeper && zkConnect == null) {
-      throw new ConfigException(s"Missing required configuration `${KafkaConfig.ZkConnectProp}` which has no default value.")
-    } else if (usesSelfManagedQuorum && nodeId < 0) {
-      throw new ConfigException(s"Missing required configuration `${KafkaConfig.NodeIdProp}` which is required " +
-        s"when `process.roles` is defined (i.e. when using the self-managed quorum).")
-    }
   }
 }
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index 1a072c3..dc3fd16 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -17,6 +17,7 @@
 package kafka.server
 
 import java.io.File
+import java.util.concurrent.CompletableFuture
 
 import kafka.common.{InconsistentNodeIdException, KafkaException}
 import kafka.log.Log
@@ -26,7 +27,10 @@ import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole}
 import kafka.utils.{CoreUtils, Logging, Mx4jLoader, VerifiableProperties}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.utils.{AppInfoParser, Time}
-import org.apache.kafka.raft.internals.StringSerde
+import org.apache.kafka.metadata.ApiMessageAndVersion
+import org.apache.kafka.raft.metadata.{MetaLogRaftShim, MetadataRecordSerde}
+
+import scala.collection.Seq
 
 /**
  * This class implements the KIP-500 server which relies on a self-managed
@@ -47,7 +51,7 @@ class KafkaRaftServer(
   KafkaMetricsReporter.startReporters(VerifiableProperties(config.originals))
   KafkaYammerMetrics.INSTANCE.configure(config.originals)
 
-  private val (metaProps, _) = KafkaRaftServer.initializeLogDirs(config)
+  private val (metaProps, offlineDirs) = KafkaRaftServer.initializeLogDirs(config)
 
   private val metrics = Server.initializeMetrics(
     config,
@@ -55,24 +59,38 @@ class KafkaRaftServer(
     metaProps.clusterId.toString
   )
 
-  private val raftManager = new KafkaRaftManager(
+  private val controllerQuorumVotersFuture = CompletableFuture.completedFuture(config.quorumVoters)
+
+  private val raftManager = new KafkaRaftManager[ApiMessageAndVersion](
     metaProps,
     config,
-    new StringSerde,
+    new MetadataRecordSerde,
     KafkaRaftServer.MetadataPartition,
     time,
     metrics,
     threadNamePrefix
   )
 
+  private val metaLogShim = new MetaLogRaftShim(raftManager.kafkaRaftClient, config.nodeId)
+
   private val broker: Option[BrokerServer] = if (config.processRoles.contains(BrokerRole)) {
-    Some(new BrokerServer())
+    Some(new BrokerServer(config, metaProps, metaLogShim, time, metrics, threadNamePrefix,
+      offlineDirs, controllerQuorumVotersFuture, Server.SUPPORTED_FEATURES))
   } else {
     None
   }
 
   private val controller: Option[ControllerServer] = if (config.processRoles.contains(ControllerRole)) {
-    Some(new ControllerServer())
+    Some(new ControllerServer(
+      metaProps,
+      config,
+      metaLogShim,
+      raftManager,
+      time,
+      metrics,
+      threadNamePrefix,
+      CompletableFuture.completedFuture(config.quorumVoters)
+    ))
   } else {
     None
   }
@@ -83,6 +101,7 @@ class KafkaRaftServer(
     controller.foreach(_.startup())
     broker.foreach(_.startup())
     AppInfoParser.registerAppInfo(Server.MetricsPrefix, config.brokerId.toString, metrics, time.milliseconds())
+    info(KafkaBroker.STARTED_MESSAGE)
   }
 
   override def shutdown(): Unit = {
@@ -118,7 +137,7 @@ object KafkaRaftServer {
    *         be consistent across all log dirs) and the offline directories
    */
   def initializeLogDirs(config: KafkaConfig): (MetaProperties, Seq[String]) = {
-    val logDirs = config.logDirs :+ config.metadataLogDir
+    val logDirs = (config.logDirs.toSet + config.metadataLogDir).toSeq
     val (rawMetaProperties, offlineDirs) = BrokerMetadataCheckpoint.
       getBrokerMetadataAndOfflineDirs(logDirs, ignoreMissing = false)
 
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 7ec7a29..df37026 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -138,7 +138,7 @@ class KafkaServer(
 
   var kafkaScheduler: KafkaScheduler = null
 
-  var metadataCache: MetadataCache = null
+  var metadataCache: ZkMetadataCache = null
   var quotaManagers: QuotaFactory.QuotaManagers = null
 
   val zkClientConfig: ZKClientConfig = KafkaServer.zkClientConfigFromKafkaConfig(config).getOrElse(new ZKClientConfig())
@@ -275,7 +275,8 @@ class KafkaServer(
             time = time,
             metrics = metrics,
             threadNamePrefix = threadNamePrefix,
-            brokerEpochSupplier = () => kafkaController.brokerEpoch
+            brokerEpochSupplier = () => kafkaController.brokerEpoch,
+            config.brokerId
           )
         } else {
           AlterIsrManager(kafkaScheduler, time, zkClient)
@@ -332,8 +333,8 @@ class KafkaServer(
           time,
           metrics,
           threadNamePrefix,
-          adminManager,
-          kafkaController,
+          Some(adminManager),
+          Some(kafkaController),
           groupCoordinator,
           transactionCoordinator,
           enableForwarding
@@ -359,7 +360,7 @@ class KafkaServer(
             KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
 
         /* start processing requests */
-        val zkSupport = ZkSupport(adminManager, kafkaController, zkClient, forwardingManager)
+        val zkSupport = ZkSupport(adminManager, kafkaController, zkClient, forwardingManager, metadataCache)
         dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, zkSupport, replicaManager, groupCoordinator, transactionCoordinator,
           autoTopicCreationManager, config.brokerId, config, configRepository, metadataCache, metrics, authorizer, quotaManagers,
           fetchManager, brokerTopicStats, clusterId, time, tokenManager, brokerFeatures, featureCache)
diff --git a/core/src/main/scala/kafka/server/MetadataSupport.scala b/core/src/main/scala/kafka/server/MetadataSupport.scala
index 00b029f..86390ea 100644
--- a/core/src/main/scala/kafka/server/MetadataSupport.scala
+++ b/core/src/main/scala/kafka/server/MetadataSupport.scala
@@ -19,6 +19,7 @@ package kafka.server
 
 import kafka.controller.KafkaController
 import kafka.network.RequestChannel
+import kafka.server.metadata.RaftMetadataCache
 import kafka.zk.{AdminZkClient, KafkaZkClient}
 import org.apache.kafka.common.requests.AbstractResponse
 
@@ -58,12 +59,15 @@ sealed trait MetadataSupport {
   def maybeForward(request: RequestChannel.Request,
                    handler: RequestChannel.Request => Unit,
                    responseCallback: Option[AbstractResponse] => Unit): Unit
+
+  def controllerId: Option[Int]
 }
 
 case class ZkSupport(adminManager: ZkAdminManager,
                      controller: KafkaController,
                      zkClient: KafkaZkClient,
-                     forwardingManager: Option[ForwardingManager]) extends MetadataSupport {
+                     forwardingManager: Option[ForwardingManager],
+                     metadataCache: ZkMetadataCache) extends MetadataSupport {
   val adminZkClient = new AdminZkClient(zkClient)
 
   override def requireZkOrThrow(createException: => Exception): ZkSupport = this
@@ -83,9 +87,11 @@ case class ZkSupport(adminManager: ZkAdminManager,
       case _ => handler(request)
     }
   }
+
+  override def controllerId: Option[Int] =  metadataCache.getControllerId
 }
 
-case class RaftSupport(fwdMgr: ForwardingManager) extends MetadataSupport {
+case class RaftSupport(fwdMgr: ForwardingManager, metadataCache: RaftMetadataCache) extends MetadataSupport {
   override val forwardingManager: Option[ForwardingManager] = Some(fwdMgr)
   override def requireZkOrThrow(createException: => Exception): ZkSupport = throw createException
   override def requireRaftOrThrow(createException: => Exception): RaftSupport = this
@@ -105,4 +111,14 @@ case class RaftSupport(fwdMgr: ForwardingManager) extends MetadataSupport {
       handler(request) // will reject
     }
   }
+
+  override def controllerId: Option[Int] = {
+    // We send back a random controller ID when running with a Raft-based metadata quorum.
+    // Raft-based controllers are not directly accessible to clients; rather, clients can send
+    // requests destined for the controller to any broker node, and the receiving broker will
+    // automatically forward the request on the client's behalf to the active Raft-based
+    // controller  as per KIP-590.
+    metadataCache.currentImage().brokers.randomAliveBrokerId()
+  }
+
 }
diff --git a/core/src/main/scala/kafka/server/Server.scala b/core/src/main/scala/kafka/server/Server.scala
index 9126114..1b5aa59 100644
--- a/core/src/main/scala/kafka/server/Server.scala
+++ b/core/src/main/scala/kafka/server/Server.scala
@@ -16,11 +16,15 @@
  */
 package kafka.server
 
+import java.util.Collections
 import java.util.concurrent.TimeUnit
 
 import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.common.metrics.{JmxReporter, KafkaMetricsContext, MetricConfig, Metrics, MetricsReporter, Sensor}
 import org.apache.kafka.common.utils.Time
+import org.apache.kafka.metadata.VersionRange
+
+import scala.jdk.CollectionConverters._
 
 trait Server {
   def startup(): Unit
@@ -91,4 +95,12 @@ object Server {
     reporters
   }
 
+  sealed trait ProcessStatus
+  case object SHUTDOWN extends ProcessStatus
+  case object STARTING extends ProcessStatus
+  case object STARTED extends ProcessStatus
+  case object SHUTTING_DOWN extends ProcessStatus
+
+  val SUPPORTED_FEATURES = Collections.
+    unmodifiableMap[String, VersionRange](Map[String, VersionRange]().asJava)
 }
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index 0a83fd9..5db6f84 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -160,7 +160,7 @@ class TestRaftServer(
       eventQueue.offer(HandleClaim(epoch))
     }
 
-    override def handleResign(): Unit = {
+    override def handleResign(epoch: Int): Unit = {
       eventQueue.offer(HandleResign)
     }
 
diff --git a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
index dc4dd06..9f9749b 100644
--- a/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AutoTopicCreationManagerTest.scala
@@ -96,8 +96,8 @@ class AutoTopicCreationManagerTest {
       config,
       metadataCache,
       Some(brokerToController),
-      adminManager,
-      controller,
+      Some(adminManager),
+      Some(controller),
       groupCoordinator,
       transactionCoordinator)
 
@@ -125,8 +125,8 @@ class AutoTopicCreationManagerTest {
       config,
       metadataCache,
       None,
-      adminManager,
-      controller,
+      Some(adminManager),
+      Some(controller),
       groupCoordinator,
       transactionCoordinator)
 
@@ -155,8 +155,8 @@ class AutoTopicCreationManagerTest {
       config,
       metadataCache,
       Some(brokerToController),
-      adminManager,
-      controller,
+      Some(adminManager),
+      Some(controller),
       groupCoordinator,
       transactionCoordinator)
 
diff --git a/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
new file mode 100644
index 0000000..fc0a38b
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ControllerApisTest.scala
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package unit.kafka.server
+
+import java.net.InetAddress
+import java.util.Properties
+
+import kafka.network.RequestChannel
+import kafka.raft.RaftManager
+import kafka.server.QuotaFactory.QuotaManagers
+import kafka.server.{ClientQuotaManager, ClientRequestQuotaManager, ControllerApis, ControllerMutationQuotaManager, KafkaConfig, MetaProperties, ReplicationQuotaManager}
+import kafka.utils.MockTime
+import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.errors.ClusterAuthorizationException
+import org.apache.kafka.common.memory.MemoryPool
+import org.apache.kafka.common.message.BrokerRegistrationRequestData
+import org.apache.kafka.common.network.{ClientInformation, ListenerName}
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.{AbstractRequest, BrokerRegistrationRequest, RequestContext, RequestHeader, RequestTestUtils}
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.controller.Controller
+import org.apache.kafka.metadata.{ApiMessageAndVersion, VersionRange}
+import org.apache.kafka.server.authorizer.{AuthorizableRequestContext, AuthorizationResult, Authorizer}
+import org.easymock.{Capture, EasyMock, IAnswer}
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{AfterEach, Test}
+
+class ControllerApisTest {
+  // Mocks
+  private val nodeId = 1
+  private val brokerRack = "Rack1"
+  private val clientID = "Client1"
+  private val requestChannelMetrics: RequestChannel.Metrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
+  private val requestChannel: RequestChannel = EasyMock.createNiceMock(classOf[RequestChannel])
+  private val time = new MockTime
+  private val clientQuotaManager: ClientQuotaManager = EasyMock.createNiceMock(classOf[ClientQuotaManager])
+  private val clientRequestQuotaManager: ClientRequestQuotaManager = EasyMock.createNiceMock(classOf[ClientRequestQuotaManager])
+  private val clientControllerQuotaManager: ControllerMutationQuotaManager = EasyMock.createNiceMock(classOf[ControllerMutationQuotaManager])
+  private val replicaQuotaManager: ReplicationQuotaManager = EasyMock.createNiceMock(classOf[ReplicationQuotaManager])
+  private val raftManager: RaftManager[ApiMessageAndVersion] = EasyMock.createNiceMock(classOf[RaftManager[ApiMessageAndVersion]])
+  private val quotas = QuotaManagers(
+    clientQuotaManager,
+    clientQuotaManager,
+    clientRequestQuotaManager,
+    clientControllerQuotaManager,
+    replicaQuotaManager,
+    replicaQuotaManager,
+    replicaQuotaManager,
+    None)
+  private val controller: Controller = EasyMock.createNiceMock(classOf[Controller])
+
+  private def createControllerApis(authorizer: Option[Authorizer],
+                                   supportedFeatures: Map[String, VersionRange] = Map.empty): ControllerApis = {
+    val props = new Properties()
+    props.put(KafkaConfig.NodeIdProp, nodeId)
+    props.put(KafkaConfig.ProcessRolesProp, "controller")
+    new ControllerApis(
+      requestChannel,
+      authorizer,
+      quotas,
+      time,
+      supportedFeatures,
+      controller,
+      raftManager,
+      new KafkaConfig(props),
+
+      // FIXME: Would make more sense to set controllerId here
+      MetaProperties(Uuid.fromString("JgxuGe9URy-E-ceaL04lEw"), nodeId = nodeId),
+      Seq.empty
+    )
+  }
+
+  /**
+   * Build a RequestChannel.Request from the AbstractRequest
+   *
+   * @param request - AbstractRequest
+   * @param listenerName - Default listener for the RequestChannel
+   * @tparam T - Type of AbstractRequest
+   * @return
+   */
+  private def buildRequest[T <: AbstractRequest](request: AbstractRequest,
+                                                 listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT)): RequestChannel.Request = {
+    val buffer = RequestTestUtils.serializeRequestWithHeader(
+      new RequestHeader(request.apiKey, request.version, clientID, 0), request)
+
+    // read the header from the buffer first so that the body can be read next from the Request constructor
+    val header = RequestHeader.parse(buffer)
+    val context = new RequestContext(header, "1", InetAddress.getLocalHost, KafkaPrincipal.ANONYMOUS,
+      listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, false)
+    new RequestChannel.Request(processor = 1, context = context, startTimeNanos = 0, MemoryPool.NONE, buffer,
+      requestChannelMetrics)
+  }
+
+  @Test
+  def testBrokerRegistration(): Unit = {
+    val brokerRegistrationRequest = new BrokerRegistrationRequest.Builder(
+      new BrokerRegistrationRequestData()
+        .setBrokerId(nodeId)
+        .setRack(brokerRack)
+    ).build()
+
+    val request = buildRequest(brokerRegistrationRequest)
+
+    val capturedResponse: Capture[RequestChannel.Response] = EasyMock.newCapture()
+    EasyMock.expect(requestChannel.sendResponse(EasyMock.capture(capturedResponse)))
+
+    val authorizer = Some[Authorizer](EasyMock.createNiceMock(classOf[Authorizer]))
+    EasyMock.expect(authorizer.get.authorize(EasyMock.anyObject[AuthorizableRequestContext](), EasyMock.anyObject())).andAnswer(
+      new IAnswer[java.util.List[AuthorizationResult]]() {
+        override def answer(): java.util.List[AuthorizationResult] = {
+          new java.util.ArrayList[AuthorizationResult](){
+            add(AuthorizationResult.DENIED)
+          }
+        }
+      }
+    )
+    EasyMock.replay(requestChannel, authorizer.get)
+
+    val assertion = assertThrows(classOf[ClusterAuthorizationException],
+      () => createControllerApis(authorizer = authorizer).handleBrokerRegistration(request))
+    assert(Errors.forException(assertion) == Errors.CLUSTER_AUTHORIZATION_FAILED)
+  }
+
+  @AfterEach
+  def tearDown(): Unit = {
+    quotas.shutdown()
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index 88bf8ebc..5138bf6 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -34,7 +34,7 @@ import kafka.log.AppendOrigin
 import kafka.network.RequestChannel
 import kafka.network.RequestChannel.{CloseConnectionResponse, SendResponse}
 import kafka.server.QuotaFactory.QuotaManagers
-import kafka.server.metadata.{ConfigRepository, CachedConfigRepository}
+import kafka.server.metadata.{CachedConfigRepository, ConfigRepository, RaftMetadataCache}
 import kafka.utils.{MockTime, TestUtils}
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.clients.NodeApiVersions
@@ -148,8 +148,23 @@ class KafkaApisTest {
     else
       None
 
+    val metadataSupport = if (raftSupport) {
+      // it will be up to the test to replace the default ZkMetadataCache implementation
+      // with a RaftMetadataCache instance
+      metadataCache match {
+        case raftMetadataCache: RaftMetadataCache =>
+          RaftSupport(forwardingManager, raftMetadataCache)
+        case _ => throw new IllegalStateException("Test must set an instance of RaftMetadataCache")
+      }
+    } else {
+      metadataCache match {
+        case zkMetadataCache: ZkMetadataCache =>
+          ZkSupport(adminManager, controller, zkClient, forwardingManagerOpt, zkMetadataCache)
+        case _ => throw new IllegalStateException("Test must set an instance of ZkMetadataCache")
+      }
+    }
     new KafkaApis(requestChannel,
-      if (raftSupport) RaftSupport(forwardingManager) else ZkSupport(adminManager, controller, zkClient, forwardingManagerOpt),
+      metadataSupport,
       replicaManager,
       groupCoordinator,
       txnCoordinator,
@@ -321,6 +336,7 @@ class KafkaApisTest {
 
     EasyMock.expect(controller.isActive).andReturn(true)
 
+    EasyMock.expect(requestChannel.metrics).andReturn(EasyMock.niceMock(classOf[RequestChannel.Metrics]))
     EasyMock.expect(requestChannel.updateErrorMetrics(ApiKeys.ENVELOPE, Map(Errors.INVALID_REQUEST -> 1)))
     val capturedResponse = expectNoThrottling()
 
@@ -3460,101 +3476,121 @@ class KafkaApisTest {
 
   @Test
   def testRaftShouldNeverHandleLeaderAndIsrRequest(): Unit = {
+    metadataCache = MetadataCache.raftMetadataCache(brokerId)
     verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleLeaderAndIsrRequest)
   }
 
   @Test
   def testRaftShouldNeverHandleStopReplicaRequest(): Unit = {
+    metadataCache = MetadataCache.raftMetadataCache(brokerId)
     verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleStopReplicaRequest)
   }
 
   @Test
   def testRaftShouldNeverHandleUpdateMetadataRequest(): Unit = {
+    metadataCache = MetadataCache.raftMetadataCache(brokerId)
     verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleUpdateMetadataRequest)
   }
 
   @Test
   def testRaftShouldNeverHandleControlledShutdownRequest(): Unit = {
+    metadataCache = MetadataCache.raftMetadataCache(brokerId)
     verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleControlledShutdownRequest)
   }
 
   @Test
   def testRaftShouldNeverHandleAlterIsrRequest(): Unit = {
+    metadataCache = MetadataCache.raftMetadataCache(brokerId)
     verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleAlterIsrRequest)
   }
 
   @Test
   def testRaftShouldNeverHandleEnvelope(): Unit = {
+    metadataCache = MetadataCache.raftMetadataCache(brokerId)
     verifyShouldNeverHandle(createKafkaApis(raftSupport = true).handleEnvelope)
   }
 
   @Test
   def testRaftShouldAlwaysForwardCreateTopicsRequest(): Unit = {
+    metadataCache = MetadataCache.raftMetadataCache(brokerId)
     verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreateTopicsRequest)
   }
 
   @Test
   def testRaftShouldAlwaysForwardCreatePartitionsRequest(): Unit = {
+    metadataCache = MetadataCache.raftMetadataCache(brokerId)
     verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreatePartitionsRequest)
   }
 
   @Test
   def testRaftShouldAlwaysForwardDeleteTopicsRequest(): Unit = {
+    metadataCache = MetadataCache.raftMetadataCache(brokerId)
     verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleDeleteTopicsRequest)
   }
 
   @Test
   def testRaftShouldAlwaysForwardCreateAcls(): Unit = {
+    metadataCache = MetadataCache.raftMetadataCache(brokerId)
     verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreateAcls)
   }
 
   @Test
   def testRaftShouldAlwaysForwardDeleteAcls(): Unit = {
+    metadataCache = MetadataCache.raftMetadataCache(brokerId)
     verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleDeleteAcls)
   }
 
   @Test
   def testRaftShouldAlwaysForwardAlterConfigsRequest(): Unit = {
+    metadataCache = MetadataCache.raftMetadataCache(brokerId)
     verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterConfigsRequest)
   }
 
   @Test
   def testRaftShouldAlwaysForwardAlterPartitionReassignmentsRequest(): Unit = {
+    metadataCache = MetadataCache.raftMetadataCache(brokerId)
     verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterPartitionReassignmentsRequest)
   }
 
   @Test
   def testRaftShouldAlwaysForwardIncrementalAlterConfigsRequest(): Unit = {
+    metadataCache = MetadataCache.raftMetadataCache(brokerId)
     verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleIncrementalAlterConfigsRequest)
   }
 
   @Test
   def testRaftShouldAlwaysForwardCreateTokenRequest(): Unit = {
+    metadataCache = MetadataCache.raftMetadataCache(brokerId)
     verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleCreateTokenRequest)
   }
 
   @Test
   def testRaftShouldAlwaysForwardRenewTokenRequest(): Unit = {
+    metadataCache = MetadataCache.raftMetadataCache(brokerId)
     verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleRenewTokenRequest)
   }
 
   @Test
   def testRaftShouldAlwaysForwardExpireTokenRequest(): Unit = {
+    metadataCache = MetadataCache.raftMetadataCache(brokerId)
     verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleExpireTokenRequest)
   }
 
   @Test
   def testRaftShouldAlwaysForwardAlterClientQuotasRequest(): Unit = {
+    metadataCache = MetadataCache.raftMetadataCache(brokerId)
     verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterClientQuotasRequest)
   }
 
   @Test
   def testRaftShouldAlwaysForwardAlterUserScramCredentialsRequest(): Unit = {
+    metadataCache = MetadataCache.raftMetadataCache(brokerId)
     verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleAlterUserScramCredentialsRequest)
   }
 
   @Test
   def testRaftShouldAlwaysForwardUpdateFeatures(): Unit = {
+    metadataCache = MetadataCache.raftMetadataCache(brokerId)
     verifyShouldAlwaysForward(createKafkaApis(raftSupport = true).handleUpdateFeatures)
   }
 }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index d6c456b..6271105 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -33,10 +33,13 @@ import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 import java.net.InetSocketAddress
 import java.util
-import java.util.Properties
+import java.util.{Collections, Properties}
 
+import org.apache.kafka.common.Node
 import org.junit.jupiter.api.function.Executable
 
+import scala.jdk.CollectionConverters._
+
 class KafkaConfigTest {
 
   @Test
@@ -1034,7 +1037,17 @@ class KafkaConfigTest {
   }
 
   @Test
-  def testInvalidQuorumVotersConfig(): Unit = {
+  def testControllerQuorumVoterStringsToNodes(): Unit = {
+    assertThrows(classOf[ConfigException], () => RaftConfig.quorumVoterStringsToNodes(Collections.singletonList("")))
+    assertEquals(Seq(new Node(3000, "example.com", 9093)),
+      RaftConfig.quorumVoterStringsToNodes(util.Arrays.asList("3000@example.com:9093")).asScala.toSeq)
+    assertEquals(Seq(new Node(3000, "example.com", 9093),
+      new Node(3001, "example.com", 9094)),
+      RaftConfig.quorumVoterStringsToNodes(util.Arrays.asList("3000@example.com:9093","3001@example.com:9094")).asScala.toSeq)
+  }
+
+  @Test
+  def testInvalidQuorumVoterConfig(): Unit = {
     assertInvalidQuorumVoters("1")
     assertInvalidQuorumVoters("1@")
     assertInvalidQuorumVoters("1:")
@@ -1046,6 +1059,7 @@ class KafkaConfigTest {
     assertInvalidQuorumVoters("1@kafka1:9092,2@")
     assertInvalidQuorumVoters("1@kafka1:9092,2@blah")
     assertInvalidQuorumVoters("1@kafka1:9092,2@blah,")
+    assertInvalidQuorumVoters("1@kafka1:9092:1@kafka2:9092")
   }
 
   private def assertInvalidQuorumVoters(value: String): Unit = {
@@ -1081,6 +1095,102 @@ class KafkaConfigTest {
   }
 
   @Test
+  def testAcceptsLargeNodeIdForRaftBasedCase(): Unit = {
+    // Generation of Broker IDs is not supported when using Raft-based controller quorums,
+    // so pick a broker ID greater than reserved.broker.max.id, which defaults to 1000,
+    // and make sure it is allowed despite broker.id.generation.enable=true (true is the default)
+    val largeBrokerId = 2000
+    val props = new Properties()
+    props.put(KafkaConfig.ProcessRolesProp, "broker")
+    props.put(KafkaConfig.NodeIdProp, largeBrokerId.toString)
+    assertTrue(isValidKafkaConfig(props))
+  }
+
+  @Test
+  def testRejectsNegativeNodeIdForRaftBasedBrokerCaseWithAutoGenEnabled(): Unit = {
+    // -1 is the default for both node.id and broker.id
+    val props = new Properties()
+    props.put(KafkaConfig.ProcessRolesProp, "broker")
+    assertFalse(isValidKafkaConfig(props))
+  }
+
+  @Test
+  def testRejectsNegativeNodeIdForRaftBasedControllerCaseWithAutoGenEnabled(): Unit = {
+    // -1 is the default for both node.id and broker.id
+    val props = new Properties()
+    props.put(KafkaConfig.ProcessRolesProp, "controller")
+    assertFalse(isValidKafkaConfig(props))
+  }
+
+  @Test
+  def testRejectsNegativeNodeIdForRaftBasedCaseWithAutoGenDisabled(): Unit = {
+    // -1 is the default for both node.id and broker.id
+    val props = new Properties()
+    props.put(KafkaConfig.ProcessRolesProp, "broker")
+    props.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
+    assertFalse(isValidKafkaConfig(props))
+  }
+
+  @Test
+  def testRejectsLargeNodeIdForZkBasedCaseWithAutoGenEnabled(): Unit = {
+    // Generation of Broker IDs is supported when using ZooKeeper-based controllers,
+    // so pick a broker ID greater than reserved.broker.max.id, which defaults to 1000,
+    // and make sure it is not allowed with broker.id.generation.enable=true (true is the default)
+    val largeBrokerId = 2000
+    val props = TestUtils.createBrokerConfig(largeBrokerId, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
+    val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
+    props.put(KafkaConfig.ListenersProp, listeners)
+    props.put(KafkaConfig.AdvertisedListenersProp, listeners)
+    assertFalse(isValidKafkaConfig(props))
+  }
+
+  @Test
+  def testAcceptsNegativeOneNodeIdForZkBasedCaseWithAutoGenEnabled(): Unit = {
+    // -1 is the default for both node.id and broker.id; it implies "auto-generate" and should succeed
+    val props = TestUtils.createBrokerConfig(-1, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
+    val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
+    props.put(KafkaConfig.ListenersProp, listeners)
+    props.put(KafkaConfig.AdvertisedListenersProp, listeners)
+    assertTrue(isValidKafkaConfig(props))
+  }
+
+  @Test
+  def testRejectsNegativeTwoNodeIdForZkBasedCaseWithAutoGenEnabled(): Unit = {
+    // -1 implies "auto-generate" and should succeed, but -2 does not and should fail
+    val negativeTwoNodeId = -2
+    val props = TestUtils.createBrokerConfig(negativeTwoNodeId, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
+    val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
+    props.put(KafkaConfig.ListenersProp, listeners)
+    props.put(KafkaConfig.AdvertisedListenersProp, listeners)
+    props.put(KafkaConfig.NodeIdProp, negativeTwoNodeId.toString)
+    props.put(KafkaConfig.BrokerIdProp, negativeTwoNodeId.toString)
+    assertFalse(isValidKafkaConfig(props))
+  }
+
+  @Test
+  def testAcceptsLargeNodeIdForZkBasedCaseWithAutoGenDisabled(): Unit = {
+    // Ensure a broker ID greater than reserved.broker.max.id, which defaults to 1000,
+    // is allowed with broker.id.generation.enable=false
+    val largeBrokerId = 2000
+    val props = TestUtils.createBrokerConfig(largeBrokerId, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
+    val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
+    props.put(KafkaConfig.ListenersProp, listeners)
+    props.put(KafkaConfig.AdvertisedListenersProp, listeners)
+    props.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
+    assertTrue(isValidKafkaConfig(props))
+  }
+
+  @Test
+  def testRejectsNegativeNodeIdForZkBasedCaseWithAutoGenDisabled(): Unit = {
+    // -1 is the default for both node.id and broker.id
+    val props = TestUtils.createBrokerConfig(-1, TestUtils.MockZkConnect, port = TestUtils.MockZkPort)
+    val listeners = "PLAINTEXT://A:9092,SSL://B:9093,SASL_SSL://C:9094"
+    props.put(KafkaConfig.ListenersProp, listeners)
+    props.put(KafkaConfig.BrokerIdGenerationEnableProp, "false")
+    assertFalse(isValidKafkaConfig(props))
+  }
+
+  @Test
   def testZookeeperConnectRequiredIfEmptyProcessRoles(): Unit = {
     val props = new Properties()
     props.put(KafkaConfig.ProcessRolesProp, "")
diff --git a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
index 4cf7d1e..6166d73 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaRaftServerTest.scala
@@ -66,7 +66,7 @@ class KafkaRaftServerTest {
   private def invokeLoadMetaProperties(
     metaProperties: MetaProperties,
     configProperties: Properties
-  ): (MetaProperties, Seq[String]) = {
+  ): (MetaProperties, collection.Seq[String]) = {
     val tempLogDir = TestUtils.tempDirectory()
     try {
       writeMetaProperties(tempLogDir, metaProperties)
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
index 887d53da..c71c058 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/metadata/MetadataRequestBenchmark.java
@@ -34,6 +34,7 @@ import kafka.server.KafkaApis;
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaConfig$;
 import kafka.server.MetadataCache;
+import kafka.server.ZkMetadataCache;
 import kafka.server.QuotaFactory;
 import kafka.server.ReplicaManager;
 import kafka.server.ReplicationQuotaManager;
@@ -105,7 +106,7 @@ public class MetadataRequestBenchmark {
     private KafkaZkClient kafkaZkClient = Mockito.mock(KafkaZkClient.class);
     private Metrics metrics = new Metrics();
     private int brokerId = 1;
-    private MetadataCache metadataCache = MetadataCache.zkMetadataCache(brokerId);
+    private ZkMetadataCache metadataCache = MetadataCache.zkMetadataCache(brokerId);
     private ClientQuotaManager clientQuotaManager = Mockito.mock(ClientQuotaManager.class);
     private ClientRequestQuotaManager clientRequestQuotaManager = Mockito.mock(ClientRequestQuotaManager.class);
     private ControllerMutationQuotaManager controllerMutationQuotaManager = Mockito.mock(ControllerMutationQuotaManager.class);
@@ -173,7 +174,7 @@ public class MetadataRequestBenchmark {
         kafkaProps.put(KafkaConfig$.MODULE$.BrokerIdProp(), brokerId + "");
         BrokerFeatures brokerFeatures = BrokerFeatures.createDefault();
         return new KafkaApis(requestChannel,
-            new ZkSupport(adminManager, kafkaController, kafkaZkClient, Option.empty()),
+            new ZkSupport(adminManager, kafkaController, kafkaZkClient, Option.empty(), metadataCache),
             replicaManager,
             groupCoordinator,
             transactionCoordinator,
diff --git a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
index 2823186..d0d93c8 100644
--- a/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
@@ -243,6 +243,12 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
             random);
         this.kafkaRaftMetrics = new KafkaRaftMetrics(metrics, "raft", quorum);
         kafkaRaftMetrics.updateNumUnknownVoterConnections(quorum.remoteVoters().size());
+
+        // Update the voter endpoints with what's in RaftConfig
+        Map<Integer, RaftConfig.AddressSpec> voterAddresses = raftConfig.quorumVoterConnections();
+        voterAddresses.entrySet().stream()
+            .filter(e -> e.getValue() instanceof RaftConfig.InetAddressSpec)
+            .forEach(e -> this.channel.updateEndpoint(e.getKey(), (RaftConfig.InetAddressSpec) e.getValue()));
     }
 
     private void updateFollowerHighWatermark(
@@ -336,9 +342,9 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
         }
     }
 
-    private void fireHandleResign() {
+    private void fireHandleResign(int epoch) {
         for (ListenerContext listenerContext : listenerContexts) {
-            listenerContext.fireHandleResign();
+            listenerContext.fireHandleResign(epoch);
         }
     }
 
@@ -370,6 +376,11 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
         wakeup();
     }
 
+    @Override
+    public LeaderAndEpoch leaderAndEpoch() {
+        return quorum.leaderAndEpoch();
+    }
+
     private OffsetAndEpoch endOffset() {
         return new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch());
     }
@@ -457,7 +468,7 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
 
     private void maybeResignLeadership() {
         if (quorum.isLeader()) {
-            fireHandleResign();
+            fireHandleResign(quorum.epoch());
         }
 
         if (accumulator != null) {
@@ -2357,8 +2368,8 @@ public class KafkaRaftClient<T> implements RaftClient<T> {
             }
         }
 
-        void fireHandleResign() {
-            listener.handleResign();
+        void fireHandleResign(int epoch) {
+            listener.handleResign(epoch);
         }
 
         public synchronized void onClose(BatchReader<T> reader) {
diff --git a/raft/src/main/java/org/apache/kafka/raft/NetworkChannel.java b/raft/src/main/java/org/apache/kafka/raft/NetworkChannel.java
index b88241b..ee74b5b 100644
--- a/raft/src/main/java/org/apache/kafka/raft/NetworkChannel.java
+++ b/raft/src/main/java/org/apache/kafka/raft/NetworkChannel.java
@@ -37,6 +37,11 @@ public interface NetworkChannel extends Closeable {
      */
     void send(RaftRequest.Outbound request);
 
+    /**
+     * Update connection information for the given id.
+     */
+    void updateEndpoint(int id, RaftConfig.InetAddressSpec address);
+
     default void close() {}
 
 }
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
index 554ce61..e2bec0e 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftClient.java
@@ -57,15 +57,16 @@ public interface RaftClient<T> extends Closeable {
         /**
          * Invoked after a leader has stepped down. This callback may or may not
          * fire before the next leader has been elected.
+         *
+         * @param epoch the epoch that the leader is resigning from
          */
-        default void handleResign() {}
+        default void handleResign(int epoch) {}
     }
 
     /**
      * Initialize the client.
      * This should only be called once on startup.
      *
-     * @param raftConfig the Raft quorum configuration
      * @throws IOException For any IO errors during initialization
      */
     void initialize() throws IOException;
@@ -78,6 +79,12 @@ public interface RaftClient<T> extends Closeable {
     void register(Listener<T> listener);
 
     /**
+     * Return the current {@link LeaderAndEpoch}.
+     * @return the current {@link LeaderAndEpoch}
+     */
+    LeaderAndEpoch leaderAndEpoch();
+
+    /**
      * Append a list of records to the log. The write will be scheduled for some time
      * in the future. There is no guarantee that appended records will be written to
      * the log and eventually committed. However, it is guaranteed that if any of the
diff --git a/raft/src/main/java/org/apache/kafka/raft/RaftConfig.java b/raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
index de40b35..13dd879 100644
--- a/raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
+++ b/raft/src/main/java/org/apache/kafka/raft/RaftConfig.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.raft;
 
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.Node;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
@@ -28,6 +29,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * RaftConfig encapsulates configuration specific to the Raft quorum voter nodes.
@@ -233,6 +235,17 @@ public class RaftConfig {
         return voterMap;
     }
 
+    public static List<Node> quorumVoterStringsToNodes(List<String> voters) {
+        return parseVoterConnections(voters).entrySet().stream()
+            .filter(connection -> connection.getValue() instanceof InetAddressSpec)
+            .map(connection -> {
+                InetAddressSpec inetAddressSpec = InetAddressSpec.class.cast(connection.getValue());
+                return new Node(connection.getKey(), inetAddressSpec.address.getHostName(),
+                    inetAddressSpec.address.getPort());
+            })
+            .collect(Collectors.toList());
+    }
+
     public static class ControllerQuorumVotersValidator implements ConfigDef.Validator {
         @Override
         public void ensureValid(String name, Object value) {
diff --git a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
index 47dae5d..3db4d73 100644
--- a/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
+++ b/raft/src/main/java/org/apache/kafka/raft/ReplicatedCounter.java
@@ -96,7 +96,7 @@ public class ReplicatedCounter implements RaftClient.Listener<Integer> {
     }
 
     @Override
-    public synchronized void handleResign() {
+    public synchronized void handleResign(int epoch) {
         log.debug("Counter uncommitted value reset after resigning leadership");
         this.uncommitted = -1;
         this.claimedEpoch = Optional.empty();
diff --git a/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java b/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java
new file mode 100644
index 0000000..bf88e7d
--- /dev/null
+++ b/raft/src/main/java/org/apache/kafka/raft/metadata/MetaLogRaftShim.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft.metadata;
+
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogLeader;
+import org.apache.kafka.metalog.MetaLogListener;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.raft.RaftClient;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * For now, we rely on a shim to translate from `RaftClient` to `MetaLogManager`.
+ * Once we check in to trunk, we can drop `RaftClient` and implement `MetaLogManager`
+ * directly.
+ */
+public class MetaLogRaftShim implements MetaLogManager {
+    private final RaftClient<ApiMessageAndVersion> client;
+    private final int nodeId;
+
+    public MetaLogRaftShim(RaftClient<ApiMessageAndVersion> client, int nodeId) {
+        this.client = client;
+        this.nodeId = nodeId;
+    }
+
+    @Override
+    public void initialize() {
+        // NO-OP - The RaftClient is initialized externally
+    }
+
+    @Override
+    public void register(MetaLogListener listener) {
+        client.register(new ListenerShim(listener));
+    }
+
+    @Override
+    public long scheduleWrite(long epoch, List<ApiMessageAndVersion> batch) {
+        return client.scheduleAppend((int) epoch, batch);
+    }
+
+    @Override
+    public void renounce(long epoch) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public MetaLogLeader leader() {
+        LeaderAndEpoch leaderAndEpoch = client.leaderAndEpoch();
+        return new MetaLogLeader(leaderAndEpoch.leaderId.orElse(-1), leaderAndEpoch.epoch);
+    }
+
+    @Override
+    public int nodeId() {
+        return nodeId;
+    }
+
+    private class ListenerShim implements RaftClient.Listener<ApiMessageAndVersion> {
+        private final MetaLogListener listener;
+
+        private ListenerShim(MetaLogListener listener) {
+            this.listener = listener;
+        }
+
+        @Override
+        public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
+            try {
+                // TODO: The `BatchReader` might need to read from disk if this is
+                // not a leader. We want to move this IO to the state machine so that
+                // it does not block Raft replication
+                while (reader.hasNext()) {
+                    BatchReader.Batch<ApiMessageAndVersion> batch = reader.next();
+                    List<ApiMessage> records = batch.records().stream()
+                        .map(ApiMessageAndVersion::message)
+                        .collect(Collectors.toList());
+                    listener.handleCommits(batch.lastOffset(), records);
+                }
+            } finally {
+                reader.close();
+            }
+        }
+
+        @Override
+        public void handleClaim(int epoch) {
+            listener.handleNewLeader(new MetaLogLeader(nodeId, epoch));
+        }
+
+        @Override
+        public void handleResign(int epoch) {
+            listener.handleRenounce(epoch);
+        }
+
+        @Override
+        public String toString() {
+            return "ListenerShim(" +
+                    "listener=" + listener +
+                    ')';
+        }
+    }
+
+}
diff --git a/raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java b/raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java
index 7a5b385..2a97931 100644
--- a/raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java
+++ b/raft/src/test/java/org/apache/kafka/raft/MockNetworkChannel.java
@@ -56,6 +56,11 @@ public class MockNetworkChannel implements NetworkChannel {
         sendQueue.add(request);
     }
 
+    @Override
+    public void updateEndpoint(int id, RaftConfig.InetAddressSpec address) {
+        // empty
+    }
+
     public List<RaftRequest.Outbound> drainSendQueue() {
         return drainSentRequests(Optional.empty());
     }
diff --git a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
index efe7c95..9d19b86 100644
--- a/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
+++ b/raft/src/test/java/org/apache/kafka/raft/RaftClientTestContext.java
@@ -975,7 +975,7 @@ public final class RaftClientTestContext {
         }
 
         @Override
-        public void handleResign() {
+        public void handleResign(int epoch) {
             this.currentClaimedEpoch = OptionalInt.empty();
         }
 
diff --git a/tests/kafkatest/services/kafka/config_property.py b/tests/kafkatest/services/kafka/config_property.py
index 2222c16..42243cf 100644
--- a/tests/kafkatest/services/kafka/config_property.py
+++ b/tests/kafkatest/services/kafka/config_property.py
@@ -22,7 +22,7 @@ NODE_ID = "node.id"
 FIRST_BROKER_PORT = 9092
 FIRST_CONTROLLER_PORT = FIRST_BROKER_PORT + 500
 FIRST_CONTROLLER_ID = 3001
-CLUSTER_ID = "6bd37820-6745-4790-ae98-620300e1f61b"
+CLUSTER_ID = "I2eXt9rvSnyhct8BYmW6-w"
 PORT = "port"
 ADVERTISED_HOSTNAME = "advertised.host.name"