You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2022/07/05 22:19:09 UTC
[kafka] branch trunk updated: KAFKA-13228; Ensure ApiVersionRequest is properly handled KRaft co-resident mode (#11784)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 448441a35d KAFKA-13228; Ensure ApiVersionRequest is properly handled KRaft co-resident mode (#11784)
448441a35d is described below
commit 448441a35d2f0c200c0f642f2bb618bd2d43df23
Author: dengziming <de...@gmail.com>
AuthorDate: Wed Jul 6 06:19:00 2022 +0800
KAFKA-13228; Ensure ApiVersionRequest is properly handled KRaft co-resident mode (#11784)
When brokers are co-resident with controllers using kraft, we incorrectly determine the supported API versions on the controller using `NodeApiVersions.create()`. The patch fixes the problem by using the versions from the sent `ApiVersions` request even when connecting to the local node.
The patch also improves integration tests by adding support for co-resident mode.
Reviewers: Justine Olshan <jo...@confluent.io>, Jason Gustafson <ja...@confluent.io>
---
.../scala/kafka/server/AlterPartitionManager.scala | 5 +--
.../src/main/scala/kafka/server/BrokerServer.scala | 49 ++++++++++++----------
.../server/BrokerToControllerChannelManager.scala | 6 +--
core/src/main/scala/kafka/server/KafkaConfig.scala | 6 ++-
core/src/main/scala/kafka/server/KafkaServer.scala | 19 +++++----
.../java/kafka/test/ClusterTestExtensionsTest.java | 4 ++
core/src/test/java/kafka/test/annotation/Type.java | 13 ++++--
.../test/junit/RaftClusterInvocationContext.java | 11 +++--
.../java/kafka/testkit/KafkaClusterTestKit.java | 43 +++++++++++++++----
core/src/test/java/kafka/testkit/TestKitNodes.java | 25 ++++++++++-
.../kafka/admin/LeaderElectionCommandTest.scala | 2 +-
.../unit/kafka/server/ApiVersionsRequestTest.scala | 2 +-
.../unit/kafka/server/BrokerMetricNamesTest.scala | 2 +-
.../kafka/server/ClientQuotasRequestTest.scala | 2 +-
14 files changed, 128 insertions(+), 61 deletions(-)
diff --git a/core/src/main/scala/kafka/server/AlterPartitionManager.scala b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
index 3dd6cfb0b1..574df470a3 100644
--- a/core/src/main/scala/kafka/server/AlterPartitionManager.scala
+++ b/core/src/main/scala/kafka/server/AlterPartitionManager.scala
@@ -78,15 +78,14 @@ object AlterPartitionManager {
config: KafkaConfig,
metadataCache: MetadataCache,
scheduler: KafkaScheduler,
+ controllerNodeProvider: ControllerNodeProvider,
time: Time,
metrics: Metrics,
threadNamePrefix: Option[String],
brokerEpochSupplier: () => Long,
): AlterPartitionManager = {
- val nodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache)
-
val channelManager = BrokerToControllerChannelManager(
- controllerNodeProvider = nodeProvider,
+ controllerNodeProvider,
time = time,
metrics = metrics,
config = config,
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index b62d118096..d0d2a98b48 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -129,7 +129,7 @@ class BrokerServer(
var forwardingManager: ForwardingManager = null
- var alterIsrManager: AlterPartitionManager = null
+ var alterPartitionManager: AlterPartitionManager = null
var autoTopicCreationManager: AutoTopicCreationManager = null
@@ -241,24 +241,17 @@ class BrokerServer(
clientQuotaMetadataManager = new ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)
- val alterIsrChannelManager = BrokerToControllerChannelManager(
- controllerNodeProvider,
- time,
- metrics,
+ alterPartitionManager = AlterPartitionManager(
config,
- channelName = "alterIsr",
- threadNamePrefix,
- retryTimeoutMs = Long.MaxValue
- )
- alterIsrManager = new DefaultAlterPartitionManager(
- controllerChannelManager = alterIsrChannelManager,
+ metadataCache,
scheduler = kafkaScheduler,
+ controllerNodeProvider,
time = time,
- brokerId = config.nodeId,
- brokerEpochSupplier = () => lifecycleManager.brokerEpoch,
- metadataVersionSupplier = () => metadataCache.metadataVersion()
+ metrics,
+ threadNamePrefix,
+ brokerEpochSupplier = () => lifecycleManager.brokerEpoch
)
- alterIsrManager.start()
+ alterPartitionManager.start()
this._replicaManager = new ReplicaManager(
config = config,
@@ -269,7 +262,7 @@ class BrokerServer(
quotaManagers = quotaManagers,
metadataCache = metadataCache,
logDirFailureChannel = logDirFailureChannel,
- alterPartitionManager = alterIsrManager,
+ alterPartitionManager = alterPartitionManager,
brokerTopicStats = brokerTopicStats,
isShuttingDown = isShuttingDown,
zkClient = None,
@@ -343,10 +336,22 @@ class BrokerServer(
k -> VersionRange.of(v.min, v.max)
}.asJava
- lifecycleManager.start(() => metadataListener.highestMetadataOffset,
- BrokerToControllerChannelManager(controllerNodeProvider, time, metrics, config,
- "heartbeat", threadNamePrefix, config.brokerSessionTimeoutMs.toLong),
- metaProps.clusterId, networkListeners, featuresRemapped)
+ val brokerLifecycleChannelManager = BrokerToControllerChannelManager(
+ controllerNodeProvider,
+ time,
+ metrics,
+ config,
+ "heartbeat",
+ threadNamePrefix,
+ config.brokerSessionTimeoutMs.toLong
+ )
+ lifecycleManager.start(
+ () => metadataListener.highestMetadataOffset,
+ brokerLifecycleChannelManager,
+ metaProps.clusterId,
+ networkListeners,
+ featuresRemapped
+ )
// Register a listener with the Raft layer to receive metadata event notifications
raftManager.register(metadataListener)
@@ -544,8 +549,8 @@ class BrokerServer(
if (replicaManager != null)
CoreUtils.swallow(replicaManager.shutdown(), this)
- if (alterIsrManager != null)
- CoreUtils.swallow(alterIsrManager.shutdown(), this)
+ if (alterPartitionManager != null)
+ CoreUtils.swallow(alterPartitionManager.shutdown(), this)
if (clientToControllerChannelManager != null)
CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
diff --git a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
index 86395f015c..37f3a47e29 100644
--- a/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
@@ -165,7 +165,6 @@ class BrokerToControllerChannelManagerImpl(
private val logContext = new LogContext(s"[BrokerToControllerChannelManager broker=${config.brokerId} name=$channelName] ")
private val manualMetadataUpdater = new ManualMetadataUpdater()
private val apiVersions = new ApiVersions()
- private val currentNodeApiVersions = NodeApiVersions.create()
private val requestThread = newRequestThread
def start(): Unit = {
@@ -253,10 +252,7 @@ class BrokerToControllerChannelManagerImpl(
def controllerApiVersions(): Option[NodeApiVersions] = {
requestThread.activeControllerAddress().flatMap { activeController =>
- if (activeController.id == config.brokerId)
- Some(currentNodeApiVersions)
- else
- Option(apiVersions.get(activeController.idString))
+ Option(apiVersions.get(activeController.idString))
}
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index b4e0b9449c..a9fbda6c21 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -1637,6 +1637,10 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
distinctRoles
}
+ def isKRaftCoResidentMode: Boolean = {
+ processRoles == Set(BrokerRole, ControllerRole)
+ }
+
def metadataLogDir: String = {
Option(getString(KafkaConfig.MetadataLogDirProp)) match {
case Some(dir) => dir
@@ -2164,7 +2168,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami
validateControllerQuorumVotersMustContainNodeIdForKRaftController()
validateControllerListenerExistsForKRaftController()
validateControllerListenerNamesMustAppearInListenersForKRaftController()
- } else if (processRoles == Set(BrokerRole, ControllerRole)) {
+ } else if (isKRaftCoResidentMode) {
// KRaft colocated broker and controller
validateNonEmptyQuorumVotersForKRaft()
validateControlPlaneListenerEmptyForKRaft()
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 64cf88d4ee..6b52511c1b 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -140,7 +140,7 @@ class KafkaServer(
var clientToControllerChannelManager: BrokerToControllerChannelManager = null
- var alterIsrManager: AlterPartitionManager = null
+ var alterPartitionManager: AlterPartitionManager = null
var kafkaScheduler: KafkaScheduler = null
@@ -263,6 +263,7 @@ class KafkaServer(
logManager.startup(zkClient.getAllTopicsInCluster())
metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion, brokerFeatures)
+ val controllerNodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache)
/* initialize feature change listener */
_featureChangeListener = new FinalizedFeatureChangeListener(metadataCache, _zkClient)
@@ -276,13 +277,14 @@ class KafkaServer(
credentialProvider = new CredentialProvider(ScramMechanism.mechanismNames, tokenCache)
clientToControllerChannelManager = BrokerToControllerChannelManager(
- controllerNodeProvider = MetadataCacheControllerNodeProvider(config, metadataCache),
+ controllerNodeProvider = controllerNodeProvider,
time = time,
metrics = metrics,
config = config,
channelName = "forwarding",
threadNamePrefix = threadNamePrefix,
- retryTimeoutMs = config.requestTimeoutMs.longValue)
+ retryTimeoutMs = config.requestTimeoutMs.longValue
+ )
clientToControllerChannelManager.start()
/* start forwarding manager */
@@ -309,11 +311,12 @@ class KafkaServer(
socketServer = new SocketServer(config, metrics, time, credentialProvider, apiVersionManager)
// Start alter partition manager based on the IBP version
- alterIsrManager = if (config.interBrokerProtocolVersion.isAlterPartitionSupported) {
+ alterPartitionManager = if (config.interBrokerProtocolVersion.isAlterPartitionSupported) {
AlterPartitionManager(
config = config,
metadataCache = metadataCache,
scheduler = kafkaScheduler,
+ controllerNodeProvider,
time = time,
metrics = metrics,
threadNamePrefix = threadNamePrefix,
@@ -322,7 +325,7 @@ class KafkaServer(
} else {
AlterPartitionManager(kafkaScheduler, time, zkClient)
}
- alterIsrManager.start()
+ alterPartitionManager.start()
// Start replica manager
_replicaManager = createReplicaManager(isShuttingDown)
@@ -478,7 +481,7 @@ class KafkaServer(
quotaManagers = quotaManagers,
metadataCache = metadataCache,
logDirFailureChannel = logDirFailureChannel,
- alterPartitionManager = alterIsrManager,
+ alterPartitionManager = alterPartitionManager,
brokerTopicStats = brokerTopicStats,
isShuttingDown = isShuttingDown,
zkClient = Some(zkClient),
@@ -755,8 +758,8 @@ class KafkaServer(
if (replicaManager != null)
CoreUtils.swallow(replicaManager.shutdown(), this)
- if (alterIsrManager != null)
- CoreUtils.swallow(alterIsrManager.shutdown(), this)
+ if (alterPartitionManager != null)
+ CoreUtils.swallow(alterPartitionManager.shutdown(), this)
if (clientToControllerChannelManager != null)
CoreUtils.swallow(clientToControllerChannelManager.shutdown(), this)
diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
index 33780f795e..63ca137253 100644
--- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -90,6 +90,10 @@ public class ClusterTestExtensionsTest {
@ClusterTest(name = "cluster-tests-2", clusterType = Type.KRAFT, serverProperties = {
@ClusterConfigProperty(key = "foo", value = "baz"),
@ClusterConfigProperty(key = "spam", value = "eggz")
+ }),
+ @ClusterTest(name = "cluster-tests-3", clusterType = Type.CO_KRAFT, serverProperties = {
+ @ClusterConfigProperty(key = "foo", value = "baz"),
+ @ClusterConfigProperty(key = "spam", value = "eggz")
})
})
public void testClusterTests() {
diff --git a/core/src/test/java/kafka/test/annotation/Type.java b/core/src/test/java/kafka/test/annotation/Type.java
index 0d1a161dab..933ca50113 100644
--- a/core/src/test/java/kafka/test/annotation/Type.java
+++ b/core/src/test/java/kafka/test/annotation/Type.java
@@ -31,7 +31,13 @@ public enum Type {
KRAFT {
@Override
public void invocationContexts(ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
- invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf()));
+ invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf(), false));
+ }
+ },
+ CO_KRAFT {
+ @Override
+ public void invocationContexts(ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
+ invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf(), true));
}
},
ZK {
@@ -40,10 +46,11 @@ public enum Type {
invocationConsumer.accept(new ZkClusterInvocationContext(config.copyOf()));
}
},
- BOTH {
+ ALL {
@Override
public void invocationContexts(ClusterConfig config, Consumer<TestTemplateInvocationContext> invocationConsumer) {
- invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf()));
+ invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf(), false));
+ invocationConsumer.accept(new RaftClusterInvocationContext(config.copyOf(), true));
invocationConsumer.accept(new ZkClusterInvocationContext(config.copyOf()));
}
},
diff --git a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
index 73fe67836a..cef71042d3 100644
--- a/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
+++ b/core/src/test/java/kafka/test/junit/RaftClusterInvocationContext.java
@@ -65,18 +65,20 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
private final ClusterConfig clusterConfig;
private final AtomicReference<KafkaClusterTestKit> clusterReference;
+ private final boolean isCoResident;
- public RaftClusterInvocationContext(ClusterConfig clusterConfig) {
+ public RaftClusterInvocationContext(ClusterConfig clusterConfig, boolean isCoResident) {
this.clusterConfig = clusterConfig;
this.clusterReference = new AtomicReference<>();
+ this.isCoResident = isCoResident;
}
@Override
public String getDisplayName(int invocationIndex) {
String clusterDesc = clusterConfig.nameTags().entrySet().stream()
- .map(Object::toString)
- .collect(Collectors.joining(", "));
- return String.format("[%d] Type=Raft, %s", invocationIndex, clusterDesc);
+ .map(Object::toString)
+ .collect(Collectors.joining(", "));
+ return String.format("[%d] Type=Raft-%s, %s", invocationIndex, isCoResident ? "CoReside" : "Distributed", clusterDesc);
}
@Override
@@ -86,6 +88,7 @@ public class RaftClusterInvocationContext implements TestTemplateInvocationConte
(BeforeTestExecutionCallback) context -> {
TestKitNodes nodes = new TestKitNodes.Builder().
setBootstrapMetadataVersion(clusterConfig.metadataVersion()).
+ setCoResident(isCoResident).
setNumBrokerNodes(clusterConfig.numBrokers()).
setNumControllerNodes(clusterConfig.numControllers()).build();
nodes.brokerNodes().forEach((brokerId, brokerNode) -> {
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index a930bafde6..c961d71bbe 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -150,15 +150,16 @@ public class KafkaClusterTestKit implements AutoCloseable {
ThreadUtils.createThreadFactory("KafkaClusterTestKit%d", false));
for (ControllerNode node : nodes.controllerNodes().values()) {
Map<String, String> props = new HashMap<>(configProps);
- props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "controller");
+ props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), roles(node.id()));
props.put(KafkaConfig$.MODULE$.NodeIdProp(),
Integer.toString(node.id()));
props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(),
node.metadataDirectory());
props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
- "CONTROLLER:PLAINTEXT");
- props.put(KafkaConfig$.MODULE$.ListenersProp(),
- "CONTROLLER://localhost:0");
+ "EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
+ props.put(KafkaConfig$.MODULE$.ListenersProp(), listeners(node.id()));
+ props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(),
+ nodes.interBrokerListenerName().value());
props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
"CONTROLLER");
// Note: we can't accurately set controller.quorum.voters yet, since we don't
@@ -203,7 +204,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
}
for (BrokerNode node : nodes.brokerNodes().values()) {
Map<String, String> props = new HashMap<>(configProps);
- props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), "broker");
+ props.put(KafkaConfig$.MODULE$.ProcessRolesProp(), roles(node.id()));
props.put(KafkaConfig$.MODULE$.BrokerIdProp(),
Integer.toString(node.id()));
props.put(KafkaConfig$.MODULE$.MetadataLogDirProp(),
@@ -212,8 +213,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
String.join(",", node.logDataDirectories()));
props.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(),
"EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT");
- props.put(KafkaConfig$.MODULE$.ListenersProp(),
- "EXTERNAL://localhost:0");
+ props.put(KafkaConfig$.MODULE$.ListenersProp(), listeners(node.id()));
props.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(),
nodes.interBrokerListenerName().value());
props.put(KafkaConfig$.MODULE$.ControllerListenerNamesProp(),
@@ -231,9 +231,15 @@ public class KafkaClusterTestKit implements AutoCloseable {
String threadNamePrefix = String.format("broker%d_", node.id());
MetaProperties metaProperties = MetaProperties.apply(nodes.clusterId().toString(), node.id());
TopicPartition metadataPartition = new TopicPartition(KafkaRaftServer.MetadataTopic(), 0);
- KafkaRaftManager<ApiMessageAndVersion> raftManager = new KafkaRaftManager<>(
+ KafkaRaftManager<ApiMessageAndVersion> raftManager;
+ if (raftManagers.containsKey(node.id())) {
+ raftManager = raftManagers.get(node.id());
+ } else {
+ raftManager = new KafkaRaftManager<>(
metaProperties, config, new MetadataRecordSerde(), metadataPartition, KafkaRaftServer.MetadataTopicId(),
Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), connectFutureManager.future);
+ raftManagers.put(node.id(), raftManager);
+ }
BrokerServer broker = new BrokerServer(
config,
nodes.brokerProperties(node.id()),
@@ -245,7 +251,6 @@ public class KafkaClusterTestKit implements AutoCloseable {
connectFutureManager.future
);
brokers.put(node.id(), broker);
- raftManagers.put(node.id(), raftManager);
}
} catch (Exception e) {
if (executorService != null) {
@@ -271,6 +276,26 @@ public class KafkaClusterTestKit implements AutoCloseable {
brokers, raftManagers, connectFutureManager, baseDirectory);
}
+ private String listeners(int node) {
+ if (nodes.isCoResidentNode(node)) {
+ return "EXTERNAL://localhost:0,CONTROLLER://localhost:0";
+ }
+ if (nodes.controllerNodes().containsKey(node)) {
+ return "CONTROLLER://localhost:0";
+ }
+ return "EXTERNAL://localhost:0";
+ }
+
+ private String roles(int node) {
+ if (nodes.isCoResidentNode(node)) {
+ return "broker,controller";
+ }
+ if (nodes.controllerNodes().containsKey(node)) {
+ return "controller";
+ }
+ return "broker";
+ }
+
static private void setupNodeDirectories(File baseDirectory,
String metadataDirectory,
Collection<String> logDataDirectories) throws Exception {
diff --git a/core/src/test/java/kafka/testkit/TestKitNodes.java b/core/src/test/java/kafka/testkit/TestKitNodes.java
index f91e62d179..14692ccc96 100644
--- a/core/src/test/java/kafka/testkit/TestKitNodes.java
+++ b/core/src/test/java/kafka/testkit/TestKitNodes.java
@@ -33,6 +33,7 @@ import java.util.TreeMap;
public class TestKitNodes {
public static class Builder {
+ private boolean coResident = false;
private Uuid clusterId = null;
private MetadataVersion bootstrapMetadataVersion = null;
private final NavigableMap<Integer, ControllerNode> controllerNodes = new TreeMap<>();
@@ -48,6 +49,11 @@ public class TestKitNodes {
return this;
}
+ public Builder setCoResident(boolean coResident) {
+ this.coResident = coResident;
+ return this;
+ }
+
public Builder addNodes(TestKitNode[] nodes) {
for (TestKitNode node : nodes) {
addNode(node);
@@ -78,7 +84,7 @@ public class TestKitNodes {
controllerNodes.pollFirstEntry();
}
while (controllerNodes.size() < numControllerNodes) {
- int nextId = 3000;
+ int nextId = startControllerId();
if (!controllerNodes.isEmpty()) {
nextId = controllerNodes.lastKey() + 1;
}
@@ -96,7 +102,7 @@ public class TestKitNodes {
brokerNodes.pollFirstEntry();
}
while (brokerNodes.size() < numBrokerNodes) {
- int nextId = 0;
+ int nextId = startBrokerId();
if (!brokerNodes.isEmpty()) {
nextId = brokerNodes.lastKey() + 1;
}
@@ -115,6 +121,17 @@ public class TestKitNodes {
}
return new TestKitNodes(clusterId, bootstrapMetadataVersion, controllerNodes, brokerNodes);
}
+
+ private int startBrokerId() {
+ return 0;
+ }
+
+ private int startControllerId() {
+ if (coResident) {
+ return startBrokerId();
+ }
+ return startBrokerId() + 3000;
+ }
}
private final Uuid clusterId;
@@ -122,6 +139,10 @@ public class TestKitNodes {
private final NavigableMap<Integer, ControllerNode> controllerNodes;
private final NavigableMap<Integer, BrokerNode> brokerNodes;
+ public boolean isCoResidentNode(int node) {
+ return controllerNodes.containsKey(node) && brokerNodes.containsKey(node);
+ }
+
private TestKitNodes(Uuid clusterId,
MetadataVersion bootstrapMetadataVersion,
NavigableMap<Integer, ControllerNode> controllerNodes,
diff --git a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
index 785054901d..aebd479f18 100644
--- a/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/LeaderElectionCommandTest.scala
@@ -35,7 +35,7 @@ import org.junit.jupiter.api.extension.ExtendWith
import org.junit.jupiter.api.{BeforeEach, Tag}
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.BOTH, brokers = 3)
+@ClusterTestDefaults(clusterType = Type.ALL, brokers = 3)
@Tag("integration")
final class LeaderElectionCommandTest(cluster: ClusterInstance) {
import LeaderElectionCommandTest._
diff --git a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
index b1778ba7df..bc45b72077 100644
--- a/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ApiVersionsRequestTest.scala
@@ -29,7 +29,7 @@ import org.junit.jupiter.api.extension.ExtendWith
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
-@ClusterTestDefaults(clusterType = Type.BOTH, brokers = 1)
+@ClusterTestDefaults(clusterType = Type.ALL, brokers = 1)
class ApiVersionsRequestTest(cluster: ClusterInstance) extends AbstractApiVersionsRequestTest(cluster) {
@BeforeEach
diff --git a/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala b/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala
index c1322fe6fe..dc69076619 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerMetricNamesTest.scala
@@ -28,7 +28,7 @@ import org.junit.jupiter.api.extension.ExtendWith
import scala.jdk.CollectionConverters._
-@ClusterTestDefaults(clusterType = Type.BOTH)
+@ClusterTestDefaults(clusterType = Type.ALL)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
class BrokerMetricNamesTest(cluster: ClusterInstance) {
@AfterEach
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
index 71321c1f20..904fbbc216 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotasRequestTest.scala
@@ -37,7 +37,7 @@ import org.junit.jupiter.api.extension.ExtendWith
import scala.jdk.CollectionConverters._
-@ClusterTestDefaults(clusterType = Type.BOTH)
+@ClusterTestDefaults(clusterType = Type.ALL)
@ExtendWith(value = Array(classOf[ClusterTestExtensions]))
@Tag("integration")
class ClientQuotasRequestTest(cluster: ClusterInstance) {