You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/01/06 18:30:13 UTC
[kafka] branch trunk updated: KAFKA-13528: KRaft RegisterBroker should validate that the cluster ID matches (#11593)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8e301b4 KAFKA-13528: KRaft RegisterBroker should validate that the cluster ID matches (#11593)
8e301b4 is described below
commit 8e301b48e937198c65a058518dc7551488aac8fc
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Thu Jan 6 10:28:36 2022 -0800
KAFKA-13528: KRaft RegisterBroker should validate that the cluster ID matches (#11593)
The KRaft controller should validate that the clusterID matches before allowing a broker to register in
the cluster.
Reviewers: José Armando García Sancio <js...@gmail.com>
---
.../main/scala/kafka/server/ControllerServer.scala | 2 +-
.../kafka/controller/ClusterControlManager.java | 12 +++++++++
.../apache/kafka/controller/QuorumController.java | 21 ++++++++++++---
.../controller/ClusterControlManagerTest.java | 31 +++++++++++++++++++---
.../controller/ProducerIdControlManagerTest.java | 4 ++-
.../kafka/controller/QuorumControllerTest.java | 16 +++++------
.../kafka/controller/QuorumControllerTestEnv.java | 2 +-
.../controller/ReplicationControlManagerTest.java | 11 +++++---
.../kafka/metalog/LocalLogManagerTestEnv.java | 8 ++++++
9 files changed, 85 insertions(+), 22 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index ede71d4..0402bb3 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -160,7 +160,7 @@ class ControllerServer(
alterConfigPolicy = Option(config.
getConfiguredInstance(AlterConfigPolicyClassNameProp, classOf[AlterConfigPolicy]))
- controller = new QuorumController.Builder(config.nodeId).
+ controller = new QuorumController.Builder(config.nodeId, metaProperties.clusterId).
setTime(time).
setThreadNamePrefix(threadNamePrefixAsString).
setConfigDefs(configDefs).
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 5916cdc..b041457 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -19,6 +19,7 @@ package org.apache.kafka.controller;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException;
+import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.BrokerRegistrationRequestData;
@@ -92,6 +93,11 @@ public class ClusterControlManager {
private final LogContext logContext;
/**
+ * The ID of this cluster.
+ */
+ private final String clusterId;
+
+ /**
* The SLF4J log object.
*/
private final Logger log;
@@ -133,12 +139,14 @@ public class ClusterControlManager {
private Optional<ReadyBrokersFuture> readyBrokersFuture;
ClusterControlManager(LogContext logContext,
+ String clusterId,
Time time,
SnapshotRegistry snapshotRegistry,
long sessionTimeoutNs,
ReplicaPlacer replicaPlacer,
ControllerMetrics metrics) {
this.logContext = logContext;
+ this.clusterId = clusterId;
this.log = logContext.logger(ClusterControlManager.class);
this.time = time;
this.sessionTimeoutNs = sessionTimeoutNs;
@@ -188,6 +196,10 @@ public class ClusterControlManager {
if (heartbeatManager == null) {
throw new RuntimeException("ClusterControlManager is not active.");
}
+ if (!clusterId.equals(request.clusterId())) {
+ throw new InconsistentClusterIdException("Expected cluster ID " + clusterId +
+ ", but got cluster ID " + request.clusterId());
+ }
int brokerId = request.brokerId();
BrokerRegistration existing = brokerRegistrations.get(brokerId);
if (existing != null) {
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 662da8a..37df567 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -128,6 +128,7 @@ public final class QuorumController implements Controller {
*/
static public class Builder {
private final int nodeId;
+ private final String clusterId;
private Time time = Time.SYSTEM;
private String threadNamePrefix = null;
private LogContext logContext = null;
@@ -144,8 +145,9 @@ public final class QuorumController implements Controller {
private Optional<AlterConfigPolicy> alterConfigPolicy = Optional.empty();
private ConfigurationValidator configurationValidator = ConfigurationValidator.NO_OP;
- public Builder(int nodeId) {
+ public Builder(int nodeId, String clusterId) {
this.nodeId = nodeId;
+ this.clusterId = clusterId;
}
public Builder setTime(Time time) {
@@ -241,8 +243,8 @@ public final class QuorumController implements Controller {
KafkaEventQueue queue = null;
try {
queue = new KafkaEventQueue(time, logContext, threadNamePrefix + "QuorumController");
- return new QuorumController(logContext, nodeId, queue, time, configDefs,
- raftClient, supportedFeatures, defaultReplicationFactor,
+ return new QuorumController(logContext, nodeId, clusterId, queue, time,
+ configDefs, raftClient, supportedFeatures, defaultReplicationFactor,
defaultNumPartitions, replicaPlacer, snapshotMaxNewRecordBytes,
sessionTimeoutNs, controllerMetrics, createTopicPolicy,
alterConfigPolicy, configurationValidator);
@@ -1024,6 +1026,11 @@ public final class QuorumController implements Controller {
private final int nodeId;
/**
+ * The ID of this cluster.
+ */
+ private final String clusterId;
+
+ /**
* The single-threaded queue that processes all of our events.
* It also processes timeouts.
*/
@@ -1148,6 +1155,7 @@ public final class QuorumController implements Controller {
private QuorumController(LogContext logContext,
int nodeId,
+ String clusterId,
KafkaEventQueue queue,
Time time,
Map<ConfigResource.Type, ConfigDef> configDefs,
@@ -1165,6 +1173,7 @@ public final class QuorumController implements Controller {
this.logContext = logContext;
this.log = logContext.logger(QuorumController.class);
this.nodeId = nodeId;
+ this.clusterId = clusterId;
this.queue = queue;
this.time = time;
this.controllerMetrics = controllerMetrics;
@@ -1174,7 +1183,7 @@ public final class QuorumController implements Controller {
this.configurationControl = new ConfigurationControlManager(logContext,
snapshotRegistry, configDefs, alterConfigPolicy, configurationValidator);
this.clientQuotaControlManager = new ClientQuotaControlManager(snapshotRegistry);
- this.clusterControl = new ClusterControlManager(logContext, time,
+ this.clusterControl = new ClusterControlManager(logContext, clusterId, time,
snapshotRegistry, sessionTimeoutNs, replicaPlacer, controllerMetrics);
this.featureControl = new FeatureControlManager(supportedFeatures, snapshotRegistry);
this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
@@ -1436,6 +1445,10 @@ public final class QuorumController implements Controller {
return nodeId;
}
+ public String clusterId() {
+ return clusterId;
+ }
+
@Override
public int curClaimEpoch() {
return curClaimEpoch;
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index 16625b5..124cb3d 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -25,7 +25,9 @@ import java.util.Optional;
import java.util.Random;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InconsistentClusterIdException;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointCollection;
import org.apache.kafka.common.metadata.RegisterBrokerRecord;
@@ -35,6 +37,8 @@ import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.metadata.FeatureMap;
+import org.apache.kafka.metadata.FeatureMapAndEpoch;
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.timeline.SnapshotRegistry;
@@ -57,7 +61,7 @@ public class ClusterControlManagerTest {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ClusterControlManager clusterControl = new ClusterControlManager(
- new LogContext(), time, snapshotRegistry, 1000,
+ new LogContext(), Uuid.randomUuid().toString(), time, snapshotRegistry, 1000,
new StripedReplicaPlacer(new Random()), new MockControllerMetrics());
clusterControl.activate();
assertFalse(clusterControl.unfenced(0));
@@ -85,6 +89,24 @@ public class ClusterControlManagerTest {
}
@Test
+ public void testRegistrationWithIncorrectClusterId() throws Exception {
+ SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
+ ClusterControlManager clusterControl = new ClusterControlManager(
+ new LogContext(), "fPZv1VBsRFmnlRvmGcOW9w", new MockTime(0, 0, 0),
+ snapshotRegistry, 1000,
+ new StripedReplicaPlacer(new Random()), new MockControllerMetrics());
+ clusterControl.activate();
+ assertThrows(InconsistentClusterIdException.class, () ->
+ clusterControl.registerBroker(new BrokerRegistrationRequestData().
+ setClusterId("WIjw3grwRZmR2uOpdpVXbg").
+ setBrokerId(0).
+ setRack(null).
+ setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
+ 123L,
+ new FeatureMapAndEpoch(new FeatureMap(Collections.emptyMap()), 456L)));
+ }
+
+ @Test
public void testUnregister() throws Exception {
RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
setBrokerId(1).
@@ -98,7 +120,8 @@ public class ClusterControlManagerTest {
setHost("example.com"));
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ClusterControlManager clusterControl = new ClusterControlManager(
- new LogContext(), new MockTime(0, 0, 0), snapshotRegistry, 1000,
+ new LogContext(), Uuid.randomUuid().toString(), new MockTime(0, 0, 0),
+ snapshotRegistry, 1000,
new StripedReplicaPlacer(new Random()), new MockControllerMetrics());
clusterControl.activate();
clusterControl.replay(brokerRecord);
@@ -121,7 +144,7 @@ public class ClusterControlManagerTest {
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
MockRandom random = new MockRandom();
ClusterControlManager clusterControl = new ClusterControlManager(
- new LogContext(), time, snapshotRegistry, 1000,
+ new LogContext(), Uuid.randomUuid().toString(), time, snapshotRegistry, 1000,
new StripedReplicaPlacer(random), new MockControllerMetrics());
clusterControl.activate();
for (int i = 0; i < numUsableBrokers; i++) {
@@ -158,7 +181,7 @@ public class ClusterControlManagerTest {
MockTime time = new MockTime(0, 0, 0);
SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
ClusterControlManager clusterControl = new ClusterControlManager(
- new LogContext(), time, snapshotRegistry, 1000,
+ new LogContext(), Uuid.randomUuid().toString(), time, snapshotRegistry, 1000,
new StripedReplicaPlacer(new Random()), new MockControllerMetrics());
clusterControl.activate();
assertFalse(clusterControl.unfenced(0));
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
index 990395b..41cee2e 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.controller;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.errors.StaleBrokerEpochException;
import org.apache.kafka.common.errors.UnknownServerException;
import org.apache.kafka.common.metadata.ProducerIdsRecord;
@@ -49,11 +50,12 @@ public class ProducerIdControlManagerTest {
@BeforeEach
public void setUp() {
final LogContext logContext = new LogContext();
+ String clusterId = Uuid.randomUuid().toString();
final MockTime time = new MockTime();
final Random random = new Random();
snapshotRegistry = new SnapshotRegistry(logContext);
clusterControl = new ClusterControlManager(
- logContext, time, snapshotRegistry, 1000,
+ logContext, clusterId, time, snapshotRegistry, 1000,
new StripedReplicaPlacer(random), new MockControllerMetrics());
clusterControl.activate();
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index b079c16..9d2cf77 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -125,7 +125,7 @@ public class QuorumControllerTest {
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS))
) {
controlEnv.activeController().registerBroker(new BrokerRegistrationRequestData().
- setBrokerId(0)).get();
+ setBrokerId(0).setClusterId(logEnv.clusterId())).get();
testConfigurationOperations(controlEnv.activeController());
}
}
@@ -158,7 +158,7 @@ public class QuorumControllerTest {
QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS))
) {
controlEnv.activeController().registerBroker(new BrokerRegistrationRequestData().
- setBrokerId(0)).get();
+ setBrokerId(0).setClusterId(logEnv.clusterId())).get();
testDelayedConfigurationOperations(logEnv, controlEnv.activeController());
}
}
@@ -201,7 +201,7 @@ public class QuorumControllerTest {
CompletableFuture<BrokerRegistrationReply> reply = active.registerBroker(
new BrokerRegistrationRequestData().
setBrokerId(brokerId).
- setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+ setClusterId(active.clusterId()).
setIncarnationId(Uuid.randomUuid()).
setListeners(listeners));
brokerEpochs.put(brokerId, reply.get().epoch());
@@ -274,7 +274,7 @@ public class QuorumControllerTest {
CompletableFuture<BrokerRegistrationReply> reply = active.registerBroker(
new BrokerRegistrationRequestData().
setBrokerId(0).
- setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+ setClusterId(active.clusterId()).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
setListeners(listeners));
assertEquals(0L, reply.get().epoch());
@@ -330,7 +330,7 @@ public class QuorumControllerTest {
new BrokerRegistrationRequestData().
setBrokerId(i).
setRack(null).
- setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+ setClusterId(active.clusterId()).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)).
setListeners(new ListenerCollection(Arrays.asList(new Listener().
setName("PLAINTEXT").setHost("localhost").
@@ -402,7 +402,7 @@ public class QuorumControllerTest {
new BrokerRegistrationRequestData().
setBrokerId(i).
setRack(null).
- setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+ setClusterId(active.clusterId()).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)).
setListeners(new ListenerCollection(Arrays.asList(new Listener().
setName("PLAINTEXT").setHost("localhost").
@@ -457,7 +457,7 @@ public class QuorumControllerTest {
new BrokerRegistrationRequestData().
setBrokerId(i).
setRack(null).
- setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+ setClusterId(active.clusterId()).
setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)).
setListeners(new ListenerCollection(Arrays.asList(new Listener().
setName("PLAINTEXT").setHost("localhost").
@@ -822,7 +822,7 @@ public class QuorumControllerTest {
new BrokerRegistrationRequestData()
.setBrokerId(brokerId)
.setRack(null)
- .setClusterId("06B-K3N1TBCNYFgruEVP0Q")
+ .setClusterId(controller.clusterId())
.setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + brokerId))
.setListeners(
new ListenerCollection(
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index 7487882..f905621 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -58,7 +58,7 @@ public class QuorumControllerTestEnv implements AutoCloseable {
this.controllers = new ArrayList<>(numControllers);
try {
for (int i = 0; i < numControllers; i++) {
- QuorumController.Builder builder = new QuorumController.Builder(i);
+ QuorumController.Builder builder = new QuorumController.Builder(i, logEnv.clusterId());
builder.setRaftClient(logEnv.logManagers().get(i));
if (sessionTimeoutMillis.isPresent()) {
builder.setSessionTimeoutNs(NANOSECONDS.convert(
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 94fbe7c..897970c 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -133,9 +133,14 @@ public class ReplicationControlManagerTest {
final MockTime time = new MockTime();
final MockRandom random = new MockRandom();
final ControllerMetrics metrics = new MockControllerMetrics();
- final ClusterControlManager clusterControl = new ClusterControlManager(
- logContext, time, snapshotRegistry, TimeUnit.MILLISECONDS.convert(BROKER_SESSION_TIMEOUT_MS, TimeUnit.NANOSECONDS),
- new StripedReplicaPlacer(random), metrics);
+ final String clusterId = Uuid.randomUuid().toString();
+ final ClusterControlManager clusterControl = new ClusterControlManager(logContext,
+ clusterId,
+ time,
+ snapshotRegistry,
+ TimeUnit.MILLISECONDS.convert(BROKER_SESSION_TIMEOUT_MS, TimeUnit.NANOSECONDS),
+ new StripedReplicaPlacer(random),
+ metrics);
final ConfigurationControlManager configurationControl = new ConfigurationControlManager(
new LogContext(), snapshotRegistry, Collections.emptyMap(), Optional.empty(),
(__, ___) -> { });
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
index 8a22434..ed18ab6 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
@@ -17,6 +17,7 @@
package org.apache.kafka.metalog;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.metalog.LocalLogManager.SharedLogData;
@@ -37,6 +38,8 @@ public class LocalLogManagerTestEnv implements AutoCloseable {
private static final Logger log =
LoggerFactory.getLogger(LocalLogManagerTestEnv.class);
+ private final String clusterId;
+
/**
* The first error we encountered during this test, or the empty string if we have
* not encountered any.
@@ -75,6 +78,7 @@ public class LocalLogManagerTestEnv implements AutoCloseable {
}
public LocalLogManagerTestEnv(int numManagers, Optional<RawSnapshotReader> snapshot) throws Exception {
+ clusterId = Uuid.randomUuid().toString();
dir = TestUtils.tempDirectory();
shared = new SharedLogData(snapshot);
List<LocalLogManager> newLogManagers = new ArrayList<>(numManagers);
@@ -98,6 +102,10 @@ public class LocalLogManagerTestEnv implements AutoCloseable {
this.logManagers = newLogManagers;
}
+ public String clusterId() {
+ return clusterId;
+ }
+
AtomicReference<String> firstError() {
return firstError;
}