You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/08/11 07:36:39 UTC

[GitHub] [kafka] dengziming commented on a diff in pull request #12274: KAFKA-13959: Controller should unfence Broker with busy metadata log

dengziming commented on code in PR #12274:
URL: https://github.com/apache/kafka/pull/12274#discussion_r943173917


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1305,12 +1305,11 @@ private void handleFeatureControlChange() {
         }
     }
 
-    @SuppressWarnings("unchecked")
-    private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId) {
+    private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, long offset) {

Review Comment:
   Done



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -759,7 +759,7 @@ public void run() throws Exception {
                 int i = 1;
                 for (ApiMessageAndVersion message : result.records()) {
                     try {
-                        replay(message.message(), Optional.empty());
+                        replay(message.message(), Optional.empty(), writeOffset + i);

Review Comment:
   Thank you for pointing out this.



##########
metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java:
##########
@@ -217,6 +217,14 @@ boolean check() {
      */
     private final TimelineHashMap<Integer, BrokerRegistration> brokerRegistrations;
 
+    /**
+     * Save the offset of each broker registration record, we will only unfence a
+     * broker when its high watermark has reached its broker registration record,
+     * this is not necessarily the exact offset of each broker registration record
+     * but should not be smaller than it.
+     */
+    private final TimelineHashMap<Integer, Long> registerBrokerRecordOffsets;

Review Comment:
   Currently, all fields in `BrokerRegistration` are hard states which means they will all be persisted in `RegisterBrokerRecord`, so I prefer to leave it in a separate field



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1874,8 +1873,13 @@ public CompletableFuture<BrokerHeartbeatReply> processBrokerHeartbeat(
 
                 @Override
                 public ControllerResult<BrokerHeartbeatReply> generateRecordsAndResult() {
+                    Long offsetForRegisterBrokerRecord = clusterControl.registerBrokerRecordOffset(brokerId);
+                    if (offsetForRegisterBrokerRecord == null) {
+                        throw new RuntimeException(
+                            String.format("Receive a heartbeat from broker %d before registration", brokerId));

Review Comment:
   Good catch, I find we have a similar inspection at `ClusterControlManager.checkBrokerEpoch` where we return `StaleBrokerEpochException` if BrokerRegistration is null, I also return a `StaleBrokerEpochException` here to make it consistent.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org