You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/02/26 16:12:03 UTC

[GitHub] [kafka] mumrah opened a new pull request #10220: Minor: TestKit for Raft controller integration tests

mumrah opened a new pull request #10220:
URL: https://github.com/apache/kafka/pull/10220


   TODO


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10220: KAFKA-12383: Get RaftClusterTest.java and other KIP-500 junit tests working

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10220:
URL: https://github.com/apache/kafka/pull/10220#discussion_r597214017



##########
File path: core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.raft.KafkaRaftManager;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.Server;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.server.MetaProperties;
+import kafka.tools.StorageTool;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.controller.Controller;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.raft.metadata.MetaLogRaftShim;
+import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@SuppressWarnings("deprecation") // Needed for Scala 2.12 compatibility
+public class KafkaClusterTestKit implements AutoCloseable {
+    private final static Logger log = LoggerFactory.getLogger(KafkaClusterTestKit.class);
+
+    /**
+     * This class manages a future which is completed with the proper value for
+     * controller.quorum.voters once the randomly assigned ports for all the controllers are
+     * known.
+     */
+    private static class ControllerQuorumVotersFutureManager implements AutoCloseable {
+        private final int expectedControllers;
+        private final CompletableFuture<List<String>> future = new CompletableFuture<>();
+        private final Map<Integer, Integer> controllerPorts = new TreeMap<>();
+
+        ControllerQuorumVotersFutureManager(int expectedControllers) {
+            this.expectedControllers = expectedControllers;
+        }
+
+        synchronized void registerPort(int nodeId, int port) {
+            controllerPorts.put(nodeId, port);
+            if (controllerPorts.size() >= expectedControllers) {
+                future.complete(controllerPorts.entrySet().stream().
+                    map(e -> String.format("%d@localhost:%d", e.getKey(), e.getValue())).
+                    collect(Collectors.toList()));
+            }
+        }
+
+        void fail(Throwable e) {
+            future.completeExceptionally(e);
+        }
+
+        @Override
+        public void close() {
+            future.cancel(true);
+        }
+    }
+
+    public static class Builder {
+        private TestKitNodes nodes;
+        private Map<String, String> configProps = new HashMap<>();
+
+        public Builder(TestKitNodes nodes) {
+            this.nodes = nodes;
+        }
+
+        public Builder setConfigProp(String key, String value) {
+            this.configProps.put(key, value);
+            return this;
+        }
+
+        public KafkaClusterTestKit build() throws Exception {
+            Map<Integer, ControllerServer> controllers = new HashMap<>();
+            Map<Integer, BrokerServer> kip500Brokers = new HashMap<>();
+            Map<Integer, KafkaRaftManager> raftManagers = new HashMap<>();
+            String dummyQuorumVotersString = nodes.controllerNodes().keySet().stream().
+                map(controllerNode -> String.format("%d@0.0.0.0:0", controllerNode)).
+                collect(Collectors.joining(","));
+            /*
+              Number of threads = Total number of brokers + Total number of controllers + Total number of Raft Managers
+                                = Total number of brokers + Total number of controllers * 2
+                                  (Raft Manager per broker/controller)
+             */
+            int numOfExecutorThreads = (nodes.brokerNodes().size() + nodes.controllerNodes().size()) * 2;
+            ExecutorService executorService = null;
+            ControllerQuorumVotersFutureManager connectFutureManager =
+                new ControllerQuorumVotersFutureManager(nodes.controllerNodes().size());
+            File baseDirectory = null;
+
+            try {
+                baseDirectory = TestUtils.tempDirectory();
+                nodes = nodes.copyWithAbsolutePaths(baseDirectory.getAbsolutePath());
+                executorService = Executors.newFixedThreadPool(numOfExecutorThreads,
+                    ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", false));
+                Time time = Time.SYSTEM;
+                for (ControllerNode node : nodes.controllerNodes().values()) {

Review comment:
       I took a stab at this and it didn't really end up much cleaner




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah merged pull request #10220: KAFKA-12383: Get RaftClusterTest.java and other KIP-500 junit tests working

Posted by GitBox <gi...@apache.org>.
mumrah merged pull request #10220:
URL: https://github.com/apache/kafka/pull/10220


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] mumrah commented on a change in pull request #10220: KAFKA-12383: Get RaftClusterTest.java and other KIP-500 junit tests working

Posted by GitBox <gi...@apache.org>.
mumrah commented on a change in pull request #10220:
URL: https://github.com/apache/kafka/pull/10220#discussion_r597202938



##########
File path: core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.raft.KafkaRaftManager;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.Server;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.server.MetaProperties;
+import kafka.tools.StorageTool;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.controller.Controller;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.raft.metadata.MetaLogRaftShim;
+import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@SuppressWarnings("deprecation") // Needed for Scala 2.12 compatibility
+public class KafkaClusterTestKit implements AutoCloseable {
+    private final static Logger log = LoggerFactory.getLogger(KafkaClusterTestKit.class);
+
+    /**
+     * This class manages a future which is completed with the proper value for
+     * controller.quorum.voters once the randomly assigned ports for all the controllers are
+     * known.
+     */
+    private static class ControllerQuorumVotersFutureManager implements AutoCloseable {
+        private final int expectedControllers;
+        private final CompletableFuture<List<String>> future = new CompletableFuture<>();
+        private final Map<Integer, Integer> controllerPorts = new TreeMap<>();
+
+        ControllerQuorumVotersFutureManager(int expectedControllers) {
+            this.expectedControllers = expectedControllers;
+        }
+
+        synchronized void registerPort(int nodeId, int port) {
+            controllerPorts.put(nodeId, port);
+            if (controllerPorts.size() >= expectedControllers) {
+                future.complete(controllerPorts.entrySet().stream().
+                    map(e -> String.format("%d@localhost:%d", e.getKey(), e.getValue())).
+                    collect(Collectors.toList()));
+            }
+        }
+
+        void fail(Throwable e) {
+            future.completeExceptionally(e);
+        }
+
+        @Override
+        public void close() {
+            future.cancel(true);
+        }
+    }
+
+    public static class Builder {
+        private TestKitNodes nodes;
+        private Map<String, String> configProps = new HashMap<>();
+
+        public Builder(TestKitNodes nodes) {
+            this.nodes = nodes;
+        }
+
+        public Builder setConfigProp(String key, String value) {
+            this.configProps.put(key, value);
+            return this;
+        }
+
+        public KafkaClusterTestKit build() throws Exception {
+            Map<Integer, ControllerServer> controllers = new HashMap<>();
+            Map<Integer, BrokerServer> kip500Brokers = new HashMap<>();
+            Map<Integer, KafkaRaftManager> raftManagers = new HashMap<>();
+            String dummyQuorumVotersString = nodes.controllerNodes().keySet().stream().
+                map(controllerNode -> String.format("%d@0.0.0.0:0", controllerNode)).
+                collect(Collectors.joining(","));
+            /*
+              Number of threads = Total number of brokers + Total number of controllers + Total number of Raft Managers
+                                = Total number of brokers + Total number of controllers * 2
+                                  (Raft Manager per broker/controller)
+             */
+            int numOfExecutorThreads = (nodes.brokerNodes().size() + nodes.controllerNodes().size()) * 2;
+            ExecutorService executorService = null;
+            ControllerQuorumVotersFutureManager connectFutureManager =
+                new ControllerQuorumVotersFutureManager(nodes.controllerNodes().size());
+            File baseDirectory = null;
+
+            try {
+                baseDirectory = TestUtils.tempDirectory();
+                nodes = nodes.copyWithAbsolutePaths(baseDirectory.getAbsolutePath());
+                executorService = Executors.newFixedThreadPool(numOfExecutorThreads,
+                    ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", false));
+                Time time = Time.SYSTEM;
+                for (ControllerNode node : nodes.controllerNodes().values()) {
+                    Map<String, String> props = new HashMap<>(configProps);
+                    props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "controller");
+                    props.put(KafkaConfig$.MODULE$.NodeIdProp(),
+                        Integer.toString(node.id()));
+                    props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(),
+                        node.metadataDirectory());
+                    props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
+                        "CONTROLLER:PLAINTEXT");
+                    props.put(KafkaConfig$.MODULE$.ListenersProp(),
+                        "CONTROLLER://localhost:0");
+                    props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
+                        "CONTROLLER");
+                    // Note: we can't accurately set controller.quorum.voters yet, since we don't
+                    // yet know what ports each controller will pick.  Set it to a dummy string \
+                    // for now as a placeholder.
+                    props.put(RaftConfig.QUORUM_VOTERS_CONFIG, dummyQuorumVotersString);
+                    setupNodeDirectories(baseDirectory, node.metadataDirectory(), Collections.emptyList());
+                    KafkaConfig config = new KafkaConfig(props, false, Option.empty());
+
+                    String threadNamePrefix = String.format("controller%d_", node.id());
+                    MetaProperties metaProperties = MetaProperties.apply(nodes.clusterId(), node.id());
+                    TopicPartition metadataPartition = new TopicPartition("@metadata", 0);
+                    KafkaRaftManager<ApiMessageAndVersion> raftManager = new KafkaRaftManager<>(
+                        metaProperties, config, new MetadataRecordSerde(), metadataPartition,
+                        Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), connectFutureManager.future);
+                    MetaLogManager metaLogShim = new MetaLogRaftShim(raftManager.kafkaRaftClient(), config.nodeId());
+                    ControllerServer controller = new ControllerServer(
+                        nodes.controllerProperties(node.id()),
+                        config,
+                        metaLogShim,
+                        raftManager,
+                        time,
+                        new Metrics(),
+                        Option.apply(threadNamePrefix),
+                        connectFutureManager.future
+                    );
+                    controllers.put(node.id(), controller);
+                    controller.socketServerFirstBoundPortFuture().whenComplete((port, e) -> {
+                        if (e != null) {
+                            connectFutureManager.fail(e);
+                        } else {
+                            connectFutureManager.registerPort(node.id(), port);
+                        }
+                    });
+                    raftManagers.put(node.id(), raftManager);
+                }
+                for (Kip500BrokerNode node : nodes.brokerNodes().values()) {

Review comment:
       https://issues.apache.org/jira/browse/KAFKA-12501




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10220: KAFKA-12383: Get RaftClusterTest.java and other KIP-500 junit tests working

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10220:
URL: https://github.com/apache/kafka/pull/10220#discussion_r588522951



##########
File path: core/src/test/java/kafka/test/junit/ClusterTestExtensions.java
##########
@@ -199,12 +209,25 @@ private void processClusterTest(ClusterTest annot, ClusterTestDefaults defaults,
         }
 
         switch (type) {
-            case ZK:
-            case BOTH:
+            case ZK: {
                 ClusterConfig config = builder.build();
                 config.serverProperties().putAll(properties);
                 testInvocations.accept(new ZkClusterInvocationContext(config));
                 break;
+            } case RAFT: {
+                ClusterConfig config = builder.build();
+                config.serverProperties().putAll(properties);
+                testInvocations.accept(new RaftClusterInvocationContext(config));
+                break;
+            } case BOTH: {
+                ClusterConfig zkConfig = builder.build();

Review comment:
       nit: not a big deal, but I suspect there's a way to remove some of the duplication here in this switch. For example, perhaps we could add a method to `Type` to build the invocation context:
   ```java
     List<ClusterInvocationContext> invocationContexts(ClusterConfig config);
   ```

##########
File path: core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.junit;
+
+import kafka.network.SocketServer;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.testkit.KafkaClusterTestKit;
+import kafka.testkit.TestKitNodes;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.metadata.BrokerState;
+import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
+import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+/**
+ * Wraps a {@link KafkaClusterTestKit} inside lifecycle methods for a test invocation. Each instance of this
+ * class is provided with a configuration for the cluster.
+ *
+ * This context also provides parameter resolvers for:
+ *
+ * <ul>
+ *     <li>ClusterConfig (the same instance passed to the constructor)</li>
+ *     <li>ClusterInstance (includes methods to expose underlying SocketServer-s)</li>
+ *     <li>IntegrationTestHelper (helper methods)</li>
+ * </ul>
+ */
+public class RaftClusterInvocationContext implements TestTemplateInvocationContext {
+
+    private final ClusterConfig clusterConfig;
+    private final AtomicReference<KafkaClusterTestKit> clusterReference;
+
+    public RaftClusterInvocationContext(ClusterConfig clusterConfig) {
+        this.clusterConfig = clusterConfig;
+        this.clusterReference = new AtomicReference<>();
+    }
+
+    @Override
+    public String getDisplayName(int invocationIndex) {
+        String clusterDesc = clusterConfig.nameTags().entrySet().stream()
+            .map(Object::toString)
+            .collect(Collectors.joining(", "));
+        return String.format("[Quorum %d] %s", invocationIndex, clusterDesc);
+    }
+
+    @Override
+    public List<Extension> getAdditionalExtensions() {
+        return Arrays.asList(
+            (BeforeTestExecutionCallback) context -> {
+                KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(
+                    new TestKitNodes.Builder().
+                        setNumKip500BrokerNodes(clusterConfig.numBrokers()).
+                        setNumControllerNodes(clusterConfig.numControllers()).build());
+
+                // Copy properties into the TestKit builder
+                clusterConfig.serverProperties().forEach((key, value) -> builder.setConfigProp(key.toString(), value.toString()));
+                // TODO how to pass down security protocol and listener name?
+                KafkaClusterTestKit cluster = builder.build();
+                clusterReference.set(cluster);
+                cluster.format();
+                cluster.startup();
+                kafka.utils.TestUtils.waitUntilTrue(
+                    () -> cluster.kip500Brokers().get(0).currentState() == BrokerState.RUNNING,
+                    () -> "Broker never made it to RUNNING state.",
+                    org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS,
+                    100L);
+            },
+            (AfterTestExecutionCallback) context -> clusterReference.get().close(),
+            new ClusterInstanceParameterResolver(new RaftClusterInstance(clusterReference, clusterConfig)),
+            new GenericParameterResolver<>(clusterConfig, ClusterConfig.class)
+        );
+    }
+
+    public static class RaftClusterInstance implements ClusterInstance {
+
+        private final AtomicReference<KafkaClusterTestKit> clusterReference;
+        private final ClusterConfig clusterConfig;
+        final AtomicBoolean started = new AtomicBoolean(false);
+        final AtomicBoolean stopped = new AtomicBoolean(false);
+
+        RaftClusterInstance(AtomicReference<KafkaClusterTestKit> clusterReference, ClusterConfig clusterConfig) {
+            this.clusterReference = clusterReference;
+            this.clusterConfig = clusterConfig;
+        }
+
+        @Override
+        public String bootstrapServers() {
+            return clusterReference.get().clientProperties().getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+        }
+
+        @Override
+        public Collection<SocketServer> brokerSocketServers() {
+            return clusterReference.get().kip500Brokers().values().stream()
+                .map(BrokerServer::socketServer)
+                .collect(Collectors.toList());
+        }
+
+        @Override
+        public ListenerName clientListener() {
+            return ListenerName.normalised("EXTERNAL");
+        }
+
+        @Override
+        public Collection<SocketServer> controllerSocketServers() {
+            return clusterReference.get().controllers().values().stream()
+                .map(ControllerServer::socketServer)
+                .collect(Collectors.toList());
+        }
+
+        @Override
+        public SocketServer anyBrokerSocketServer() {
+            return clusterReference.get().kip500Brokers().values().stream()
+                .map(BrokerServer::socketServer)
+                .findFirst()
+                .orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
+        }
+
+        @Override
+        public SocketServer anyControllerSocketServer() {
+            return clusterReference.get().controllers().values().stream()
+                .map(ControllerServer::socketServer)
+                .findFirst()
+                .orElseThrow(() -> new RuntimeException("No controller SocketServers found"));
+        }
+
+        @Override
+        public ClusterType clusterType() {
+            return ClusterType.RAFT;
+        }
+
+        @Override
+        public ClusterConfig config() {
+            return clusterConfig;
+        }
+
+        @Override
+        public Object getUnderlying() {

Review comment:
       Is there any benefit letting this return the more specific `KafkaClusterTestKit` type?

##########
File path: core/src/test/java/kafka/testkit/Kip500BrokerNode.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class Kip500BrokerNode implements TestKitNode {
+    public static class Builder {
+        private int id = -1;
+        private Uuid incarnationId = null;
+        private String metadataDirectory = null;
+        private List<String> logDataDirectories = null;
+
+        public Builder setId(int id) {
+            this.id = id;
+            return this;
+        }
+
+        public Builder setLogDirectories(List<String> logDataDirectories) {
+            this.logDataDirectories = logDataDirectories;
+            return this;
+        }
+
+        public Kip500BrokerNode build() {
+            if (id == -1) {
+                throw new RuntimeException("You must set the node id");
+            }
+            if (incarnationId == null) {
+                incarnationId = Uuid.randomUuid();
+            }
+            if (metadataDirectory == null) {
+                metadataDirectory = String.format("kip500broker_%d_meta", id);

Review comment:
       nit: do we need the `kip500` prefix? We don't use it in `ControllerNode`. Similarly, maybe we can call this `BrokerNode` instead of `Kip500BrokerNode`?

##########
File path: core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.junit;
+
+import kafka.network.SocketServer;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.testkit.KafkaClusterTestKit;
+import kafka.testkit.TestKitNodes;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.metadata.BrokerState;
+import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
+import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+/**
+ * Wraps a {@link KafkaClusterTestKit} inside lifecycle methods for a test invocation. Each instance of this
+ * class is provided with a configuration for the cluster.
+ *
+ * This context also provides parameter resolvers for:
+ *
+ * <ul>
+ *     <li>ClusterConfig (the same instance passed to the constructor)</li>
+ *     <li>ClusterInstance (includes methods to expose underlying SocketServer-s)</li>
+ *     <li>IntegrationTestHelper (helper methods)</li>
+ * </ul>
+ */
+public class RaftClusterInvocationContext implements TestTemplateInvocationContext {
+
+    private final ClusterConfig clusterConfig;
+    private final AtomicReference<KafkaClusterTestKit> clusterReference;
+
+    public RaftClusterInvocationContext(ClusterConfig clusterConfig) {
+        this.clusterConfig = clusterConfig;
+        this.clusterReference = new AtomicReference<>();
+    }
+
+    @Override
+    public String getDisplayName(int invocationIndex) {
+        String clusterDesc = clusterConfig.nameTags().entrySet().stream()
+            .map(Object::toString)
+            .collect(Collectors.joining(", "));
+        return String.format("[Quorum %d] %s", invocationIndex, clusterDesc);
+    }
+
+    @Override
+    public List<Extension> getAdditionalExtensions() {
+        return Arrays.asList(
+            (BeforeTestExecutionCallback) context -> {
+                KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(
+                    new TestKitNodes.Builder().
+                        setNumKip500BrokerNodes(clusterConfig.numBrokers()).
+                        setNumControllerNodes(clusterConfig.numControllers()).build());
+
+                // Copy properties into the TestKit builder
+                clusterConfig.serverProperties().forEach((key, value) -> builder.setConfigProp(key.toString(), value.toString()));
+                // TODO how to pass down security protocol and listener name?
+                KafkaClusterTestKit cluster = builder.build();
+                clusterReference.set(cluster);
+                cluster.format();
+                cluster.startup();
+                kafka.utils.TestUtils.waitUntilTrue(
+                    () -> cluster.kip500Brokers().get(0).currentState() == BrokerState.RUNNING,
+                    () -> "Broker never made it to RUNNING state.",
+                    org.apache.kafka.test.TestUtils.DEFAULT_MAX_WAIT_MS,
+                    100L);
+            },
+            (AfterTestExecutionCallback) context -> clusterReference.get().close(),
+            new ClusterInstanceParameterResolver(new RaftClusterInstance(clusterReference, clusterConfig)),
+            new GenericParameterResolver<>(clusterConfig, ClusterConfig.class)
+        );
+    }
+
+    public static class RaftClusterInstance implements ClusterInstance {
+
+        private final AtomicReference<KafkaClusterTestKit> clusterReference;
+        private final ClusterConfig clusterConfig;
+        final AtomicBoolean started = new AtomicBoolean(false);
+        final AtomicBoolean stopped = new AtomicBoolean(false);
+
+        RaftClusterInstance(AtomicReference<KafkaClusterTestKit> clusterReference, ClusterConfig clusterConfig) {
+            this.clusterReference = clusterReference;
+            this.clusterConfig = clusterConfig;
+        }
+
+        @Override
+        public String bootstrapServers() {
+            return clusterReference.get().clientProperties().getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+        }
+
+        @Override
+        public Collection<SocketServer> brokerSocketServers() {
+            return clusterReference.get().kip500Brokers().values().stream()
+                .map(BrokerServer::socketServer)
+                .collect(Collectors.toList());
+        }
+
+        @Override
+        public ListenerName clientListener() {
+            return ListenerName.normalised("EXTERNAL");
+        }
+
+        @Override
+        public Collection<SocketServer> controllerSocketServers() {
+            return clusterReference.get().controllers().values().stream()
+                .map(ControllerServer::socketServer)
+                .collect(Collectors.toList());
+        }
+
+        @Override
+        public SocketServer anyBrokerSocketServer() {
+            return clusterReference.get().kip500Brokers().values().stream()
+                .map(BrokerServer::socketServer)
+                .findFirst()
+                .orElseThrow(() -> new RuntimeException("No broker SocketServers found"));
+        }
+
+        @Override
+        public SocketServer anyControllerSocketServer() {
+            return clusterReference.get().controllers().values().stream()
+                .map(ControllerServer::socketServer)
+                .findFirst()
+                .orElseThrow(() -> new RuntimeException("No controller SocketServers found"));
+        }
+
+        @Override
+        public ClusterType clusterType() {
+            return ClusterType.RAFT;
+        }
+
+        @Override
+        public ClusterConfig config() {
+            return clusterConfig;
+        }
+
+        @Override
+        public Object getUnderlying() {
+            return clusterReference.get();
+        }
+
+        @Override
+        public Admin createAdminClient(Properties configOverrides) {
+            return Admin.create(clusterReference.get().clientProperties());
+        }
+
+        @Override
+        public void start() {
+            if (started.compareAndSet(false, true)) {
+                try {
+                    clusterReference.get().startup();
+                } catch (Exception e) {
+                    throw new RuntimeException("Failed to start up Raft server", e);
+                }
+            }
+        }
+
+        @Override
+        public void stop() {
+            if (stopped.compareAndSet(false, true)) {
+                try {
+                    clusterReference.get().close();
+                } catch (Exception e) {
+                    throw new RuntimeException("Failed to stop up Raft server", e);

Review comment:
       nit: drop "up"

##########
File path: core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
##########
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.test.junit;
+
+import kafka.network.SocketServer;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.test.ClusterConfig;
+import kafka.test.ClusterInstance;
+import kafka.testkit.KafkaClusterTestKit;
+import kafka.testkit.TestKitNodes;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.metadata.BrokerState;
+import org.junit.jupiter.api.extension.AfterTestExecutionCallback;
+import org.junit.jupiter.api.extension.BeforeTestExecutionCallback;
+import org.junit.jupiter.api.extension.Extension;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+/**
+ * Wraps a {@link KafkaClusterTestKit} inside lifecycle methods for a test invocation. Each instance of this
+ * class is provided with a configuration for the cluster.
+ *
+ * This context also provides parameter resolvers for:
+ *
+ * <ul>
+ *     <li>ClusterConfig (the same instance passed to the constructor)</li>
+ *     <li>ClusterInstance (includes methods to expose underlying SocketServer-s)</li>
+ *     <li>IntegrationTestHelper (helper methods)</li>
+ * </ul>
+ */
+public class RaftClusterInvocationContext implements TestTemplateInvocationContext {
+
+    private final ClusterConfig clusterConfig;
+    private final AtomicReference<KafkaClusterTestKit> clusterReference;
+
+    public RaftClusterInvocationContext(ClusterConfig clusterConfig) {
+        this.clusterConfig = clusterConfig;
+        this.clusterReference = new AtomicReference<>();
+    }
+
+    @Override
+    public String getDisplayName(int invocationIndex) {
+        String clusterDesc = clusterConfig.nameTags().entrySet().stream()
+            .map(Object::toString)
+            .collect(Collectors.joining(", "));
+        return String.format("[Quorum %d] %s", invocationIndex, clusterDesc);
+    }
+
+    @Override
+    public List<Extension> getAdditionalExtensions() {
+        return Arrays.asList(
+            (BeforeTestExecutionCallback) context -> {
+                KafkaClusterTestKit.Builder builder = new KafkaClusterTestKit.Builder(
+                    new TestKitNodes.Builder().
+                        setNumKip500BrokerNodes(clusterConfig.numBrokers()).
+                        setNumControllerNodes(clusterConfig.numControllers()).build());
+
+                // Copy properties into the TestKit builder
+                clusterConfig.serverProperties().forEach((key, value) -> builder.setConfigProp(key.toString(), value.toString()));
+                // TODO how to pass down security protocol and listener name?

Review comment:
       Maybe we can turn this into a JIRA?

##########
File path: core/src/test/java/kafka/testkit/Kip500BrokerNode.java
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import org.apache.kafka.common.Uuid;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class Kip500BrokerNode implements TestKitNode {
+    public static class Builder {
+        private int id = -1;
+        private Uuid incarnationId = null;
+        private String metadataDirectory = null;

Review comment:
       It looks like the builder doesn't provide a way to set this. On a side note, I wonder if we should mimic the configuration behavior more closely. If `metadata.log.dir` is not set explicitly, then we use the first listed data directory.

##########
File path: core/src/test/java/kafka/testkit/TestKitNodes.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.server.MetaProperties;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.network.ListenerName;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+public class TestKitNodes {
+    public static class Builder {
+        private Uuid clusterId = null;
+        private final NavigableMap<Integer, ControllerNode> controllerNodes = new TreeMap<>();
+        private final NavigableMap<Integer, Kip500BrokerNode> kip500BrokerNodes = new TreeMap<>();
+
+        public Builder setClusterId(Uuid clusterId) {
+            this.clusterId = clusterId;
+            return this;
+        }
+
+        public Builder addNodes(TestKitNode[] nodes) {
+            for (TestKitNode node : nodes) {
+                addNode(node);
+            }
+            return this;
+        }
+
+        public Builder addNode(TestKitNode node) {
+            if (node instanceof ControllerNode) {
+                ControllerNode controllerNode = (ControllerNode) node;
+                controllerNodes.put(node.id(), controllerNode);
+            } else if (node instanceof Kip500BrokerNode) {
+                Kip500BrokerNode brokerNode = (Kip500BrokerNode) node;
+                kip500BrokerNodes.put(node.id(), brokerNode);
+            } else {
+                throw new RuntimeException("Can't handle TestKitNode subclass " +
+                        node.getClass().getSimpleName());
+            }
+            return this;
+        }
+
+        public Builder setNumControllerNodes(int numControllerNodes) {
+            if (numControllerNodes < 0) {
+                throw new RuntimeException("Invalid negative value for numControllerNodes");
+            }
+            while (controllerNodes.size() > numControllerNodes) {

Review comment:
       nit: this loop is a little unconventional. Maybe we could use `pollFirstEntry` instead of the iterator? Similarly in `setNumKip500BrokerNodes`. 

##########
File path: core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.{Admin, NewTopic}
+import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
+import org.apache.kafka.metadata.BrokerState
+import org.junit.jupiter.api.{Test, Timeout}
+import org.junit.jupiter.api.Assertions._
+
+import java.util
+import java.util.Collections
+import java.util.concurrent.TimeUnit
+import scala.jdk.CollectionConverters._
+
+@Timeout(120000)
+class RaftClusterTest {
+
+  @Test
+  def testCreateClusterAndClose(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumKip500BrokerNodes(1).
+        setNumControllerNodes(1).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+    } finally {
+      cluster.close()
+    }
+  }
+
+  @Test
+  def testCreateClusterAndWaitForBrokerInRunningState(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumKip500BrokerNodes(3).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+      TestUtils.waitUntilTrue(() => cluster.kip500Brokers().get(0).currentState() == BrokerState.RUNNING,
+        "Broker never made it to RUNNING state.")
+      TestUtils.waitUntilTrue(() => cluster.raftManagers().get(0).kafkaRaftClient.leaderAndEpoch().leaderId.isPresent,
+      "RaftManager was not initialized.")
+      val admin = Admin.create(cluster.clientProperties())
+      try {
+        assertEquals(cluster.nodes().clusterId().toString,
+          admin.describeCluster().clusterId().get())
+      } finally {
+        admin.close()
+      }
+    } finally {
+      cluster.close()
+    }
+  }
+
+  @Test
+  def testCreateClusterAndCreateAndListTopic(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumKip500BrokerNodes(3).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+      cluster.waitForReadyBrokers()
+      TestUtils.waitUntilTrue(() => cluster.kip500Brokers().get(0).currentState() == BrokerState.RUNNING,
+        "Broker never made it to RUNNING state.")
+      TestUtils.waitUntilTrue(() => cluster.raftManagers().get(0).kafkaRaftClient.leaderAndEpoch().leaderId.isPresent,
+        "RaftManager was not initialized.")
+      val admin = Admin.create(cluster.clientProperties())
+      try {
+        // Create a test topic
+        val newTopic = Collections.singletonList(new NewTopic("test-topic", 1, 3.toShort))
+        val createTopicResult = admin.createTopics(newTopic)
+        createTopicResult.all().get(60, TimeUnit.SECONDS)
+
+        // List created topic
+        TestUtils.waitUntilTrue(() => {
+          val listTopicsResult = admin.listTopics()
+          val result = listTopicsResult.names().get(5, TimeUnit.SECONDS).size() == newTopic.size()
+          if (result) {
+            newTopic forEach(topic => {
+              assertTrue(listTopicsResult.names().get().contains(topic.name()))
+            })
+          }
+          result
+        }, "Topics created were not listed.")
+      } finally {
+        admin.close()
+      }
+    } finally {
+      cluster.close()
+    }
+  }
+
+  @Test
+  def testCreateClusterAndCreateAndManyTopics(): Unit = {

Review comment:
       Not sure there's much benefit having the single topic case covered separately. If it is, we can probably factor out a generalized helper. We can probably cover `testCreateClusterAndCreateAndManyTopicsWithManyPartitions` with the helper as well.

##########
File path: core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.raft.KafkaRaftManager;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.Server;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.server.MetaProperties;
+import kafka.tools.StorageTool;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.controller.Controller;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.raft.metadata.MetaLogRaftShim;
+import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@SuppressWarnings("deprecation") // Needed for Scala 2.12 compatibility
+public class KafkaClusterTestKit implements AutoCloseable {
+    private final static Logger log = LoggerFactory.getLogger(KafkaClusterTestKit.class);
+
+    /**
+     * This class manages a future which is completed with the proper value for
+     * controller.quorum.voters once the randomly assigned ports for all the controllers are
+     * known.
+     */
+    private static class ControllerQuorumVotersFutureManager implements AutoCloseable {
+        private final int expectedControllers;
+        private final CompletableFuture<List<String>> future = new CompletableFuture<>();

Review comment:
       nit: I think we could use a more convenient type, such as `Map<Integer, InetAddressSpec>`. Ultimately this just needs to make it down to `KafkaNetworkChannel.updateEndpoint` so the conversion to the config value is unnecessary.

##########
File path: core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.raft.KafkaRaftManager;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.Server;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.server.MetaProperties;
+import kafka.tools.StorageTool;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.controller.Controller;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.raft.metadata.MetaLogRaftShim;
+import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@SuppressWarnings("deprecation") // Needed for Scala 2.12 compatibility
+public class KafkaClusterTestKit implements AutoCloseable {
+    private final static Logger log = LoggerFactory.getLogger(KafkaClusterTestKit.class);
+
+    /**
+     * This class manages a future which is completed with the proper value for
+     * controller.quorum.voters once the randomly assigned ports for all the controllers are
+     * known.
+     */
+    private static class ControllerQuorumVotersFutureManager implements AutoCloseable {
+        private final int expectedControllers;
+        private final CompletableFuture<List<String>> future = new CompletableFuture<>();
+        private final Map<Integer, Integer> controllerPorts = new TreeMap<>();
+
+        ControllerQuorumVotersFutureManager(int expectedControllers) {
+            this.expectedControllers = expectedControllers;
+        }
+
+        synchronized void registerPort(int nodeId, int port) {
+            controllerPorts.put(nodeId, port);
+            if (controllerPorts.size() >= expectedControllers) {
+                future.complete(controllerPorts.entrySet().stream().
+                    map(e -> String.format("%d@localhost:%d", e.getKey(), e.getValue())).
+                    collect(Collectors.toList()));
+            }
+        }
+
+        void fail(Throwable e) {
+            future.completeExceptionally(e);
+        }
+
+        @Override
+        public void close() {
+            future.cancel(true);
+        }
+    }
+
+    public static class Builder {
+        private TestKitNodes nodes;
+        private Map<String, String> configProps = new HashMap<>();
+
+        public Builder(TestKitNodes nodes) {
+            this.nodes = nodes;
+        }
+
+        public Builder setConfigProp(String key, String value) {
+            this.configProps.put(key, value);
+            return this;
+        }
+
+        public KafkaClusterTestKit build() throws Exception {
+            Map<Integer, ControllerServer> controllers = new HashMap<>();
+            Map<Integer, BrokerServer> kip500Brokers = new HashMap<>();
+            Map<Integer, KafkaRaftManager> raftManagers = new HashMap<>();
+            String dummyQuorumVotersString = nodes.controllerNodes().keySet().stream().
+                map(controllerNode -> String.format("%d@0.0.0.0:0", controllerNode)).
+                collect(Collectors.joining(","));
+            /*
+              Number of threads = Total number of brokers + Total number of controllers + Total number of Raft Managers
+                                = Total number of brokers + Total number of controllers * 2
+                                  (Raft Manager per broker/controller)
+             */
+            int numOfExecutorThreads = (nodes.brokerNodes().size() + nodes.controllerNodes().size()) * 2;
+            ExecutorService executorService = null;
+            ControllerQuorumVotersFutureManager connectFutureManager =
+                new ControllerQuorumVotersFutureManager(nodes.controllerNodes().size());
+            File baseDirectory = null;
+
+            try {
+                baseDirectory = TestUtils.tempDirectory();
+                nodes = nodes.copyWithAbsolutePaths(baseDirectory.getAbsolutePath());
+                executorService = Executors.newFixedThreadPool(numOfExecutorThreads,
+                    ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", false));
+                Time time = Time.SYSTEM;

Review comment:
       nit: this seems unnecessary. We're already using the constant below anyway.

##########
File path: core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.raft.KafkaRaftManager;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.Server;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.server.MetaProperties;
+import kafka.tools.StorageTool;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.controller.Controller;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.raft.metadata.MetaLogRaftShim;
+import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@SuppressWarnings("deprecation") // Needed for Scala 2.12 compatibility
+public class KafkaClusterTestKit implements AutoCloseable {
+    private final static Logger log = LoggerFactory.getLogger(KafkaClusterTestKit.class);
+
+    /**
+     * This class manages a future which is completed with the proper value for
+     * controller.quorum.voters once the randomly assigned ports for all the controllers are
+     * known.
+     */
+    private static class ControllerQuorumVotersFutureManager implements AutoCloseable {
+        private final int expectedControllers;
+        private final CompletableFuture<List<String>> future = new CompletableFuture<>();
+        private final Map<Integer, Integer> controllerPorts = new TreeMap<>();
+
+        ControllerQuorumVotersFutureManager(int expectedControllers) {
+            this.expectedControllers = expectedControllers;
+        }
+
+        synchronized void registerPort(int nodeId, int port) {
+            controllerPorts.put(nodeId, port);
+            if (controllerPorts.size() >= expectedControllers) {
+                future.complete(controllerPorts.entrySet().stream().
+                    map(e -> String.format("%d@localhost:%d", e.getKey(), e.getValue())).
+                    collect(Collectors.toList()));
+            }
+        }
+
+        void fail(Throwable e) {
+            future.completeExceptionally(e);
+        }
+
+        @Override
+        public void close() {
+            future.cancel(true);
+        }
+    }
+
+    public static class Builder {
+        private TestKitNodes nodes;
+        private Map<String, String> configProps = new HashMap<>();
+
+        public Builder(TestKitNodes nodes) {
+            this.nodes = nodes;
+        }
+
+        public Builder setConfigProp(String key, String value) {
+            this.configProps.put(key, value);
+            return this;
+        }
+
+        public KafkaClusterTestKit build() throws Exception {
+            Map<Integer, ControllerServer> controllers = new HashMap<>();
+            Map<Integer, BrokerServer> kip500Brokers = new HashMap<>();
+            Map<Integer, KafkaRaftManager> raftManagers = new HashMap<>();
+            String dummyQuorumVotersString = nodes.controllerNodes().keySet().stream().
+                map(controllerNode -> String.format("%d@0.0.0.0:0", controllerNode)).
+                collect(Collectors.joining(","));
+            /*
+              Number of threads = Total number of brokers + Total number of controllers + Total number of Raft Managers
+                                = Total number of brokers + Total number of controllers * 2
+                                  (Raft Manager per broker/controller)
+             */
+            int numOfExecutorThreads = (nodes.brokerNodes().size() + nodes.controllerNodes().size()) * 2;
+            ExecutorService executorService = null;
+            ControllerQuorumVotersFutureManager connectFutureManager =
+                new ControllerQuorumVotersFutureManager(nodes.controllerNodes().size());
+            File baseDirectory = null;
+
+            try {
+                baseDirectory = TestUtils.tempDirectory();
+                nodes = nodes.copyWithAbsolutePaths(baseDirectory.getAbsolutePath());
+                executorService = Executors.newFixedThreadPool(numOfExecutorThreads,
+                    ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", false));
+                Time time = Time.SYSTEM;
+                for (ControllerNode node : nodes.controllerNodes().values()) {

Review comment:
       It might be nice to factor out a helper to build the controller and broker nodes. It would make it a little easier to process this method visually.

##########
File path: core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.raft.KafkaRaftManager;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.Server;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.server.MetaProperties;
+import kafka.tools.StorageTool;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.controller.Controller;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.raft.metadata.MetaLogRaftShim;
+import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@SuppressWarnings("deprecation") // Needed for Scala 2.12 compatibility
+public class KafkaClusterTestKit implements AutoCloseable {
+    private final static Logger log = LoggerFactory.getLogger(KafkaClusterTestKit.class);
+
+    /**
+     * This class manages a future which is completed with the proper value for
+     * controller.quorum.voters once the randomly assigned ports for all the controllers are
+     * known.
+     */
+    private static class ControllerQuorumVotersFutureManager implements AutoCloseable {
+        private final int expectedControllers;
+        private final CompletableFuture<List<String>> future = new CompletableFuture<>();
+        private final Map<Integer, Integer> controllerPorts = new TreeMap<>();
+
+        ControllerQuorumVotersFutureManager(int expectedControllers) {
+            this.expectedControllers = expectedControllers;
+        }
+
+        synchronized void registerPort(int nodeId, int port) {
+            controllerPorts.put(nodeId, port);
+            if (controllerPorts.size() >= expectedControllers) {
+                future.complete(controllerPorts.entrySet().stream().
+                    map(e -> String.format("%d@localhost:%d", e.getKey(), e.getValue())).
+                    collect(Collectors.toList()));
+            }
+        }
+
+        void fail(Throwable e) {
+            future.completeExceptionally(e);
+        }
+
+        @Override
+        public void close() {
+            future.cancel(true);
+        }
+    }
+
+    public static class Builder {
+        private TestKitNodes nodes;
+        private Map<String, String> configProps = new HashMap<>();
+
+        public Builder(TestKitNodes nodes) {
+            this.nodes = nodes;
+        }
+
+        public Builder setConfigProp(String key, String value) {
+            this.configProps.put(key, value);
+            return this;
+        }
+
+        public KafkaClusterTestKit build() throws Exception {
+            Map<Integer, ControllerServer> controllers = new HashMap<>();
+            Map<Integer, BrokerServer> kip500Brokers = new HashMap<>();
+            Map<Integer, KafkaRaftManager> raftManagers = new HashMap<>();
+            String dummyQuorumVotersString = nodes.controllerNodes().keySet().stream().
+                map(controllerNode -> String.format("%d@0.0.0.0:0", controllerNode)).
+                collect(Collectors.joining(","));
+            /*
+              Number of threads = Total number of brokers + Total number of controllers + Total number of Raft Managers
+                                = Total number of brokers + Total number of controllers * 2
+                                  (Raft Manager per broker/controller)
+             */
+            int numOfExecutorThreads = (nodes.brokerNodes().size() + nodes.controllerNodes().size()) * 2;
+            ExecutorService executorService = null;
+            ControllerQuorumVotersFutureManager connectFutureManager =
+                new ControllerQuorumVotersFutureManager(nodes.controllerNodes().size());
+            File baseDirectory = null;
+
+            try {
+                baseDirectory = TestUtils.tempDirectory();
+                nodes = nodes.copyWithAbsolutePaths(baseDirectory.getAbsolutePath());
+                executorService = Executors.newFixedThreadPool(numOfExecutorThreads,
+                    ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", false));
+                Time time = Time.SYSTEM;
+                for (ControllerNode node : nodes.controllerNodes().values()) {
+                    Map<String, String> props = new HashMap<>(configProps);
+                    props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "controller");
+                    props.put(KafkaConfig$.MODULE$.NodeIdProp(),
+                        Integer.toString(node.id()));
+                    props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(),
+                        node.metadataDirectory());
+                    props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
+                        "CONTROLLER:PLAINTEXT");
+                    props.put(KafkaConfig$.MODULE$.ListenersProp(),
+                        "CONTROLLER://localhost:0");
+                    props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
+                        "CONTROLLER");
+                    // Note: we can't accurately set controller.quorum.voters yet, since we don't
+                    // yet know what ports each controller will pick.  Set it to a dummy string \
+                    // for now as a placeholder.
+                    props.put(RaftConfig.QUORUM_VOTERS_CONFIG, dummyQuorumVotersString);

Review comment:
       nit: Instead of calling it `dummy` which makes it sound hacky, maybe we could call it `uninitializedQuorumVotersString` or something like that. We have tried to make configuring with the `0.0.0.0:0` endpoint an explicitly supported feature.

##########
File path: core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.raft.KafkaRaftManager;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.Server;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.server.MetaProperties;
+import kafka.tools.StorageTool;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.controller.Controller;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.raft.metadata.MetaLogRaftShim;
+import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@SuppressWarnings("deprecation") // Needed for Scala 2.12 compatibility
+public class KafkaClusterTestKit implements AutoCloseable {
+    private final static Logger log = LoggerFactory.getLogger(KafkaClusterTestKit.class);
+
+    /**
+     * This class manages a future which is completed with the proper value for
+     * controller.quorum.voters once the randomly assigned ports for all the controllers are
+     * known.
+     */
+    private static class ControllerQuorumVotersFutureManager implements AutoCloseable {
+        private final int expectedControllers;
+        private final CompletableFuture<List<String>> future = new CompletableFuture<>();
+        private final Map<Integer, Integer> controllerPorts = new TreeMap<>();
+
+        ControllerQuorumVotersFutureManager(int expectedControllers) {
+            this.expectedControllers = expectedControllers;
+        }
+
+        synchronized void registerPort(int nodeId, int port) {
+            controllerPorts.put(nodeId, port);
+            if (controllerPorts.size() >= expectedControllers) {
+                future.complete(controllerPorts.entrySet().stream().
+                    map(e -> String.format("%d@localhost:%d", e.getKey(), e.getValue())).
+                    collect(Collectors.toList()));
+            }
+        }
+
+        void fail(Throwable e) {
+            future.completeExceptionally(e);
+        }
+
+        @Override
+        public void close() {
+            future.cancel(true);
+        }
+    }
+
+    public static class Builder {
+        private TestKitNodes nodes;
+        private Map<String, String> configProps = new HashMap<>();
+
+        public Builder(TestKitNodes nodes) {
+            this.nodes = nodes;
+        }
+
+        public Builder setConfigProp(String key, String value) {
+            this.configProps.put(key, value);
+            return this;
+        }
+
+        public KafkaClusterTestKit build() throws Exception {
+            Map<Integer, ControllerServer> controllers = new HashMap<>();
+            Map<Integer, BrokerServer> kip500Brokers = new HashMap<>();
+            Map<Integer, KafkaRaftManager> raftManagers = new HashMap<>();
+            String dummyQuorumVotersString = nodes.controllerNodes().keySet().stream().
+                map(controllerNode -> String.format("%d@0.0.0.0:0", controllerNode)).
+                collect(Collectors.joining(","));
+            /*
+              Number of threads = Total number of brokers + Total number of controllers + Total number of Raft Managers
+                                = Total number of brokers + Total number of controllers * 2
+                                  (Raft Manager per broker/controller)
+             */
+            int numOfExecutorThreads = (nodes.brokerNodes().size() + nodes.controllerNodes().size()) * 2;
+            ExecutorService executorService = null;
+            ControllerQuorumVotersFutureManager connectFutureManager =
+                new ControllerQuorumVotersFutureManager(nodes.controllerNodes().size());
+            File baseDirectory = null;
+
+            try {
+                baseDirectory = TestUtils.tempDirectory();
+                nodes = nodes.copyWithAbsolutePaths(baseDirectory.getAbsolutePath());
+                executorService = Executors.newFixedThreadPool(numOfExecutorThreads,
+                    ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", false));
+                Time time = Time.SYSTEM;
+                for (ControllerNode node : nodes.controllerNodes().values()) {
+                    Map<String, String> props = new HashMap<>(configProps);
+                    props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "controller");
+                    props.put(KafkaConfig$.MODULE$.NodeIdProp(),
+                        Integer.toString(node.id()));
+                    props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(),
+                        node.metadataDirectory());
+                    props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
+                        "CONTROLLER:PLAINTEXT");
+                    props.put(KafkaConfig$.MODULE$.ListenersProp(),
+                        "CONTROLLER://localhost:0");
+                    props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
+                        "CONTROLLER");
+                    // Note: we can't accurately set controller.quorum.voters yet, since we don't
+                    // yet know what ports each controller will pick.  Set it to a dummy string \
+                    // for now as a placeholder.
+                    props.put(RaftConfig.QUORUM_VOTERS_CONFIG, dummyQuorumVotersString);
+                    setupNodeDirectories(baseDirectory, node.metadataDirectory(), Collections.emptyList());
+                    KafkaConfig config = new KafkaConfig(props, false, Option.empty());
+
+                    String threadNamePrefix = String.format("controller%d_", node.id());
+                    MetaProperties metaProperties = MetaProperties.apply(nodes.clusterId(), node.id());
+                    TopicPartition metadataPartition = new TopicPartition("@metadata", 0);
+                    KafkaRaftManager<ApiMessageAndVersion> raftManager = new KafkaRaftManager<>(
+                        metaProperties, config, new MetadataRecordSerde(), metadataPartition,
+                        Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), connectFutureManager.future);
+                    MetaLogManager metaLogShim = new MetaLogRaftShim(raftManager.kafkaRaftClient(), config.nodeId());
+                    ControllerServer controller = new ControllerServer(
+                        nodes.controllerProperties(node.id()),
+                        config,
+                        metaLogShim,
+                        raftManager,
+                        time,
+                        new Metrics(),
+                        Option.apply(threadNamePrefix),
+                        connectFutureManager.future
+                    );
+                    controllers.put(node.id(), controller);
+                    controller.socketServerFirstBoundPortFuture().whenComplete((port, e) -> {
+                        if (e != null) {
+                            connectFutureManager.fail(e);
+                        } else {
+                            connectFutureManager.registerPort(node.id(), port);
+                        }
+                    });
+                    raftManagers.put(node.id(), raftManager);
+                }
+                for (Kip500BrokerNode node : nodes.brokerNodes().values()) {

Review comment:
       By the way, I sort of feel it would make our lives easier if we used `KafkaRaftServer` directly instead of building the controller, broker, and raft managers ourselves. For one thing, that would make it trivial to support mixed mode. We don't have to do that here, but I'm kind of curious if there is a reason that we don't.

##########
File path: core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.raft.KafkaRaftManager;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.Server;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.server.MetaProperties;
+import kafka.tools.StorageTool;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.controller.Controller;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.raft.metadata.MetaLogRaftShim;
+import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@SuppressWarnings("deprecation") // Needed for Scala 2.12 compatibility
+public class KafkaClusterTestKit implements AutoCloseable {
+    private final static Logger log = LoggerFactory.getLogger(KafkaClusterTestKit.class);
+
+    /**
+     * This class manages a future which is completed with the proper value for
+     * controller.quorum.voters once the randomly assigned ports for all the controllers are
+     * known.
+     */
+    private static class ControllerQuorumVotersFutureManager implements AutoCloseable {
+        private final int expectedControllers;
+        private final CompletableFuture<List<String>> future = new CompletableFuture<>();
+        private final Map<Integer, Integer> controllerPorts = new TreeMap<>();
+
+        ControllerQuorumVotersFutureManager(int expectedControllers) {
+            this.expectedControllers = expectedControllers;
+        }
+
+        synchronized void registerPort(int nodeId, int port) {
+            controllerPorts.put(nodeId, port);
+            if (controllerPorts.size() >= expectedControllers) {
+                future.complete(controllerPorts.entrySet().stream().
+                    map(e -> String.format("%d@localhost:%d", e.getKey(), e.getValue())).
+                    collect(Collectors.toList()));
+            }
+        }
+
+        void fail(Throwable e) {
+            future.completeExceptionally(e);
+        }
+
+        @Override
+        public void close() {
+            future.cancel(true);
+        }
+    }
+
+    public static class Builder {
+        private TestKitNodes nodes;
+        private Map<String, String> configProps = new HashMap<>();
+
+        public Builder(TestKitNodes nodes) {
+            this.nodes = nodes;
+        }
+
+        public Builder setConfigProp(String key, String value) {
+            this.configProps.put(key, value);
+            return this;
+        }
+
+        public KafkaClusterTestKit build() throws Exception {
+            Map<Integer, ControllerServer> controllers = new HashMap<>();
+            Map<Integer, BrokerServer> kip500Brokers = new HashMap<>();
+            Map<Integer, KafkaRaftManager> raftManagers = new HashMap<>();
+            String dummyQuorumVotersString = nodes.controllerNodes().keySet().stream().
+                map(controllerNode -> String.format("%d@0.0.0.0:0", controllerNode)).
+                collect(Collectors.joining(","));
+            /*
+              Number of threads = Total number of brokers + Total number of controllers + Total number of Raft Managers
+                                = Total number of brokers + Total number of controllers * 2
+                                  (Raft Manager per broker/controller)
+             */
+            int numOfExecutorThreads = (nodes.brokerNodes().size() + nodes.controllerNodes().size()) * 2;
+            ExecutorService executorService = null;
+            ControllerQuorumVotersFutureManager connectFutureManager =
+                new ControllerQuorumVotersFutureManager(nodes.controllerNodes().size());
+            File baseDirectory = null;
+
+            try {
+                baseDirectory = TestUtils.tempDirectory();
+                nodes = nodes.copyWithAbsolutePaths(baseDirectory.getAbsolutePath());
+                executorService = Executors.newFixedThreadPool(numOfExecutorThreads,
+                    ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", false));
+                Time time = Time.SYSTEM;
+                for (ControllerNode node : nodes.controllerNodes().values()) {
+                    Map<String, String> props = new HashMap<>(configProps);
+                    props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "controller");
+                    props.put(KafkaConfig$.MODULE$.NodeIdProp(),
+                        Integer.toString(node.id()));
+                    props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(),
+                        node.metadataDirectory());
+                    props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
+                        "CONTROLLER:PLAINTEXT");
+                    props.put(KafkaConfig$.MODULE$.ListenersProp(),
+                        "CONTROLLER://localhost:0");
+                    props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
+                        "CONTROLLER");
+                    // Note: we can't accurately set controller.quorum.voters yet, since we don't
+                    // yet know what ports each controller will pick.  Set it to a dummy string \
+                    // for now as a placeholder.
+                    props.put(RaftConfig.QUORUM_VOTERS_CONFIG, dummyQuorumVotersString);
+                    setupNodeDirectories(baseDirectory, node.metadataDirectory(), Collections.emptyList());
+                    KafkaConfig config = new KafkaConfig(props, false, Option.empty());
+
+                    String threadNamePrefix = String.format("controller%d_", node.id());
+                    MetaProperties metaProperties = MetaProperties.apply(nodes.clusterId(), node.id());
+                    TopicPartition metadataPartition = new TopicPartition("@metadata", 0);
+                    KafkaRaftManager<ApiMessageAndVersion> raftManager = new KafkaRaftManager<>(
+                        metaProperties, config, new MetadataRecordSerde(), metadataPartition,
+                        Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), connectFutureManager.future);
+                    MetaLogManager metaLogShim = new MetaLogRaftShim(raftManager.kafkaRaftClient(), config.nodeId());
+                    ControllerServer controller = new ControllerServer(
+                        nodes.controllerProperties(node.id()),
+                        config,
+                        metaLogShim,
+                        raftManager,
+                        time,
+                        new Metrics(),
+                        Option.apply(threadNamePrefix),
+                        connectFutureManager.future
+                    );
+                    controllers.put(node.id(), controller);
+                    controller.socketServerFirstBoundPortFuture().whenComplete((port, e) -> {
+                        if (e != null) {
+                            connectFutureManager.fail(e);
+                        } else {
+                            connectFutureManager.registerPort(node.id(), port);
+                        }
+                    });
+                    raftManagers.put(node.id(), raftManager);
+                }
+                for (Kip500BrokerNode node : nodes.brokerNodes().values()) {
+                    Map<String, String> props = new HashMap<>(configProps);
+                    props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "broker");
+                    props.put(KafkaConfig$.MODULE$.BrokerIdProp(),
+                        Integer.toString(node.id()));
+                    props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(),
+                        node.metadataDirectory());
+                    props.put(KafkaConfig$.MODULE$.LogDirsProp(),
+                        String.join(",", node.logDataDirectories()));
+                    props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
+                        "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
+                    props.put(KafkaConfig$.MODULE$.ListenersProp(),
+                        "EXTERNAL://localhost:0");
+                    props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(),
+                        nodes.interBrokerListenerName().value());
+                    props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
+                        "CONTROLLER");
+
+                    setupNodeDirectories(baseDirectory, node.metadataDirectory(),
+                        node.logDataDirectories());
+
+                    // Just like above, we set a placeholder voter list here until we
+                    // find out what ports the controllers picked.
+                    props.put(RaftConfig.QUORUM_VOTERS_CONFIG, dummyQuorumVotersString);
+                    KafkaConfig config = new KafkaConfig(props, false, Option.empty());
+
+                    String threadNamePrefix = String.format("broker%d_", node.id());
+                    MetaProperties metaProperties = MetaProperties.apply(nodes.clusterId(), node.id());
+                    TopicPartition metadataPartition = new TopicPartition("@metadata", 0);
+                    KafkaRaftManager<ApiMessageAndVersion> raftManager = new KafkaRaftManager<>(
+                            metaProperties, config, new MetadataRecordSerde(), metadataPartition,
+                            Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), connectFutureManager.future);
+                    MetaLogManager metaLogShim = new MetaLogRaftShim(raftManager.kafkaRaftClient(), config.nodeId());
+                    BrokerServer broker = new BrokerServer(
+                        config,
+                        nodes.brokerProperties(node.id()),
+                        metaLogShim,
+                        time,
+                        new Metrics(),
+                        Option.apply(threadNamePrefix),
+                        JavaConverters.asScalaBuffer(Collections.<String>emptyList()).toSeq(),
+                        connectFutureManager.future,
+                        Server.SUPPORTED_FEATURES()
+                    );
+                    kip500Brokers.put(node.id(), broker);
+                    raftManagers.put(node.id(), raftManager);
+                }
+            } catch (Exception e) {
+                if (executorService != null) {
+                    executorService.shutdownNow();
+                    executorService.awaitTermination(1, TimeUnit.DAYS);

Review comment:
       Would we really wait one day for tests to shutdown? Maybe one minute is enough?

##########
File path: core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.raft.KafkaRaftManager;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.Server;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.server.MetaProperties;
+import kafka.tools.StorageTool;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.controller.Controller;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.raft.metadata.MetaLogRaftShim;
+import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@SuppressWarnings("deprecation") // Needed for Scala 2.12 compatibility
+public class KafkaClusterTestKit implements AutoCloseable {
+    private final static Logger log = LoggerFactory.getLogger(KafkaClusterTestKit.class);
+
+    /**
+     * This class manages a future which is completed with the proper value for
+     * controller.quorum.voters once the randomly assigned ports for all the controllers are
+     * known.
+     */
+    private static class ControllerQuorumVotersFutureManager implements AutoCloseable {
+        private final int expectedControllers;
+        private final CompletableFuture<List<String>> future = new CompletableFuture<>();
+        private final Map<Integer, Integer> controllerPorts = new TreeMap<>();
+
+        ControllerQuorumVotersFutureManager(int expectedControllers) {
+            this.expectedControllers = expectedControllers;
+        }
+
+        synchronized void registerPort(int nodeId, int port) {
+            controllerPorts.put(nodeId, port);
+            if (controllerPorts.size() >= expectedControllers) {
+                future.complete(controllerPorts.entrySet().stream().
+                    map(e -> String.format("%d@localhost:%d", e.getKey(), e.getValue())).
+                    collect(Collectors.toList()));
+            }
+        }
+
+        void fail(Throwable e) {
+            future.completeExceptionally(e);
+        }
+
+        @Override
+        public void close() {
+            future.cancel(true);
+        }
+    }
+
+    public static class Builder {
+        private TestKitNodes nodes;
+        private Map<String, String> configProps = new HashMap<>();
+
+        public Builder(TestKitNodes nodes) {
+            this.nodes = nodes;
+        }
+
+        public Builder setConfigProp(String key, String value) {
+            this.configProps.put(key, value);
+            return this;
+        }
+
+        public KafkaClusterTestKit build() throws Exception {
+            Map<Integer, ControllerServer> controllers = new HashMap<>();
+            Map<Integer, BrokerServer> kip500Brokers = new HashMap<>();
+            Map<Integer, KafkaRaftManager> raftManagers = new HashMap<>();
+            String dummyQuorumVotersString = nodes.controllerNodes().keySet().stream().
+                map(controllerNode -> String.format("%d@0.0.0.0:0", controllerNode)).
+                collect(Collectors.joining(","));
+            /*
+              Number of threads = Total number of brokers + Total number of controllers + Total number of Raft Managers
+                                = Total number of brokers + Total number of controllers * 2
+                                  (Raft Manager per broker/controller)
+             */
+            int numOfExecutorThreads = (nodes.brokerNodes().size() + nodes.controllerNodes().size()) * 2;
+            ExecutorService executorService = null;
+            ControllerQuorumVotersFutureManager connectFutureManager =
+                new ControllerQuorumVotersFutureManager(nodes.controllerNodes().size());
+            File baseDirectory = null;
+
+            try {
+                baseDirectory = TestUtils.tempDirectory();
+                nodes = nodes.copyWithAbsolutePaths(baseDirectory.getAbsolutePath());
+                executorService = Executors.newFixedThreadPool(numOfExecutorThreads,
+                    ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", false));
+                Time time = Time.SYSTEM;
+                for (ControllerNode node : nodes.controllerNodes().values()) {
+                    Map<String, String> props = new HashMap<>(configProps);
+                    props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "controller");
+                    props.put(KafkaConfig$.MODULE$.NodeIdProp(),
+                        Integer.toString(node.id()));
+                    props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(),
+                        node.metadataDirectory());
+                    props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
+                        "CONTROLLER:PLAINTEXT");
+                    props.put(KafkaConfig$.MODULE$.ListenersProp(),
+                        "CONTROLLER://localhost:0");
+                    props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
+                        "CONTROLLER");
+                    // Note: we can't accurately set controller.quorum.voters yet, since we don't
+                    // yet know what ports each controller will pick.  Set it to a dummy string \
+                    // for now as a placeholder.
+                    props.put(RaftConfig.QUORUM_VOTERS_CONFIG, dummyQuorumVotersString);
+                    setupNodeDirectories(baseDirectory, node.metadataDirectory(), Collections.emptyList());
+                    KafkaConfig config = new KafkaConfig(props, false, Option.empty());
+
+                    String threadNamePrefix = String.format("controller%d_", node.id());
+                    MetaProperties metaProperties = MetaProperties.apply(nodes.clusterId(), node.id());
+                    TopicPartition metadataPartition = new TopicPartition("@metadata", 0);
+                    KafkaRaftManager<ApiMessageAndVersion> raftManager = new KafkaRaftManager<>(
+                        metaProperties, config, new MetadataRecordSerde(), metadataPartition,
+                        Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), connectFutureManager.future);
+                    MetaLogManager metaLogShim = new MetaLogRaftShim(raftManager.kafkaRaftClient(), config.nodeId());
+                    ControllerServer controller = new ControllerServer(
+                        nodes.controllerProperties(node.id()),
+                        config,
+                        metaLogShim,
+                        raftManager,
+                        time,
+                        new Metrics(),
+                        Option.apply(threadNamePrefix),
+                        connectFutureManager.future
+                    );
+                    controllers.put(node.id(), controller);
+                    controller.socketServerFirstBoundPortFuture().whenComplete((port, e) -> {
+                        if (e != null) {
+                            connectFutureManager.fail(e);
+                        } else {
+                            connectFutureManager.registerPort(node.id(), port);
+                        }
+                    });
+                    raftManagers.put(node.id(), raftManager);
+                }
+                for (Kip500BrokerNode node : nodes.brokerNodes().values()) {
+                    Map<String, String> props = new HashMap<>(configProps);
+                    props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "broker");
+                    props.put(KafkaConfig$.MODULE$.BrokerIdProp(),
+                        Integer.toString(node.id()));
+                    props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(),
+                        node.metadataDirectory());
+                    props.put(KafkaConfig$.MODULE$.LogDirsProp(),
+                        String.join(",", node.logDataDirectories()));
+                    props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
+                        "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
+                    props.put(KafkaConfig$.MODULE$.ListenersProp(),
+                        "EXTERNAL://localhost:0");
+                    props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(),
+                        nodes.interBrokerListenerName().value());
+                    props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
+                        "CONTROLLER");
+
+                    setupNodeDirectories(baseDirectory, node.metadataDirectory(),
+                        node.logDataDirectories());
+
+                    // Just like above, we set a placeholder voter list here until we
+                    // find out what ports the controllers picked.
+                    props.put(RaftConfig.QUORUM_VOTERS_CONFIG, dummyQuorumVotersString);
+                    KafkaConfig config = new KafkaConfig(props, false, Option.empty());
+
+                    String threadNamePrefix = String.format("broker%d_", node.id());
+                    MetaProperties metaProperties = MetaProperties.apply(nodes.clusterId(), node.id());
+                    TopicPartition metadataPartition = new TopicPartition("@metadata", 0);
+                    KafkaRaftManager<ApiMessageAndVersion> raftManager = new KafkaRaftManager<>(
+                            metaProperties, config, new MetadataRecordSerde(), metadataPartition,
+                            Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), connectFutureManager.future);
+                    MetaLogManager metaLogShim = new MetaLogRaftShim(raftManager.kafkaRaftClient(), config.nodeId());
+                    BrokerServer broker = new BrokerServer(
+                        config,
+                        nodes.brokerProperties(node.id()),
+                        metaLogShim,
+                        time,
+                        new Metrics(),
+                        Option.apply(threadNamePrefix),
+                        JavaConverters.asScalaBuffer(Collections.<String>emptyList()).toSeq(),
+                        connectFutureManager.future,
+                        Server.SUPPORTED_FEATURES()
+                    );
+                    kip500Brokers.put(node.id(), broker);
+                    raftManagers.put(node.id(), raftManager);
+                }
+            } catch (Exception e) {
+                if (executorService != null) {
+                    executorService.shutdownNow();
+                    executorService.awaitTermination(1, TimeUnit.DAYS);
+                }
+                for (ControllerServer controller : controllers.values()) {
+                    controller.shutdown();
+                }
+                for (BrokerServer brokerServer : kip500Brokers.values()) {
+                    brokerServer.shutdown();
+                }
+                for (KafkaRaftManager raftManager : raftManagers.values()) {
+                    raftManager.shutdown();
+                }
+                connectFutureManager.close();
+                if (baseDirectory != null) {
+                    Utils.delete(baseDirectory);
+                }
+                throw e;
+            }
+            return new KafkaClusterTestKit(executorService, nodes, controllers,
+                kip500Brokers, raftManagers, connectFutureManager, baseDirectory);
+        }
+
+        static private void setupNodeDirectories(File baseDirectory,
+                                                 String metadataDirectory,
+                                                 Collection<String> logDataDirectories) throws Exception {
+            Files.createDirectories(new File(baseDirectory, "local").toPath());
+            Files.createDirectories(Paths.get(metadataDirectory));
+            for (String logDataDirectory : logDataDirectories) {
+                Files.createDirectories(Paths.get(logDataDirectory));
+            }
+        }
+    }
+
+    private final ExecutorService executorService;
+    private final TestKitNodes nodes;
+    private final Map<Integer, ControllerServer> controllers;
+    private final Map<Integer, BrokerServer> kip500Brokers;
+    private final Map<Integer, KafkaRaftManager> raftManagers;
+    private final ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager;
+    private final File baseDirectory;
+
+    private KafkaClusterTestKit(ExecutorService executorService,
+                                TestKitNodes nodes,
+                                Map<Integer, ControllerServer> controllers,
+                                Map<Integer, BrokerServer> kip500Brokers,
+                                Map<Integer, KafkaRaftManager> raftManagers,
+                                ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager,
+                                File baseDirectory) {
+        this.executorService = executorService;
+        this.nodes = nodes;
+        this.controllers = controllers;
+        this.kip500Brokers = kip500Brokers;
+        this.raftManagers = raftManagers;
+        this.controllerQuorumVotersFutureManager = controllerQuorumVotersFutureManager;
+        this.baseDirectory = baseDirectory;
+    }
+
+    public void format() throws Exception {
+        List<Future<?>> futures = new ArrayList<>();
+        try {
+            for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) {
+                int nodeId = entry.getKey();

Review comment:
       We really ought to be able to factor out a helper here. The code looks identical for both the controller and broker cases.

##########
File path: core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.raft.KafkaRaftManager;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.Server;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.server.MetaProperties;
+import kafka.tools.StorageTool;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.controller.Controller;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.raft.metadata.MetaLogRaftShim;
+import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@SuppressWarnings("deprecation") // Needed for Scala 2.12 compatibility
+public class KafkaClusterTestKit implements AutoCloseable {
+    private final static Logger log = LoggerFactory.getLogger(KafkaClusterTestKit.class);
+
+    /**
+     * This class manages a future which is completed with the proper value for
+     * controller.quorum.voters once the randomly assigned ports for all the controllers are
+     * known.
+     */
+    private static class ControllerQuorumVotersFutureManager implements AutoCloseable {
+        private final int expectedControllers;
+        private final CompletableFuture<List<String>> future = new CompletableFuture<>();
+        private final Map<Integer, Integer> controllerPorts = new TreeMap<>();
+
+        ControllerQuorumVotersFutureManager(int expectedControllers) {
+            this.expectedControllers = expectedControllers;
+        }
+
+        synchronized void registerPort(int nodeId, int port) {
+            controllerPorts.put(nodeId, port);
+            if (controllerPorts.size() >= expectedControllers) {
+                future.complete(controllerPorts.entrySet().stream().
+                    map(e -> String.format("%d@localhost:%d", e.getKey(), e.getValue())).
+                    collect(Collectors.toList()));
+            }
+        }
+
+        void fail(Throwable e) {
+            future.completeExceptionally(e);
+        }
+
+        @Override
+        public void close() {
+            future.cancel(true);
+        }
+    }
+
+    public static class Builder {
+        private TestKitNodes nodes;
+        private Map<String, String> configProps = new HashMap<>();
+
+        public Builder(TestKitNodes nodes) {
+            this.nodes = nodes;
+        }
+
+        public Builder setConfigProp(String key, String value) {
+            this.configProps.put(key, value);
+            return this;
+        }
+
+        public KafkaClusterTestKit build() throws Exception {
+            Map<Integer, ControllerServer> controllers = new HashMap<>();
+            Map<Integer, BrokerServer> kip500Brokers = new HashMap<>();
+            Map<Integer, KafkaRaftManager> raftManagers = new HashMap<>();
+            String dummyQuorumVotersString = nodes.controllerNodes().keySet().stream().
+                map(controllerNode -> String.format("%d@0.0.0.0:0", controllerNode)).
+                collect(Collectors.joining(","));
+            /*
+              Number of threads = Total number of brokers + Total number of controllers + Total number of Raft Managers
+                                = Total number of brokers + Total number of controllers * 2
+                                  (Raft Manager per broker/controller)
+             */
+            int numOfExecutorThreads = (nodes.brokerNodes().size() + nodes.controllerNodes().size()) * 2;
+            ExecutorService executorService = null;
+            ControllerQuorumVotersFutureManager connectFutureManager =
+                new ControllerQuorumVotersFutureManager(nodes.controllerNodes().size());
+            File baseDirectory = null;
+
+            try {
+                baseDirectory = TestUtils.tempDirectory();
+                nodes = nodes.copyWithAbsolutePaths(baseDirectory.getAbsolutePath());
+                executorService = Executors.newFixedThreadPool(numOfExecutorThreads,
+                    ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", false));
+                Time time = Time.SYSTEM;
+                for (ControllerNode node : nodes.controllerNodes().values()) {
+                    Map<String, String> props = new HashMap<>(configProps);
+                    props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "controller");
+                    props.put(KafkaConfig$.MODULE$.NodeIdProp(),
+                        Integer.toString(node.id()));
+                    props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(),
+                        node.metadataDirectory());
+                    props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
+                        "CONTROLLER:PLAINTEXT");
+                    props.put(KafkaConfig$.MODULE$.ListenersProp(),
+                        "CONTROLLER://localhost:0");
+                    props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
+                        "CONTROLLER");
+                    // Note: we can't accurately set controller.quorum.voters yet, since we don't
+                    // yet know what ports each controller will pick.  Set it to a dummy string \
+                    // for now as a placeholder.
+                    props.put(RaftConfig.QUORUM_VOTERS_CONFIG, dummyQuorumVotersString);
+                    setupNodeDirectories(baseDirectory, node.metadataDirectory(), Collections.emptyList());
+                    KafkaConfig config = new KafkaConfig(props, false, Option.empty());
+
+                    String threadNamePrefix = String.format("controller%d_", node.id());
+                    MetaProperties metaProperties = MetaProperties.apply(nodes.clusterId(), node.id());
+                    TopicPartition metadataPartition = new TopicPartition("@metadata", 0);

Review comment:
       I don't suppose we could reference the constant defined in `KafkaRaftServer`?

##########
File path: core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.{Admin, NewTopic}
+import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
+import org.apache.kafka.metadata.BrokerState
+import org.junit.jupiter.api.{Test, Timeout}
+import org.junit.jupiter.api.Assertions._
+
+import java.util
+import java.util.Collections
+import java.util.concurrent.TimeUnit
+import scala.jdk.CollectionConverters._
+
+@Timeout(120000)
+class RaftClusterTest {
+
+  @Test
+  def testCreateClusterAndClose(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumKip500BrokerNodes(1).
+        setNumControllerNodes(1).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+    } finally {
+      cluster.close()
+    }
+  }
+
+  @Test
+  def testCreateClusterAndWaitForBrokerInRunningState(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumKip500BrokerNodes(3).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+      TestUtils.waitUntilTrue(() => cluster.kip500Brokers().get(0).currentState() == BrokerState.RUNNING,
+        "Broker never made it to RUNNING state.")
+      TestUtils.waitUntilTrue(() => cluster.raftManagers().get(0).kafkaRaftClient.leaderAndEpoch().leaderId.isPresent,
+      "RaftManager was not initialized.")

Review comment:
       nit: alignment looks off

##########
File path: core/src/test/resources/log4j.properties
##########
@@ -18,8 +18,8 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
 
-log4j.logger.kafka=ERROR
-log4j.logger.org.apache.kafka=ERROR
+log4j.logger.kafka=INFO

Review comment:
       Guessing this was just added for debugging.

##########
File path: core/src/test/java/kafka/testkit/TestKitNodes.java
##########
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.server.MetaProperties;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.network.ListenerName;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+public class TestKitNodes {
+    public static class Builder {
+        private Uuid clusterId = null;
+        private final NavigableMap<Integer, ControllerNode> controllerNodes = new TreeMap<>();

Review comment:
       It looks like we don't support mixed mode testing. That seems worth a follow-up JIRA. It is definitely an interesting case from the perspective of the raft implementation since it involves two listeners.

##########
File path: core/src/test/scala/integration/kafka/server/RaftClusterTest.scala
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.admin.{Admin, NewTopic}
+import org.apache.kafka.common.quota.{ClientQuotaAlteration, ClientQuotaEntity, ClientQuotaFilter, ClientQuotaFilterComponent}
+import org.apache.kafka.metadata.BrokerState
+import org.junit.jupiter.api.{Test, Timeout}
+import org.junit.jupiter.api.Assertions._
+
+import java.util
+import java.util.Collections
+import java.util.concurrent.TimeUnit
+import scala.jdk.CollectionConverters._
+
+@Timeout(120000)
+class RaftClusterTest {
+
+  @Test
+  def testCreateClusterAndClose(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumKip500BrokerNodes(1).
+        setNumControllerNodes(1).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+    } finally {
+      cluster.close()
+    }
+  }
+
+  @Test
+  def testCreateClusterAndWaitForBrokerInRunningState(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumKip500BrokerNodes(3).
+        setNumControllerNodes(3).build()).build()
+    try {
+      cluster.format()
+      cluster.startup()
+      TestUtils.waitUntilTrue(() => cluster.kip500Brokers().get(0).currentState() == BrokerState.RUNNING,
+        "Broker never made it to RUNNING state.")
+      TestUtils.waitUntilTrue(() => cluster.raftManagers().get(0).kafkaRaftClient.leaderAndEpoch().leaderId.isPresent,
+      "RaftManager was not initialized.")
+      val admin = Admin.create(cluster.clientProperties())
+      try {
+        assertEquals(cluster.nodes().clusterId().toString,
+          admin.describeCluster().clusterId().get())
+      } finally {
+        admin.close()
+      }
+    } finally {
+      cluster.close()
+    }
+  }
+
+  @Test
+  def testCreateClusterAndCreateAndListTopic(): Unit = {

Review comment:
       Maybe we can let this cover topic deletion as well

##########
File path: core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
##########
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.raft.KafkaRaftManager;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.Server;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.server.MetaProperties;
+import kafka.tools.StorageTool;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.controller.Controller;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.raft.metadata.MetaLogRaftShim;
+import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+
+@SuppressWarnings("deprecation") // Needed for Scala 2.12 compatibility
+public class KafkaClusterTestKit implements AutoCloseable {
+    private final static Logger log = LoggerFactory.getLogger(KafkaClusterTestKit.class);
+
+    /**
+     * This class manages a future which is completed with the proper value for
+     * controller.quorum.voters once the randomly assigned ports for all the controllers are
+     * known.
+     */
+    private static class ControllerQuorumVotersFutureManager implements AutoCloseable {
+        private final int expectedControllers;
+        private final CompletableFuture<List<String>> future = new CompletableFuture<>();
+        private final Map<Integer, Integer> controllerPorts = new TreeMap<>();
+
+        ControllerQuorumVotersFutureManager(int expectedControllers) {
+            this.expectedControllers = expectedControllers;
+        }
+
+        synchronized void registerPort(int nodeId, int port) {
+            controllerPorts.put(nodeId, port);
+            if (controllerPorts.size() >= expectedControllers) {
+                future.complete(controllerPorts.entrySet().stream().
+                    map(e -> String.format("%d@localhost:%d", e.getKey(), e.getValue())).
+                    collect(Collectors.toList()));
+            }
+        }
+
+        void fail(Throwable e) {
+            future.completeExceptionally(e);
+        }
+
+        @Override
+        public void close() {
+            future.cancel(true);
+        }
+    }
+
+    public static class Builder {
+        private TestKitNodes nodes;
+        private Map<String, String> configProps = new HashMap<>();
+
+        public Builder(TestKitNodes nodes) {
+            this.nodes = nodes;
+        }
+
+        public Builder setConfigProp(String key, String value) {
+            this.configProps.put(key, value);
+            return this;
+        }
+
+        public KafkaClusterTestKit build() throws Exception {
+            Map<Integer, ControllerServer> controllers = new HashMap<>();
+            Map<Integer, BrokerServer> kip500Brokers = new HashMap<>();
+            Map<Integer, KafkaRaftManager> raftManagers = new HashMap<>();
+            String dummyQuorumVotersString = nodes.controllerNodes().keySet().stream().

Review comment:
       Just want to point out that this assumes all controllers are voters. It would be worth a follow-up to support controllers as observers as well.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10220: KAFKA-12383: Get RaftClusterTest.java and other KIP-500 junit tests working

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10220:
URL: https://github.com/apache/kafka/pull/10220#discussion_r597825699



##########
File path: core/src/test/java/kafka/test/annotation/Type.java
##########
@@ -17,12 +17,43 @@
 
 package kafka.test.annotation;
 
+import kafka.test.ClusterConfig;
+import kafka.test.junit.RaftClusterInvocationContext;
+import kafka.test.junit.ZkClusterInvocationContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+
+import java.util.function.Consumer;
+
 /**
  * The type of cluster config being requested. Used by {@link kafka.test.ClusterConfig} and the test annotations.
  */
 public enum Type {
-    // RAFT,
-    ZK,
-    BOTH,
-    DEFAULT
+    RAFT {
+        @Override
+        public void invocationContexts(ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
+            invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf()));
+        }
+    },
+    ZK {
+        @Override
+        public void invocationContexts(ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
+            invocationConsumer.accept(new ZkClusterInvocationContext(config.copyOf()));
+        }
+    },
+    BOTH {
+        @Override
+        public void invocationContexts(ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
+            invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf()));
+            invocationConsumer.accept(new ZkClusterInvocationContext(config.copyOf()));
+

Review comment:
       nit: unneeded newline

##########
File path: core/src/test/java/kafka/test/annotation/Type.java
##########
@@ -17,12 +17,43 @@
 
 package kafka.test.annotation;
 
+import kafka.test.ClusterConfig;
+import kafka.test.junit.RaftClusterInvocationContext;
+import kafka.test.junit.ZkClusterInvocationContext;
+import org.junit.jupiter.api.extension.TestTemplateInvocationContext;
+
+import java.util.function.Consumer;
+
 /**
  * The type of cluster config being requested. Used by {@link kafka.test.ClusterConfig} and the test annotations.
  */
 public enum Type {
-    // RAFT,
-    ZK,
-    BOTH,
-    DEFAULT
+    RAFT {
+        @Override
+        public void invocationContexts(ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
+            invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf()));
+        }
+    },
+    ZK {
+        @Override
+        public void invocationContexts(ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
+            invocationConsumer.accept(new ZkClusterInvocationContext(config.copyOf()));
+        }
+    },
+    BOTH {
+        @Override
+        public void invocationContexts(ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
+            invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf()));
+            invocationConsumer.accept(new ZkClusterInvocationContext(config.copyOf()));
+
+        }
+    },
+    DEFAULT {
+        @Override
+        public void invocationContexts(ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
+            throw new IllegalStateException("Cannot create invocation contexts for DEFAULT type");

Review comment:
       nit: maybe `UnsupportedOperationException`?

##########
File path: core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.raft.KafkaRaftManager;
+import kafka.server.BrokerServer;
+import kafka.server.ControllerServer;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.KafkaRaftServer;
+import kafka.server.MetaProperties;
+import kafka.server.Server;
+import kafka.tools.StorageTool;
+import kafka.utils.Logging;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.Node;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.ThreadUtils;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.controller.Controller;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import org.apache.kafka.metalog.MetaLogManager;
+import org.apache.kafka.raft.RaftConfig;
+import org.apache.kafka.raft.metadata.MetaLogRaftShim;
+import org.apache.kafka.raft.metadata.MetadataRecordSerde;
+import org.apache.kafka.test.TestUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import scala.Option;
+import scala.collection.JavaConverters;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.net.InetSocketAddress;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+
+@SuppressWarnings("deprecation") // Needed for Scala 2.12 compatibility
+public class KafkaClusterTestKit implements AutoCloseable {
+    private final static Logger log = LoggerFactory.getLogger(KafkaClusterTestKit.class);
+
+    /**
+     * This class manages a future which is completed with the proper value for
+     * controller.quorum.voters once the randomly assigned ports for all the controllers are
+     * known.
+     */
+    private static class ControllerQuorumVotersFutureManager implements AutoCloseable {
+        private final int expectedControllers;
+        private final CompletableFuture<Map<Integer, RaftConfig.AddressSpec>> future = new CompletableFuture<>();
+        private final Map<Integer, Integer> controllerPorts = new TreeMap<>();
+
+        ControllerQuorumVotersFutureManager(int expectedControllers) {
+            this.expectedControllers = expectedControllers;
+        }
+
+        synchronized void registerPort(int nodeId, int port) {
+            controllerPorts.put(nodeId, port);
+            if (controllerPorts.size() >= expectedControllers) {
+                future.complete(controllerPorts.entrySet().stream().
+                    collect(Collectors.toMap(
+                        Map.Entry::getKey,
+                        entry -> new RaftConfig.InetAddressSpec(new InetSocketAddress("localhost", entry.getValue()))
+                    )));
+            }
+        }
+
+        void fail(Throwable e) {
+            future.completeExceptionally(e);
+        }
+
+        @Override
+        public void close() {
+            future.cancel(true);
+        }
+    }
+
+    public static class Builder {
+        private TestKitNodes nodes;
+        private Map<String, String> configProps = new HashMap<>();
+
+        public Builder(TestKitNodes nodes) {
+            this.nodes = nodes;
+        }
+
+        public Builder setConfigProp(String key, String value) {
+            this.configProps.put(key, value);
+            return this;
+        }
+
+        public KafkaClusterTestKit build() throws Exception {
+            Map<Integer, ControllerServer> controllers = new HashMap<>();
+            Map<Integer, BrokerServer> kip500Brokers = new HashMap<>();
+            Map<Integer, KafkaRaftManager> raftManagers = new HashMap<>();
+            String uninitializedQuorumVotersString = nodes.controllerNodes().keySet().stream().
+                map(controllerNode -> String.format("%d@0.0.0.0:0", controllerNode)).
+                collect(Collectors.joining(","));
+            /*
+              Number of threads = Total number of brokers + Total number of controllers + Total number of Raft Managers
+                                = Total number of brokers + Total number of controllers * 2
+                                  (Raft Manager per broker/controller)
+             */
+            int numOfExecutorThreads = (nodes.brokerNodes().size() + nodes.controllerNodes().size()) * 2;
+            ExecutorService executorService = null;
+            ControllerQuorumVotersFutureManager connectFutureManager =
+                new ControllerQuorumVotersFutureManager(nodes.controllerNodes().size());
+            File baseDirectory = null;
+
+            try {
+                baseDirectory = TestUtils.tempDirectory();
+                nodes = nodes.copyWithAbsolutePaths(baseDirectory.getAbsolutePath());
+                executorService = Executors.newFixedThreadPool(numOfExecutorThreads,
+                    ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", false));
+                for (ControllerNode node : nodes.controllerNodes().values()) {
+                    Map<String, String> props = new HashMap<>(configProps);
+                    props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "controller");
+                    props.put(KafkaConfig$.MODULE$.NodeIdProp(),
+                        Integer.toString(node.id()));
+                    props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(),
+                        node.metadataDirectory());
+                    props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
+                        "CONTROLLER:PLAINTEXT");
+                    props.put(KafkaConfig$.MODULE$.ListenersProp(),
+                        "CONTROLLER://localhost:0");
+                    props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
+                        "CONTROLLER");
+                    // Note: we can't accurately set controller.quorum.voters yet, since we don't
+                    // yet know what ports each controller will pick.  Set it to a dummy string \
+                    // for now as a placeholder.
+                    props.put(RaftConfig.QUORUM_VOTERS_CONFIG, uninitializedQuorumVotersString);
+                    setupNodeDirectories(baseDirectory, node.metadataDirectory(), Collections.emptyList());
+                    KafkaConfig config = new KafkaConfig(props, false, Option.empty());
+
+                    String threadNamePrefix = String.format("controller%d_", node.id());
+                    MetaProperties metaProperties = MetaProperties.apply(nodes.clusterId(), node.id());
+                    TopicPartition metadataPartition = new TopicPartition(KafkaRaftServer.MetadataTopic(), 0);
+                    KafkaRaftManager<ApiMessageAndVersion> raftManager = new KafkaRaftManager<>(
+                        metaProperties, config, new MetadataRecordSerde(), metadataPartition,
+                        Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), connectFutureManager.future);
+                    MetaLogManager metaLogShim = new MetaLogRaftShim(raftManager.kafkaRaftClient(), config.nodeId());
+                    ControllerServer controller = new ControllerServer(
+                        nodes.controllerProperties(node.id()),
+                        config,
+                        metaLogShim,
+                        raftManager,
+                        Time.SYSTEM,
+                        new Metrics(),
+                        Option.apply(threadNamePrefix),
+                        connectFutureManager.future
+                    );
+                    controllers.put(node.id(), controller);
+                    controller.socketServerFirstBoundPortFuture().whenComplete((port, e) -> {
+                        if (e != null) {
+                            connectFutureManager.fail(e);
+                        } else {
+                            connectFutureManager.registerPort(node.id(), port);
+                        }
+                    });
+                    raftManagers.put(node.id(), raftManager);
+                }
+                for (BrokerNode node : nodes.brokerNodes().values()) {
+                    Map<String, String> props = new HashMap<>(configProps);
+                    props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "broker");
+                    props.put(KafkaConfig$.MODULE$.BrokerIdProp(),
+                        Integer.toString(node.id()));
+                    props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(),
+                        node.metadataDirectory());
+                    props.put(KafkaConfig$.MODULE$.LogDirsProp(),
+                        String.join(",", node.logDataDirectories()));
+                    props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
+                        "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
+                    props.put(KafkaConfig$.MODULE$.ListenersProp(),
+                        "EXTERNAL://localhost:0");
+                    props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(),
+                        nodes.interBrokerListenerName().value());
+                    props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
+                        "CONTROLLER");
+
+                    setupNodeDirectories(baseDirectory, node.metadataDirectory(),
+                        node.logDataDirectories());
+
+                    // Just like above, we set a placeholder voter list here until we
+                    // find out what ports the controllers picked.
+                    props.put(RaftConfig.QUORUM_VOTERS_CONFIG, uninitializedQuorumVotersString);
+                    KafkaConfig config = new KafkaConfig(props, false, Option.empty());
+
+                    String threadNamePrefix = String.format("broker%d_", node.id());
+                    MetaProperties metaProperties = MetaProperties.apply(nodes.clusterId(), node.id());
+                    TopicPartition metadataPartition = new TopicPartition(KafkaRaftServer.MetadataTopic(), 0);
+                    KafkaRaftManager<ApiMessageAndVersion> raftManager = new KafkaRaftManager<>(
+                            metaProperties, config, new MetadataRecordSerde(), metadataPartition,
+                            Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), connectFutureManager.future);
+                    MetaLogManager metaLogShim = new MetaLogRaftShim(raftManager.kafkaRaftClient(), config.nodeId());
+                    BrokerServer broker = new BrokerServer(
+                        config,
+                        nodes.brokerProperties(node.id()),
+                        metaLogShim,
+                        Time.SYSTEM,
+                        new Metrics(),
+                        Option.apply(threadNamePrefix),
+                        JavaConverters.asScalaBuffer(Collections.<String>emptyList()).toSeq(),
+                        connectFutureManager.future,
+                        Server.SUPPORTED_FEATURES()
+                    );
+                    kip500Brokers.put(node.id(), broker);
+                    raftManagers.put(node.id(), raftManager);
+                }
+            } catch (Exception e) {
+                if (executorService != null) {
+                    executorService.shutdownNow();
+                    executorService.awaitTermination(5, TimeUnit.MINUTES);
+                }
+                for (ControllerServer controller : controllers.values()) {
+                    controller.shutdown();
+                }
+                for (BrokerServer brokerServer : kip500Brokers.values()) {
+                    brokerServer.shutdown();
+                }
+                for (KafkaRaftManager raftManager : raftManagers.values()) {
+                    raftManager.shutdown();
+                }
+                connectFutureManager.close();
+                if (baseDirectory != null) {
+                    Utils.delete(baseDirectory);
+                }
+                throw e;
+            }
+            return new KafkaClusterTestKit(executorService, nodes, controllers,
+                kip500Brokers, raftManagers, connectFutureManager, baseDirectory);
+        }
+
+        static private void setupNodeDirectories(File baseDirectory,
+                                                 String metadataDirectory,
+                                                 Collection<String> logDataDirectories) throws Exception {
+            Files.createDirectories(new File(baseDirectory, "local").toPath());
+            Files.createDirectories(Paths.get(metadataDirectory));
+            for (String logDataDirectory : logDataDirectories) {
+                Files.createDirectories(Paths.get(logDataDirectory));
+            }
+        }
+    }
+
+    private final ExecutorService executorService;
+    private final TestKitNodes nodes;
+    private final Map<Integer, ControllerServer> controllers;
+    private final Map<Integer, BrokerServer> kip500Brokers;
+    private final Map<Integer, KafkaRaftManager> raftManagers;
+    private final ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager;
+    private final File baseDirectory;
+
+    private KafkaClusterTestKit(ExecutorService executorService,
+                                TestKitNodes nodes,
+                                Map<Integer, ControllerServer> controllers,
+                                Map<Integer, BrokerServer> kip500Brokers,
+                                Map<Integer, KafkaRaftManager> raftManagers,
+                                ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager,
+                                File baseDirectory) {
+        this.executorService = executorService;
+        this.nodes = nodes;
+        this.controllers = controllers;
+        this.kip500Brokers = kip500Brokers;
+        this.raftManagers = raftManagers;
+        this.controllerQuorumVotersFutureManager = controllerQuorumVotersFutureManager;
+        this.baseDirectory = baseDirectory;
+    }
+
+    public void format() throws Exception {
+        List<Future<?>> futures = new ArrayList<>();
+        try {
+            for (Entry<Integer, ControllerServer> entry : controllers.entrySet()) {
+                int nodeId = entry.getKey();
+                ControllerServer controller = entry.getValue();
+                formatNodeAndLog(nodes.controllerProperties(nodeId), controller.config().metadataLogDir(),
+                    controller, futures::add);
+            }
+            for (Entry<Integer, BrokerServer> entry : kip500Brokers.entrySet()) {
+                int nodeId = entry.getKey();
+                BrokerServer broker = entry.getValue();
+                formatNodeAndLog(nodes.brokerProperties(nodeId), broker.config().metadataLogDir(),
+                    broker, futures::add);
+            }
+            for (Future<?> future: futures) {
+                future.get();
+            }
+        } catch (Exception e) {
+            for (Future<?> future: futures) {
+                future.cancel(true);
+            }
+            throw e;
+        }
+    }
+
+    private void formatNodeAndLog(MetaProperties properties, String metadataLogDir, Logging loggingMixin,
+                                  Consumer<Future<?>> futureConsumer) {
+        futureConsumer.accept(executorService.submit(() -> {
+            try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
+                try (PrintStream out = new PrintStream(stream)) {
+                    StorageTool.formatCommand(out,
+                            JavaConverters.asScalaBuffer(Collections.singletonList(metadataLogDir)).toSeq(),
+                            properties,
+                            false);
+                } finally {
+                    for (String line : stream.toString().split(String.format("%n"))) {
+                        loggingMixin.info(() -> line);
+                    }
+                }
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+        }));
+    }
+
+    public void startup() throws ExecutionException, InterruptedException {
+        List<Future<?>> futures = new ArrayList<>();
+        try {
+            for (ControllerServer controller : controllers.values()) {
+                futures.add(executorService.submit(controller::startup));
+            }
+            for (KafkaRaftManager raftManager : raftManagers.values()) {
+                futures.add(controllerQuorumVotersFutureManager.future.thenRunAsync(raftManager::startup));
+            }
+            for (BrokerServer broker : kip500Brokers.values()) {
+                futures.add(executorService.submit(broker::startup));
+            }
+            for (Future<?> future: futures) {
+                future.get();
+            }
+        } catch (Exception e) {
+            for (Future<?> future: futures) {
+                future.cancel(true);
+            }
+            throw e;
+        }
+    }
+
+    /**
+     * Wait for a controller to mark all the brokers as ready (registered and unfenced).
+     */
+    public void waitForReadyBrokers() throws ExecutionException, InterruptedException {
+        // We can choose any controller, not just the active controller.
+        // If we choose a standby controller, we will wait slightly longer.
+        ControllerServer controllerServer = controllers.values().iterator().next();
+        Controller controller = controllerServer.controller();
+        controller.waitForReadyBrokers(kip500Brokers.size()).get();
+    }
+
+    public Properties controllerClientProperties() throws ExecutionException, InterruptedException {
+        Properties properties = new Properties();
+        if (!controllers.isEmpty()) {
+            Collection<Node> controllerNodes = RaftConfig.voterConnectionsToNodes(
+                controllerQuorumVotersFutureManager.future.get());
+
+            StringBuilder bld = new StringBuilder();
+            String prefix = "";
+            for (Node node : controllerNodes) {
+                bld.append(prefix).append(node.id()).append('@');
+                bld.append(node.host()).append(":").append(node.port());
+                prefix = ",";
+            }
+            properties.setProperty(RaftConfig.QUORUM_VOTERS_CONFIG, bld.toString());
+            properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
+                controllerNodes.stream().map(n -> n.host() + ":" + n.port()).
+                    collect(Collectors.joining(",")));
+        }
+        return properties;
+    }
+
+    public Properties clientProperties() {
+        Properties properties = new Properties();
+        if (!kip500Brokers.isEmpty()) {
+            StringBuilder bld = new StringBuilder();
+            String prefix = "";
+            for (Entry<Integer, BrokerServer> entry : kip500Brokers.entrySet()) {
+                int brokerId = entry.getKey();
+                BrokerServer broker = entry.getValue();
+                ListenerName listenerName = nodes.externalListenerName();
+                int port = broker.boundPort(listenerName);
+                if (port <= 0) {
+                    throw new RuntimeException("Broker " + brokerId + " does not yet " +
+                        "have a bound port for " + listenerName + ".  Did you start " +
+                        "the cluster yet?");
+                }
+                bld.append(prefix).append("localhost:").append(port);
+                prefix = ",";
+            }
+            properties.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, bld.toString());
+        }
+        return properties;
+    }
+
+    public Map<Integer, ControllerServer> controllers() {
+        return controllers;
+    }
+
+    public Map<Integer, BrokerServer> kip500Brokers() {

Review comment:
       nit: remove `kip500` prefix?

##########
File path: core/src/test/java/kafka/testkit/TestKitNodes.java
##########
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.testkit;
+
+import kafka.server.MetaProperties;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.network.ListenerName;
+
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.NavigableMap;
+import java.util.TreeMap;
+
+public class TestKitNodes {
+    public static class Builder {
+        private Uuid clusterId = null;
+        private final NavigableMap<Integer, ControllerNode> controllerNodes = new TreeMap<>();
+        private final NavigableMap<Integer, BrokerNode> brokerNodes = new TreeMap<>();
+
+        public Builder setClusterId(Uuid clusterId) {
+            this.clusterId = clusterId;
+            return this;
+        }
+
+        public Builder addNodes(TestKitNode[] nodes) {
+            for (TestKitNode node : nodes) {
+                addNode(node);
+            }
+            return this;
+        }
+
+        public Builder addNode(TestKitNode node) {
+            if (node instanceof ControllerNode) {
+                ControllerNode controllerNode = (ControllerNode) node;
+                controllerNodes.put(node.id(), controllerNode);
+            } else if (node instanceof BrokerNode) {
+                BrokerNode brokerNode = (BrokerNode) node;
+                brokerNodes.put(node.id(), brokerNode);
+            } else {
+                throw new RuntimeException("Can't handle TestKitNode subclass " +
+                        node.getClass().getSimpleName());
+            }
+            return this;
+        }
+
+        public Builder setNumControllerNodes(int numControllerNodes) {
+            if (numControllerNodes < 0) {
+                throw new RuntimeException("Invalid negative value for numControllerNodes");
+            }
+
+            while (controllerNodes.size() > numControllerNodes) {
+                controllerNodes.pollFirstEntry();
+            }
+            while (controllerNodes.size() < numControllerNodes) {
+                int nextId = 3000;
+                if (!controllerNodes.isEmpty()) {
+                    nextId = controllerNodes.lastKey() + 1;
+                }
+                controllerNodes.put(nextId, new ControllerNode.Builder().
+                    setId(nextId).build());
+            }
+            return this;
+        }
+
+        public Builder setNumKip500BrokerNodes(int numBrokerNodes) {

Review comment:
       nit: kip500 prefix




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org