You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/10/13 18:24:05 UTC

[kafka] 02/02: KAFKA-14275; KRaft Controllers should crash after failing to apply any metadata record (#12709)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 7617f1a0c949352512640e947c9b59cf34bdfceb
Author: Niket <ni...@users.noreply.github.com>
AuthorDate: Tue Oct 11 09:46:42 2022 -0700

    KAFKA-14275; KRaft Controllers should crash after failing to apply any metadata record (#12709)
    
    Make all faults in metadata processing on standby controllers be fatal. This is the same behavior-wise as the active controller. This prevents a standby controller from eventually becoming active with incomplete state.
    
    Reviewers: Colin Patrick McCabe <cm...@apache.org>, Jason Gustafson <ja...@confluent.io>
---
 .../main/scala/kafka/server/ControllerServer.scala |  1 -
 .../kafka/server/QuorumTestHarness.scala           |  2 +-
 .../apache/kafka/controller/QuorumController.java  | 20 +--------
 .../kafka/controller/QuorumControllerTest.java     | 49 ++++++++++++++++++++--
 .../kafka/controller/QuorumControllerTestEnv.java  | 31 ++++++++------
 5 files changed, 66 insertions(+), 37 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 441171fe1d9..8e52f30d2ea 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -209,7 +209,6 @@ class ControllerServer(
           setConfigurationValidator(new ControllerConfigurationValidator()).
           setStaticConfig(config.originals).
           setBootstrapMetadata(bootstrapMetadata).
-          setMetadataFaultHandler(metadataFaultHandler).
           setFatalFaultHandler(fatalFaultHandler)
       }
       authorizer match {
diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 6bb2e769cda..f456770f0f7 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -323,7 +323,7 @@ abstract class QuorumTestHarness extends Logging {
         raftApiVersions = raftManager.apiVersions,
         bootstrapMetadata = BootstrapMetadata.fromVersion(metadataVersion, "test harness"),
         metadataFaultHandler = faultHandler,
-        fatalFaultHandler = faultHandler,
+        fatalFaultHandler = faultHandler
       )
       controllerServer.socketServerFirstBoundPortFuture.whenComplete((port, e) => {
         if (e != null) {
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 7ad601b0079..034ef1e216c 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -157,7 +157,6 @@ public final class QuorumController implements Controller {
         private final int nodeId;
         private final String clusterId;
         private FaultHandler fatalFaultHandler = null;
-        private FaultHandler metadataFaultHandler = null;
         private Time time = Time.SYSTEM;
         private String threadNamePrefix = null;
         private LogContext logContext = null;
@@ -190,11 +189,6 @@ public final class QuorumController implements Controller {
             return this;
         }
 
-        public Builder setMetadataFaultHandler(FaultHandler metadataFaultHandler) {
-            this.metadataFaultHandler = metadataFaultHandler;
-            return this;
-        }
-
         public int nodeId() {
             return nodeId;
         }
@@ -314,8 +308,6 @@ public final class QuorumController implements Controller {
                 throw new IllegalStateException("You must specify the quorum features");
             } else if (fatalFaultHandler == null) {
                 throw new IllegalStateException("You must specify a fatal fault handler.");
-            } else if (metadataFaultHandler == null) {
-                throw new IllegalStateException("You must specify a metadata fault handler.");
             }
 
             if (threadNamePrefix == null) {
@@ -334,7 +326,6 @@ public final class QuorumController implements Controller {
                 queue = new KafkaEventQueue(time, logContext, threadNamePrefix + "QuorumController");
                 return new QuorumController(
                     fatalFaultHandler,
-                    metadataFaultHandler,
                     logContext,
                     nodeId,
                     clusterId,
@@ -983,7 +974,7 @@ public final class QuorumController implements Controller {
                                             "controller, which was %d of %d record(s) in the batch with baseOffset %d.",
                                             message.message().getClass().getSimpleName(), i, messages.size(),
                                             batch.baseOffset());
-                                    throw metadataFaultHandler.handleFault(failureMessage, e);
+                                    throw fatalFaultHandler.handleFault(failureMessage, e);
                                 }
                                 i++;
                             }
@@ -1039,7 +1030,7 @@ public final class QuorumController implements Controller {
                                         "%d record(s) in the batch with baseOffset %d.",
                                         message.message().getClass().getSimpleName(), reader.snapshotId(),
                                         i, messages.size(), batch.baseOffset());
-                                throw metadataFaultHandler.handleFault(failureMessage, e);
+                                throw fatalFaultHandler.handleFault(failureMessage, e);
                             }
                             i++;
                         }
@@ -1489,11 +1480,6 @@ public final class QuorumController implements Controller {
      */
     private final FaultHandler fatalFaultHandler;
 
-    /**
-     * Handles faults in metadata handling that are normally not fatal.
-     */
-    private final FaultHandler metadataFaultHandler;
-
     /**
      * The slf4j log context, used to create new loggers.
      */
@@ -1702,7 +1688,6 @@ public final class QuorumController implements Controller {
 
     private QuorumController(
         FaultHandler fatalFaultHandler,
-        FaultHandler metadataFaultHandler,
         LogContext logContext,
         int nodeId,
         String clusterId,
@@ -1728,7 +1713,6 @@ public final class QuorumController implements Controller {
         int maxRecordsPerBatch
     ) {
         this.fatalFaultHandler = fatalFaultHandler;
-        this.metadataFaultHandler = metadataFaultHandler;
         this.logContext = logContext;
         this.log = logContext.logger(QuorumController.class);
         this.nodeId = nodeId;
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index abd214f7e5d..a5e7258a724 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -1315,10 +1315,51 @@ public class QuorumControllerTest {
                                         setValue(null), (short) 0)), null);
                     });
             assertThrows(ExecutionException.class, () -> future.get());
-            assertEquals(NullPointerException.class,
-                    controlEnv.fatalFaultHandler().firstException().getCause().getClass());
-            controlEnv.fatalFaultHandler().setIgnore(true);
-            controlEnv.metadataFaultHandler().setIgnore(true);
+            assertEquals(NullPointerException.class, controlEnv.fatalFaultHandler(active.nodeId())
+                .firstException().getCause().getClass());
+            controlEnv.ignoreFatalFaults();
+        }
+    }
+
+    @Test
+    public void testFatalMetadataErrorDuringSnapshotLoading() throws Exception {
+        InitialSnapshot invalidSnapshot = new InitialSnapshot(Collections.unmodifiableList(Arrays.asList(
+            new ApiMessageAndVersion(new PartitionRecord(), (short) 0)))
+        );
+
+        LocalLogManagerTestEnv.Builder logEnvBuilder = new LocalLogManagerTestEnv.Builder(3)
+            .setSnapshotReader(FileRawSnapshotReader.open(
+                invalidSnapshot.tempDir.toPath(),
+                new OffsetAndEpoch(0, 0)
+            ));
+
+        try (LocalLogManagerTestEnv logEnv = logEnvBuilder.build()) {
+            try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build()) {
+                TestUtils.waitForCondition(() -> controlEnv.controllers().stream().allMatch(controller -> {
+                    return controlEnv.fatalFaultHandler(controller.nodeId()).firstException() != null;
+                }),
+                    "At least one controller failed to detect the fatal fault"
+                );
+                controlEnv.ignoreFatalFaults();
+            }
+        }
+    }
+
+    @Test
+    public void testFatalMetadataErrorDuringLogLoading() throws Exception {
+        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build()) {
+            logEnv.appendInitialRecords(Collections.unmodifiableList(Arrays.asList(
+                new ApiMessageAndVersion(new PartitionRecord(), (short) 0))
+            ));
+
+            try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build()) {
+                TestUtils.waitForCondition(() -> controlEnv.controllers().stream().allMatch(controller -> {
+                    return controlEnv.fatalFaultHandler(controller.nodeId()).firstException() != null;
+                }),
+                    "At least one controller failed to detect the fatal fault"
+                );
+                controlEnv.ignoreFatalFaults();
+            }
         }
     }
 
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index 993ba848aad..b2fe8785661 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -26,7 +26,9 @@ import org.apache.kafka.server.fault.MockFaultHandler;
 import org.apache.kafka.test.TestUtils;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
 import java.util.concurrent.TimeUnit;
@@ -40,8 +42,7 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
 public class QuorumControllerTestEnv implements AutoCloseable {
     private final List<QuorumController> controllers;
     private final LocalLogManagerTestEnv logEnv;
-    private final MockFaultHandler fatalFaultHandler = new MockFaultHandler("fatalFaultHandler");
-    private final MockFaultHandler metadataFaultHandler = new MockFaultHandler("metadataFaultHandler");
+    private final Map<Integer, MockFaultHandler> fatalFaultHandlers = new HashMap<>();
 
     public static class Builder {
         private final LocalLogManagerTestEnv logEnv;
@@ -98,17 +99,18 @@ public class QuorumControllerTestEnv implements AutoCloseable {
         try {
             ApiVersions apiVersions = new ApiVersions();
             List<Integer> nodeIds = IntStream.range(0, numControllers).boxed().collect(Collectors.toList());
-            for (int i = 0; i < numControllers; i++) {
-                QuorumController.Builder builder = new QuorumController.Builder(i, logEnv.clusterId());
-                builder.setRaftClient(logEnv.logManagers().get(i));
+            for (int nodeId = 0; nodeId < numControllers; nodeId++) {
+                QuorumController.Builder builder = new QuorumController.Builder(nodeId, logEnv.clusterId());
+                builder.setRaftClient(logEnv.logManagers().get(nodeId));
                 builder.setBootstrapMetadata(bootstrapMetadata);
                 builder.setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs);
-                builder.setQuorumFeatures(new QuorumFeatures(i, apiVersions, QuorumFeatures.defaultFeatureMap(), nodeIds));
+                builder.setQuorumFeatures(new QuorumFeatures(nodeId, apiVersions, QuorumFeatures.defaultFeatureMap(), nodeIds));
                 sessionTimeoutMillis.ifPresent(timeout -> {
                     builder.setSessionTimeoutNs(NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS));
                 });
+                MockFaultHandler fatalFaultHandler = new MockFaultHandler("fatalFaultHandler");
                 builder.setFatalFaultHandler(fatalFaultHandler);
-                builder.setMetadataFaultHandler(metadataFaultHandler);
+                fatalFaultHandlers.put(nodeId, fatalFaultHandler);
                 controllerBuilderInitializer.accept(builder);
                 this.controllers.add(builder.build());
             }
@@ -142,12 +144,14 @@ public class QuorumControllerTestEnv implements AutoCloseable {
         return controllers;
     }
 
-    public MockFaultHandler fatalFaultHandler() {
-        return fatalFaultHandler;
+    public MockFaultHandler fatalFaultHandler(Integer nodeId) {
+        return fatalFaultHandlers.get(nodeId);
     }
 
-    public MockFaultHandler metadataFaultHandler() {
-        return metadataFaultHandler;
+    public void ignoreFatalFaults() {
+        for (MockFaultHandler faultHandler : fatalFaultHandlers.values()) {
+            faultHandler.setIgnore(true);
+        }
     }
 
     @Override
@@ -158,7 +162,8 @@ public class QuorumControllerTestEnv implements AutoCloseable {
         for (QuorumController controller : controllers) {
             controller.close();
         }
-        fatalFaultHandler.maybeRethrowFirstException();
-        metadataFaultHandler.maybeRethrowFirstException();
+        for (MockFaultHandler faultHandler : fatalFaultHandlers.values()) {
+            faultHandler.maybeRethrowFirstException();
+        }
     }
 }