You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/07/05 22:19:09 UTC

[kafka] branch trunk updated: KAFKA-13228; Ensure ApiVersionRequest is properly handled KRaft co-resident mode (#11784)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 448441a35d KAFKA-13228; Ensure ApiVersionRequest is properly handled KRaft co-resident mode (#11784)
448441a35d is described below

commit 448441a35d2f0c200c0f642f2bb618bd2d43df23
Author: dengziming <de...@gmail.com>
AuthorDate: Wed Jul 6 06:19:00 2022 +0800

    KAFKA-13228; Ensure ApiVersionRequest is properly handled KRaft co-resident mode (#11784)
    
    When brokers are co-resident with controllers using kraft, we incorrectly determine the supported API versions on the controller using `NodeApiVersions.create()`. The patch fixes the problem by using the versions from the sent `ApiVersions` request even when connecting to the local node.
    
    The patch also improves integration tests by adding support for co-resident mode.
    
    Reviewers: Justine Olshan <jo...@confluent.io>, Jason Gustafson <ja...@confluent.io>
---
 .../scala/kafka/server/AlterPartitionManager.scala |  5 +--
 .../src/main/scala/kafka/server/BrokerServer.scala | 49 ++++++++++++----------
 .../server/BrokerToControllerChannelManager.scala  |  6 +--
 core/src/main/scala/kafka/server/KafkaConfig.scala |  6 ++-
 core/src/main/scala/kafka/server/KafkaServer.scala | 19 +++++----
 .../java/kafka/test/ClusterTestExtensionsTest.java |  4 ++
 core/src/test/java/kafka/test/annotation/Type.java | 13 ++++--
 .../test/junit/RaftClusterInvocationContext.java   | 11 +++--
 .../java/kafka/testkit/KafkaClusterTestKit.java    | 43 +++++++++++++++----
 core/src/test/java/kafka/testkit/TestKitNodes.java | 25 ++++++++++-
 .../kafka/admin/LeaderElectionCommandTest.scala    |  2 +-
 .../unit/kafka/server/ApiVersionsRequestTest.scala |  2 +-
 .../unit/kafka/server/BrokerMetricNamesTest.scala  |  2 +-
 .../kafka/server/ClientQuotasRequestTest.scala     |  2 +-
 14 files changed, 128 insertions(+), 61 deletions(-)

diff --git a/core/src/main/scala/kafka/server/AlterPartitionManager.scala b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
index 3dd6cfb0b1..574df470a3 100644
--- a/core/src/main/scala/kafka/server/AlterPartitionManager.scala
+++ b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
@@ -78,15 +78,14 @@ object AlterPartitionManager {
     config: KafkaConfig,
     metadataCache: MetadataCache,
     scheduler: KafkaScheduler,
+    controllerNodeProvider: ControllerNodeProvider,
     time: Time,
     metrics: Metrics,
     threadNamePrefix: Option[String],
     brokerEpochSupplier: () => Long,
   ): AlterPartitionManager = {
-    val nodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache)
-
     val channelManager = BrokerToControllerChannelManager(
-      controllerNodeProvider = nodeProvider,
+      controllerNodeProvider,
       time = time,
       metrics = metrics,
       config = config,
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index b62d118096..d0d2a98b48 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -129,7 +129,7 @@ class BrokerServer(
 
   var forwardingManager: ForwardingManager = null
 
-  var alterIsrManager: AlterPartitionManager = null
+  var alterPartitionManager: AlterPartitionManager = null
 
   var autoTopicCreationManager: AutoTopicCreationManager = null
 
@@ -241,24 +241,17 @@ class BrokerServer(
 
       clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)
 
-      val alterIsrChannelManager = BrokerToControllerChannelManager(
-        controllerNodeProvider,
-        time,
-        metrics,
+      alterPartitionManager = AlterPartitionManager(
         config,
-        channelName = "alterIsr",
-        threadNamePrefix,
-        retryTimeoutMs = Long.MaxValue
-      )
-      alterIsrManager = new DefaultAlterPartitionManager(
-        controllerChannelManager = alterIsrChannelManager,
+        metadataCache,
         scheduler = kafkaScheduler,
+        controllerNodeProvider,
         time = time,
-        brokerId = config.nodeId,
-        brokerEpochSupplier = () => lifecycleManager.brokerEpoch,
-        metadataVersionSupplier = () => metadataCache.metadataVersion()
+        metrics,
+        threadNamePrefix,
+        brokerEpochSupplier = () => lifecycleManager.brokerEpoch
       )
-      alterIsrManager.start()
+      alterPartitionManager.start()
 
       this._replicaManager = new ReplicaManager(
         config = config,
@@ -269,7 +262,7 @@ class BrokerServer(
         quotaManagers = quotaManagers,
         metadataCache = metadataCache,
         logDirFailureChannel = logDirFailureChannel,
-        alterPartitionManager = alterIsrManager,
+        alterPartitionManager = alterPartitionManager,
         brokerTopicStats = brokerTopicStats,
         isShuttingDown = isShuttingDown,
         zkClient = None,
@@ -343,10 +336,22 @@ class BrokerServer(
           k -> VersionRange.of(v.min, v.max)
       }.asJava
 
-      lifecycleManager.start(() => metadataListener.highestMetadataOffset,
-        BrokerToControllerChannelManager(controllerNodeProvider, time, metrics, config,
-          "heartbeat", threadNamePrefix, config.brokerSessionTimeoutMs.toLong),
-        metaProps.clusterId, networkListeners, featuresRemapped)
+      val brokerLifecycleChannelManager = BrokerToControllerChannelManager(
+        controllerNodeProvider,
+        time,
+        metrics,
+        config,
+        "heartbeat",
+        threadNamePrefix,
+        config.brokerSessionTimeoutMs.toLong
+      )
+      lifecycleManager.start(
+        () => metadataListener.highestMetadataOffset,
+        brokerLifecycleChannelManager,
+        metaProps.clusterId,
+        networkListeners,
+        featuresRemapped
+      )
 
       // Register a listener with the Raft layer to receive metadata event notifications
       raftManager.register(metadataListener)
@@ -544,8 +549,8 @@ class BrokerServer(
       if (replicaManager != null)
         CoreUtils.swallow(replicaManager.shutdown(), this)
 
-      if (alterIsrManager != null)
-        CoreUtils.swallow(alterIsrManager.shutdown(), this)
+      if (alterPartitionManager != null)
+        CoreUtils.swallow(alterPartitionManager.shutdown(), this)
 
       if (clientToControllerChannelManager != null)
         CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
index 86395f015c..37f3a47e29 100644
--- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
@@ -165,7 +165,6 @@ class BrokerToControllerChannelManagerImpl(
   private val logContext = new LogContext(s"[BrokerToControllerChannelManager broker=${config.brokerId} name=$channelName] ")
   private val manualMetadataUpdater = new ManualMetadataUpdater()
   private val apiVersions = new ApiVersions()
-  private val currentNodeApiVersions = NodeApiVersions.create()
   private val requestThread = newRequestThread
 
   def start(): Unit = {
@@ -253,10 +252,7 @@ class BrokerToControllerChannelManagerImpl(
 
   def controllerApiVersions(): Option[NodeApiVersions] = {
     requestThread.activeControllerAddress().flatMap { activeController =>
-      if (activeController.id == config.brokerId)
-        Some(currentNodeApiVersions)
-      else
-        Option(apiVersions.get(activeController.idString))
+      Option(apiVersions.get(activeController.idString))
     }
   }
 }
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index b4e0b9449c..a9fbda6c21 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1637,6 +1637,10 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
     distinctRoles
   }
 
+  def isKRaftCoResidentMode: Boolean = {
+    processRoles == Set(BrokerRole, ControllerRole)
+  }
+
   def metadataLogDir: String = {
     Option(getString(KafkaConfig.MetadataLogDirProp)) match {
       case Some(dir) => dir
@@ -2164,7 +2168,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
       validateControllerQuorumVotersMustContainNodeIdForKRaftController()
       validateControllerListenerExistsForKRaftController()
       validateControllerListenerNamesMustAppearInListenersForKRaftController()
-    } else if (processRoles == Set(BrokerRole, ControllerRole)) {
+    } else if (isKRaftCoResidentMode) {
       // KRaft colocated broker and controller
       validateNonEmptyQuorumVotersForKRaft()
       validateControlPlaneListenerEmptyForKRaft()
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 64cf88d4ee..6b52511c1b 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -140,7 +140,7 @@ class KafkaServer(
 
   var clientToControllerChannelManager: BrokerToControllerChannelManager = null
 
-  var alterIsrManager: AlterPartitionManager = null
+  var alterPartitionManager: AlterPartitionManager = null
 
   var kafkaScheduler: KafkaScheduler = null
 
@@ -263,6 +263,7 @@ class KafkaServer(
         logManager.startup(zkClient.getAllTopicsInCluster())
 
         metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion, brokerFeatures)
+        val controllerNodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache)
 
         /* initialize feature change listener */
         _featureChangeListener = new FinalizedFeatureChangeListener(metadataCache, _zkClient)
@@ -276,13 +277,14 @@ class KafkaServer(
         credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
 
         clientToControllerChannelManager = BrokerToControllerChannelManager(
-          controllerNodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache),
+          controllerNodeProvider = controllerNodeProvider,
           time = time,
           metrics = metrics,
           config = config,
           channelName = "forwarding",
           threadNamePrefix = threadNamePrefix,
-          retryTimeoutMs = config.requestTimeoutMs.longValue)
+          retryTimeoutMs = config.requestTimeoutMs.longValue
+        )
         clientToControllerChannelManager.start()
 
         /* start forwarding manager */
@@ -309,11 +311,12 @@ class KafkaServer(
         socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)
 
         // Start alter partition manager based on the IBP version
-        alterIsrManager = if (config.interBrokerProtocolVersion.isAlterPartitionSupported) {
+        alterPartitionManager = if (config.interBrokerProtocolVersion.isAlterPartitionSupported) {
           AlterPartitionManager(
             config = config,
             metadataCache = metadataCache,
             scheduler = kafkaScheduler,
+            controllerNodeProvider,
             time = time,
             metrics = metrics,
             threadNamePrefix = threadNamePrefix,
@@ -322,7 +325,7 @@ class KafkaServer(
         } else {
           AlterPartitionManager(kafkaScheduler, time, zkClient)
         }
-        alterIsrManager.start()
+        alterPartitionManager.start()
 
         // Start replica manager
         _replicaManager = createReplicaManager(isShuttingDown)
@@ -478,7 +481,7 @@ class KafkaServer(
       quotaManagers = quotaManagers,
       metadataCache = metadataCache,
       logDirFailureChannel = logDirFailureChannel,
-      alterPartitionManager = alterIsrManager,
+      alterPartitionManager = alterPartitionManager,
       brokerTopicStats = brokerTopicStats,
       isShuttingDown = isShuttingDown,
       zkClient = Some(zkClient),
@@ -755,8 +758,8 @@ class KafkaServer(
         if (replicaManager != null)
           CoreUtils.swallow(replicaManager.shutdown(), this)
 
-        if (alterIsrManager != null)
-          CoreUtils.swallow(alterIsrManager.shutdown(), this)
+        if (alterPartitionManager != null)
+          CoreUtils.swallow(alterPartitionManager.shutdown(), this)
 
         if (clientToControllerChannelManager != null)
           CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
index 33780f795e..63ca137253 100644
--- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -90,6 +90,10 @@ public class ClusterTestExtensionsTest {
         @ClusterTest(name = "cluster-tests-2", clusterType = Type.KRAFT, serverProperties = {
             @ClusterConfigProperty(key = "foo", value = "baz"),
             @ClusterConfigProperty(key = "spam", value = "eggz")
+        }),
+        @ClusterTest(name = "cluster-tests-3", clusterType = Type.CO_KRAFT, serverProperties = {
+            @ClusterConfigProperty(key = "foo", value = "baz"),
+            @ClusterConfigProperty(key = "spam", value = "eggz")
         })
     })
     public void testClusterTests() {
diff --git a/core/src/test/java/kafka/test/annotation/Type.java b/core/src/test/java/kafka/test/annotation/Type.java
index 0d1a161dab..933ca50113 100644
--- a/core/src/test/java/kafka/test/annotation/Type.java
+++ b/core/src/test/java/kafka/test/annotation/Type.java
@@ -31,7 +31,13 @@ public enum Type {
     KRAFT {
         @Override
         public void invocationContexts(ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
-            invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf()));
+            invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf(), false));
+        }
+    },
+    CO_KRAFT {
+        @Override
+        public void invocationContexts(ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
+            invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf(), true));
         }
     },
     ZK {
@@ -40,10 +46,11 @@ public enum Type {
             invocationConsumer.accept(new ZkClusterInvocationContext(config.copyOf()));
         }
     },
-    BOTH {
+    ALL {
         @Override
         public void invocationContexts(ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
-            invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf()));
+            invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf(), false));
+            invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf(), true));
             invocationConsumer.accept(new ZkClusterInvocationContext(config.copyOf()));
         }
     },
diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
index 73fe67836a..cef71042d3 100644
--- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
@@ -65,18 +65,20 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
 
     private final ClusterConfig clusterConfig;
     private final AtomicReference<KafkaClusterTestKit> clusterReference;
+    private final boolean isCoResident;
 
-    public RaftClusterInvocationContext(ClusterConfig clusterConfig) {
+    public RaftClusterInvocationContext(ClusterConfig clusterConfig, boolean isCoResident) {
         this.clusterConfig = clusterConfig;
         this.clusterReference = new AtomicReference<>();
+        this.isCoResident = isCoResident;
     }
 
     @Override
     public String getDisplayName(int invocationIndex) {
         String clusterDesc = clusterConfig.nameTags().entrySet().stream()
-                .map(Object::toString)
-                .collect(Collectors.joining(", "));
-        return String.format("[%d] Type=Raft, %s", invocationIndex, clusterDesc);
+            .map(Object::toString)
+            .collect(Collectors.joining(", "));
+        return String.format("[%d] Type=Raft-%s, %s", invocationIndex, isCoResident ? "CoReside" : "Distributed", clusterDesc);
     }
 
     @Override
@@ -86,6 +88,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
             (BeforeTestExecutionCallback) context -> {
                 TestKitNodes nodes = new TestKitNodes.Builder().
                         setBootstrapMetadataVersion(clusterConfig.metadataVersion()).
+                        setCoResident(isCoResident).
                         setNumBrokerNodes(clusterConfig.numBrokers()).
                         setNumControllerNodes(clusterConfig.numControllers()).build();
                 nodes.brokerNodes().forEach((brokerId, brokerNode) -> {
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index a930bafde6..c961d71bbe 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -150,15 +150,16 @@ public class KafkaClusterTestKit implements AutoCloseable {
                     ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", false));
                 for (ControllerNode node : nodes.controllerNodes().values()) {
                     Map<String, String> props = new HashMap<>(configProps);
-                    props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "controller");
+                    props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), roles(node.id()));
                     props.put(KafkaConfig$.MODULE$.NodeIdProp(),
                         Integer.toString(node.id()));
                     props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(),
                         node.metadataDirectory());
                     props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
-                        "CONTROLLER:PLAINTEXT");
-                    props.put(KafkaConfig$.MODULE$.ListenersProp(),
-                        "CONTROLLER://localhost:0");
+                        "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
+                    props.put(KafkaConfig$.MODULE$.ListenersProp(), listeners(node.id()));
+                    props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(),
+                        nodes.interBrokerListenerName().value());
                     props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
                         "CONTROLLER");
                     // Note: we can't accurately set controller.quorum.voters yet, since we don't
@@ -203,7 +204,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
                 }
                 for (BrokerNode node : nodes.brokerNodes().values()) {
                     Map<String, String> props = new HashMap<>(configProps);
-                    props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "broker");
+                    props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), roles(node.id()));
                     props.put(KafkaConfig$.MODULE$.BrokerIdProp(),
                         Integer.toString(node.id()));
                     props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(),
@@ -212,8 +213,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
                         String.join(",", node.logDataDirectories()));
                     props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
                         "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
-                    props.put(KafkaConfig$.MODULE$.ListenersProp(),
-                        "EXTERNAL://localhost:0");
+                    props.put(KafkaConfig$.MODULE$.ListenersProp(), listeners(node.id()));
                     props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(),
                         nodes.interBrokerListenerName().value());
                     props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
@@ -231,9 +231,15 @@ public class KafkaClusterTestKit implements AutoCloseable {
                     String threadNamePrefix = String.format("broker%d_", node.id());
                     MetaProperties metaProperties = MetaProperties.apply(nodes.clusterId().toString(), node.id());
                     TopicPartition metadataPartition = new TopicPartition(KafkaRaftServer.MetadataTopic(), 0);
-                    KafkaRaftManager<ApiMessageAndVersion> raftManager = new KafkaRaftManager<>(
+                    KafkaRaftManager<ApiMessageAndVersion> raftManager;
+                    if (raftManagers.containsKey(node.id())) {
+                        raftManager = raftManagers.get(node.id());
+                    } else {
+                        raftManager = new KafkaRaftManager<>(
                             metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftServer.MetadataTopicId(),
                             Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), connectFutureManager.future);
+                        raftManagers.put(node.id(), raftManager);
+                    }
                     BrokerServer broker = new BrokerServer(
                         config,
                         nodes.brokerProperties(node.id()),
@@ -245,7 +251,6 @@ public class KafkaClusterTestKit implements AutoCloseable {
                         connectFutureManager.future
                     );
                     brokers.put(node.id(), broker);
-                    raftManagers.put(node.id(), raftManager);
                 }
             } catch (Exception e) {
                 if (executorService != null) {
@@ -271,6 +276,26 @@ public class KafkaClusterTestKit implements AutoCloseable {
                 brokers, raftManagers, connectFutureManager, baseDirectory);
         }
 
+        private String listeners(int node) {
+            if (nodes.isCoResidentNode(node)) {
+                return "EXTERNAL://localhost:0,CONTROLLER://localhost:0";
+            }
+            if (nodes.controllerNodes().containsKey(node)) {
+                return "CONTROLLER://localhost:0";
+            }
+            return "EXTERNAL://localhost:0";
+        }
+
+        private String roles(int node) {
+            if (nodes.isCoResidentNode(node)) {
+                return "broker,controller";
+            }
+            if (nodes.controllerNodes().containsKey(node)) {
+                return "controller";
+            }
+            return "broker";
+        }
+
         static private void setupNodeDirectories(File baseDirectory,
                                                  String metadataDirectory,
                                                  Collection<String> logDataDirectories) throws Exception {
diff --git a/core/src/test/java/kafka/testkit/TestKitNodes.java b/core/src/test/java/kafka/testkit/TestKitNodes.java
index f91e62d179..14692ccc96 100644
--- a/core/src/test/java/kafka/testkit/TestKitNodes.java
+++ b/core/src/test/java/kafka/testkit/TestKitNodes.java
@@ -33,6 +33,7 @@ import java.util.TreeMap;
 
 public class TestKitNodes {
     public static class Builder {
+        private boolean coResident = false;
         private Uuid clusterId = null;
         private MetadataVersion bootstrapMetadataVersion = null;
         private final NavigableMap<Integer, ControllerNode> controllerNodes = new TreeMap<>();
@@ -48,6 +49,11 @@ public class TestKitNodes {
             return this;
         }
 
+        public Builder setCoResident(boolean coResident) {
+            this.coResident = coResident;
+            return this;
+        }
+
         public Builder addNodes(TestKitNode[] nodes) {
             for (TestKitNode node : nodes) {
                 addNode(node);
@@ -78,7 +84,7 @@ public class TestKitNodes {
                 controllerNodes.pollFirstEntry();
             }
             while (controllerNodes.size() < numControllerNodes) {
-                int nextId = 3000;
+                int nextId = startControllerId();
                 if (!controllerNodes.isEmpty()) {
                     nextId = controllerNodes.lastKey() + 1;
                 }
@@ -96,7 +102,7 @@ public class TestKitNodes {
                 brokerNodes.pollFirstEntry();
             }
             while (brokerNodes.size() < numBrokerNodes) {
-                int nextId = 0;
+                int nextId = startBrokerId();
                 if (!brokerNodes.isEmpty()) {
                     nextId = brokerNodes.lastKey() + 1;
                 }
@@ -115,6 +121,17 @@ public class TestKitNodes {
             }
             return new TestKitNodes(clusterId, bootstrapMetadataVersion, controllerNodes, brokerNodes);
         }
+
+        private int startBrokerId() {
+            return 0;
+        }
+
+        private int startControllerId() {
+            if (coResident) {
+                return startBrokerId();
+            }
+            return startBrokerId() + 3000;
+        }
     }
 
     private final Uuid clusterId;
@@ -122,6 +139,10 @@ public class TestKitNodes {
     private final NavigableMap<Integer, ControllerNode> controllerNodes;
     private final NavigableMap<Integer, BrokerNode> brokerNodes;
 
+    public boolean isCoResidentNode(int node) {
+        return controllerNodes.containsKey(node) && brokerNodes.containsKey(node);
+    }
+
     private TestKitNodes(Uuid clusterId,
                          MetadataVersion bootstrapMetadataVersion,
                          NavigableMap<Integer, ControllerNode> controllerNodes,
diff --git a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
index 785054901d..aebd479f18 100644
--- a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
@@ -35,7 +35,7 @@ import org.junit.jupiter.api.extension.ExtendWith
 import org.junit.jupiter.api.{BeforeEach, Tag}
 
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.BOTH, brokers = 3)
+@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3)
 @Tag("integration")
 final class LeaderElectionCommandTest(cluster: ClusterInstance) {
   import LeaderElectionCommandTest._
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index b1778ba7df..bc45b72077 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -29,7 +29,7 @@ import org.junit.jupiter.api.extension.ExtendWith
 
 
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.BOTH, brokers = 1)
+@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1)
 class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) {
 
   @BeforeEach
diff --git a/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala b/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala
index c1322fe6fe..dc69076619 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala
@@ -28,7 +28,7 @@ import org.junit.jupiter.api.extension.ExtendWith
 
 import scala.jdk.CollectionConverters._
 
-@ClusterTestDefaults(clusterType = Type.BOTH)
+@ClusterTestDefaults(clusterType = Type.ALL)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 class BrokerMetricNamesTest(cluster: ClusterInstance) {
   @AfterEach
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
index 71321c1f20..904fbbc216 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
@@ -37,7 +37,7 @@ import org.junit.jupiter.api.extension.ExtendWith
 
 import scala.jdk.CollectionConverters._
 
-@ClusterTestDefaults(clusterType = Type.BOTH)
+@ClusterTestDefaults(clusterType = Type.ALL)
 @ExtendWith(value = Array(classOf[ClusterTestExtensions]))
 @Tag("integration")
 class ClientQuotasRequestTest(cluster: ClusterInstance) {