You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/04/21 21:58:16 UTC

[kafka] branch trunk updated: KAFKA-13841: Fix a case where we were unable to place on fenced brokers in KRaft mode (#12075)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d480c4aa6e KAFKA-13841: Fix a case where we were unable to place on fenced brokers in KRaft mode (#12075)
d480c4aa6e is described below

commit d480c4aa6e513e36050d8e067931de2270525d18
Author: Colin Patrick McCabe <cm...@confluent.io>
AuthorDate: Thu Apr 21 14:58:02 2022 -0700

    KAFKA-13841: Fix a case where we were unable to place on fenced brokers in KRaft mode (#12075)
    
    This PR fixes a case where we were unable to place on fenced brokers In KRaft mode. Specifically,
    if we had a broker registration in the metadata log, but no associated heartbeat, previously the
    HeartbeatManager would not track the fenced broker. This PR fixes this by adding this logic to the
    metadata log replay path in ClusterControlManager.
    
    Reviewers: David Arthur <mu...@gmail.com>, dengziming <de...@gmail.com>
---
 .../kafka/controller/BrokerHeartbeatManager.java   | 16 +++++++++++++++
 .../kafka/controller/ClusterControlManager.java    | 14 +++++++------
 .../controller/ReplicationControlManager.java      |  2 +-
 .../controller/ReplicationControlManagerTest.java  | 24 +++++++++++++++++++++-
 4 files changed, 48 insertions(+), 8 deletions(-)

diff --git a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
index 2e71b76fcb..f31df917d7 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/BrokerHeartbeatManager.java
@@ -342,6 +342,22 @@ public class BrokerHeartbeatManager {
         }
     }
 
+    /**
+     * Register this broker if we haven't already, and make sure its fencing state is
+     * correct.
+     *
+     * @param brokerId          The broker ID.
+     * @param fenced            True only if the broker is currently fenced.
+     */
+    void register(int brokerId, boolean fenced) {
+        BrokerHeartbeatState broker = brokers.get(brokerId);
+        if (broker == null) {
+            touch(brokerId, fenced, -1);
+        } else if (broker.fenced() != fenced) {
+            touch(brokerId, fenced, broker.metadataOffset);
+        }
+    }
+
     /**
      * Update broker state, including lastContactNs.
      *
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index add0a53a76..538e8a13eb 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -302,7 +302,6 @@ public class ClusterControlManager {
                 if (!existing.incarnationId().equals(request.incarnationId())) {
                     // Remove any existing session for the old broker incarnation.
                     heartbeatManager.remove(brokerId);
-                    existing = null;
                 }
             }
         }
@@ -334,11 +333,7 @@ public class ClusterControlManager {
                 setMaxSupportedVersion(feature.maxSupportedVersion()));
         }
 
-        if (existing == null) {
-            heartbeatManager.touch(brokerId, true, -1);
-        } else {
-            heartbeatManager.touch(brokerId, existing.fenced(), -1);
-        }
+        heartbeatManager.register(brokerId, record.fenced());
 
         List<ApiMessageAndVersion> records = new ArrayList<>();
         records.add(new ApiMessageAndVersion(record,
@@ -366,6 +361,10 @@ public class ClusterControlManager {
                     record.incarnationId(), listeners, features,
                     Optional.ofNullable(record.rack()), record.fenced()));
         updateMetrics(prevRegistration, brokerRegistrations.get(brokerId));
+        if (heartbeatManager != null) {
+            if (prevRegistration != null) heartbeatManager.remove(brokerId);
+            heartbeatManager.register(brokerId, record.fenced());
+        }
         if (prevRegistration == null) {
             log.info("Registered new broker: {}", record);
         } else if (prevRegistration.incarnationId().equals(record.incarnationId())) {
@@ -385,6 +384,7 @@ public class ClusterControlManager {
             throw new RuntimeException(String.format("Unable to replay %s: no broker " +
                 "registration with that epoch found", record.toString()));
         } else {
+            if (heartbeatManager != null) heartbeatManager.remove(brokerId);
             brokerRegistrations.remove(brokerId);
             updateMetrics(registration, brokerRegistrations.get(brokerId));
             log.info("Unregistered broker: {}", record);
@@ -401,6 +401,7 @@ public class ClusterControlManager {
             throw new RuntimeException(String.format("Unable to replay %s: no broker " +
                 "registration with that epoch found", record.toString()));
         } else {
+            if (heartbeatManager != null) heartbeatManager.register(brokerId, true);
             brokerRegistrations.put(brokerId, registration.cloneWithFencing(true));
             updateMetrics(registration, brokerRegistrations.get(brokerId));
             log.info("Fenced broker: {}", record);
@@ -417,6 +418,7 @@ public class ClusterControlManager {
             throw new RuntimeException(String.format("Unable to replay %s: no broker " +
                 "registration with that epoch found", record.toString()));
         } else {
+            if (heartbeatManager != null) heartbeatManager.register(brokerId, false);
             brokerRegistrations.put(brokerId, registration.cloneWithFencing(false));
             updateMetrics(registration, brokerRegistrations.get(brokerId));
             log.info("Unfenced broker: {}", record);
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index c9d6a5997f..dfa3009ea2 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -355,7 +355,7 @@ public class ReplicationControlManager {
     /**
      * A ClusterDescriber which supplies cluster information to our ReplicaPlacer.
      */
-    private final KRaftClusterDescriber clusterDescriber = new KRaftClusterDescriber();
+    final KRaftClusterDescriber clusterDescriber = new KRaftClusterDescriber();
 
     private ReplicationControlManager(
         SnapshotRegistry snapshotRegistry,
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index 0bfb6da7a2..356a627223 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -65,6 +65,7 @@ import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.controller.ReplicationControlManager.KRaftClusterDescriber;
 import org.apache.kafka.metadata.BrokerHeartbeatReply;
 import org.apache.kafka.metadata.BrokerRegistration;
 import org.apache.kafka.metadata.LeaderRecoveryState;
@@ -73,6 +74,7 @@ import org.apache.kafka.metadata.PartitionRegistration;
 import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.metadata.Replicas;
 import org.apache.kafka.metadata.placement.StripedReplicaPlacer;
+import org.apache.kafka.metadata.placement.UsableBroker;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.policy.CreateTopicPolicy;
 import org.apache.kafka.timeline.SnapshotRegistry;
@@ -251,7 +253,7 @@ public class ReplicationControlManagerTest {
         void registerBrokers(Integer... brokerIds) throws Exception {
             for (int brokerId : brokerIds) {
                 RegisterBrokerRecord brokerRecord = new RegisterBrokerRecord().
-                    setBrokerEpoch(brokerId + 100).setBrokerId(brokerId);
+                    setBrokerEpoch(brokerId + 100).setBrokerId(brokerId).setRack(null);
                 brokerRecord.endPoints().add(new RegisterBrokerRecord.BrokerEndpoint().
                     setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
                     setPort((short) 9092 + brokerId).
@@ -1725,4 +1727,24 @@ public class ReplicationControlManagerTest {
         return response;
     }
 
+    @Test
+    public void testKRaftClusterDescriber() throws Exception {
+        ReplicationControlTestContext ctx = new ReplicationControlTestContext();
+        ReplicationControlManager replication = ctx.replicationControl;
+        ctx.registerBrokers(0, 1, 2, 3, 4);
+        ctx.unfenceBrokers(2, 3, 4);
+        ctx.createTestTopic("foo", new int[][]{
+            new int[]{1, 2, 3}, new int[]{2, 3, 4}, new int[]{0, 2, 1}}).topicId();
+        ctx.createTestTopic("bar", new int[][]{
+            new int[]{2, 3, 4}, new int[]{3, 4, 2}}).topicId();
+        KRaftClusterDescriber describer = replication.clusterDescriber;
+        HashSet<UsableBroker> brokers = new HashSet<>();
+        describer.usableBrokers().forEachRemaining(broker -> brokers.add(broker));
+        assertEquals(new HashSet<>(Arrays.asList(
+            new UsableBroker(0, Optional.empty(), true),
+            new UsableBroker(1, Optional.empty(), true),
+            new UsableBroker(2, Optional.empty(), false),
+            new UsableBroker(3, Optional.empty(), false),
+            new UsableBroker(4, Optional.empty(), false))), brokers);
+    }
 }