You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/01/06 18:30:13 UTC

[kafka] branch trunk updated: KAFKA-13528: KRaft RegisterBroker should validate that the cluster ID matches (#11593)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8e301b4  KAFKA-13528: KRaft RegisterBroker should validate that the cluster ID matches (#11593)
8e301b4 is described below

commit 8e301b48e937198c65a058518dc7551488aac8fc
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Thu Jan 6 10:28:36 2022 -0800

    KAFKA-13528: KRaft RegisterBroker should validate that the cluster ID matches (#11593)
    
    The KRaft controller should validate that the clusterID matches before allowing a broker to register in
    the cluster.
    
    Reviewers: José Armando García Sancio <js...@gmail.com>
---
 .../main/scala/kafka/server/ControllerServer.scala |  2 +-
 .../kafka/controller/ClusterControlManager.java    | 12 +++++++++
 .../apache/kafka/controller/QuorumController.java  | 21 ++++++++++++---
 .../controller/ClusterControlManagerTest.java      | 31 +++++++++++++++++++---
 .../controller/ProducerIdControlManagerTest.java   |  4 ++-
 .../kafka/controller/QuorumControllerTest.java     | 16 +++++------
 .../kafka/controller/QuorumControllerTestEnv.java  |  2 +-
 .../controller/ReplicationControlManagerTest.java  | 11 +++++---
 .../kafka/metalog/LocalLogManagerTestEnv.java      |  8 ++++++
 9 files changed, 85 insertions(+), 22 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index ede71d4..0402bb3 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -160,7 +160,7 @@ class ControllerServer(
       alterConfigPolicy = Option(config.
         getConfiguredInstance(AlterConfigPolicyClassNameProp, classOf[AlterConfigPolicy]))
 
-      controller = new QuorumController.Builder(config.nodeId).
+      controller = new QuorumController.Builder(config.nodeId, metaProperties.clusterId).
         setTime(time).
         setThreadNamePrefix(threadNamePrefixAsString).
         setConfigDefs(configDefs).
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 5916cdc..b041457 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -19,6 +19,7 @@ package org.apache.kafka.controller;
 
 import org.apache.kafka.common.Endpoint;
 import org.apache.kafka.common.errors.DuplicateBrokerRegistrationException;
+import org.apache.kafka.common.errors.InconsistentClusterIdException;
 import org.apache.kafka.common.errors.StaleBrokerEpochException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.message.BrokerRegistrationRequestData;
@@ -92,6 +93,11 @@ public class ClusterControlManager {
     private final LogContext logContext;
 
     /**
+     * The ID of this cluster.
+     */
+    private final String clusterId;
+
+    /**
      * The SLF4J log object.
      */
     private final Logger log;
@@ -133,12 +139,14 @@ public class ClusterControlManager {
     private Optional<ReadyBrokersFuture> readyBrokersFuture;
 
     ClusterControlManager(LogContext logContext,
+                          String clusterId,
                           Time time,
                           SnapshotRegistry snapshotRegistry,
                           long sessionTimeoutNs,
                           ReplicaPlacer replicaPlacer,
                           ControllerMetrics metrics) {
         this.logContext = logContext;
+        this.clusterId = clusterId;
         this.log = logContext.logger(ClusterControlManager.class);
         this.time = time;
         this.sessionTimeoutNs = sessionTimeoutNs;
@@ -188,6 +196,10 @@ public class ClusterControlManager {
         if (heartbeatManager == null) {
             throw new RuntimeException("ClusterControlManager is not active.");
         }
+        if (!clusterId.equals(request.clusterId())) {
+            throw new InconsistentClusterIdException("Expected cluster ID " + clusterId +
+                ", but got cluster ID " + request.clusterId());
+        }
         int brokerId = request.brokerId();
         BrokerRegistration existing = brokerRegistrations.get(brokerId);
         if (existing != null) {
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 662da8a..37df567 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -128,6 +128,7 @@ public final class QuorumController implements Controller {
      */
     static public class Builder {
         private final int nodeId;
+        private final String clusterId;
         private Time time = Time.SYSTEM;
         private String threadNamePrefix = null;
         private LogContext logContext = null;
@@ -144,8 +145,9 @@ public final class QuorumController implements Controller {
         private Optional<AlterConfigPolicy> alterConfigPolicy = Optional.empty();
         private ConfigurationValidator configurationValidator = ConfigurationValidator.NO_OP;
 
-        public Builder(int nodeId) {
+        public Builder(int nodeId, String clusterId) {
             this.nodeId = nodeId;
+            this.clusterId = clusterId;
         }
 
         public Builder setTime(Time time) {
@@ -241,8 +243,8 @@ public final class QuorumController implements Controller {
             KafkaEventQueue queue = null;
             try {
                 queue = new KafkaEventQueue(time, logContext, threadNamePrefix + "QuorumController");
-                return new QuorumController(logContext, nodeId, queue, time, configDefs,
-                    raftClient, supportedFeatures, defaultReplicationFactor,
+                return new QuorumController(logContext, nodeId, clusterId, queue, time,
+                    configDefs, raftClient, supportedFeatures, defaultReplicationFactor,
                     defaultNumPartitions, replicaPlacer, snapshotMaxNewRecordBytes,
                     sessionTimeoutNs, controllerMetrics, createTopicPolicy,
                     alterConfigPolicy, configurationValidator);
@@ -1024,6 +1026,11 @@ public final class QuorumController implements Controller {
     private final int nodeId;
 
     /**
+     * The ID of this cluster.
+     */
+    private final String clusterId;
+
+    /**
      * The single-threaded queue that processes all of our events.
      * It also processes timeouts.
      */
@@ -1148,6 +1155,7 @@ public final class QuorumController implements Controller {
 
     private QuorumController(LogContext logContext,
                              int nodeId,
+                             String clusterId,
                              KafkaEventQueue queue,
                              Time time,
                              Map<ConfigResource.Type, ConfigDef> configDefs,
@@ -1165,6 +1173,7 @@ public final class QuorumController implements Controller {
         this.logContext = logContext;
         this.log = logContext.logger(QuorumController.class);
         this.nodeId = nodeId;
+        this.clusterId = clusterId;
         this.queue = queue;
         this.time = time;
         this.controllerMetrics = controllerMetrics;
@@ -1174,7 +1183,7 @@ public final class QuorumController implements Controller {
         this.configurationControl = new ConfigurationControlManager(logContext,
             snapshotRegistry, configDefs, alterConfigPolicy, configurationValidator);
         this.clientQuotaControlManager = new ClientQuotaControlManager(snapshotRegistry);
-        this.clusterControl = new ClusterControlManager(logContext, time,
+        this.clusterControl = new ClusterControlManager(logContext, clusterId, time,
             snapshotRegistry, sessionTimeoutNs, replicaPlacer, controllerMetrics);
         this.featureControl = new FeatureControlManager(supportedFeatures, snapshotRegistry);
         this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
@@ -1436,6 +1445,10 @@ public final class QuorumController implements Controller {
         return nodeId;
     }
 
+    public String clusterId() {
+        return clusterId;
+    }
+
     @Override
     public int curClaimEpoch() {
         return curClaimEpoch;
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index 16625b5..124cb3d 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -25,7 +25,9 @@ import java.util.Optional;
 import java.util.Random;
 import org.apache.kafka.common.Endpoint;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.InconsistentClusterIdException;
 import org.apache.kafka.common.errors.StaleBrokerEpochException;
+import org.apache.kafka.common.message.BrokerRegistrationRequestData;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointCollection;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord;
@@ -35,6 +37,8 @@ import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.metadata.BrokerRegistration;
+import org.apache.kafka.metadata.FeatureMap;
+import org.apache.kafka.metadata.FeatureMapAndEpoch;
 import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.timeline.SnapshotRegistry;
@@ -57,7 +61,7 @@ public class ClusterControlManagerTest {
 
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
         ClusterControlManager clusterControl = new ClusterControlManager(
-            new LogContext(), time, snapshotRegistry, 1000,
+            new LogContext(), Uuid.randomUuid().toString(), time, snapshotRegistry, 1000,
                 new StripedReplicaPlacer(new Random()), new MockControllerMetrics());
         clusterControl.activate();
         assertFalse(clusterControl.unfenced(0));
@@ -85,6 +89,24 @@ public class ClusterControlManagerTest {
     }
 
     @Test
+    public void testRegistrationWithIncorrectClusterId() throws Exception {
+        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
+        ClusterControlManager clusterControl = new ClusterControlManager(
+            new LogContext(), "fPZv1VBsRFmnlRvmGcOW9w", new MockTime(0, 0, 0),
+            snapshotRegistry, 1000,
+            new StripedReplicaPlacer(new Random()), new MockControllerMetrics());
+        clusterControl.activate();
+        assertThrows(InconsistentClusterIdException.class, () ->
+            clusterControl.registerBroker(new BrokerRegistrationRequestData().
+                    setClusterId("WIjw3grwRZmR2uOpdpVXbg").
+                    setBrokerId(0).
+                    setRack(null).
+                    setIncarnationId(Uuid.fromString("0H4fUu1xQEKXFYwB1aBjhg")),
+                123L,
+                new FeatureMapAndEpoch(new FeatureMap(Collections.emptyMap()), 456L)));
+    }
+
+    @Test
     public void testUnregister() throws Exception {
         RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
             setBrokerId(1).
@@ -98,7 +120,8 @@ public class ClusterControlManagerTest {
             setHost("example.com"));
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
         ClusterControlManager clusterControl = new ClusterControlManager(
-            new LogContext(), new MockTime(0, 0, 0), snapshotRegistry, 1000,
+            new LogContext(), Uuid.randomUuid().toString(), new MockTime(0, 0, 0),
+            snapshotRegistry, 1000,
             new StripedReplicaPlacer(new Random()), new MockControllerMetrics());
         clusterControl.activate();
         clusterControl.replay(brokerRecord);
@@ -121,7 +144,7 @@ public class ClusterControlManagerTest {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
         MockRandom random = new MockRandom();
         ClusterControlManager clusterControl = new ClusterControlManager(
-            new LogContext(), time, snapshotRegistry, 1000,
+            new LogContext(),  Uuid.randomUuid().toString(), time, snapshotRegistry, 1000,
             new StripedReplicaPlacer(random), new MockControllerMetrics());
         clusterControl.activate();
         for (int i = 0; i < numUsableBrokers; i++) {
@@ -158,7 +181,7 @@ public class ClusterControlManagerTest {
         MockTime time = new MockTime(0, 0, 0);
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
         ClusterControlManager clusterControl = new ClusterControlManager(
-            new LogContext(), time, snapshotRegistry, 1000,
+            new LogContext(), Uuid.randomUuid().toString(), time, snapshotRegistry, 1000,
             new StripedReplicaPlacer(new Random()), new MockControllerMetrics());
         clusterControl.activate();
         assertFalse(clusterControl.unfenced(0));
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
index 990395b..41cee2e 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.controller;
 
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.StaleBrokerEpochException;
 import org.apache.kafka.common.errors.UnknownServerException;
 import org.apache.kafka.common.metadata.ProducerIdsRecord;
@@ -49,11 +50,12 @@ public class ProducerIdControlManagerTest {
     @BeforeEach
     public void setUp() {
         final LogContext logContext = new LogContext();
+        String clusterId = Uuid.randomUuid().toString();
         final MockTime time = new MockTime();
         final Random random = new Random();
         snapshotRegistry = new SnapshotRegistry(logContext);
         clusterControl = new ClusterControlManager(
-            logContext, time, snapshotRegistry, 1000,
+            logContext, clusterId, time, snapshotRegistry, 1000,
             new StripedReplicaPlacer(random), new MockControllerMetrics());
 
         clusterControl.activate();
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index b079c16..9d2cf77 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -125,7 +125,7 @@ public class QuorumControllerTest {
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS))
         ) {
             controlEnv.activeController().registerBroker(new BrokerRegistrationRequestData().
-                setBrokerId(0)).get();
+                setBrokerId(0).setClusterId(logEnv.clusterId())).get();
             testConfigurationOperations(controlEnv.activeController());
         }
     }
@@ -158,7 +158,7 @@ public class QuorumControllerTest {
             QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> b.setConfigDefs(CONFIGS))
         ) {
             controlEnv.activeController().registerBroker(new BrokerRegistrationRequestData().
-                setBrokerId(0)).get();
+                setBrokerId(0).setClusterId(logEnv.clusterId())).get();
             testDelayedConfigurationOperations(logEnv, controlEnv.activeController());
         }
     }
@@ -201,7 +201,7 @@ public class QuorumControllerTest {
                 CompletableFuture<BrokerRegistrationReply> reply = active.registerBroker(
                     new BrokerRegistrationRequestData().
                         setBrokerId(brokerId).
-                        setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+                        setClusterId(active.clusterId()).
                         setIncarnationId(Uuid.randomUuid()).
                         setListeners(listeners));
                 brokerEpochs.put(brokerId, reply.get().epoch());
@@ -274,7 +274,7 @@ public class QuorumControllerTest {
                 CompletableFuture<BrokerRegistrationReply> reply = active.registerBroker(
                     new BrokerRegistrationRequestData().
                         setBrokerId(0).
-                        setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+                        setClusterId(active.clusterId()).
                         setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
                         setListeners(listeners));
                 assertEquals(0L, reply.get().epoch());
@@ -330,7 +330,7 @@ public class QuorumControllerTest {
                         new BrokerRegistrationRequestData().
                             setBrokerId(i).
                             setRack(null).
-                            setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+                            setClusterId(active.clusterId()).
                             setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)).
                             setListeners(new ListenerCollection(Arrays.asList(new Listener().
                                 setName("PLAINTEXT").setHost("localhost").
@@ -402,7 +402,7 @@ public class QuorumControllerTest {
                         new BrokerRegistrationRequestData().
                             setBrokerId(i).
                             setRack(null).
-                            setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+                            setClusterId(active.clusterId()).
                             setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)).
                             setListeners(new ListenerCollection(Arrays.asList(new Listener().
                                 setName("PLAINTEXT").setHost("localhost").
@@ -457,7 +457,7 @@ public class QuorumControllerTest {
                         new BrokerRegistrationRequestData().
                             setBrokerId(i).
                             setRack(null).
-                            setClusterId("06B-K3N1TBCNYFgruEVP0Q").
+                            setClusterId(active.clusterId()).
                             setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)).
                             setListeners(new ListenerCollection(Arrays.asList(new Listener().
                                 setName("PLAINTEXT").setHost("localhost").
@@ -822,7 +822,7 @@ public class QuorumControllerTest {
                 new BrokerRegistrationRequestData()
                     .setBrokerId(brokerId)
                     .setRack(null)
-                    .setClusterId("06B-K3N1TBCNYFgruEVP0Q")
+                    .setClusterId(controller.clusterId())
                     .setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + brokerId))
                     .setListeners(
                         new ListenerCollection(
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index 7487882..f905621 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -58,7 +58,7 @@ public class QuorumControllerTestEnv implements AutoCloseable {
         this.controllers = new ArrayList<>(numControllers);
         try {
             for (int i = 0; i < numControllers; i++) {
-                QuorumController.Builder builder = new QuorumController.Builder(i);
+                QuorumController.Builder builder = new QuorumController.Builder(i, logEnv.clusterId());
                 builder.setRaftClient(logEnv.logManagers().get(i));
                 if (sessionTimeoutMillis.isPresent()) {
                     builder.setSessionTimeoutNs(NANOSECONDS.convert(
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 94fbe7c..897970c 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -133,9 +133,14 @@ public class ReplicationControlManagerTest {
         final MockTime time = new MockTime();
         final MockRandom random = new MockRandom();
         final ControllerMetrics metrics = new MockControllerMetrics();
-        final ClusterControlManager clusterControl = new ClusterControlManager(
-            logContext, time, snapshotRegistry, TimeUnit.MILLISECONDS.convert(BROKER_SESSION_TIMEOUT_MS, TimeUnit.NANOSECONDS),
-            new StripedReplicaPlacer(random), metrics);
+        final String clusterId = Uuid.randomUuid().toString();
+        final ClusterControlManager clusterControl = new ClusterControlManager(logContext,
+            clusterId,
+            time,
+            snapshotRegistry,
+            TimeUnit.MILLISECONDS.convert(BROKER_SESSION_TIMEOUT_MS, TimeUnit.NANOSECONDS),
+            new StripedReplicaPlacer(random),
+            metrics);
         final ConfigurationControlManager configurationControl = new ConfigurationControlManager(
             new LogContext(), snapshotRegistry, Collections.emptyMap(), Optional.empty(),
                 (__, ___) -> { });
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
index 8a22434..ed18ab6 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.metalog;
 
+import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.metalog.LocalLogManager.SharedLogData;
@@ -37,6 +38,8 @@ public class LocalLogManagerTestEnv implements AutoCloseable {
     private static final Logger log =
         LoggerFactory.getLogger(LocalLogManagerTestEnv.class);
 
+    private final String clusterId;
+
     /**
      * The first error we encountered during this test, or the empty string if we have
      * not encountered any.
@@ -75,6 +78,7 @@ public class LocalLogManagerTestEnv implements AutoCloseable {
     }
 
     public LocalLogManagerTestEnv(int numManagers, Optional<RawSnapshotReader> snapshot) throws Exception {
+        clusterId = Uuid.randomUuid().toString();
         dir = TestUtils.tempDirectory();
         shared = new SharedLogData(snapshot);
         List<LocalLogManager> newLogManagers = new ArrayList<>(numManagers);
@@ -98,6 +102,10 @@ public class LocalLogManagerTestEnv implements AutoCloseable {
         this.logManagers = newLogManagers;
     }
 
+    public String clusterId() {
+        return clusterId;
+    }
+
     AtomicReference<String> firstError() {
         return firstError;
     }