You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/05/25 17:37:38 UTC

[GitHub] [kafka] jsancio commented on a diff in pull request #12195: MINOR: implement BrokerRegistrationChangeRecord

jsancio commented on code in PR #12195:
URL: https://github.com/apache/kafka/pull/12195#discussion_r881937519


##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1114,9 +1143,15 @@ void handleBrokerUnregistered(int brokerId, long brokerEpoch,
                                   List<ApiMessageAndVersion> records) {
         generateLeaderAndIsrUpdates("handleBrokerUnregistered", brokerId, NO_LEADER, records,
             brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
-        records.add(new ApiMessageAndVersion(new UnregisterBrokerRecord().
-            setBrokerId(brokerId).setBrokerEpoch(brokerEpoch),
-            UNREGISTER_BROKER_RECORD.highestSupportedVersion()));
+        if (featureControl.metadataVersion().brokerRegistrationChangeRecordSupported()) {
+            records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
+                    setBrokerId(brokerId).setBrokerEpoch(brokerEpoch).setFenced((byte) -1),

Review Comment:
   Can we define the `(byte) -1` constant somewhere? This is used a in few places.



##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -391,41 +393,60 @@ public void replay(UnregisterBrokerRecord record) {
     }
 
     public void replay(FenceBrokerRecord record) {
-        int brokerId = record.id();
-        BrokerRegistration registration = brokerRegistrations.get(brokerId);
-        if (registration == null) {
-            throw new RuntimeException(String.format("Unable to replay %s: no broker " +
-                "registration found for that id", record.toString()));
-        } else if (registration.epoch() !=  record.epoch()) {
-            throw new RuntimeException(String.format("Unable to replay %s: no broker " +
-                "registration with that epoch found", record.toString()));
-        } else {
-            if (heartbeatManager != null) heartbeatManager.register(brokerId, true);
-            brokerRegistrations.put(brokerId, registration.cloneWithFencing(true));
-            updateMetrics(registration, brokerRegistrations.get(brokerId));
-            log.info("Fenced broker: {}", record);
-        }
+        replayRegistrationChange(record, record.id(), record.epoch(), Optional.of(true));
     }
 
     public void replay(UnfenceBrokerRecord record) {
-        int brokerId = record.id();
-        BrokerRegistration registration = brokerRegistrations.get(brokerId);
-        if (registration == null) {
+        replayRegistrationChange(record, record.id(), record.epoch(), Optional.of(false));
+    }
+
+    public void replay(BrokerRegistrationChangeRecord record) {
+        replayRegistrationChange(record, record.brokerId(), record.brokerEpoch(), getFencingChange(record.fenced()));
+    }
+
+    private Optional<Boolean> getFencingChange(byte value) {
+        switch (value) {
+            case -1:
+                return Optional.of(false);
+            case 0:
+                return Optional.empty();
+            case 1:
+                return Optional.of(true);
+            default:
+                throw new RuntimeException(String.format("Invalid value for fenced: %d", value));
+        }
+    }
+
+    private void replayRegistrationChange(
+        ApiMessage record,
+        int brokerId,
+        long brokerEpoch,
+        Optional<Boolean> fenced
+    ) {
+        BrokerRegistration curRegistration = brokerRegistrations.get(brokerId);
+        if (curRegistration == null) {
             throw new RuntimeException(String.format("Unable to replay %s: no broker " +
                 "registration found for that id", record.toString()));
-        } else if (registration.epoch() !=  record.epoch()) {
+        } else if (curRegistration.epoch() != brokerEpoch) {
             throw new RuntimeException(String.format("Unable to replay %s: no broker " +
                 "registration with that epoch found", record.toString()));
         } else {
-            if (heartbeatManager != null) heartbeatManager.register(brokerId, false);
-            brokerRegistrations.put(brokerId, registration.cloneWithFencing(false));
-            updateMetrics(registration, brokerRegistrations.get(brokerId));
-            log.info("Unfenced broker: {}", record);
-        }
-        if (readyBrokersFuture.isPresent()) {
-            if (readyBrokersFuture.get().check()) {
-                readyBrokersFuture.get().future.complete(null);
-                readyBrokersFuture = Optional.empty();
+            BrokerRegistration nextRegistration = curRegistration;
+            if (fenced.isPresent()) {
+                nextRegistration = nextRegistration.cloneWithFencing(fenced.get());
+            }
+            if (!curRegistration.equals(nextRegistration)) {
+                brokerRegistrations.put(brokerId, nextRegistration);
+                updateMetrics(curRegistration, nextRegistration);
+            } else {
+                log.info("Ignoring no-op registration change for {}", curRegistration);
+            }
+            if (heartbeatManager != null) heartbeatManager.register(brokerId, nextRegistration.fenced());

Review Comment:
   When will the heartbeatManager be `null`? Is this only in tests?



##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -391,41 +393,60 @@ public void replay(UnregisterBrokerRecord record) {
     }
 
     public void replay(FenceBrokerRecord record) {
-        int brokerId = record.id();
-        BrokerRegistration registration = brokerRegistrations.get(brokerId);
-        if (registration == null) {
-            throw new RuntimeException(String.format("Unable to replay %s: no broker " +
-                "registration found for that id", record.toString()));
-        } else if (registration.epoch() !=  record.epoch()) {
-            throw new RuntimeException(String.format("Unable to replay %s: no broker " +
-                "registration with that epoch found", record.toString()));
-        } else {
-            if (heartbeatManager != null) heartbeatManager.register(brokerId, true);
-            brokerRegistrations.put(brokerId, registration.cloneWithFencing(true));
-            updateMetrics(registration, brokerRegistrations.get(brokerId));
-            log.info("Fenced broker: {}", record);
-        }
+        replayRegistrationChange(record, record.id(), record.epoch(), Optional.of(true));
     }
 
     public void replay(UnfenceBrokerRecord record) {
-        int brokerId = record.id();
-        BrokerRegistration registration = brokerRegistrations.get(brokerId);
-        if (registration == null) {
+        replayRegistrationChange(record, record.id(), record.epoch(), Optional.of(false));
+    }
+
+    public void replay(BrokerRegistrationChangeRecord record) {
+        replayRegistrationChange(record, record.brokerId(), record.brokerEpoch(), getFencingChange(record.fenced()));
+    }
+
+    private Optional<Boolean> getFencingChange(byte value) {
+        switch (value) {
+            case -1:
+                return Optional.of(false);
+            case 0:
+                return Optional.empty();
+            case 1:
+                return Optional.of(true);

Review Comment:
   Can define and create Java symbols for this constants? E.g. a Java enum. Don't we need to use this constants in the broker's metadata listener and delta?



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -1095,9 +1118,15 @@ void handleBrokerFenced(int brokerId, List<ApiMessageAndVersion> records) {
         }
         generateLeaderAndIsrUpdates("handleBrokerFenced", brokerId, NO_LEADER, records,
             brokersToIsrs.partitionsWithBrokerInIsr(brokerId));
-        records.add(new ApiMessageAndVersion(new FenceBrokerRecord().
-            setId(brokerId).setEpoch(brokerRegistration.epoch()),
-            FENCE_BROKER_RECORD.highestSupportedVersion()));
+        if (featureControl.metadataVersion().brokerRegistrationChangeRecordSupported()) {
+            records.add(new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
+                    setBrokerId(brokerId).setBrokerEpoch(brokerRegistration.epoch()).setFenced((byte) 1),

Review Comment:
   Can we define the `(byte) 1` constant somewhere? This is used a in few places.



##########
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##########
@@ -153,7 +153,10 @@ public enum MetadataVersion {
     IBP_3_2_IV0(4, "3.2", "IV0", false),
 
     // Support for metadata.version feature flag (KIP-778)
-    IBP_3_3_IV0(5, "3.3", "IV0", false);
+    IBP_3_3_IV0(5, "3.3", "IV0", false),
+
+    // In KRaft mode, use BrokerRegistrationChangeRecord instead of UnfenceBrokerRecord and FenceBrokerRecord.
+    IBP_3_3_IV1(6, "3.3", "IV1", false);

Review Comment:
   Shouldn't `didMetadataChange` be set to `true`?



##########
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##########
@@ -225,6 +228,10 @@ public RecordVersion highestSupportedRecordVersion() {
         }
     }
 
+    public boolean brokerRegistrationChangeRecordSupported() {

Review Comment:
   This type tends to prefix this check with "is". For example, "isBrokerRegistrationChangeRecordSupported".



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org