You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/07/30 00:27:34 UTC

[GitHub] [kafka] mumrah commented on a diff in pull request #12447: KAFKA-14124: improve quorum controller fault handling

mumrah commented on code in PR #12447:
URL: https://github.com/apache/kafka/pull/12447#discussion_r933689628


##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -728,6 +746,23 @@ public void run() throws Exception {
                         "reaches offset {}", this, resultAndOffset.offset());
                 }
             } else {
+                // Start by trying to apply the record to our in-memory state. This should always
+                // succeed; if it does not, that's a fatal error. It is important to do this before
+                // scheduling the record for Raft replication.
+                int i = 1;
+                for (ApiMessageAndVersion message : result.records()) {
+                    try {
+                        replay(message.message(), Optional.empty());
+                    } catch (Throwable e) {
+                        String failureMessage = String.format("Unable to apply %s record, which was " +
+                            "%d of %d record(s) in the batch following last writeOffset %d.",
+                            message.message().getClass().getSimpleName(), i, result.records().size(),
+                            writeOffset);
+                        fatalFaultHandler.handleFault(failureMessage, e);
+                    }
+                    i++;
+                }
+
                 // If the operation returned a batch of records, those records need to be

Review Comment:
   We should update this comment to something like "if the records could be applied ... "



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -862,13 +903,9 @@ public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
             appendRaftEvent(String.format("handleSnapshot[snapshotId=%s]", reader.snapshotId()), () -> {
                 try {
                     if (isActiveController()) {
-                        throw new IllegalStateException(
-                            String.format(
-                                "Asked to load snapshot (%s) when it is the active controller (%d)",
-                                reader.snapshotId(),
-                                curClaimEpoch
-                            )
-                        );
+                        fatalFaultHandler.handleFault(String.format("Asked to load snapshot " +
+                            "(%s) when it is the active controller (%d)", reader.snapshotId(),
+                            curClaimEpoch), null);

Review Comment:
   Can call the default method on the fault handler here instead of passing null



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -1246,70 +1293,60 @@ private void handleFeatureControlChange() {
     }
 
     @SuppressWarnings("unchecked")
-    private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, long offset) {
-        try {
-            MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
-            switch (type) {
-                case REGISTER_BROKER_RECORD:
-                    clusterControl.replay((RegisterBrokerRecord) message);
-                    break;
-                case UNREGISTER_BROKER_RECORD:
-                    clusterControl.replay((UnregisterBrokerRecord) message);
-                    break;
-                case TOPIC_RECORD:
-                    replicationControl.replay((TopicRecord) message);
-                    break;
-                case PARTITION_RECORD:
-                    replicationControl.replay((PartitionRecord) message);
-                    break;
-                case CONFIG_RECORD:
-                    configurationControl.replay((ConfigRecord) message);
-                    break;
-                case PARTITION_CHANGE_RECORD:
-                    replicationControl.replay((PartitionChangeRecord) message);
-                    break;
-                case FENCE_BROKER_RECORD:
-                    clusterControl.replay((FenceBrokerRecord) message);
-                    break;
-                case UNFENCE_BROKER_RECORD:
-                    clusterControl.replay((UnfenceBrokerRecord) message);
-                    break;
-                case REMOVE_TOPIC_RECORD:
-                    replicationControl.replay((RemoveTopicRecord) message);
-                    break;
-                case FEATURE_LEVEL_RECORD:
-                    featureControl.replay((FeatureLevelRecord) message);
-                    handleFeatureControlChange();
-                    break;
-                case CLIENT_QUOTA_RECORD:
-                    clientQuotaControlManager.replay((ClientQuotaRecord) message);
-                    break;
-                case PRODUCER_IDS_RECORD:
-                    producerIdControlManager.replay((ProducerIdsRecord) message);
-                    break;
-                case BROKER_REGISTRATION_CHANGE_RECORD:
-                    clusterControl.replay((BrokerRegistrationChangeRecord) message);
-                    break;
-                case ACCESS_CONTROL_ENTRY_RECORD:
-                    aclControlManager.replay((AccessControlEntryRecord) message, snapshotId);
-                    break;
-                case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
-                    aclControlManager.replay((RemoveAccessControlEntryRecord) message, snapshotId);
-                    break;
-                case NO_OP_RECORD:
-                    // NoOpRecord is an empty record and doesn't need to be replayed
-                    break;
-                default:
-                    throw new RuntimeException("Unhandled record type " + type);
-            }
-        } catch (Exception e) {
-            if (snapshotId.isPresent()) {
-                log.error("Error replaying record {} from snapshot {} at last offset {}.",
-                    message.toString(), snapshotId.get(), offset, e);
-            } else {
-                log.error("Error replaying record {} at last offset {}.",
-                    message.toString(), offset, e);
-            }
+    private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId) {
+        MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
+        switch (type) {
+            case REGISTER_BROKER_RECORD:
+                clusterControl.replay((RegisterBrokerRecord) message);
+                break;
+            case UNREGISTER_BROKER_RECORD:
+                clusterControl.replay((UnregisterBrokerRecord) message);
+                break;
+            case TOPIC_RECORD:
+                replicationControl.replay((TopicRecord) message);
+                break;
+            case PARTITION_RECORD:
+                replicationControl.replay((PartitionRecord) message);
+                break;
+            case CONFIG_RECORD:
+                configurationControl.replay((ConfigRecord) message);
+                break;
+            case PARTITION_CHANGE_RECORD:
+                replicationControl.replay((PartitionChangeRecord) message);
+                break;
+            case FENCE_BROKER_RECORD:
+                clusterControl.replay((FenceBrokerRecord) message);
+                break;
+            case UNFENCE_BROKER_RECORD:
+                clusterControl.replay((UnfenceBrokerRecord) message);
+                break;
+            case REMOVE_TOPIC_RECORD:
+                replicationControl.replay((RemoveTopicRecord) message);
+                break;
+            case FEATURE_LEVEL_RECORD:
+                featureControl.replay((FeatureLevelRecord) message);
+                handleFeatureControlChange();
+                break;
+            case CLIENT_QUOTA_RECORD:
+                clientQuotaControlManager.replay((ClientQuotaRecord) message);
+                break;
+            case PRODUCER_IDS_RECORD:
+                producerIdControlManager.replay((ProducerIdsRecord) message);
+                break;
+            case BROKER_REGISTRATION_CHANGE_RECORD:
+                clusterControl.replay((BrokerRegistrationChangeRecord) message);
+                break;
+            case ACCESS_CONTROL_ENTRY_RECORD:
+                aclControlManager.replay((AccessControlEntryRecord) message, snapshotId);
+                break;
+            case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
+                aclControlManager.replay((RemoveAccessControlEntryRecord) message, snapshotId);
+                break;
+            case NO_OP_RECORD:
+                // NoOpRecord is an empty record and doesn't need to be replayed
+                break;
+            default:
+                throw new RuntimeException("Unhandled record type " + type);

Review Comment:
   MetadataFaultException?



##########
metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultException.java:
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.fault;
+
+
+/**
+ * A metadata fault.

Review Comment:
   Can we elaborate on when it's expected to use this exception? Is it just when applying records?



##########
metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java:
##########
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.fault;
+
+import org.apache.kafka.server.fault.FaultHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles faults in Kafka metadata management.
+ */
+public class MetadataFaultHandler implements FaultHandler {
+    private static final Logger log = LoggerFactory.getLogger(MetadataFaultHandler.class);
+
+    @Override
+    public void handleFault(String failureMessage, Throwable cause) {
+        FaultHandler.logFailureMessage(log, failureMessage, cause);

Review Comment:
   Is this where we would increment one of the metrics being discussed in KIP-859?



##########
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##########
@@ -882,26 +919,28 @@ public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
 
                         if (log.isDebugEnabled()) {
                             if (log.isTraceEnabled()) {
-                                log.trace(
-                                    "Replaying snapshot ({}) batch with last offset of {}: {}",
-                                    reader.snapshotId(),
-                                    offset,
-                                    messages
-                                        .stream()
-                                        .map(ApiMessageAndVersion::toString)
-                                        .collect(Collectors.joining(", "))
-                                );
+                                log.trace("Replaying snapshot ({}) batch with last offset of {}: {}",
+                                    reader.snapshotId(), offset, messages.stream().map(ApiMessageAndVersion::toString).
+                                        collect(Collectors.joining(", ")));
                             } else {
-                                log.debug(
-                                    "Replaying snapshot ({}) batch with last offset of {}",
-                                    reader.snapshotId(),
-                                    offset
-                                );
+                                log.debug("Replaying snapshot ({}) batch with last offset of {}",
+                                    reader.snapshotId(), offset);
                             }
                         }
 
-                        for (ApiMessageAndVersion messageAndVersion : messages) {
-                            replay(messageAndVersion.message(), Optional.of(reader.snapshotId()), offset);
+                        int i = 1;
+                        for (ApiMessageAndVersion message : messages) {

Review Comment:
   We have this pattern in three places now in QuorumController. Any benefit of refactoring into a private method? Maybe we add some helper to iterate the messages along with the index?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org