You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/10/13 18:24:05 UTC
[kafka] 02/02: KAFKA-14275; KRaft Controllers should crash after failing to apply any metadata record (#12709)
This is an automated email from the ASF dual-hosted git repository.
cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 7617f1a0c949352512640e947c9b59cf34bdfceb
Author: Niket <ni...@users.noreply.github.com>
AuthorDate: Tue Oct 11 09:46:42 2022 -0700
KAFKA-14275; KRaft Controllers should crash after failing to apply any metadata record (#12709)
Make all faults in metadata processing on standby controllers be fatal. This is the same behavior-wise as the active controller. This prevents a standby controller from eventually becoming active with incomplete state.
Reviewers: Colin Patrick McCabe <cm...@apache.org>, Jason Gustafson <ja...@confluent.io>
---
.../main/scala/kafka/server/ControllerServer.scala | 1 -
.../kafka/server/QuorumTestHarness.scala | 2 +-
.../apache/kafka/controller/QuorumController.java | 20 +--------
.../kafka/controller/QuorumControllerTest.java | 49 ++++++++++++++++++++--
.../kafka/controller/QuorumControllerTestEnv.java | 31 ++++++++------
5 files changed, 66 insertions(+), 37 deletions(-)
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 441171fe1d9..8e52f30d2ea 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -209,7 +209,6 @@ class ControllerServer(
setConfigurationValidator(new ControllerConfigurationValidator()).
setStaticConfig(config.originals).
setBootstrapMetadata(bootstrapMetadata).
- setMetadataFaultHandler(metadataFaultHandler).
setFatalFaultHandler(fatalFaultHandler)
}
authorizer match {
diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 6bb2e769cda..f456770f0f7 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -323,7 +323,7 @@ abstract class QuorumTestHarness extends Logging {
raftApiVersions = raftManager.apiVersions,
bootstrapMetadata = BootstrapMetadata.fromVersion(metadataVersion, "test harness"),
metadataFaultHandler = faultHandler,
- fatalFaultHandler = faultHandler,
+ fatalFaultHandler = faultHandler
)
controllerServer.socketServerFirstBoundPortFuture.whenComplete((port, e) => {
if (e != null) {
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 7ad601b0079..034ef1e216c 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -157,7 +157,6 @@ public final class QuorumController implements Controller {
private final int nodeId;
private final String clusterId;
private FaultHandler fatalFaultHandler = null;
- private FaultHandler metadataFaultHandler = null;
private Time time = Time.SYSTEM;
private String threadNamePrefix = null;
private LogContext logContext = null;
@@ -190,11 +189,6 @@ public final class QuorumController implements Controller {
return this;
}
- public Builder setMetadataFaultHandler(FaultHandler metadataFaultHandler) {
- this.metadataFaultHandler = metadataFaultHandler;
- return this;
- }
-
public int nodeId() {
return nodeId;
}
@@ -314,8 +308,6 @@ public final class QuorumController implements Controller {
throw new IllegalStateException("You must specify the quorum features");
} else if (fatalFaultHandler == null) {
throw new IllegalStateException("You must specify a fatal fault handler.");
- } else if (metadataFaultHandler == null) {
- throw new IllegalStateException("You must specify a metadata fault handler.");
}
if (threadNamePrefix == null) {
@@ -334,7 +326,6 @@ public final class QuorumController implements Controller {
queue = new KafkaEventQueue(time, logContext, threadNamePrefix + "QuorumController");
return new QuorumController(
fatalFaultHandler,
- metadataFaultHandler,
logContext,
nodeId,
clusterId,
@@ -983,7 +974,7 @@ public final class QuorumController implements Controller {
"controller, which was %d of %d record(s) in the batch with baseOffset %d.",
message.message().getClass().getSimpleName(), i, messages.size(),
batch.baseOffset());
- throw metadataFaultHandler.handleFault(failureMessage, e);
+ throw fatalFaultHandler.handleFault(failureMessage, e);
}
i++;
}
@@ -1039,7 +1030,7 @@ public final class QuorumController implements Controller {
"%d record(s) in the batch with baseOffset %d.",
message.message().getClass().getSimpleName(), reader.snapshotId(),
i, messages.size(), batch.baseOffset());
- throw metadataFaultHandler.handleFault(failureMessage, e);
+ throw fatalFaultHandler.handleFault(failureMessage, e);
}
i++;
}
@@ -1489,11 +1480,6 @@ public final class QuorumController implements Controller {
*/
private final FaultHandler fatalFaultHandler;
- /**
- * Handles faults in metadata handling that are normally not fatal.
- */
- private final FaultHandler metadataFaultHandler;
-
/**
* The slf4j log context, used to create new loggers.
*/
@@ -1702,7 +1688,6 @@ public final class QuorumController implements Controller {
private QuorumController(
FaultHandler fatalFaultHandler,
- FaultHandler metadataFaultHandler,
LogContext logContext,
int nodeId,
String clusterId,
@@ -1728,7 +1713,6 @@ public final class QuorumController implements Controller {
int maxRecordsPerBatch
) {
this.fatalFaultHandler = fatalFaultHandler;
- this.metadataFaultHandler = metadataFaultHandler;
this.logContext = logContext;
this.log = logContext.logger(QuorumController.class);
this.nodeId = nodeId;
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index abd214f7e5d..a5e7258a724 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -1315,10 +1315,51 @@ public class QuorumControllerTest {
setValue(null), (short) 0)), null);
});
assertThrows(ExecutionException.class, () -> future.get());
- assertEquals(NullPointerException.class,
- controlEnv.fatalFaultHandler().firstException().getCause().getClass());
- controlEnv.fatalFaultHandler().setIgnore(true);
- controlEnv.metadataFaultHandler().setIgnore(true);
+ assertEquals(NullPointerException.class, controlEnv.fatalFaultHandler(active.nodeId())
+ .firstException().getCause().getClass());
+ controlEnv.ignoreFatalFaults();
+ }
+ }
+
+ @Test
+ public void testFatalMetadataErrorDuringSnapshotLoading() throws Exception {
+ InitialSnapshot invalidSnapshot = new InitialSnapshot(Collections.unmodifiableList(Arrays.asList(
+ new ApiMessageAndVersion(new PartitionRecord(), (short) 0)))
+ );
+
+ LocalLogManagerTestEnv.Builder logEnvBuilder = new LocalLogManagerTestEnv.Builder(3)
+ .setSnapshotReader(FileRawSnapshotReader.open(
+ invalidSnapshot.tempDir.toPath(),
+ new OffsetAndEpoch(0, 0)
+ ));
+
+ try (LocalLogManagerTestEnv logEnv = logEnvBuilder.build()) {
+ try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build()) {
+ TestUtils.waitForCondition(() -> controlEnv.controllers().stream().allMatch(controller -> {
+ return controlEnv.fatalFaultHandler(controller.nodeId()).firstException() != null;
+ }),
+ "At least one controller failed to detect the fatal fault"
+ );
+ controlEnv.ignoreFatalFaults();
+ }
+ }
+ }
+
+ @Test
+ public void testFatalMetadataErrorDuringLogLoading() throws Exception {
+ try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).build()) {
+ logEnv.appendInitialRecords(Collections.unmodifiableList(Arrays.asList(
+ new ApiMessageAndVersion(new PartitionRecord(), (short) 0))
+ ));
+
+ try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).build()) {
+ TestUtils.waitForCondition(() -> controlEnv.controllers().stream().allMatch(controller -> {
+ return controlEnv.fatalFaultHandler(controller.nodeId()).firstException() != null;
+ }),
+ "At least one controller failed to detect the fatal fault"
+ );
+ controlEnv.ignoreFatalFaults();
+ }
}
}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index 993ba848aad..b2fe8785661 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -26,7 +26,9 @@ import org.apache.kafka.server.fault.MockFaultHandler;
import org.apache.kafka.test.TestUtils;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.OptionalInt;
import java.util.OptionalLong;
import java.util.concurrent.TimeUnit;
@@ -40,8 +42,7 @@ import static java.util.concurrent.TimeUnit.NANOSECONDS;
public class QuorumControllerTestEnv implements AutoCloseable {
private final List<QuorumController> controllers;
private final LocalLogManagerTestEnv logEnv;
- private final MockFaultHandler fatalFaultHandler = new MockFaultHandler("fatalFaultHandler");
- private final MockFaultHandler metadataFaultHandler = new MockFaultHandler("metadataFaultHandler");
+ private final Map<Integer, MockFaultHandler> fatalFaultHandlers = new HashMap<>();
public static class Builder {
private final LocalLogManagerTestEnv logEnv;
@@ -98,17 +99,18 @@ public class QuorumControllerTestEnv implements AutoCloseable {
try {
ApiVersions apiVersions = new ApiVersions();
List<Integer> nodeIds = IntStream.range(0, numControllers).boxed().collect(Collectors.toList());
- for (int i = 0; i < numControllers; i++) {
- QuorumController.Builder builder = new QuorumController.Builder(i, logEnv.clusterId());
- builder.setRaftClient(logEnv.logManagers().get(i));
+ for (int nodeId = 0; nodeId < numControllers; nodeId++) {
+ QuorumController.Builder builder = new QuorumController.Builder(nodeId, logEnv.clusterId());
+ builder.setRaftClient(logEnv.logManagers().get(nodeId));
builder.setBootstrapMetadata(bootstrapMetadata);
builder.setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs);
- builder.setQuorumFeatures(new QuorumFeatures(i, apiVersions, QuorumFeatures.defaultFeatureMap(), nodeIds));
+ builder.setQuorumFeatures(new QuorumFeatures(nodeId, apiVersions, QuorumFeatures.defaultFeatureMap(), nodeIds));
sessionTimeoutMillis.ifPresent(timeout -> {
builder.setSessionTimeoutNs(NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS));
});
+ MockFaultHandler fatalFaultHandler = new MockFaultHandler("fatalFaultHandler");
builder.setFatalFaultHandler(fatalFaultHandler);
- builder.setMetadataFaultHandler(metadataFaultHandler);
+ fatalFaultHandlers.put(nodeId, fatalFaultHandler);
controllerBuilderInitializer.accept(builder);
this.controllers.add(builder.build());
}
@@ -142,12 +144,14 @@ public class QuorumControllerTestEnv implements AutoCloseable {
return controllers;
}
- public MockFaultHandler fatalFaultHandler() {
- return fatalFaultHandler;
+ public MockFaultHandler fatalFaultHandler(Integer nodeId) {
+ return fatalFaultHandlers.get(nodeId);
}
- public MockFaultHandler metadataFaultHandler() {
- return metadataFaultHandler;
+ public void ignoreFatalFaults() {
+ for (MockFaultHandler faultHandler : fatalFaultHandlers.values()) {
+ faultHandler.setIgnore(true);
+ }
}
@Override
@@ -158,7 +162,8 @@ public class QuorumControllerTestEnv implements AutoCloseable {
for (QuorumController controller : controllers) {
controller.close();
}
- fatalFaultHandler.maybeRethrowFirstException();
- metadataFaultHandler.maybeRethrowFirstException();
+ for (MockFaultHandler faultHandler : fatalFaultHandlers.values()) {
+ faultHandler.maybeRethrowFirstException();
+ }
}
}