You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/08/09 20:36:21 UTC

[kafka] 01/07: KAFKA-14124: improve quorum controller fault handling (#12447)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 8db3f0998c61bd45a57ca60d17ec238f9864b7b1
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Thu Aug 4 22:49:45 2022 -0700

    KAFKA-14124: improve quorum controller fault handling (#12447)
    
    Before trying to commit a batch of records to the __cluster_metadata log, the active controller
    should try to apply them to its current in-memory state. If this application process fails, the
    active controller process should exit, allowing another node to take leadership. This will prevent
    most bad metadata records from ending up in the log and help to surface errors during testing.
    
    Similarly, if the active controller attempts to renounce leadership, and the renunciation process
    itself fails, the process should exit. This will help avoid bugs where the active controller
    continues in an undefined state.
    
    In contrast, standby controllers that experience metadata application errors should continue on, in
    order to avoid a scenario where a bad record brings down the whole controller cluster.  The
    intended effect of these changes is to make it harder to commit a bad record to the metadata log,
    but to continue to ride out the bad record as well as possible if such a record does get committed.
    
    This PR introduces the FaultHandler interface to implement these concepts. In junit tests, we use a
    FaultHandler implementation which does not exit the process. This allows us to avoid terminating
    the gradle test runner, which would be very disruptive. It also allows us to ensure that the test
    surfaces these exceptions, which we previously were not doing (the mock fault handler stores the
    exception).
    
    In addition to the above, this PR fixes a bug where RaftClient#resign was not being called from the
    renounce() function. This bug could have resulted in the raft layer not being informed of an active
    controller resigning.
    
    Reviewers: David Arthur <mu...@gmail.com>
---
 build.gradle                                       |   2 +
 checkstyle/import-control-core.xml                 |   1 +
 checkstyle/import-control.xml                      |   4 +
 checkstyle/suppressions.xml                        |   2 +
 .../main/scala/kafka/server/ControllerServer.scala |  10 +-
 .../main/scala/kafka/server/KafkaRaftServer.scala  |   6 +-
 .../java/kafka/testkit/KafkaClusterTestKit.java    |  36 +-
 .../kafka/server/QuorumTestHarness.scala           |   6 +
 .../apache/kafka/controller/QuorumController.java  | 382 ++++++++++++---------
 .../metadata/fault/MetadataFaultException.java     |  32 ++
 .../kafka/metadata/fault/MetadataFaultHandler.java |  36 ++
 .../kafka/controller/QuorumControllerTest.java     |  25 ++
 .../kafka/controller/QuorumControllerTestEnv.java  |  15 +
 .../apache/kafka/server/fault/FaultHandler.java    |  58 ++++
 .../server/fault/ProcessExitingFaultHandler.java   |  37 ++
 .../kafka/server/fault/MockFaultHandler.java       |  65 ++++
 .../server/fault/MockFaultHandlerException.java    |  38 ++
 17 files changed, 586 insertions(+), 169 deletions(-)

diff --git a/build.gradle b/build.gradle
index 73c36c31170..aa930ce1bff 100644
--- a/build.gradle
+++ b/build.gradle
@@ -875,6 +875,7 @@ project(':core') {
     testImplementation project(':clients').sourceSets.test.output
     testImplementation project(':metadata').sourceSets.test.output
     testImplementation project(':raft').sourceSets.test.output
+    testImplementation project(':server-common').sourceSets.test.output
     testImplementation libs.bcpkix
     testImplementation libs.mockitoCore
     testImplementation(libs.apacheda) {
@@ -1157,6 +1158,7 @@ project(':metadata') {
     testImplementation libs.slf4jlog4j
     testImplementation project(':clients').sourceSets.test.output
     testImplementation project(':raft').sourceSets.test.output
+    testImplementation project(':server-common').sourceSets.test.output
     generator project(':generator')
   }
 
diff --git a/checkstyle/import-control-core.xml b/checkstyle/import-control-core.xml
index 28b325b093d..4042cba402f 100644
--- a/checkstyle/import-control-core.xml
+++ b/checkstyle/import-control-core.xml
@@ -54,6 +54,7 @@
     <allow pkg="org.apache.kafka.metadata" />
     <allow pkg="org.apache.kafka.metalog" />
     <allow pkg="org.apache.kafka.server.common" />
+    <allow pkg="org.apache.kafka.server.fault" />
   </subpackage>
 
   <subpackage name="tools">
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 211d23ff60a..4b07a26cba5 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -232,6 +232,7 @@
     <allow pkg="org.apache.kafka.raft" />
     <allow pkg="org.apache.kafka.server.authorizer" />
     <allow pkg="org.apache.kafka.server.common" />
+    <allow pkg="org.apache.kafka.server.fault" />
     <allow pkg="org.apache.kafka.server.metrics" />
     <allow pkg="org.apache.kafka.server.policy"/>
     <allow pkg="org.apache.kafka.snapshot" />
@@ -276,6 +277,9 @@
       <allow pkg="org.apache.kafka.controller" />
       <allow pkg="org.apache.kafka.metadata" />
     </subpackage>
+    <subpackage name="fault">
+      <allow pkg="org.apache.kafka.server.fault" />
+    </subpackage>
   </subpackage>
 
   <subpackage name="metalog">
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index f6ca0d02fe3..bec3da1637a 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -39,6 +39,8 @@
     <suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|FinalLocalVariable|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
               files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
     <suppress checks="NPathComplexity" files="(ClusterTestExtensions|KafkaApisBuilder).java"/>
+    <suppress checks="MethodLength"
+              files="(KafkaClusterTestKit).java"/>
 
     <!-- Clients -->
     <suppress id="dontUseSystemExit"
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 67b3f0276d7..28c98643c3a 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -21,7 +21,6 @@ import java.util
 import java.util.OptionalLong
 import java.util.concurrent.locks.ReentrantLock
 import java.util.concurrent.{CompletableFuture, TimeUnit}
-
 import kafka.cluster.Broker.ServerInfo
 import kafka.metrics.{KafkaMetricsGroup, LinuxIoMetricsCollector}
 import kafka.network.{DataPlaneAcceptor, SocketServer}
@@ -45,6 +44,7 @@ import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
+import org.apache.kafka.server.fault.FaultHandler
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
 
@@ -64,7 +64,9 @@ class ControllerServer(
   val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
   val configSchema: KafkaConfigSchema,
   val raftApiVersions: ApiVersions,
-  val bootstrapMetadata: BootstrapMetadata
+  val bootstrapMetadata: BootstrapMetadata,
+  val metadataFaultHandler: FaultHandler,
+  val fatalFaultHandler: FaultHandler,
 ) extends Logging with KafkaMetricsGroup {
   import kafka.server.Server._
 
@@ -194,7 +196,9 @@ class ControllerServer(
           setAlterConfigPolicy(alterConfigPolicy.asJava).
           setConfigurationValidator(new ControllerConfigurationValidator()).
           setStaticConfig(config.originals).
-          setBootstrapMetadata(bootstrapMetadata)
+          setBootstrapMetadata(bootstrapMetadata).
+          setMetadataFaultHandler(metadataFaultHandler).
+          setFatalFaultHandler(fatalFaultHandler)
       }
       authorizer match {
         case Some(a: ClusterMetadataAuthorizer) => controllerBuilder.setAuthorizer(a)
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index 07a31183720..e7cf8f8f1fa 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -29,9 +29,11 @@ import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.utils.{AppInfoParser, Time}
 import org.apache.kafka.common.{KafkaException, Uuid}
 import org.apache.kafka.controller.BootstrapMetadata
+import org.apache.kafka.metadata.fault.MetadataFaultHandler
 import org.apache.kafka.metadata.{KafkaConfigSchema, MetadataRecordSerde}
 import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.apache.kafka.server.fault.ProcessExitingFaultHandler
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 
 import java.nio.file.Paths
@@ -106,7 +108,9 @@ class KafkaRaftServer(
       controllerQuorumVotersFuture,
       KafkaRaftServer.configSchema,
       raftManager.apiVersions,
-      bootstrapMetadata
+      bootstrapMetadata,
+      new MetadataFaultHandler(),
+      new ProcessExitingFaultHandler(),
     ))
   } else {
     None
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index c961d71bbe5..42120324f5f 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -40,6 +40,7 @@ import org.apache.kafka.metadata.MetadataRecordSerde;
 import org.apache.kafka.raft.RaftConfig;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.fault.MockFaultHandler;
 import org.apache.kafka.test.TestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -115,6 +116,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
     public static class Builder {
         private TestKitNodes nodes;
         private Map<String, String> configProps = new HashMap<>();
+        private MockFaultHandler metadataFaultHandler = new MockFaultHandler("metadataFaultHandler");
+        private MockFaultHandler fatalFaultHandler = new MockFaultHandler("fatalFaultHandler");
 
         public Builder(TestKitNodes nodes) {
             this.nodes = nodes;
@@ -190,7 +193,9 @@ public class KafkaClusterTestKit implements AutoCloseable {
                         connectFutureManager.future,
                         KafkaRaftServer.configSchema(),
                         raftManager.apiVersions(),
-                        bootstrapMetadata
+                        bootstrapMetadata,
+                        metadataFaultHandler,
+                        fatalFaultHandler
                     );
                     controllers.put(node.id(), controller);
                     controller.socketServerFirstBoundPortFuture().whenComplete((port, e) -> {
@@ -273,7 +278,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
                 throw e;
             }
             return new KafkaClusterTestKit(executorService, nodes, controllers,
-                brokers, raftManagers, connectFutureManager, baseDirectory);
+                brokers, raftManagers, connectFutureManager, baseDirectory,
+                metadataFaultHandler, fatalFaultHandler);
         }
 
         private String listeners(int node) {
@@ -314,14 +320,20 @@ public class KafkaClusterTestKit implements AutoCloseable {
     private final Map<Integer, KafkaRaftManager<ApiMessageAndVersion>> raftManagers;
     private final ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager;
     private final File baseDirectory;
-
-    private KafkaClusterTestKit(ExecutorService executorService,
-                                TestKitNodes nodes,
-                                Map<Integer, ControllerServer> controllers,
-                                Map<Integer, BrokerServer> brokers,
-                                Map<Integer, KafkaRaftManager<ApiMessageAndVersion>> raftManagers,
-                                ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager,
-                                File baseDirectory) {
+    private final MockFaultHandler metadataFaultHandler;
+    private final MockFaultHandler fatalFaultHandler;
+
+    private KafkaClusterTestKit(
+        ExecutorService executorService,
+        TestKitNodes nodes,
+        Map<Integer, ControllerServer> controllers,
+        Map<Integer, BrokerServer> brokers,
+        Map<Integer, KafkaRaftManager<ApiMessageAndVersion>> raftManagers,
+        ControllerQuorumVotersFutureManager controllerQuorumVotersFutureManager,
+        File baseDirectory,
+        MockFaultHandler metadataFaultHandler,
+        MockFaultHandler fatalFaultHandler
+    ) {
         this.executorService = executorService;
         this.nodes = nodes;
         this.controllers = controllers;
@@ -329,6 +341,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
         this.raftManagers = raftManagers;
         this.controllerQuorumVotersFutureManager = controllerQuorumVotersFutureManager;
         this.baseDirectory = baseDirectory;
+        this.metadataFaultHandler = metadataFaultHandler;
+        this.fatalFaultHandler = fatalFaultHandler;
     }
 
     public void format() throws Exception {
@@ -520,6 +534,8 @@ public class KafkaClusterTestKit implements AutoCloseable {
             executorService.shutdownNow();
             executorService.awaitTermination(5, TimeUnit.MINUTES);
         }
+        metadataFaultHandler.maybeRethrowFirstException();
+        fatalFaultHandler.maybeRethrowFirstException();
     }
 
     private void waitForAllFutures(List<Entry<String, Future<?>>> futureEntries)
diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index b82f86a8cb3..766eb3a7e53 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.controller.BootstrapMetadata
 import org.apache.kafka.metadata.MetadataRecordSerde
 import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec}
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.apache.kafka.server.fault.MockFaultHandler
 import org.apache.zookeeper.client.ZKClientConfig
 import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
 import org.junit.jupiter.api.Assertions._
@@ -179,6 +180,8 @@ abstract class QuorumTestHarness extends Logging {
     }
   }
 
+  val faultHandler = new MockFaultHandler("quorumTestHarnessFaultHandler")
+
   // Note: according to the junit documentation: "JUnit Jupiter does not guarantee the execution
   // order of multiple @BeforeEach methods that are declared within a single test class or test
   // interface." Therefore, if you have things you would like to do before each test case runs, it
@@ -296,6 +299,8 @@ abstract class QuorumTestHarness extends Logging {
         configSchema = KafkaRaftServer.configSchema,
         raftApiVersions = raftManager.apiVersions,
         bootstrapMetadata = BootstrapMetadata.create(metadataVersion, bootstrapRecords.asJava),
+        metadataFaultHandler = faultHandler,
+        fatalFaultHandler = faultHandler,
       )
       controllerServer.socketServerFirstBoundPortFuture.whenComplete((port, e) => {
         if (e != null) {
@@ -362,6 +367,7 @@ abstract class QuorumTestHarness extends Logging {
     }
     System.clearProperty(JaasUtils.JAVA_LOGIN_CONFIG_PARAM)
     Configuration.setConfiguration(null)
+    faultHandler.maybeRethrowFirstException()
   }
 
   // Trigger session expiry by reusing the session id in another client
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 0290e0040c2..a4cc1d92cba 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -91,6 +91,7 @@ import org.apache.kafka.server.authorizer.AclCreateResult;
 import org.apache.kafka.server.authorizer.AclDeleteResult;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.fault.FaultHandler;
 import org.apache.kafka.server.policy.AlterConfigPolicy;
 import org.apache.kafka.server.policy.CreateTopicPolicy;
 import org.apache.kafka.snapshot.SnapshotReader;
@@ -149,6 +150,8 @@ public final class QuorumController implements Controller {
     static public class Builder {
         private final int nodeId;
         private final String clusterId;
+        private FaultHandler fatalFaultHandler = null;
+        private FaultHandler metadataFaultHandler = null;
         private Time time = Time.SYSTEM;
         private String threadNamePrefix = null;
         private LogContext logContext = null;
@@ -175,6 +178,16 @@ public final class QuorumController implements Controller {
             this.clusterId = clusterId;
         }
 
+        public Builder setFatalFaultHandler(FaultHandler fatalFaultHandler) {
+            this.fatalFaultHandler = fatalFaultHandler;
+            return this;
+        }
+
+        public Builder setMetadataFaultHandler(FaultHandler metadataFaultHandler) {
+            this.metadataFaultHandler = metadataFaultHandler;
+            return this;
+        }
+
         public int nodeId() {
             return nodeId;
         }
@@ -287,6 +300,10 @@ public final class QuorumController implements Controller {
                 throw new IllegalStateException("You must specify an initial metadata.version using the kafka-storage tool.");
             } else if (quorumFeatures == null) {
                 throw new IllegalStateException("You must specify the quorum features");
+            } else if (fatalFaultHandler == null) {
+                throw new IllegalStateException("You must specify a fatal fault handler.");
+            } else if (metadataFaultHandler == null) {
+                throw new IllegalStateException("You must specify a metadata fault handler.");
             }
 
             if (threadNamePrefix == null) {
@@ -304,6 +321,8 @@ public final class QuorumController implements Controller {
             try {
                 queue = new KafkaEventQueue(time, logContext, threadNamePrefix + "QuorumController");
                 return new QuorumController(
+                    fatalFaultHandler,
+                    metadataFaultHandler,
                     logContext,
                     nodeId,
                     clusterId,
@@ -426,12 +445,18 @@ public final class QuorumController implements Controller {
                 exception.getClass().getSimpleName(), deltaUs);
             return exception;
         }
-        log.warn("{}: failed with unknown server exception {} at epoch {} in {} us.  " +
-                "Renouncing leadership and reverting to the last committed offset {}.",
-            name, exception.getClass().getSimpleName(), curClaimEpoch, deltaUs,
-            lastCommittedOffset, exception);
-        raftClient.resign(curClaimEpoch);
-        renounce();
+        if (isActiveController()) {
+            log.warn("{}: failed with unknown server exception {} at epoch {} in {} us.  " +
+                    "Renouncing leadership and reverting to the last committed offset {}.",
+                    name, exception.getClass().getSimpleName(), curClaimEpoch, deltaUs,
+                    lastCommittedOffset, exception);
+            renounce();
+        } else {
+            log.warn("{}: failed with unknown server exception {} in {} us.  " +
+                    "The controller is already in standby mode.",
+                    name, exception.getClass().getSimpleName(), deltaUs,
+                    exception);
+        }
         return new UnknownServerException(exception);
     }
 
@@ -702,7 +727,7 @@ public final class QuorumController implements Controller {
             long now = time.nanoseconds();
             controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs));
             int controllerEpoch = curClaimEpoch;
-            if (controllerEpoch == -1) {
+            if (!isActiveController()) {
                 throw newNotControllerException();
             }
             startProcessingTimeNs = OptionalLong.of(now);
@@ -728,9 +753,26 @@ public final class QuorumController implements Controller {
                         "reaches offset {}", this, resultAndOffset.offset());
                 }
             } else {
-                // If the operation returned a batch of records, those records need to be
-                // written before we can return our result to the user.  Here, we hand off
-                // the batch of records to the raft client.  They will be written out
+                // Start by trying to apply the record to our in-memory state. This should always
+                // succeed; if it does not, that's a fatal error. It is important to do this before
+                // scheduling the record for Raft replication.
+                int i = 1;
+                for (ApiMessageAndVersion message : result.records()) {
+                    try {
+                        replay(message.message(), Optional.empty());
+                    } catch (Throwable e) {
+                        String failureMessage = String.format("Unable to apply %s record, which was " +
+                            "%d of %d record(s) in the batch following last writeOffset %d.",
+                            message.message().getClass().getSimpleName(), i, result.records().size(),
+                            writeOffset);
+                        fatalFaultHandler.handleFault(failureMessage, e);
+                    }
+                    i++;
+                }
+
+                // If the operation returned a batch of records, and those records could be applied,
+                // they need to be written before we can return our result to the user.  Here, we
+                // hand off the batch of records to the raft client.  They will be written out
                 // asynchronously.
                 final long offset;
                 if (result.isAtomic()) {
@@ -741,9 +783,6 @@ public final class QuorumController implements Controller {
                 op.processBatchEndOffset(offset);
                 updateWriteOffset(offset);
                 resultAndOffset = ControllerResultAndOffset.of(offset, result);
-                for (ApiMessageAndVersion message : result.records()) {
-                    replay(message.message(), Optional.empty(), offset);
-                }
                 snapshotRegistry.getOrCreateSnapshot(offset);
 
                 log.debug("Read-write operation {} will be completed when the log " +
@@ -789,9 +828,9 @@ public final class QuorumController implements Controller {
         return event.future();
     }
 
-    private <T> CompletableFuture<T> appendWriteEvent(String name,
-                                                      OptionalLong deadlineNs,
-                                                      ControllerWriteOperation<T> op) {
+    <T> CompletableFuture<T> appendWriteEvent(String name,
+                                              OptionalLong deadlineNs,
+                                              ControllerWriteOperation<T> op) {
         ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op);
         if (deadlineNs.isPresent()) {
             queue.appendWithDeadline(deadlineNs.getAsLong(), event);
@@ -841,11 +880,20 @@ public final class QuorumController implements Controller {
                                         "offset {} and epoch {}.", offset, epoch);
                                 }
                             }
-                            for (ApiMessageAndVersion messageAndVersion : messages) {
-                                replay(messageAndVersion.message(), Optional.empty(), offset);
+                            int i = 1;
+                            for (ApiMessageAndVersion message : messages) {
+                                try {
+                                    replay(message.message(), Optional.empty());
+                                } catch (Throwable e) {
+                                    String failureMessage = String.format("Unable to apply %s record on standby " +
+                                            "controller, which was %d of %d record(s) in the batch with baseOffset %d.",
+                                            message.message().getClass().getSimpleName(), i, messages.size(),
+                                            batch.baseOffset());
+                                    metadataFaultHandler.handleFault(failureMessage, e);
+                                }
+                                i++;
                             }
                         }
-
                         updateLastCommittedState(offset, epoch, batch.appendTimestamp());
                         processedRecordsSize += batch.sizeInBytes();
                     }
@@ -862,13 +910,9 @@ public final class QuorumController implements Controller {
             appendRaftEvent(String.format("handleSnapshot[snapshotId=%s]", reader.snapshotId()), () -> {
                 try {
                     if (isActiveController()) {
-                        throw new IllegalStateException(
-                            String.format(
-                                "Asked to load snapshot (%s) when it is the active controller (%d)",
-                                reader.snapshotId(),
-                                curClaimEpoch
-                            )
-                        );
+                        fatalFaultHandler.handleFault(String.format("Asked to load snapshot " +
+                            "(%s) when it is the active controller (%d)", reader.snapshotId(),
+                            curClaimEpoch));
                     }
                     log.info("Starting to replay snapshot ({}), from last commit offset ({}) and epoch ({})",
                         reader.snapshotId(), lastCommittedOffset, lastCommittedEpoch);
@@ -882,26 +926,28 @@ public final class QuorumController implements Controller {
 
                         if (log.isDebugEnabled()) {
                             if (log.isTraceEnabled()) {
-                                log.trace(
-                                    "Replaying snapshot ({}) batch with last offset of {}: {}",
-                                    reader.snapshotId(),
-                                    offset,
-                                    messages
-                                        .stream()
-                                        .map(ApiMessageAndVersion::toString)
-                                        .collect(Collectors.joining(", "))
-                                );
+                                log.trace("Replaying snapshot ({}) batch with last offset of {}: {}",
+                                    reader.snapshotId(), offset, messages.stream().map(ApiMessageAndVersion::toString).
+                                        collect(Collectors.joining(", ")));
                             } else {
-                                log.debug(
-                                    "Replaying snapshot ({}) batch with last offset of {}",
-                                    reader.snapshotId(),
-                                    offset
-                                );
+                                log.debug("Replaying snapshot ({}) batch with last offset of {}",
+                                    reader.snapshotId(), offset);
                             }
                         }
 
-                        for (ApiMessageAndVersion messageAndVersion : messages) {
-                            replay(messageAndVersion.message(), Optional.of(reader.snapshotId()), offset);
+                        int i = 1;
+                        for (ApiMessageAndVersion message : messages) {
+                            try {
+                                replay(message.message(), Optional.of(reader.snapshotId()));
+                            } catch (Throwable e) {
+                                String failureMessage = String.format("Unable to apply %s record " +
+                                        "from snapshot %s on standby controller, which was %d of " +
+                                        "%d record(s) in the batch with baseOffset %d.",
+                                        message.message().getClass().getSimpleName(), reader.snapshotId(),
+                                        i, messages.size(), batch.baseOffset());
+                                metadataFaultHandler.handleFault(failureMessage, e);
+                            }
+                            i++;
                         }
                     }
                     updateLastCommittedState(
@@ -968,10 +1014,14 @@ public final class QuorumController implements Controller {
                             if (exception != null) {
                                 log.error("Failed to bootstrap metadata.", exception);
                                 appendRaftEvent("bootstrapMetadata[" + curClaimEpoch + "]", () -> {
-                                    log.warn("Renouncing the leadership at oldEpoch {} since we could not bootstrap " +
-                                             "metadata. Reverting to last committed offset {}.",
-                                        curClaimEpoch, lastCommittedOffset);
-                                    renounce();
+                                    if (isActiveController()) {
+                                        log.warn("Renouncing the leadership at oldEpoch {} since we could not bootstrap " +
+                                                        "metadata. Reverting to last committed offset {}.",
+                                                curClaimEpoch, lastCommittedOffset);
+                                        renounce();
+                                    } else {
+                                        log.warn("Unable to bootstrap metadata on standby controller.");
+                                    }
                                 });
                             }
                         });
@@ -998,10 +1048,12 @@ public final class QuorumController implements Controller {
                 });
             } else if (isActiveController()) {
                 appendRaftEvent("handleRenounce[" + curClaimEpoch + "]", () -> {
-                    log.warn("Renouncing the leadership at oldEpoch {} due to a metadata " +
-                             "log event. Reverting to last committed offset {}.", curClaimEpoch,
-                        lastCommittedOffset);
-                    renounce();
+                    if (isActiveController()) {
+                        log.warn("Renouncing the leadership at oldEpoch {} due to a metadata " +
+                                "log event. Reverting to last committed offset {}.", curClaimEpoch,
+                                lastCommittedOffset);
+                        renounce();
+                    }
                 });
             }
         }
@@ -1078,26 +1130,34 @@ public final class QuorumController implements Controller {
     }
 
     private void renounce() {
-        curClaimEpoch = -1;
-        controllerMetrics.setActive(false);
-        purgatory.failAll(newNotControllerException());
-
-        if (snapshotRegistry.hasSnapshot(lastCommittedOffset)) {
-            newBytesSinceLastSnapshot = 0;
-            snapshotRegistry.revertToSnapshot(lastCommittedOffset);
-            authorizer.ifPresent(a -> a.loadSnapshot(aclControlManager.idToAcl()));
-        } else {
-            resetState();
-            raftClient.unregister(metaLogListener);
-            metaLogListener = new QuorumMetaLogListener();
-            raftClient.register(metaLogListener);
+        try {
+            if (curClaimEpoch == -1) {
+                throw new RuntimeException("Cannot renounce leadership because we are not the " +
+                        "current leader.");
+            }
+            raftClient.resign(curClaimEpoch);
+            curClaimEpoch = -1;
+            controllerMetrics.setActive(false);
+            purgatory.failAll(newNotControllerException());
+
+            if (snapshotRegistry.hasSnapshot(lastCommittedOffset)) {
+                newBytesSinceLastSnapshot = 0;
+                snapshotRegistry.revertToSnapshot(lastCommittedOffset);
+                authorizer.ifPresent(a -> a.loadSnapshot(aclControlManager.idToAcl()));
+            } else {
+                resetState();
+                raftClient.unregister(metaLogListener);
+                metaLogListener = new QuorumMetaLogListener();
+                raftClient.register(metaLogListener);
+            }
+            updateWriteOffset(-1);
+            clusterControl.deactivate();
+            cancelMaybeFenceReplicas();
+            cancelMaybeBalancePartitionLeaders();
+            cancelNextWriteNoOpRecord();
+        } catch (Throwable e) {
+            fatalFaultHandler.handleFault("exception while renouncing leadership", e);
         }
-
-        updateWriteOffset(-1);
-        clusterControl.deactivate();
-        cancelMaybeFenceReplicas();
-        cancelMaybeBalancePartitionLeaders();
-        cancelNextWriteNoOpRecord();
     }
 
     private <T> void scheduleDeferredWriteEvent(String name, long deadlineNs,
@@ -1246,70 +1306,60 @@ public final class QuorumController implements Controller {
     }
 
     @SuppressWarnings("unchecked")
-    private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId, long offset) {
-        try {
-            MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
-            switch (type) {
-                case REGISTER_BROKER_RECORD:
-                    clusterControl.replay((RegisterBrokerRecord) message);
-                    break;
-                case UNREGISTER_BROKER_RECORD:
-                    clusterControl.replay((UnregisterBrokerRecord) message);
-                    break;
-                case TOPIC_RECORD:
-                    replicationControl.replay((TopicRecord) message);
-                    break;
-                case PARTITION_RECORD:
-                    replicationControl.replay((PartitionRecord) message);
-                    break;
-                case CONFIG_RECORD:
-                    configurationControl.replay((ConfigRecord) message);
-                    break;
-                case PARTITION_CHANGE_RECORD:
-                    replicationControl.replay((PartitionChangeRecord) message);
-                    break;
-                case FENCE_BROKER_RECORD:
-                    clusterControl.replay((FenceBrokerRecord) message);
-                    break;
-                case UNFENCE_BROKER_RECORD:
-                    clusterControl.replay((UnfenceBrokerRecord) message);
-                    break;
-                case REMOVE_TOPIC_RECORD:
-                    replicationControl.replay((RemoveTopicRecord) message);
-                    break;
-                case FEATURE_LEVEL_RECORD:
-                    featureControl.replay((FeatureLevelRecord) message);
-                    handleFeatureControlChange();
-                    break;
-                case CLIENT_QUOTA_RECORD:
-                    clientQuotaControlManager.replay((ClientQuotaRecord) message);
-                    break;
-                case PRODUCER_IDS_RECORD:
-                    producerIdControlManager.replay((ProducerIdsRecord) message);
-                    break;
-                case BROKER_REGISTRATION_CHANGE_RECORD:
-                    clusterControl.replay((BrokerRegistrationChangeRecord) message);
-                    break;
-                case ACCESS_CONTROL_ENTRY_RECORD:
-                    aclControlManager.replay((AccessControlEntryRecord) message, snapshotId);
-                    break;
-                case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
-                    aclControlManager.replay((RemoveAccessControlEntryRecord) message, snapshotId);
-                    break;
-                case NO_OP_RECORD:
-                    // NoOpRecord is an empty record and doesn't need to be replayed
-                    break;
-                default:
-                    throw new RuntimeException("Unhandled record type " + type);
-            }
-        } catch (Exception e) {
-            if (snapshotId.isPresent()) {
-                log.error("Error replaying record {} from snapshot {} at last offset {}.",
-                    message.toString(), snapshotId.get(), offset, e);
-            } else {
-                log.error("Error replaying record {} at last offset {}.",
-                    message.toString(), offset, e);
-            }
+    private void replay(ApiMessage message, Optional<OffsetAndEpoch> snapshotId) {
+        MetadataRecordType type = MetadataRecordType.fromId(message.apiKey());
+        switch (type) {
+            case REGISTER_BROKER_RECORD:
+                clusterControl.replay((RegisterBrokerRecord) message);
+                break;
+            case UNREGISTER_BROKER_RECORD:
+                clusterControl.replay((UnregisterBrokerRecord) message);
+                break;
+            case TOPIC_RECORD:
+                replicationControl.replay((TopicRecord) message);
+                break;
+            case PARTITION_RECORD:
+                replicationControl.replay((PartitionRecord) message);
+                break;
+            case CONFIG_RECORD:
+                configurationControl.replay((ConfigRecord) message);
+                break;
+            case PARTITION_CHANGE_RECORD:
+                replicationControl.replay((PartitionChangeRecord) message);
+                break;
+            case FENCE_BROKER_RECORD:
+                clusterControl.replay((FenceBrokerRecord) message);
+                break;
+            case UNFENCE_BROKER_RECORD:
+                clusterControl.replay((UnfenceBrokerRecord) message);
+                break;
+            case REMOVE_TOPIC_RECORD:
+                replicationControl.replay((RemoveTopicRecord) message);
+                break;
+            case FEATURE_LEVEL_RECORD:
+                featureControl.replay((FeatureLevelRecord) message);
+                handleFeatureControlChange();
+                break;
+            case CLIENT_QUOTA_RECORD:
+                clientQuotaControlManager.replay((ClientQuotaRecord) message);
+                break;
+            case PRODUCER_IDS_RECORD:
+                producerIdControlManager.replay((ProducerIdsRecord) message);
+                break;
+            case BROKER_REGISTRATION_CHANGE_RECORD:
+                clusterControl.replay((BrokerRegistrationChangeRecord) message);
+                break;
+            case ACCESS_CONTROL_ENTRY_RECORD:
+                aclControlManager.replay((AccessControlEntryRecord) message, snapshotId);
+                break;
+            case REMOVE_ACCESS_CONTROL_ENTRY_RECORD:
+                aclControlManager.replay((RemoveAccessControlEntryRecord) message, snapshotId);
+                break;
+            case NO_OP_RECORD:
+                // NoOpRecord is an empty record and doesn't need to be replayed
+                break;
+            default:
+                throw new RuntimeException("Unhandled record type " + type);
         }
     }
 
@@ -1344,8 +1394,24 @@ public final class QuorumController implements Controller {
         updateLastCommittedState(-1, -1, -1);
     }
 
+    /**
+     * Handles faults that should normally be fatal to the process.
+     */
+    private final FaultHandler fatalFaultHandler;
+
+    /**
+     * Handles faults in metadata handling that are normally not fatal.
+     */
+    private final FaultHandler metadataFaultHandler;
+
+    /**
+     * The slf4j log context, used to create new loggers.
+     */
     private final LogContext logContext;
 
+    /**
+     * The slf4j logger.
+     */
     private final Logger log;
 
     /**
@@ -1530,28 +1596,34 @@ public final class QuorumController implements Controller {
 
     private final BootstrapMetadata bootstrapMetadata;
 
-    private QuorumController(LogContext logContext,
-                             int nodeId,
-                             String clusterId,
-                             KafkaEventQueue queue,
-                             Time time,
-                             KafkaConfigSchema configSchema,
-                             RaftClient<ApiMessageAndVersion> raftClient,
-                             QuorumFeatures quorumFeatures,
-                             short defaultReplicationFactor,
-                             int defaultNumPartitions,
-                             ReplicaPlacer replicaPlacer,
-                             long snapshotMaxNewRecordBytes,
-                             OptionalLong leaderImbalanceCheckIntervalNs,
-                             OptionalLong maxIdleIntervalNs,
-                             long sessionTimeoutNs,
-                             ControllerMetrics controllerMetrics,
-                             Optional<CreateTopicPolicy> createTopicPolicy,
-                             Optional<AlterConfigPolicy> alterConfigPolicy,
-                             ConfigurationValidator configurationValidator,
-                             Optional<ClusterMetadataAuthorizer> authorizer,
-                             Map<String, Object> staticConfig,
-                             BootstrapMetadata bootstrapMetadata) {
+    private QuorumController(
+        FaultHandler fatalFaultHandler,
+        FaultHandler metadataFaultHandler,
+        LogContext logContext,
+        int nodeId,
+        String clusterId,
+        KafkaEventQueue queue,
+        Time time,
+        KafkaConfigSchema configSchema,
+        RaftClient<ApiMessageAndVersion> raftClient,
+        QuorumFeatures quorumFeatures,
+        short defaultReplicationFactor,
+        int defaultNumPartitions,
+        ReplicaPlacer replicaPlacer,
+        long snapshotMaxNewRecordBytes,
+        OptionalLong leaderImbalanceCheckIntervalNs,
+        OptionalLong maxIdleIntervalNs,
+        long sessionTimeoutNs,
+        ControllerMetrics controllerMetrics,
+        Optional<CreateTopicPolicy> createTopicPolicy,
+        Optional<AlterConfigPolicy> alterConfigPolicy,
+        ConfigurationValidator configurationValidator,
+        Optional<ClusterMetadataAuthorizer> authorizer,
+        Map<String, Object> staticConfig,
+        BootstrapMetadata bootstrapMetadata
+    ) {
+        this.fatalFaultHandler = fatalFaultHandler;
+        this.metadataFaultHandler = metadataFaultHandler;
         this.logContext = logContext;
         this.log = logContext.logger(QuorumController.class);
         this.nodeId = nodeId;
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultException.java b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultException.java
new file mode 100644
index 00000000000..c57ce46fb35
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.fault;
+
+
+/**
+ * A fault that we encountered while we replayed cluster metadata.
+ */
+public class MetadataFaultException extends RuntimeException {
+    public MetadataFaultException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public MetadataFaultException(String message) {
+        super(message);
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java
new file mode 100644
index 00000000000..e9f71b80e67
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.metadata.fault;
+
+import org.apache.kafka.server.fault.FaultHandler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Handles faults in Kafka metadata management.
+ */
+public class MetadataFaultHandler implements FaultHandler {
+    private static final Logger log = LoggerFactory.getLogger(MetadataFaultHandler.class);
+
+    @Override
+    public void handleFault(String failureMessage, Throwable cause) {
+        FaultHandler.logFailureMessage(log, failureMessage, cause);
+        throw new MetadataFaultException("Encountered metadata fault: " + failureMessage, cause);
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 2cdec699da2..e8392895626 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -45,6 +45,7 @@ import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.message.RequestHeaderData;
+import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.common.config.ConfigResource;
@@ -1182,6 +1183,30 @@ public class QuorumControllerTest {
         }
     }
 
+    @Test
+    public void testFatalMetadataReplayErrorOnActive() throws Throwable {
+        try (LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv(3, Optional.empty())) {
+            try (QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv(logEnv, b -> {
+            })) {
+                QuorumController active = controlEnv.activeController();
+                CompletableFuture<Void> future = active.appendWriteEvent("errorEvent",
+                        OptionalLong.empty(), () -> {
+                            return ControllerResult.of(Collections.singletonList(new ApiMessageAndVersion(
+                                    new ConfigRecord().
+                                            setName(null).
+                                            setResourceName(null).
+                                            setResourceType((byte) 255).
+                                            setValue(null), (short) 0)), null);
+                        });
+                assertThrows(ExecutionException.class, () -> future.get());
+                assertEquals(NullPointerException.class,
+                        controlEnv.fatalFaultHandler().firstException().getCause().getClass());
+                controlEnv.fatalFaultHandler().setIgnore(true);
+                controlEnv.metadataFaultHandler().setIgnore(true);
+            }
+        }
+    }
+
     private static void assertInitialLoadFuturesNotComplete(List<StandardAuthorizer> authorizers) {
         for (int i = 0; i < authorizers.size(); i++) {
             assertFalse(authorizers.get(i).initialLoadFuture().isDone(),
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
index 4cc45a9774b..40dd21c88d3 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTestEnv.java
@@ -34,6 +34,7 @@ import org.apache.kafka.controller.QuorumController.Builder;
 import org.apache.kafka.metalog.LocalLogManagerTestEnv;
 import org.apache.kafka.raft.LeaderAndEpoch;
 import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.fault.MockFaultHandler;
 import org.apache.kafka.test.TestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -44,6 +45,8 @@ public class QuorumControllerTestEnv implements AutoCloseable {
 
     private final List<QuorumController> controllers;
     private final LocalLogManagerTestEnv logEnv;
+    private final MockFaultHandler fatalFaultHandler = new MockFaultHandler("fatalFaultHandler");
+    private final MockFaultHandler metadataFaultHandler = new MockFaultHandler("metadataFaultHandler");
 
     public QuorumControllerTestEnv(
         LocalLogManagerTestEnv logEnv,
@@ -84,6 +87,8 @@ public class QuorumControllerTestEnv implements AutoCloseable {
                 sessionTimeoutMillis.ifPresent(timeout -> {
                     builder.setSessionTimeoutNs(NANOSECONDS.convert(timeout, TimeUnit.MILLISECONDS));
                 });
+                builder.setFatalFaultHandler(fatalFaultHandler);
+                builder.setMetadataFaultHandler(metadataFaultHandler);
                 builderConsumer.accept(builder);
                 this.controllers.add(builder.build());
             }
@@ -117,6 +122,14 @@ public class QuorumControllerTestEnv implements AutoCloseable {
         return controllers;
     }
 
+    public MockFaultHandler fatalFaultHandler() {
+        return fatalFaultHandler;
+    }
+
+    public MockFaultHandler metadataFaultHandler() {
+        return metadataFaultHandler;
+    }
+
     @Override
     public void close() throws InterruptedException {
         for (QuorumController controller : controllers) {
@@ -125,5 +138,7 @@ public class QuorumControllerTestEnv implements AutoCloseable {
         for (QuorumController controller : controllers) {
             controller.close();
         }
+        fatalFaultHandler.maybeRethrowFirstException();
+        metadataFaultHandler.maybeRethrowFirstException();
     }
 }
diff --git a/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandler.java b/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandler.java
new file mode 100644
index 00000000000..4c03eacc32f
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandler.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.fault;
+
+import org.slf4j.Logger;
+
+
+/**
+ * Handle a server fault.
+ */
+public interface FaultHandler {
+    /**
+     * Handle a fault.
+     *
+     * @param failureMessage        The failure message to log.
+     */
+    default void handleFault(String failureMessage) {
+        handleFault(failureMessage, null);
+    }
+
+    /**
+     * Handle a fault.
+     *
+     * @param failureMessage        The failure message to log.
+     * @param cause                 The exception that caused the problem, or null.
+     */
+    void handleFault(String failureMessage, Throwable cause);
+
+    /**
+     * Log a failure message about a fault.
+     *
+     * @param log               The log4j logger.
+     * @param failureMessage    The failure message.
+     * @param cause             The exception which caused the failure, or null.
+     */
+    static void logFailureMessage(Logger log, String failureMessage, Throwable cause) {
+        if (cause == null) {
+            log.error("Encountered fatal fault: {}", failureMessage);
+        } else {
+            log.error("Encountered fatal fault: {}", failureMessage, cause);
+        }
+    }
+}
diff --git a/server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java b/server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java
new file mode 100644
index 00000000000..e3b9f25a3be
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.fault;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.kafka.common.utils.Exit;
+
+
+/**
+ * This is a fault handler which exits the Java process.
+ */
+public class ProcessExitingFaultHandler implements FaultHandler {
+    private static final Logger log = LoggerFactory.getLogger(ProcessExitingFaultHandler.class);
+
+    @Override
+    public void handleFault(String failureMessage, Throwable cause) {
+        FaultHandler.logFailureMessage(log, failureMessage, cause);
+        Exit.exit(1);
+    }
+}
diff --git a/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandler.java b/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandler.java
new file mode 100644
index 00000000000..39b3ed07847
--- /dev/null
+++ b/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandler.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.fault;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * This is a fault handler suitable for use in JUnit tests. It will store the result of the first
+ * call to handleFault that was made.
+ */
+public class MockFaultHandler implements FaultHandler {
+    private static final Logger log = LoggerFactory.getLogger(MockFaultHandler.class);
+
+    private final String name;
+    private MockFaultHandlerException firstException = null;
+    private boolean ignore = false;
+
+    public MockFaultHandler(String name) {
+        this.name = name;
+    }
+
+    @Override
+    public synchronized void handleFault(String failureMessage, Throwable cause) {
+        FaultHandler.logFailureMessage(log, failureMessage, cause);
+        MockFaultHandlerException e = (cause == null) ?
+                new MockFaultHandlerException(name + ": " + failureMessage) :
+                new MockFaultHandlerException(name + ": " + failureMessage +
+                        ": " + cause.getMessage(), cause);
+        if (firstException == null) {
+            firstException = e;
+        }
+        throw e;
+    }
+
+    public synchronized void maybeRethrowFirstException() {
+        if (firstException != null && !ignore) {
+            throw firstException;
+        }
+    }
+
+    public synchronized MockFaultHandlerException firstException() {
+        return firstException;
+    }
+
+    public synchronized void setIgnore(boolean ignore) {
+        this.ignore = ignore;
+    }
+}
diff --git a/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandlerException.java b/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandlerException.java
new file mode 100644
index 00000000000..ef9b11bdeb5
--- /dev/null
+++ b/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandlerException.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.fault;
+
+
+/**
+ * An exception thrown by MockFaultHandler.
+ */
+public class MockFaultHandlerException extends RuntimeException {
+    public MockFaultHandlerException(String failureMessage, Throwable cause) {
+        super(failureMessage, cause);
+        // If a cause exception was provided, set our the stack trace its stack trace. This is
+        // useful in junit tests where a limited number of stack frames are printed, and usually
+        // the stack frames of cause exceptions get trimmed.
+        if (cause != null) {
+            setStackTrace(cause.getStackTrace());
+        }
+    }
+
+    public MockFaultHandlerException(String failureMessage) {
+        this(failureMessage, null);
+    }
+}