You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/08/09 22:34:56 UTC

[kafka] branch 3.3 updated: KAFKA-14114: Add Metadata Error Related Metrics

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new f3cf6db3a53 KAFKA-14114: Add Metadata Error Related Metrics
f3cf6db3a53 is described below

commit f3cf6db3a53a032fb0480258b29ecf416e8385d3
Author: Niket Goel <ni...@users.noreply.github.com>
AuthorDate: Wed Jul 27 18:52:33 2022 -0700

    KAFKA-14114: Add Metadata Error Related Metrics
    
    This PR adds in 3 metrics as described in KIP-859:
     kafka.server:type=broker-metadata-metrics,name=metadata-load-error-count
     kafka.server:type=broker-metadata-metrics,name=metadata-apply-error-count
     kafka.controller:type=KafkaController,name=MetadataErrorCount
    
    These metrics are incremented by fault handlers when the appropriate fault happens. Broker-side
    load errors happen in BrokerMetadataListener. Broker-side apply errors happen in the
    BrokerMetadataPublisher. The metric on the controller is incremented when the standby controller
    (not active) encounters a metadata error.
    
    In BrokerMetadataPublisher, try to limit the damage caused by an exception by introducing more
    catch blocks. The only fatal failures here are those that happen during initialization, when we
    initialize the manager objects (these would also be fatal in ZK mode).
    
    In BrokerMetadataListener, try to improve the logging of faults, especially ones that happen when
    replaying a snapshot. Try to limit the damage caused by an exception.
    
    Replace MetadataFaultHandler with LoggingFaultHandler, which is more flexible and takes a Runnable
    argument. Add LoggingFaultHandlerTest.
    
    Make QuorumControllerMetricsTest stricter. Fix a bug where we weren't cleaning up some metrics from
    the yammer registry on close in QuorumControllerMetrics.
    
    Co-author: Colin P. McCabe <cm...@apache.org>
---
 .../src/main/scala/kafka/server/BrokerServer.scala |  15 +-
 .../main/scala/kafka/server/ControllerServer.scala |   5 +-
 .../main/scala/kafka/server/KafkaRaftServer.scala  |  27 +-
 .../server/metadata/BrokerMetadataListener.scala   |  56 ++--
 .../server/metadata/BrokerMetadataPublisher.scala  | 300 +++++++++++++--------
 .../server/metadata/BrokerServerMetrics.scala      |  26 +-
 .../java/kafka/testkit/KafkaClusterTestKit.java    |  17 +-
 .../kafka/server/QuorumTestHarness.scala           |  34 ++-
 .../server/metadata/BrokerServerMetricsTest.scala  |  38 ++-
 .../scala/unit/kafka/metrics/MetricsTest.scala     |  25 +-
 .../metadata/BrokerMetadataListenerTest.scala      |  17 +-
 .../metadata/BrokerMetadataPublisherTest.scala     |  77 ++++--
 .../apache/kafka/controller/ControllerMetrics.java |   4 +
 .../apache/kafka/controller/QuorumController.java  |   8 +-
 .../kafka/controller/QuorumControllerMetrics.java  |  24 ++
 .../metadata/fault/MetadataFaultException.java     |  32 ---
 .../kafka/metadata/fault/MetadataFaultHandler.java |  36 ---
 .../kafka/controller/MockControllerMetrics.java    |  13 +
 .../controller/QuorumControllerMetricsTest.java    |  30 +++
 .../apache/kafka/server/fault/FaultHandler.java    |  25 +-
 .../kafka/server/fault/FaultHandlerException.java} |   8 +-
 .../kafka/server/fault/LoggingFaultHandler.java    |  54 ++++
 .../server/fault/ProcessExitingFaultHandler.java   |   9 +-
 .../server/fault/LoggingFaultHandlerTest.java      |  57 ++++
 .../kafka/server/fault/MockFaultHandler.java       |  20 +-
 25 files changed, 669 insertions(+), 288 deletions(-)

diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 0bdd6734975..1008decadb1 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -50,6 +50,7 @@ import org.apache.kafka.raft.RaftConfig.AddressSpec
 import org.apache.kafka.raft.{RaftClient, RaftConfig}
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.server.fault.FaultHandler
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.snapshot.SnapshotWriter
 
@@ -76,9 +77,13 @@ class BrokerServer(
   val raftManager: RaftManager[ApiMessageAndVersion],
   val time: Time,
   val metrics: Metrics,
+  val brokerMetrics: BrokerServerMetrics,
   val threadNamePrefix: Option[String],
   val initialOfflineDirs: Seq[String],
-  val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]]
+  val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
+  val fatalFaultHandler: FaultHandler,
+  val metadataLoadingFaultHandler: FaultHandler,
+  val metadataPublishingFaultHandler: FaultHandler
 ) extends KafkaBroker {
 
   override def brokerState: BrokerState = Option(lifecycleManager).
@@ -315,8 +320,8 @@ class BrokerServer(
         threadNamePrefix,
         config.metadataSnapshotMaxNewRecordBytes,
         metadataSnapshotter,
-        BrokerServerMetrics(metrics)
-      )
+        brokerMetrics,
+        metadataLoadingFaultHandler)
 
       val networkListeners = new ListenerCollection()
       config.effectiveAdvertisedListeners.foreach { ep =>
@@ -432,7 +437,9 @@ class BrokerServer(
         transactionCoordinator,
         clientQuotaMetadataManager,
         dynamicConfigHandlers.toMap,
-        authorizer)
+        authorizer,
+        fatalFaultHandler,
+        metadataPublishingFaultHandler)
 
       // Tell the metadata listener to start publishing its output, and wait for the first
       // publish operation to complete. This first operation will initialize logManager,
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 212c092e1ab..19a6e307d62 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -37,7 +37,7 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
 import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.common.{ClusterResource, Endpoint}
-import org.apache.kafka.controller.{BootstrapMetadata, Controller, QuorumController, QuorumControllerMetrics, QuorumFeatures}
+import org.apache.kafka.controller.{BootstrapMetadata, Controller, ControllerMetrics, QuorumController, QuorumFeatures}
 import org.apache.kafka.metadata.KafkaConfigSchema
 import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.raft.RaftConfig.AddressSpec
@@ -61,6 +61,7 @@ class ControllerServer(
   val raftManager: RaftManager[ApiMessageAndVersion],
   val time: Time,
   val metrics: Metrics,
+  val controllerMetrics: ControllerMetrics,
   val threadNamePrefix: Option[String],
   val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
   val configSchema: KafkaConfigSchema,
@@ -201,7 +202,7 @@ class ControllerServer(
           setSnapshotMaxNewRecordBytes(config.metadataSnapshotMaxNewRecordBytes).
           setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs).
           setMaxIdleIntervalNs(maxIdleIntervalNs).
-          setMetrics(new QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry(), time)).
+          setMetrics(controllerMetrics).
           setCreateTopicPolicy(createTopicPolicy.asJava).
           setAlterConfigPolicy(alterConfigPolicy.asJava).
           setConfigurationValidator(new ControllerConfigurationValidator()).
diff --git a/core/src/main/scala/kafka/server/KafkaRaftServer.scala b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
index e7cf8f8f1fa..2338ef5e7c4 100644
--- a/core/src/main/scala/kafka/server/KafkaRaftServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaRaftServer.scala
@@ -23,17 +23,17 @@ import kafka.log.{LogConfig, UnifiedLog}
 import kafka.metrics.KafkaMetricsReporter
 import kafka.raft.KafkaRaftManager
 import kafka.server.KafkaRaftServer.{BrokerRole, ControllerRole}
+import kafka.server.metadata.BrokerServerMetrics
 import kafka.utils.{CoreUtils, Logging, Mx4jLoader, VerifiableProperties}
 import org.apache.kafka.common.config.{ConfigDef, ConfigResource}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.utils.{AppInfoParser, Time}
 import org.apache.kafka.common.{KafkaException, Uuid}
-import org.apache.kafka.controller.BootstrapMetadata
-import org.apache.kafka.metadata.fault.MetadataFaultHandler
+import org.apache.kafka.controller.{BootstrapMetadata, QuorumControllerMetrics}
 import org.apache.kafka.metadata.{KafkaConfigSchema, MetadataRecordSerde}
 import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
-import org.apache.kafka.server.fault.ProcessExitingFaultHandler
+import org.apache.kafka.server.fault.{LoggingFaultHandler, ProcessExitingFaultHandler}
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 
 import java.nio.file.Paths
@@ -83,34 +83,49 @@ class KafkaRaftServer(
   )
 
   private val broker: Option[BrokerServer] = if (config.processRoles.contains(BrokerRole)) {
+    val brokerMetrics = BrokerServerMetrics(metrics)
+    val fatalFaultHandler = new ProcessExitingFaultHandler()
+    val metadataLoadingFaultHandler = new LoggingFaultHandler("metadata loading",
+        () => brokerMetrics.metadataLoadErrorCount.getAndIncrement())
+    val metadataApplyingFaultHandler = new LoggingFaultHandler("metadata application",
+      () => brokerMetrics.metadataApplyErrorCount.getAndIncrement())
     Some(new BrokerServer(
       config,
       metaProps,
       raftManager,
       time,
       metrics,
+      brokerMetrics,
       threadNamePrefix,
       offlineDirs,
-      controllerQuorumVotersFuture
+      controllerQuorumVotersFuture,
+      fatalFaultHandler,
+      metadataLoadingFaultHandler,
+      metadataApplyingFaultHandler
     ))
   } else {
     None
   }
 
   private val controller: Option[ControllerServer] = if (config.processRoles.contains(ControllerRole)) {
+    val controllerMetrics = new QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry(), time)
+    val metadataFaultHandler = new LoggingFaultHandler("controller metadata",
+      () => controllerMetrics.incrementMetadataErrorCount())
+    val fatalFaultHandler = new ProcessExitingFaultHandler()
     Some(new ControllerServer(
       metaProps,
       config,
       raftManager,
       time,
       metrics,
+      controllerMetrics,
       threadNamePrefix,
       controllerQuorumVotersFuture,
       KafkaRaftServer.configSchema,
       raftManager.apiVersions,
       bootstrapMetadata,
-      new MetadataFaultHandler(),
-      new ProcessExitingFaultHandler(),
+      metadataFaultHandler,
+      fatalFaultHandler
     ))
   } else {
     None
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index 3b79526a954..3984f467edd 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -19,13 +19,13 @@ package kafka.server.metadata
 import java.util
 import java.util.concurrent.{CompletableFuture, TimeUnit}
 import java.util.function.Consumer
-
 import kafka.metrics.KafkaMetricsGroup
 import org.apache.kafka.image.{MetadataDelta, MetadataImage}
 import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.queue.{EventQueue, KafkaEventQueue}
 import org.apache.kafka.raft.{Batch, BatchReader, LeaderAndEpoch, RaftClient}
 import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.server.fault.FaultHandler
 import org.apache.kafka.snapshot.SnapshotReader
 
 
@@ -40,7 +40,8 @@ class BrokerMetadataListener(
   threadNamePrefix: Option[String],
   val maxBytesBetweenSnapshots: Long,
   val snapshotter: Option[MetadataSnapshotter],
-  brokerMetrics: BrokerServerMetrics
+  brokerMetrics: BrokerServerMetrics,
+  metadataLoadingFaultHandler: FaultHandler
 ) extends RaftClient.Listener[ApiMessageAndVersion] with KafkaMetricsGroup {
   private val logContext = new LogContext(s"[BrokerMetadataListener id=$brokerId] ")
   private val log = logContext.logger(classOf[BrokerMetadataListener])
@@ -109,11 +110,16 @@ class BrokerMetadataListener(
       extends EventQueue.FailureLoggingEvent(log) {
     override def run(): Unit = {
       val results = try {
-        val loadResults = loadBatches(_delta, reader, None, None, None)
+        val loadResults = loadBatches(_delta, reader, None, None, None, None)
         if (isDebugEnabled) {
           debug(s"Loaded new commits: $loadResults")
         }
         loadResults
+      } catch {
+        case e: Throwable =>
+          metadataLoadingFaultHandler.handleFault(s"Unable to load metadata commits " +
+            s"from the BatchReader starting at base offset ${reader.baseOffset()}", e)
+          return
       } finally {
         reader.close()
       }
@@ -156,19 +162,26 @@ class BrokerMetadataListener(
   class HandleSnapshotEvent(reader: SnapshotReader[ApiMessageAndVersion])
     extends EventQueue.FailureLoggingEvent(log) {
     override def run(): Unit = {
+      val snapshotName = s"${reader.snapshotId().offset}-${reader.snapshotId().epoch}"
       try {
-        info(s"Loading snapshot ${reader.snapshotId().offset}-${reader.snapshotId().epoch}.")
+        info(s"Loading snapshot ${snapshotName}")
         _delta = new MetadataDelta(_image) // Discard any previous deltas.
-        val loadResults = loadBatches(
-          _delta,
+        val loadResults = loadBatches(_delta,
           reader,
           Some(reader.lastContainedLogTimestamp),
           Some(reader.lastContainedLogOffset),
-          Some(reader.lastContainedLogEpoch)
-        )
-        _delta.finishSnapshot()
-        info(s"Loaded snapshot ${reader.snapshotId().offset}-${reader.snapshotId().epoch}: " +
-          s"$loadResults")
+          Some(reader.lastContainedLogEpoch),
+          Some(snapshotName))
+        try {
+          _delta.finishSnapshot()
+        } catch {
+          case e: Throwable => metadataLoadingFaultHandler.handleFault(
+              s"Error finishing snapshot ${snapshotName}", e)
+        }
+        info(s"Loaded snapshot ${snapshotName}: ${loadResults}")
+      } catch {
+        case t: Throwable => metadataLoadingFaultHandler.handleFault("Uncaught exception while " +
+          s"loading broker metadata from Metadata snapshot ${snapshotName}", t)
       } finally {
         reader.close()
       }
@@ -201,7 +214,8 @@ class BrokerMetadataListener(
     iterator: util.Iterator[Batch[ApiMessageAndVersion]],
     lastAppendTimestamp: Option[Long],
     lastCommittedOffset: Option[Long],
-    lastCommittedEpoch: Option[Int]
+    lastCommittedEpoch: Option[Int],
+    snapshotName: Option[String]
   ): BatchLoadResults = {
     val startTimeNs = time.nanoseconds()
     var numBatches = 0
@@ -220,12 +234,20 @@ class BrokerMetadataListener(
           trace(s"Metadata batch ${batch.lastOffset}: processing [${index + 1}/${batch.records.size}]:" +
             s" ${messageAndVersion.message}")
         }
-
         _highestOffset = lastCommittedOffset.getOrElse(batch.baseOffset() + index)
-
-        delta.replay(highestMetadataOffset, epoch, messageAndVersion.message())
-        numRecords += 1
-        index += 1
+        try {
+          delta.replay(highestMetadataOffset, epoch, messageAndVersion.message())
+        } catch {
+          case e: Throwable => snapshotName match {
+            case None => metadataLoadingFaultHandler.handleFault(
+              s"Error replaying metadata log record at offset ${_highestOffset}", e)
+            case Some(name) => metadataLoadingFaultHandler.handleFault(
+              s"Error replaying record ${index} from snapshot ${name} at offset ${_highestOffset}", e)
+          }
+        } finally {
+          numRecords += 1
+          index += 1
+        }
       }
       numBytes = numBytes + batch.sizeInBytes()
       metadataBatchSizeHist.update(batch.records().size())
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 212f188504e..0192bb4afcf 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -31,6 +31,7 @@ import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta, TopicsImage}
 import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
 import org.apache.kafka.server.authorizer.Authorizer
+import org.apache.kafka.server.fault.FaultHandler
 
 import scala.collection.mutable
 
@@ -94,15 +95,19 @@ object BrokerMetadataPublisher extends Logging {
   }
 }
 
-class BrokerMetadataPublisher(conf: KafkaConfig,
-                              metadataCache: KRaftMetadataCache,
-                              logManager: LogManager,
-                              replicaManager: ReplicaManager,
-                              groupCoordinator: GroupCoordinator,
-                              txnCoordinator: TransactionCoordinator,
-                              clientQuotaMetadataManager: ClientQuotaMetadataManager,
-                              dynamicConfigHandlers: Map[String, ConfigHandler],
-                              private val _authorizer: Option[Authorizer]) extends MetadataPublisher with Logging {
+class BrokerMetadataPublisher(
+  conf: KafkaConfig,
+  metadataCache: KRaftMetadataCache,
+  logManager: LogManager,
+  replicaManager: ReplicaManager,
+  groupCoordinator: GroupCoordinator,
+  txnCoordinator: TransactionCoordinator,
+  clientQuotaMetadataManager: ClientQuotaMetadataManager,
+  dynamicConfigHandlers: Map[String, ConfigHandler],
+  private val _authorizer: Option[Authorizer],
+  fatalFaultHandler: FaultHandler,
+  metadataPublishingFaultHandler: FaultHandler
+) extends MetadataPublisher with Logging {
   logIdent = s"[BrokerMetadataPublisher id=${conf.nodeId}] "
 
   import BrokerMetadataPublisher._
@@ -125,8 +130,15 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
   override def publish(delta: MetadataDelta, newImage: MetadataImage): Unit = {
     val highestOffsetAndEpoch = newImage.highestOffsetAndEpoch()
 
+    val deltaName = if (_firstPublish) {
+      s"initial MetadataDelta up to ${highestOffsetAndEpoch.offset}"
+    } else {
+      s"MetadataDelta up to ${highestOffsetAndEpoch.offset}"
+    }
     try {
-      trace(s"Publishing delta $delta with highest offset $highestOffsetAndEpoch")
+      if (isTraceEnabled) {
+        trace(s"Publishing delta $delta with highest offset $highestOffsetAndEpoch")
+      }
 
       // Publish the new metadata image to the metadata cache.
       metadataCache.setImage(newImage)
@@ -151,37 +163,50 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
 
       // Apply topic deltas.
       Option(delta.topicsDelta()).foreach { topicsDelta =>
-        // Notify the replica manager about changes to topics.
-        replicaManager.applyDelta(topicsDelta, newImage)
-
-        // Update the group coordinator of local changes
-        updateCoordinator(
-          newImage,
-          delta,
-          Topic.GROUP_METADATA_TOPIC_NAME,
-          groupCoordinator.onElection,
-          groupCoordinator.onResignation
-        )
-
-        // Update the transaction coordinator of local changes
-        updateCoordinator(
-          newImage,
-          delta,
-          Topic.TRANSACTION_STATE_TOPIC_NAME,
-          txnCoordinator.onElection,
-          txnCoordinator.onResignation
-        )
-
-        // Notify the group coordinator about deleted topics.
-        val deletedTopicPartitions = new mutable.ArrayBuffer[TopicPartition]()
-        topicsDelta.deletedTopicIds().forEach { id =>
-          val topicImage = topicsDelta.image().getTopic(id)
-          topicImage.partitions().keySet().forEach {
-            id => deletedTopicPartitions += new TopicPartition(topicImage.name(), id)
-          }
+        try {
+          // Notify the replica manager about changes to topics.
+          replicaManager.applyDelta(topicsDelta, newImage)
+        } catch {
+          case t: Throwable => metadataPublishingFaultHandler.handleFault("Error applying topics " +
+            s"delta in ${deltaName}", t)
+        }
+        try {
+          // Update the group coordinator of local changes
+          updateCoordinator(newImage,
+            delta,
+            Topic.GROUP_METADATA_TOPIC_NAME,
+            groupCoordinator.onElection,
+            groupCoordinator.onResignation)
+        } catch {
+          case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " +
+            s"coordinator with local changes in ${deltaName}", t)
+        }
+        try {
+          // Update the transaction coordinator of local changes
+          updateCoordinator(newImage,
+            delta,
+            Topic.TRANSACTION_STATE_TOPIC_NAME,
+            txnCoordinator.onElection,
+            txnCoordinator.onResignation)
+        } catch {
+          case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating txn " +
+            s"coordinator with local changes in ${deltaName}", t)
         }
-        if (deletedTopicPartitions.nonEmpty) {
-          groupCoordinator.handleDeletedPartitions(deletedTopicPartitions, RequestLocal.NoCaching)
+        try {
+          // Notify the group coordinator about deleted topics.
+          val deletedTopicPartitions = new mutable.ArrayBuffer[TopicPartition]()
+          topicsDelta.deletedTopicIds().forEach { id =>
+            val topicImage = topicsDelta.image().getTopic(id)
+            topicImage.partitions().keySet().forEach {
+              id => deletedTopicPartitions += new TopicPartition(topicImage.name(), id)
+            }
+          }
+          if (deletedTopicPartitions.nonEmpty) {
+            groupCoordinator.handleDeletedPartitions(deletedTopicPartitions, RequestLocal.NoCaching)
+          }
+        } catch {
+          case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating group " +
+            s"coordinator with deleted partitions in ${deltaName}", t)
         }
       }
 
@@ -191,39 +216,62 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
           val props = newImage.configs().configProperties(resource)
           resource.`type`() match {
             case TOPIC =>
-              // Apply changes to a topic's dynamic configuration.
-              info(s"Updating topic ${resource.name()} with new configuration : " +
-                toLoggableProps(resource, props).mkString(","))
-              dynamicConfigHandlers(ConfigType.Topic).
-                processConfigChanges(resource.name(), props)
+              try {
+                // Apply changes to a topic's dynamic configuration.
+                info(s"Updating topic ${resource.name()} with new configuration : " +
+                  toLoggableProps(resource, props).mkString(","))
+                dynamicConfigHandlers(ConfigType.Topic).
+                  processConfigChanges(resource.name(), props)
+              } catch {
+                case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating topic " +
+                  s"${resource.name()} with new configuration: ${toLoggableProps(resource, props).mkString(",")} " +
+                  s"in ${deltaName}", t)
+              }
             case BROKER =>
               if (resource.name().isEmpty) {
-                // Apply changes to "cluster configs" (also known as default BROKER configs).
-                // These are stored in KRaft with an empty name field.
-                info("Updating cluster configuration : " +
-                  toLoggableProps(resource, props).mkString(","))
-                dynamicConfigHandlers(ConfigType.Broker).
-                  processConfigChanges(ConfigEntityName.Default, props)
+                try {
+                  // Apply changes to "cluster configs" (also known as default BROKER configs).
+                  // These are stored in KRaft with an empty name field.
+                  info("Updating cluster configuration : " +
+                    toLoggableProps(resource, props).mkString(","))
+                  dynamicConfigHandlers(ConfigType.Broker).
+                    processConfigChanges(ConfigEntityName.Default, props)
+                } catch {
+                  case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating " +
+                    s"cluster with new configuration: ${toLoggableProps(resource, props).mkString(",")} " +
+                    s"in ${deltaName}", t)
+                }
               } else if (resource.name() == brokerId.toString) {
-                // Apply changes to this broker's dynamic configuration.
-                info(s"Updating broker $brokerId with new configuration : " +
-                  toLoggableProps(resource, props).mkString(","))
-                dynamicConfigHandlers(ConfigType.Broker).
-                  processConfigChanges(resource.name(), props)
-                // When applying a per broker config (not a cluster config), we also
-                // reload any associated file. For example, if the ssl.keystore is still
-                // set to /tmp/foo, we still want to reload /tmp/foo in case its contents
-                // have changed. This doesn't apply to topic configs or cluster configs.
-                reloadUpdatedFilesWithoutConfigChange(props)
+                try {
+                  // Apply changes to this broker's dynamic configuration.
+                  info(s"Updating broker $brokerId with new configuration : " +
+                    toLoggableProps(resource, props).mkString(","))
+                  dynamicConfigHandlers(ConfigType.Broker).
+                    processConfigChanges(resource.name(), props)
+                  // When applying a per broker config (not a cluster config), we also
+                  // reload any associated file. For example, if the ssl.keystore is still
+                  // set to /tmp/foo, we still want to reload /tmp/foo in case its contents
+                  // have changed. This doesn't apply to topic configs or cluster configs.
+                  reloadUpdatedFilesWithoutConfigChange(props)
+                } catch {
+                  case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating " +
+                    s"broker with new configuration: ${toLoggableProps(resource, props).mkString(",")} " +
+                    s"in ${deltaName}", t)
+                }
               }
             case _ => // nothing to do
           }
         }
       }
 
-      // Apply client quotas delta.
-      Option(delta.clientQuotasDelta()).foreach { clientQuotasDelta =>
-        clientQuotaMetadataManager.update(clientQuotasDelta)
+      try {
+        // Apply client quotas delta.
+        Option(delta.clientQuotasDelta()).foreach { clientQuotasDelta =>
+          clientQuotaMetadataManager.update(clientQuotasDelta)
+        }
+      } catch {
+        case t: Throwable => metadataPublishingFaultHandler.handleFault("Error updating client " +
+          s"quotas in ${deltaName}", t)
       }
 
       // Apply changes to ACLs. This needs to be handled carefully because while we are
@@ -235,20 +283,30 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
       Option(delta.aclsDelta()).foreach( aclsDelta =>
         _authorizer match {
           case Some(authorizer: ClusterMetadataAuthorizer) => if (aclsDelta.isSnapshotDelta) {
-            // If the delta resulted from a snapshot load, we want to apply the new changes
-            // all at once using ClusterMetadataAuthorizer#loadSnapshot. If this is the
-            // first snapshot load, it will also complete the futures returned by
-           // Authorizer#start (which we wait for before processing RPCs).
-            authorizer.loadSnapshot(newImage.acls().acls())
+            try {
+              // If the delta resulted from a snapshot load, we want to apply the new changes
+              // all at once using ClusterMetadataAuthorizer#loadSnapshot. If this is the
+              // first snapshot load, it will also complete the futures returned by
+              // Authorizer#start (which we wait for before processing RPCs).
+              authorizer.loadSnapshot(newImage.acls().acls())
+            } catch {
+              case t: Throwable => metadataPublishingFaultHandler.handleFault("Error loading " +
+                s"authorizer snapshot in ${deltaName}", t)
+            }
           } else {
-            // Because the changes map is a LinkedHashMap, the deltas will be returned in
-            // the order they were performed.
-            aclsDelta.changes().entrySet().forEach(e =>
-              if (e.getValue.isPresent) {
-                authorizer.addAcl(e.getKey, e.getValue.get())
-              } else {
-                authorizer.removeAcl(e.getKey)
-              })
+            try {
+              // Because the changes map is a LinkedHashMap, the deltas will be returned in
+              // the order they were performed.
+              aclsDelta.changes().entrySet().forEach(e =>
+                if (e.getValue.isPresent) {
+                  authorizer.addAcl(e.getKey, e.getValue.get())
+                } else {
+                  authorizer.removeAcl(e.getKey)
+                })
+            } catch {
+              case t: Throwable => metadataPublishingFaultHandler.handleFault("Error loading " +
+                s"authorizer changes in ${deltaName}", t)
+            }
           }
           case _ => // No ClusterMetadataAuthorizer is configured. There is nothing to do.
         })
@@ -258,8 +316,8 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
       }
       publishedOffsetAtomic.set(newImage.highestOffsetAndEpoch().offset)
     } catch {
-      case t: Throwable => error(s"Error publishing broker metadata at $highestOffsetAndEpoch", t)
-        throw t
+      case t: Throwable => metadataPublishingFaultHandler.handleFault("Uncaught exception while " +
+        s"publishing broker metadata from ${deltaName}", t)
     } finally {
       _firstPublish = false
     }
@@ -282,7 +340,7 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
    * @param resignation function to call on resignation; the first parameter is the partition id;
    *                    the second parameter is the leader epoch
    */
-  private def updateCoordinator(
+  def updateCoordinator(
     image: MetadataImage,
     delta: MetadataDelta,
     topicName: String,
@@ -317,38 +375,60 @@ class BrokerMetadataPublisher(conf: KafkaConfig,
   }
 
   private def initializeManagers(): Unit = {
-    // Start log manager, which will perform (potentially lengthy)
-    // recovery-from-unclean-shutdown if required.
-    logManager.startup(metadataCache.getAllTopics())
-
-    // Make the LogCleaner available for reconfiguration. We can't do this prior to this
-    // point because LogManager#startup creates the LogCleaner object, if
-    // log.cleaner.enable is true. TODO: improve this (see KAFKA-13610)
-    Option(logManager.cleaner).foreach(conf.dynamicConfig.addBrokerReconfigurable)
-
-    // Start the replica manager.
-    replicaManager.startup()
-
-    // Start the group coordinator.
-    groupCoordinator.startup(() => metadataCache.numPartitions(
-      Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(conf.offsetsTopicPartitions))
-
-    // Start the transaction coordinator.
-    txnCoordinator.startup(() => metadataCache.numPartitions(
-      Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(conf.transactionTopicPartitions))
+    try {
+      // Start log manager, which will perform (potentially lengthy)
+      // recovery-from-unclean-shutdown if required.
+      logManager.startup(metadataCache.getAllTopics())
+
+      // Make the LogCleaner available for reconfiguration. We can't do this prior to this
+      // point because LogManager#startup creates the LogCleaner object, if
+      // log.cleaner.enable is true. TODO: improve this (see KAFKA-13610)
+      Option(logManager.cleaner).foreach(conf.dynamicConfig.addBrokerReconfigurable)
+    } catch {
+      case t: Throwable => fatalFaultHandler.handleFault("Error starting LogManager", t)
+    }
+    try {
+      // Start the replica manager.
+      replicaManager.startup()
+    } catch {
+      case t: Throwable => fatalFaultHandler.handleFault("Error starting ReplicaManager", t)
+    }
+    try {
+      // Start the group coordinator.
+      groupCoordinator.startup(() => metadataCache.numPartitions(
+        Topic.GROUP_METADATA_TOPIC_NAME).getOrElse(conf.offsetsTopicPartitions))
+    } catch {
+      case t: Throwable => fatalFaultHandler.handleFault("Error starting GroupCoordinator", t)
+    }
+    try {
+      // Start the transaction coordinator.
+      txnCoordinator.startup(() => metadataCache.numPartitions(
+        Topic.TRANSACTION_STATE_TOPIC_NAME).getOrElse(conf.transactionTopicPartitions))
+    } catch {
+      case t: Throwable => fatalFaultHandler.handleFault("Error starting TransactionCoordinator", t)
+    }
   }
 
   private def finishInitializingReplicaManager(newImage: MetadataImage): Unit = {
-    // Delete log directories which we're not supposed to have, according to the
-    // latest metadata. This is only necessary to do when we're first starting up. If
-    // we have to load a snapshot later, these topics will appear in deletedTopicIds.
-    val strayPartitions = findStrayPartitions(brokerId, newImage.topics, logManager.allLogs)
-    if (strayPartitions.nonEmpty) {
-      replicaManager.deleteStrayReplicas(strayPartitions)
+    try {
+      // Delete log directories which we're not supposed to have, according to the
+      // latest metadata. This is only necessary to do when we're first starting up. If
+      // we have to load a snapshot later, these topics will appear in deletedTopicIds.
+      val strayPartitions = findStrayPartitions(brokerId, newImage.topics, logManager.allLogs)
+      if (strayPartitions.nonEmpty) {
+        replicaManager.deleteStrayReplicas(strayPartitions)
+      }
+    } catch {
+      case t: Throwable => metadataPublishingFaultHandler.handleFault("Error deleting stray " +
+        "partitions during startup", t)
     }
-
-    // Make sure that the high water mark checkpoint thread is running for the replica
-    // manager.
-    replicaManager.startHighWatermarkCheckPointThread()
-  }
+    try {
+      // Make sure that the high water mark checkpoint thread is running for the replica
+      // manager.
+      replicaManager.startHighWatermarkCheckPointThread()
+    } catch {
+      case t: Throwable => metadataPublishingFaultHandler.handleFault("Error starting high " +
+        "watermark checkpoint thread during startup", t)
+    }
+}
 }
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
index 0db6f0071c4..3e68ae85f92 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
@@ -28,6 +28,8 @@ final class BrokerServerMetrics private (metrics: Metrics) extends AutoCloseable
 
   val lastAppliedRecordOffset: AtomicLong = new AtomicLong(0)
   val lastAppliedRecordTimestamp: AtomicLong = new AtomicLong(0)
+  val metadataLoadErrorCount: AtomicLong = new AtomicLong(0)
+  val metadataApplyErrorCount: AtomicLong = new AtomicLong(0)
 
   val lastAppliedRecordOffsetName = metrics.metricName(
     "last-applied-record-offset",
@@ -47,6 +49,18 @@ final class BrokerServerMetrics private (metrics: Metrics) extends AutoCloseable
     "The difference between now and the timestamp of the last record from the cluster metadata partition that was applied by the broker"
   )
 
+  val metadataLoadErrorCountName = metrics.metricName(
+    "metadata-load-error-count",
+    metricGroupName,
+    "The number of errors encountered by the BrokerMetadataListener while loading the metadata log and generating a new MetadataDelta based on it."
+  )
+
+  val metadataApplyErrorCountName = metrics.metricName(
+    "metadata-apply-error-count",
+    metricGroupName,
+    "The number of errors encountered by the BrokerMetadataPublisher while applying a new MetadataImage based on the latest MetadataDelta."
+  )
+
   addMetric(metrics, lastAppliedRecordOffsetName) { _ =>
     lastAppliedRecordOffset.get
   }
@@ -59,11 +73,21 @@ final class BrokerServerMetrics private (metrics: Metrics) extends AutoCloseable
     now - lastAppliedRecordTimestamp.get
   }
 
+  addMetric(metrics, metadataLoadErrorCountName) { _ =>
+    metadataLoadErrorCount.get
+  }
+
+  addMetric(metrics, metadataApplyErrorCountName) { _ =>
+    metadataApplyErrorCount.get
+  }
+
   override def close(): Unit = {
     List(
       lastAppliedRecordOffsetName,
       lastAppliedRecordTimestampName,
-      lastAppliedRecordLagMsName
+      lastAppliedRecordLagMsName,
+      metadataLoadErrorCountName,
+      metadataApplyErrorCountName
     ).foreach(metrics.removeMetric)
   }
 }
diff --git a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
index 42120324f5f..ecee13c4982 100644
--- a/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
+++ b/core/src/test/java/kafka/testkit/KafkaClusterTestKit.java
@@ -24,6 +24,7 @@ import kafka.server.KafkaConfig;
 import kafka.server.KafkaConfig$;
 import kafka.server.KafkaRaftServer;
 import kafka.server.MetaProperties;
+import kafka.server.metadata.BrokerServerMetrics$;
 import kafka.tools.StorageTool;
 import kafka.utils.Logging;
 import org.apache.kafka.clients.CommonClientConfigs;
@@ -36,6 +37,7 @@ import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.controller.BootstrapMetadata;
 import org.apache.kafka.controller.Controller;
+import org.apache.kafka.controller.MockControllerMetrics;
 import org.apache.kafka.metadata.MetadataRecordSerde;
 import org.apache.kafka.raft.RaftConfig;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
@@ -128,6 +130,11 @@ public class KafkaClusterTestKit implements AutoCloseable {
             return this;
         }
 
+        public Builder setMetadataFaultHandler(MockFaultHandler metadataFaultHandler) {
+            this.metadataFaultHandler = metadataFaultHandler;
+            return this;
+        }
+
         public KafkaClusterTestKit build() throws Exception {
             Map<Integer, ControllerServer> controllers = new HashMap<>();
             Map<Integer, BrokerServer> brokers = new HashMap<>();
@@ -189,6 +196,7 @@ public class KafkaClusterTestKit implements AutoCloseable {
                         raftManager,
                         Time.SYSTEM,
                         new Metrics(),
+                        new MockControllerMetrics(),
                         Option.apply(threadNamePrefix),
                         connectFutureManager.future,
                         KafkaRaftServer.configSchema(),
@@ -245,15 +253,20 @@ public class KafkaClusterTestKit implements AutoCloseable {
                             Time.SYSTEM, new Metrics(), Option.apply(threadNamePrefix), connectFutureManager.future);
                         raftManagers.put(node.id(), raftManager);
                     }
+                    Metrics metrics = new Metrics();
                     BrokerServer broker = new BrokerServer(
                         config,
                         nodes.brokerProperties(node.id()),
                         raftManager,
                         Time.SYSTEM,
-                        new Metrics(),
+                        metrics,
+                        BrokerServerMetrics$.MODULE$.apply(metrics),
                         Option.apply(threadNamePrefix),
                         JavaConverters.asScalaBuffer(Collections.<String>emptyList()).toSeq(),
-                        connectFutureManager.future
+                        connectFutureManager.future,
+                        fatalFaultHandler,
+                        metadataFaultHandler,
+                        metadataFaultHandler
                     );
                     brokers.put(node.id(), broker);
                 }
diff --git a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
index 9894df9c5f7..c4ca966f9ab 100755
--- a/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
+++ b/core/src/test/scala/integration/kafka/server/QuorumTestHarness.scala
@@ -24,6 +24,7 @@ import java.util.{Collections, Properties}
 import java.util.concurrent.CompletableFuture
 import javax.security.auth.login.Configuration
 import kafka.raft.KafkaRaftManager
+import kafka.server.metadata.BrokerServerMetrics
 import kafka.tools.StorageTool
 import kafka.utils.{CoreUtils, Logging, TestInfoUtils, TestUtils}
 import kafka.zk.{AdminZkClient, EmbeddedZookeeper, KafkaZkClient}
@@ -32,11 +33,12 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.common.security.JaasUtils
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.{Exit, Time}
-import org.apache.kafka.controller.BootstrapMetadata
+import org.apache.kafka.controller.{BootstrapMetadata, QuorumControllerMetrics}
 import org.apache.kafka.metadata.MetadataRecordSerde
 import org.apache.kafka.raft.RaftConfig.{AddressSpec, InetAddressSpec}
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
-import org.apache.kafka.server.fault.MockFaultHandler
+import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
+import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.zookeeper.client.ZKClientConfig
 import org.apache.zookeeper.{WatchedEvent, Watcher, ZooKeeper}
 import org.junit.jupiter.api.Assertions._
@@ -82,26 +84,34 @@ class ZooKeeperQuorumImplementation(
   }
 }
 
-class KRaftQuorumImplementation(val raftManager: KafkaRaftManager[ApiMessageAndVersion],
-                                val controllerServer: ControllerServer,
-                                val metadataDir: File,
-                                val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
-                                val clusterId: String,
-                                val log: Logging) extends QuorumImplementation {
+class KRaftQuorumImplementation(
+  val raftManager: KafkaRaftManager[ApiMessageAndVersion],
+  val controllerServer: ControllerServer,
+  val metadataDir: File,
+  val controllerQuorumVotersFuture: CompletableFuture[util.Map[Integer, AddressSpec]],
+  val clusterId: String,
+  val log: Logging,
+  val faultHandler: FaultHandler
+) extends QuorumImplementation {
   override def createBroker(
     config: KafkaConfig,
     time: Time,
     startup: Boolean,
     threadNamePrefix: Option[String],
   ): KafkaBroker = {
+    val metrics = new Metrics()
     val broker = new BrokerServer(config = config,
       metaProps = new MetaProperties(clusterId, config.nodeId),
       raftManager = raftManager,
       time = time,
-      metrics = new Metrics(),
+      metrics = metrics,
+      brokerMetrics = BrokerServerMetrics(metrics),
       threadNamePrefix = Some("Broker%02d_".format(config.nodeId)),
       initialOfflineDirs = Seq(),
-      controllerQuorumVotersFuture = controllerQuorumVotersFuture)
+      controllerQuorumVotersFuture = controllerQuorumVotersFuture,
+      fatalFaultHandler = faultHandler,
+      metadataLoadingFaultHandler = faultHandler,
+      metadataPublishingFaultHandler = faultHandler)
     if (startup) broker.startup()
     broker
   }
@@ -306,6 +316,7 @@ abstract class QuorumTestHarness extends Logging {
         raftManager = raftManager,
         time = Time.SYSTEM,
         metrics = controllerMetrics,
+        controllerMetrics = new QuorumControllerMetrics(KafkaYammerMetrics.defaultRegistry(), Time.SYSTEM),
         threadNamePrefix = Option(threadNamePrefix),
         controllerQuorumVotersFuture = controllerQuorumVotersFuture,
         configSchema = KafkaRaftServer.configSchema,
@@ -336,7 +347,8 @@ abstract class QuorumTestHarness extends Logging {
       metadataDir,
       controllerQuorumVotersFuture,
       metaProperties.clusterId,
-      this)
+      this,
+      faultHandler)
   }
 
   private def newZooKeeperQuorum(): ZooKeeperQuorumImplementation = {
diff --git a/core/src/test/scala/kafka/server/metadata/BrokerServerMetricsTest.scala b/core/src/test/scala/kafka/server/metadata/BrokerServerMetricsTest.scala
index df114ef59e5..ea2b439c166 100644
--- a/core/src/test/scala/kafka/server/metadata/BrokerServerMetricsTest.scala
+++ b/core/src/test/scala/kafka/server/metadata/BrokerServerMetricsTest.scala
@@ -37,12 +37,14 @@ final class BrokerServerMetricsTest {
     val expectedMetrics = Set(
       new MetricName("last-applied-record-offset", expectedGroup, "", Collections.emptyMap()),
       new MetricName("last-applied-record-timestamp", expectedGroup, "", Collections.emptyMap()),
-      new MetricName("last-applied-record-lag-ms", expectedGroup, "", Collections.emptyMap())
+      new MetricName("last-applied-record-lag-ms", expectedGroup, "", Collections.emptyMap()),
+      new MetricName("metadata-load-error-count", expectedGroup, "", Collections.emptyMap()),
+      new MetricName("metadata-apply-error-count", expectedGroup, "", Collections.emptyMap())
     )
      
     TestUtils.resource(BrokerServerMetrics(metrics)) { brokerMetrics =>
       val metricsMap = metrics.metrics().asScala.filter{ case (name, _) => name.group == expectedGroup }
-      assertEquals(3, metricsMap.size)
+      assertEquals(expectedMetrics.size, metricsMap.size)
       metricsMap.foreach { case (name, metric) =>
         assertTrue(expectedMetrics.contains(name))
       }
@@ -85,4 +87,36 @@ final class BrokerServerMetricsTest {
       assertEquals(time.milliseconds - timestamp, lagMetric.metricValue.asInstanceOf[Long])
     }
   }
+
+  @Test
+  def testMetadataLoadErrorCount(): Unit = {
+    val time = new MockTime()
+    val metrics = new Metrics(time)
+    TestUtils.resource(BrokerServerMetrics(metrics)) { brokerMetrics =>
+      val metadataLoadErrorCountMetric = metrics.metrics().get(brokerMetrics.metadataLoadErrorCountName)
+
+      assertEquals(0L, metadataLoadErrorCountMetric.metricValue.asInstanceOf[Long])
+
+      // Update metric value and check
+      val errorCount = 100
+      brokerMetrics.metadataLoadErrorCount.set(errorCount)
+      assertEquals(errorCount, metadataLoadErrorCountMetric.metricValue.asInstanceOf[Long])
+    }
+  }
+
+  @Test
+  def testMetadataApplyErrorCount(): Unit = {
+    val time = new MockTime()
+    val metrics = new Metrics(time)
+    TestUtils.resource(BrokerServerMetrics(metrics)) { brokerMetrics =>
+      val metadataApplyErrorCountMetric = metrics.metrics().get(brokerMetrics.metadataApplyErrorCountName)
+
+      assertEquals(0L, metadataApplyErrorCountMetric.metricValue.asInstanceOf[Long])
+
+      // Update metric value and check
+      val errorCount = 100
+      brokerMetrics.metadataApplyErrorCount.set(errorCount)
+      assertEquals(errorCount, metadataApplyErrorCountMetric.metricValue.asInstanceOf[Long])
+    }
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
index b21fe877f20..29de3c0f242 100644
--- a/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
+++ b/core/src/test/scala/unit/kafka/metrics/MetricsTest.scala
@@ -233,16 +233,21 @@ class MetricsTest extends KafkaServerTestHarness with Logging {
   @ValueSource(strings = Array("kraft"))
   def testKRaftControllerMetrics(quorum: String): Unit = {
     val metrics = KafkaYammerMetrics.defaultRegistry.allMetrics
-
-    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=ActiveControllerCount"), 1)
-    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=OfflinePartitionsCount"), 1)
-    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount"), 1)
-    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=GlobalTopicCount"), 1)
-    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=GlobalPartitionCount"), 1)
-    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=LastCommittedRecordOffset"), 1)
-    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=LastAppliedRecordOffset"), 1)
-    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp"), 1)
-    assertEquals(metrics.keySet.asScala.count(_.getMBeanName == "kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs"), 1)
+    Set(
+      "kafka.controller:type=KafkaController,name=ActiveControllerCount",
+      "kafka.controller:type=KafkaController,name=GlobalPartitionCount",
+      "kafka.controller:type=KafkaController,name=GlobalTopicCount",
+      "kafka.controller:type=KafkaController,name=LastAppliedRecordLagMs",
+      "kafka.controller:type=KafkaController,name=LastAppliedRecordOffset",
+      "kafka.controller:type=KafkaController,name=LastAppliedRecordTimestamp",
+      "kafka.controller:type=KafkaController,name=LastCommittedRecordOffset",
+      "kafka.controller:type=KafkaController,name=MetadataErrorCount",
+      "kafka.controller:type=KafkaController,name=OfflinePartitionsCount",
+      "kafka.controller:type=KafkaController,name=PreferredReplicaImbalanceCount",
+    ).foreach(expected => {
+      assertEquals(1, metrics.keySet.asScala.count(_.getMBeanName.equals(expected)),
+        s"Unable to find ${expected}")
+    })
   }
 
   /**
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
index 6de448f2802..6c8c2599d29 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
@@ -27,12 +27,20 @@ import org.apache.kafka.common.{Endpoint, Uuid}
 import org.apache.kafka.image.{MetadataDelta, MetadataImage}
 import org.apache.kafka.metadata.{BrokerRegistration, RecordTestUtils, VersionRange}
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
+import org.apache.kafka.server.fault.MockFaultHandler
 import org.junit.jupiter.api.Assertions.{assertEquals, assertTrue}
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.{AfterEach, Test}
 
 import scala.jdk.CollectionConverters._
 
 class BrokerMetadataListenerTest {
+  private val metadataLoadingFaultHandler = new MockFaultHandler("metadata loading")
+
+  @AfterEach
+  def verifyNoFaults(): Unit = {
+    metadataLoadingFaultHandler.maybeRethrowFirstException()
+  }
+
   private def newBrokerMetadataListener(
     metrics: BrokerServerMetrics = BrokerServerMetrics(new Metrics()),
     snapshotter: Option[MetadataSnapshotter] = None,
@@ -44,7 +52,8 @@ class BrokerMetadataListenerTest {
       threadNamePrefix = None,
       maxBytesBetweenSnapshots = maxBytesBetweenSnapshots,
       snapshotter = snapshotter,
-      brokerMetrics = metrics
+      brokerMetrics = metrics,
+      metadataLoadingFaultHandler = metadataLoadingFaultHandler
     )
   }
 
@@ -77,6 +86,8 @@ class BrokerMetadataListenerTest {
       assertEquals(100L, listener.highestMetadataOffset)
       assertEquals(0L, metrics.lastAppliedRecordOffset.get)
       assertEquals(0L, metrics.lastAppliedRecordTimestamp.get)
+      assertEquals(0L, metrics.metadataLoadErrorCount.get)
+      assertEquals(0L, metrics.metadataApplyErrorCount.get)
 
       val fencedTimestamp = 500L
       val fencedLastOffset = 200L
@@ -110,6 +121,8 @@ class BrokerMetadataListenerTest {
 
       assertEquals(fencedLastOffset, metrics.lastAppliedRecordOffset.get)
       assertEquals(fencedTimestamp, metrics.lastAppliedRecordTimestamp.get)
+      assertEquals(0L, metrics.metadataLoadErrorCount.get)
+      assertEquals(0L, metrics.metadataApplyErrorCount.get)
     } finally {
       listener.close()
     }
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index 6742530ef51..652b8b3a0c2 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -17,17 +17,16 @@
 
 package unit.kafka.server.metadata
 
-import java.util.Collections.{singleton, singletonMap}
+import java.util.Collections.{singleton, singletonList, singletonMap}
 import java.util.Properties
 import java.util.concurrent.atomic.{AtomicInteger, AtomicReference}
-
 import kafka.log.UnifiedLog
-import kafka.server.KafkaConfig
+import kafka.server.{BrokerServer, KafkaConfig}
 import kafka.server.metadata.BrokerMetadataPublisher
 import kafka.testkit.{KafkaClusterTestKit, TestKitNodes}
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.admin.AlterConfigOp.OpType.SET
-import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry}
+import org.apache.kafka.clients.admin.{Admin, AlterConfigOp, ConfigEntry, NewTopic}
 import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.config.ConfigResource.Type.BROKER
 import org.apache.kafka.common.utils.Exit
@@ -35,10 +34,12 @@ import org.apache.kafka.common.{TopicPartition, Uuid}
 import org.apache.kafka.image.{MetadataImageTest, TopicImage, TopicsImage}
 import org.apache.kafka.metadata.LeaderRecoveryState
 import org.apache.kafka.metadata.PartitionRegistration
-import org.junit.jupiter.api.Assertions.assertEquals
+import org.apache.kafka.server.fault.{FaultHandler, MockFaultHandler}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertNotNull, assertTrue}
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito
+import org.mockito.Mockito.doThrow
 import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 
@@ -176,6 +177,25 @@ class BrokerMetadataPublisherTest {
     new TopicsImage(idsMap.asJava, namesMap.asJava)
   }
 
+  private def newMockPublisher(
+    broker: BrokerServer,
+    errorHandler: FaultHandler = new MockFaultHandler("publisher")
+  ): BrokerMetadataPublisher = {
+    Mockito.spy(new BrokerMetadataPublisher(
+      conf = broker.config,
+      metadataCache = broker.metadataCache,
+      logManager = broker.logManager,
+      replicaManager = broker.replicaManager,
+      groupCoordinator = broker.groupCoordinator,
+      txnCoordinator = broker.transactionCoordinator,
+      clientQuotaMetadataManager = broker.clientQuotaMetadataManager,
+      dynamicConfigHandlers = broker.dynamicConfigHandlers.toMap,
+      _authorizer = Option.empty,
+      errorHandler,
+      errorHandler
+    ))
+  }
+
   @Test
   def testReloadUpdatedFilesWithoutConfigChange(): Unit = {
     val cluster = new KafkaClusterTestKit.Builder(
@@ -187,17 +207,7 @@ class BrokerMetadataPublisherTest {
       cluster.startup()
       cluster.waitForReadyBrokers()
       val broker = cluster.brokers().values().iterator().next()
-      val publisher = Mockito.spy(new BrokerMetadataPublisher(
-        conf = broker.config,
-        metadataCache = broker.metadataCache,
-        logManager = broker.logManager,
-        replicaManager = broker.replicaManager,
-        groupCoordinator = broker.groupCoordinator,
-        txnCoordinator = broker.transactionCoordinator,
-        clientQuotaMetadataManager = broker.clientQuotaMetadataManager,
-        dynamicConfigHandlers = broker.dynamicConfigHandlers.toMap,
-        _authorizer = Option.empty
-      ))
+      val publisher = newMockPublisher(broker)
       val numTimesReloadCalled = new AtomicInteger(0)
       Mockito.when(publisher.reloadUpdatedFilesWithoutConfigChange(any[Properties]())).
         thenAnswer(new Answer[Unit]() {
@@ -227,4 +237,39 @@ class BrokerMetadataPublisherTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testExceptionInUpdateCoordinator(): Unit = {
+    val errorHandler = new MockFaultHandler("publisher")
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(1).
+        setNumControllerNodes(1).build()).
+      setMetadataFaultHandler(errorHandler).build()
+    try {
+      cluster.format()
+      cluster.startup()
+      cluster.waitForReadyBrokers()
+      val broker = cluster.brokers().values().iterator().next()
+      TestUtils.retry(60000) {
+        assertNotNull(broker.metadataPublisher)
+      }
+      val publisher = Mockito.spy(broker.metadataPublisher)
+      doThrow(new RuntimeException("injected failure")).when(publisher).updateCoordinator(any(), any(), any(), any(), any())
+      broker.metadataListener.alterPublisher(publisher).get()
+      val admin = Admin.create(cluster.clientProperties())
+      try {
+        admin.createTopics(singletonList(new NewTopic("foo", 1, 1.toShort))).all().get()
+      } finally {
+        admin.close()
+      }
+      TestUtils.retry(60000) {
+        assertTrue(Option(errorHandler.firstException()).
+          flatMap(e => Option(e.getMessage())).getOrElse("(none)").contains("injected failure"))
+      }
+    } finally {
+      errorHandler.setIgnore(true)
+      cluster.close()
+    }
+  }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
index 6b470664d6e..ff243aebfcb 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ControllerMetrics.java
@@ -51,6 +51,10 @@ public interface ControllerMetrics extends AutoCloseable {
 
     int preferredReplicaImbalanceCount();
 
+    void incrementMetadataErrorCount();
+
+    int metadataErrorCount();
+
     void setLastAppliedRecordOffset(long offset);
 
     long lastAppliedRecordOffset();
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index a4cc1d92cba..ef87248f134 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -765,7 +765,7 @@ public final class QuorumController implements Controller {
                             "%d of %d record(s) in the batch following last writeOffset %d.",
                             message.message().getClass().getSimpleName(), i, result.records().size(),
                             writeOffset);
-                        fatalFaultHandler.handleFault(failureMessage, e);
+                        throw fatalFaultHandler.handleFault(failureMessage, e);
                     }
                     i++;
                 }
@@ -889,7 +889,7 @@ public final class QuorumController implements Controller {
                                             "controller, which was %d of %d record(s) in the batch with baseOffset %d.",
                                             message.message().getClass().getSimpleName(), i, messages.size(),
                                             batch.baseOffset());
-                                    metadataFaultHandler.handleFault(failureMessage, e);
+                                    throw metadataFaultHandler.handleFault(failureMessage, e);
                                 }
                                 i++;
                             }
@@ -910,7 +910,7 @@ public final class QuorumController implements Controller {
             appendRaftEvent(String.format("handleSnapshot[snapshotId=%s]", reader.snapshotId()), () -> {
                 try {
                     if (isActiveController()) {
-                        fatalFaultHandler.handleFault(String.format("Asked to load snapshot " +
+                        throw fatalFaultHandler.handleFault(String.format("Asked to load snapshot " +
                             "(%s) when it is the active controller (%d)", reader.snapshotId(),
                             curClaimEpoch));
                     }
@@ -945,7 +945,7 @@ public final class QuorumController implements Controller {
                                         "%d record(s) in the batch with baseOffset %d.",
                                         message.message().getClass().getSimpleName(), reader.snapshotId(),
                                         i, messages.size(), batch.baseOffset());
-                                metadataFaultHandler.handleFault(failureMessage, e);
+                                throw metadataFaultHandler.handleFault(failureMessage, e);
                             }
                             i++;
                         }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
index 5abf0d97706..b96a687b0f3 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumControllerMetrics.java
@@ -26,6 +26,7 @@ import org.apache.kafka.server.metrics.KafkaYammerMetrics;
 
 import java.util.Arrays;
 import java.util.Objects;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 
 public final class QuorumControllerMetrics implements ControllerMetrics {
@@ -47,6 +48,8 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
         "KafkaController", "OfflinePartitionsCount");
     private final static MetricName PREFERRED_REPLICA_IMBALANCE_COUNT = getMetricName(
         "KafkaController", "PreferredReplicaImbalanceCount");
+    private final static MetricName METADATA_ERROR_COUNT = getMetricName(
+            "KafkaController", "MetadataErrorCount");
     private final static MetricName LAST_APPLIED_RECORD_OFFSET = getMetricName(
         "KafkaController", "LastAppliedRecordOffset");
     private final static MetricName LAST_COMMITTED_RECORD_OFFSET = getMetricName(
@@ -64,6 +67,7 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
     private volatile int globalPartitionCount;
     private volatile int offlinePartitionCount;
     private volatile int preferredReplicaImbalanceCount;
+    private volatile AtomicInteger metadataErrorCount;
     private final AtomicLong lastAppliedRecordOffset = new AtomicLong(0);
     private final AtomicLong lastCommittedRecordOffset = new AtomicLong(0);
     private final AtomicLong lastAppliedRecordTimestamp = new AtomicLong(0);
@@ -74,6 +78,7 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
     private final Gauge<Integer> globalTopicCountGauge;
     private final Gauge<Integer> offlinePartitionCountGauge;
     private final Gauge<Integer> preferredReplicaImbalanceCountGauge;
+    private final Gauge<Integer> metadataErrorCountGauge;
     private final Gauge<Long> lastAppliedRecordOffsetGauge;
     private final Gauge<Long> lastCommittedRecordOffsetGauge;
     private final Gauge<Long> lastAppliedRecordTimestampGauge;
@@ -93,6 +98,7 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
         this.globalPartitionCount = 0;
         this.offlinePartitionCount = 0;
         this.preferredReplicaImbalanceCount = 0;
+        this.metadataErrorCount = new AtomicInteger(0);
         this.activeControllerCount = registry.newGauge(ACTIVE_CONTROLLER_COUNT, new Gauge<Integer>() {
             @Override
             public Integer value() {
@@ -137,6 +143,12 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
                 return preferredReplicaImbalanceCount;
             }
         });
+        this.metadataErrorCountGauge = registry.newGauge(METADATA_ERROR_COUNT, new Gauge<Integer>() {
+            @Override
+            public Integer value() {
+                return metadataErrorCount.get();
+            }
+        });
         lastAppliedRecordOffsetGauge = registry.newGauge(LAST_APPLIED_RECORD_OFFSET, new Gauge<Long>() {
             @Override
             public Long value() {
@@ -242,6 +254,15 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
         return this.preferredReplicaImbalanceCount;
     }
 
+    @Override
+    public void incrementMetadataErrorCount() {
+        this.metadataErrorCount.getAndIncrement();
+    }
+
+    @Override
+    public int metadataErrorCount() {
+        return this.metadataErrorCount.get();
+    }
     @Override
     public void setLastAppliedRecordOffset(long offset) {
         lastAppliedRecordOffset.set(offset);
@@ -276,12 +297,15 @@ public final class QuorumControllerMetrics implements ControllerMetrics {
     public void close() {
         Arrays.asList(
             ACTIVE_CONTROLLER_COUNT,
+            FENCED_BROKER_COUNT,
+            ACTIVE_BROKER_COUNT,
             EVENT_QUEUE_TIME_MS,
             EVENT_QUEUE_PROCESSING_TIME_MS,
             GLOBAL_TOPIC_COUNT,
             GLOBAL_PARTITION_COUNT,
             OFFLINE_PARTITION_COUNT,
             PREFERRED_REPLICA_IMBALANCE_COUNT,
+            METADATA_ERROR_COUNT,
             LAST_APPLIED_RECORD_OFFSET,
             LAST_COMMITTED_RECORD_OFFSET,
             LAST_APPLIED_RECORD_TIMESTAMP,
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultException.java b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultException.java
deleted file mode 100644
index c57ce46fb35..00000000000
--- a/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.metadata.fault;
-
-
-/**
- * A fault that we encountered while we replayed cluster metadata.
- */
-public class MetadataFaultException extends RuntimeException {
-    public MetadataFaultException(String message, Throwable cause) {
-        super(message, cause);
-    }
-
-    public MetadataFaultException(String message) {
-        super(message);
-    }
-}
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java b/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java
deleted file mode 100644
index e9f71b80e67..00000000000
--- a/metadata/src/main/java/org/apache/kafka/metadata/fault/MetadataFaultHandler.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.metadata.fault;
-
-import org.apache.kafka.server.fault.FaultHandler;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Handles faults in Kafka metadata management.
- */
-public class MetadataFaultHandler implements FaultHandler {
-    private static final Logger log = LoggerFactory.getLogger(MetadataFaultHandler.class);
-
-    @Override
-    public void handleFault(String failureMessage, Throwable cause) {
-        FaultHandler.logFailureMessage(log, failureMessage, cause);
-        throw new MetadataFaultException("Encountered metadata fault: " + failureMessage, cause);
-    }
-}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
index 5991fcc34f3..ca13d90ddea 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/MockControllerMetrics.java
@@ -17,6 +17,8 @@
 
 package org.apache.kafka.controller;
 
+import java.util.concurrent.atomic.AtomicInteger;
+
 public final class MockControllerMetrics implements ControllerMetrics {
     private volatile boolean active = false;
     private volatile int fencedBrokers = 0;
@@ -25,6 +27,7 @@ public final class MockControllerMetrics implements ControllerMetrics {
     private volatile int partitions = 0;
     private volatile int offlinePartitions = 0;
     private volatile int preferredReplicaImbalances = 0;
+    private volatile AtomicInteger metadataErrors = new AtomicInteger(0);
     private volatile long lastAppliedRecordOffset = 0;
     private volatile long lastCommittedRecordOffset = 0;
     private volatile long lastAppliedRecordTimestamp = 0;
@@ -111,6 +114,16 @@ public final class MockControllerMetrics implements ControllerMetrics {
         return this.preferredReplicaImbalances;
     }
 
+    @Override
+    public void incrementMetadataErrorCount() {
+        this.metadataErrors.getAndIncrement();
+    }
+
+    @Override
+    public int metadataErrorCount() {
+        return this.metadataErrors.get();
+    }
+
     @Override
     public void setLastAppliedRecordOffset(long offset) {
         lastAppliedRecordOffset = offset;
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsTest.java
index 2ab99955943..400b860197e 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerMetricsTest.java
@@ -36,10 +36,13 @@ public class QuorumControllerMetricsTest {
         String expectedType = "KafkaController";
         Set<String> expectedMetricNames = Utils.mkSet(
             "ActiveControllerCount",
+            "FencedBrokerCount",
+            "ActiveBrokerCount",
             "GlobalTopicCount",
             "GlobalPartitionCount",
             "OfflinePartitionsCount",
             "PreferredReplicaImbalanceCount",
+            "MetadataErrorCount",
             "LastAppliedRecordLagMs",
             "LastAppliedRecordOffset",
             "LastAppliedRecordTimestamp",
@@ -125,6 +128,25 @@ public class QuorumControllerMetricsTest {
         }
     }
 
+    @Test
+    public void testMetadataErrorCount() {
+        MetricsRegistry registry = new MetricsRegistry();
+        MockTime time = new MockTime();
+        try {
+            try (QuorumControllerMetrics quorumControllerMetrics = new QuorumControllerMetrics(registry, time)) {
+                @SuppressWarnings("unchecked")
+                Gauge<Integer> metadataErrorCount = (Gauge<Integer>) registry
+                        .allMetrics()
+                        .get(metricName("KafkaController", "MetadataErrorCount"));
+                assertEquals(0, metadataErrorCount.value());
+                quorumControllerMetrics.incrementMetadataErrorCount();
+                assertEquals(1, metadataErrorCount.value());
+            }
+        } finally {
+            registry.shutdown();
+        }
+    }
+
     private static void assertMetricsCreatedAndRemovedUponClose(String expectedType, Set<String> expectedMetricNames) {
         MetricsRegistry registry = new MetricsRegistry();
         MockTime time = new MockTime();
@@ -151,10 +173,18 @@ public class QuorumControllerMetricsTest {
     }
 
     private static void assertMetricsCreated(MetricsRegistry registry, Set<String> expectedMetricNames, String expectedType) {
+        assertEquals(registry.allMetrics().keySet().stream()
+                .filter(k -> k.getType() == expectedType).count(),
+                expectedMetricNames.size());
         expectedMetricNames.forEach(expectedName -> {
             MetricName expectMetricName = metricName(expectedType, expectedName);
             assertTrue(registry.allMetrics().containsKey(expectMetricName), "Missing metric: " + expectMetricName);
         });
+        registry.allMetrics().forEach((actualMetricName, actualMetric) -> {
+            if (actualMetricName.getType() == expectedType) {
+                assertTrue(expectedMetricNames.contains(actualMetricName.getName()), "Unexpected metric: " + actualMetricName);
+            }
+        });
     }
 
     private static void assertMetricsRemoved(MetricsRegistry registry, Set<String> expectedMetricNames, String expectedType) {
diff --git a/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandler.java b/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandler.java
index 4c03eacc32f..5efc145ea94 100644
--- a/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandler.java
+++ b/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandler.java
@@ -17,8 +17,6 @@
 
 package org.apache.kafka.server.fault;
 
-import org.slf4j.Logger;
-
 
 /**
  * Handle a server fault.
@@ -28,9 +26,11 @@ public interface FaultHandler {
      * Handle a fault.
      *
      * @param failureMessage        The failure message to log.
+     *
+     * @return                      The fault exception.
      */
-    default void handleFault(String failureMessage) {
-        handleFault(failureMessage, null);
+    default RuntimeException handleFault(String failureMessage) {
+        return handleFault(failureMessage, null);
     }
 
     /**
@@ -38,21 +38,8 @@ public interface FaultHandler {
      *
      * @param failureMessage        The failure message to log.
      * @param cause                 The exception that caused the problem, or null.
-     */
-    void handleFault(String failureMessage, Throwable cause);
-
-    /**
-     * Log a failure message about a fault.
      *
-     * @param log               The log4j logger.
-     * @param failureMessage    The failure message.
-     * @param cause             The exception which caused the failure, or null.
+     * @return                      The fault exception.
      */
-    static void logFailureMessage(Logger log, String failureMessage, Throwable cause) {
-        if (cause == null) {
-            log.error("Encountered fatal fault: {}", failureMessage);
-        } else {
-            log.error("Encountered fatal fault: {}", failureMessage, cause);
-        }
-    }
+    RuntimeException handleFault(String failureMessage, Throwable cause);
 }
diff --git a/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandlerException.java b/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandlerException.java
similarity index 83%
rename from server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandlerException.java
rename to server-common/src/main/java/org/apache/kafka/server/fault/FaultHandlerException.java
index ef9b11bdeb5..ec3b7dc4b0c 100644
--- a/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandlerException.java
+++ b/server-common/src/main/java/org/apache/kafka/server/fault/FaultHandlerException.java
@@ -19,10 +19,10 @@ package org.apache.kafka.server.fault;
 
 
 /**
- * An exception thrown by MockFaultHandler.
+ * An exception thrown by a fault handler.
  */
-public class MockFaultHandlerException extends RuntimeException {
-    public MockFaultHandlerException(String failureMessage, Throwable cause) {
+public class FaultHandlerException extends RuntimeException {
+    public FaultHandlerException(String failureMessage, Throwable cause) {
         super(failureMessage, cause);
         // If a cause exception was provided, set our the stack trace its stack trace. This is
         // useful in junit tests where a limited number of stack frames are printed, and usually
@@ -32,7 +32,7 @@ public class MockFaultHandlerException extends RuntimeException {
         }
     }
 
-    public MockFaultHandlerException(String failureMessage) {
+    public FaultHandlerException(String failureMessage) {
         this(failureMessage, null);
     }
 }
diff --git a/server-common/src/main/java/org/apache/kafka/server/fault/LoggingFaultHandler.java b/server-common/src/main/java/org/apache/kafka/server/fault/LoggingFaultHandler.java
new file mode 100644
index 00000000000..9242cef4eb9
--- /dev/null
+++ b/server-common/src/main/java/org/apache/kafka/server/fault/LoggingFaultHandler.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.fault;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * A fault handler which logs an error message and executes a runnable.
+ */
+public class LoggingFaultHandler implements FaultHandler {
+    private static final Logger log = LoggerFactory.getLogger(LoggingFaultHandler.class);
+    private final String type;
+    private final Runnable action;
+
+    public LoggingFaultHandler(
+        String type,
+        Runnable action
+    ) {
+        this.type = type;
+        this.action = action;
+    }
+
+    @Override
+    public RuntimeException handleFault(String failureMessage, Throwable cause) {
+        if (cause == null) {
+            log.error("Encountered {} fault: {}", type, failureMessage);
+        } else {
+            log.error("Encountered {} fault: {}", type, failureMessage, cause);
+        }
+        try {
+            action.run();
+        } catch (Throwable e) {
+            log.error("Failed to run LoggingFaultHandler action.", e);
+        }
+        return new FaultHandlerException(failureMessage, cause);
+    }
+}
diff --git a/server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java b/server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java
index e3b9f25a3be..b7c0d241a2a 100644
--- a/server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java
+++ b/server-common/src/main/java/org/apache/kafka/server/fault/ProcessExitingFaultHandler.java
@@ -30,8 +30,13 @@ public class ProcessExitingFaultHandler implements FaultHandler {
     private static final Logger log = LoggerFactory.getLogger(ProcessExitingFaultHandler.class);
 
     @Override
-    public void handleFault(String failureMessage, Throwable cause) {
-        FaultHandler.logFailureMessage(log, failureMessage, cause);
+    public RuntimeException handleFault(String failureMessage, Throwable cause) {
+        if (cause == null) {
+            log.error("Encountered fatal fault: {}", failureMessage);
+        } else {
+            log.error("Encountered fatal fault: {}", failureMessage, cause);
+        }
         Exit.exit(1);
+        return null;
     }
 }
diff --git a/server-common/src/test/java/org/apache/kafka/server/fault/LoggingFaultHandlerTest.java b/server-common/src/test/java/org/apache/kafka/server/fault/LoggingFaultHandlerTest.java
new file mode 100644
index 00000000000..1a11098a21b
--- /dev/null
+++ b/server-common/src/test/java/org/apache/kafka/server/fault/LoggingFaultHandlerTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.fault;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+
+/**
+ * Tests LoggingFaultHandler
+ */
+public class LoggingFaultHandlerTest {
+    /**
+     * Test handling faults with and without exceptions.
+     */
+    @Test
+    public void testHandleFault() {
+        AtomicInteger counter = new AtomicInteger(0);
+        LoggingFaultHandler handler = new LoggingFaultHandler("test", () -> {
+            counter.incrementAndGet();
+        });
+        handler.handleFault("uh oh");
+        assertEquals(1, counter.get());
+        handler.handleFault("uh oh", new RuntimeException("yikes"));
+        assertEquals(2, counter.get());
+    }
+
+    /**
+     * Test handling an exception in the action callback.
+     */
+    @Test
+    public void testHandleExceptionInAction() {
+        LoggingFaultHandler handler = new LoggingFaultHandler("test", () -> {
+            throw new RuntimeException("action failed");
+        });
+        handler.handleFault("uh oh"); // should not throw
+        handler.handleFault("uh oh", new RuntimeException("yikes")); // should not throw
+    }
+}
diff --git a/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandler.java b/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandler.java
index 39b3ed07847..e49f2bdc6c2 100644
--- a/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandler.java
+++ b/server-common/src/test/java/org/apache/kafka/server/fault/MockFaultHandler.java
@@ -29,7 +29,7 @@ public class MockFaultHandler implements FaultHandler {
     private static final Logger log = LoggerFactory.getLogger(MockFaultHandler.class);
 
     private final String name;
-    private MockFaultHandlerException firstException = null;
+    private FaultHandlerException firstException = null;
     private boolean ignore = false;
 
     public MockFaultHandler(String name) {
@@ -37,16 +37,20 @@ public class MockFaultHandler implements FaultHandler {
     }
 
     @Override
-    public synchronized void handleFault(String failureMessage, Throwable cause) {
-        FaultHandler.logFailureMessage(log, failureMessage, cause);
-        MockFaultHandlerException e = (cause == null) ?
-                new MockFaultHandlerException(name + ": " + failureMessage) :
-                new MockFaultHandlerException(name + ": " + failureMessage +
+    public synchronized RuntimeException handleFault(String failureMessage, Throwable cause) {
+        if (cause == null) {
+            log.error("Encountered {} fault: {}", name, failureMessage);
+        } else {
+            log.error("Encountered {} fault: {}", name, failureMessage, cause);
+        }
+        FaultHandlerException e = (cause == null) ?
+                new FaultHandlerException(name + ": " + failureMessage) :
+                new FaultHandlerException(name + ": " + failureMessage +
                         ": " + cause.getMessage(), cause);
         if (firstException == null) {
             firstException = e;
         }
-        throw e;
+        return firstException;
     }
 
     public synchronized void maybeRethrowFirstException() {
@@ -55,7 +59,7 @@ public class MockFaultHandler implements FaultHandler {
         }
     }
 
-    public synchronized MockFaultHandlerException firstException() {
+    public synchronized FaultHandlerException firstException() {
         return firstException;
     }