You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by cm...@apache.org on 2022/12/16 00:53:14 UTC

[kafka] branch trunk updated: MINOR: ControllerServer should use the new metadata loader and snapshot generator (#12983)

This is an automated email from the ASF dual-hosted git repository.

cmccabe pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 29c09e2ca19 MINOR: ControllerServer should use the new metadata loader and snapshot generator (#12983)
29c09e2ca19 is described below

commit 29c09e2ca199386efbcb05cdfb10b7376b956233
Author: Colin Patrick McCabe <cm...@apache.org>
AuthorDate: Thu Dec 15 16:53:07 2022 -0800

    MINOR: ControllerServer should use the new metadata loader and snapshot generator (#12983)
    
    This PR introduces the new metadata loader and snapshot generator. For the time being, they are
    only used by the controller, but a PR for the broker will come soon.
    
    The new metadata loader supports adding and removing publishers dynamically. (In contrast, the old
    loader only supported adding a single publisher.) It also passes along more information about each
    new image that is published. This information can be found in the LogDeltaManifest and
    SnapshotManifest classes.
    
    The new snapshot generator replaces the previous logic for generating snapshots in
    QuorumController.java and associated classes. The new generator is intended to be shared between
    the broker and the controller, so it is decoupled from both.
    
    There are a few small changes to the old snapshot generator in this PR. Specifically, we move the
    batch processing time and batch size metrics out of BrokerMetadataListener.scala and into
    BrokerServerMetrics.scala.
    
    Finally, fix a case where we are using 'is' rather than '==' for a numeric comparison in
    snapshot_test.py.
    
    Reviewers: David Arthur <mu...@gmail.com>
---
 checkstyle/import-control.xml                      |   6 +-
 .../main/scala/kafka/server/ControllerServer.scala |   3 +-
 .../src/main/scala/kafka/server/SharedServer.scala |  77 ++-
 .../server/metadata/BrokerMetadataListener.scala   |  39 +-
 .../server/metadata/BrokerServerMetrics.scala      |  59 ++-
 core/src/test/java/kafka/test/MockController.java  |   5 -
 .../server/metadata/BrokerServerMetricsTest.scala  |  21 +-
 .../kafka/server/DynamicConfigChangeTest.scala     |   4 +-
 .../metadata/BrokerMetadataListenerTest.scala      |   8 +-
 .../apache/kafka/controller/AclControlManager.java |  18 -
 .../controller/ClientQuotaControlManager.java      |  42 --
 .../kafka/controller/ClusterControlManager.java    |  60 ---
 .../controller/ConfigurationControlManager.java    |  36 --
 .../org/apache/kafka/controller/Controller.java    |   8 -
 .../kafka/controller/FeatureControlManager.java    |  44 --
 .../apache/kafka/controller/LogReplayTracker.java  |   3 +-
 .../kafka/controller/ProducerIdControlManager.java |  24 +-
 .../apache/kafka/controller/QuorumController.java  | 307 +----------
 .../controller/ReplicationControlManager.java      |  34 --
 .../apache/kafka/controller/SnapshotGenerator.java | 132 -----
 .../kafka/image/loader/LogDeltaManifest.java       | 106 ++++
 .../apache/kafka/image/loader/MetadataLoader.java  | 572 +++++++++++++++++++++
 .../kafka/image/loader/MetadataLoaderMetrics.java  |  46 ++
 .../kafka/image/loader/SnapshotManifest.java       |  77 +++
 .../kafka/image/publisher/MetadataPublisher.java   |  72 +++
 .../kafka/image/publisher/SnapshotEmitter.java     | 123 +++++
 .../kafka/image/publisher/SnapshotGenerator.java   | 275 ++++++++++
 .../kafka/controller/AclControlManagerTest.java    |  24 +-
 .../controller/ClientQuotaControlManagerTest.java  |  59 ++-
 .../controller/ClusterControlManagerTest.java      |  27 +-
 .../ConfigurationControlManagerTest.java           |   8 -
 .../controller/FeatureControlManagerTest.java      |  38 +-
 .../controller/ProducerIdControlManagerTest.java   |  30 +-
 .../kafka/controller/QuorumControllerTest.java     | 401 ++-------------
 .../controller/ReplicationControlManagerTest.java  |  11 -
 .../kafka/controller/SnapshotGeneratorTest.java    | 112 ----
 .../kafka/image/loader/MetadataLoaderTest.java     | 517 +++++++++++++++++++
 .../kafka/image/publisher/SnapshotEmitterTest.java | 206 ++++++++
 .../image/publisher/SnapshotGeneratorTest.java     | 185 +++++++
 .../org/apache/kafka/metalog/LocalLogManager.java  |  19 +-
 .../kafka/metalog/LocalLogManagerTestEnv.java      |   7 +
 tests/kafkatest/tests/core/snapshot_test.py        |   2 +-
 42 files changed, 2486 insertions(+), 1361 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index df9a2e9adfd..32f13d0e340 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -225,6 +225,7 @@
     <allow pkg="org.apache.kafka.common.requests" />
     <allow pkg="org.apache.kafka.common.resource" />
     <allow pkg="org.apache.kafka.controller" />
+    <allow pkg="org.apache.kafka.image.writer" />
     <allow pkg="org.apache.kafka.metadata" />
     <allow pkg="org.apache.kafka.metadata.authorizer" />
     <allow pkg="org.apache.kafka.metadata.migration" />
@@ -248,15 +249,18 @@
     <allow pkg="org.apache.kafka.common.metadata" />
     <allow pkg="org.apache.kafka.common.protocol" />
     <allow pkg="org.apache.kafka.common.quota" />
-    <allow pkg="org.apache.kafka.common.resource" />
     <allow pkg="org.apache.kafka.common.requests" />
+    <allow pkg="org.apache.kafka.common.resource" />
     <allow pkg="org.apache.kafka.image" />
     <allow pkg="org.apache.kafka.image.writer" />
     <allow pkg="org.apache.kafka.metadata" />
+    <allow pkg="org.apache.kafka.queue" />
     <allow pkg="org.apache.kafka.raft" />
     <allow pkg="org.apache.kafka.server.common" />
+    <allow pkg="org.apache.kafka.server.fault" />
     <allow pkg="org.apache.kafka.server.util" />
     <allow pkg="org.apache.kafka.snapshot" />
+    <allow pkg="org.apache.kafka.test" />
   </subpackage>
 
   <subpackage name="metadata">
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala b/core/src/main/scala/kafka/server/ControllerServer.scala
index 03b86b7067e..77f9ba546fd 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -112,6 +112,7 @@ class ControllerServer(
       maybeChangeStatus(STARTING, STARTED)
       this.logIdent = new LogContext(s"[ControllerServer id=${config.nodeId}] ").logPrefix()
 
+
       newGauge("ClusterId", () => clusterId)
       newGauge("yammer-metrics-count", () =>  KafkaYammerMetrics.defaultRegistry.allMetrics.size)
 
@@ -194,8 +195,6 @@ class ControllerServer(
           setDefaultNumPartitions(config.numPartitions.intValue()).
           setSessionTimeoutNs(TimeUnit.NANOSECONDS.convert(config.brokerSessionTimeoutMs.longValue(),
             TimeUnit.MILLISECONDS)).
-          setSnapshotMaxNewRecordBytes(config.metadataSnapshotMaxNewRecordBytes).
-          setSnapshotMaxIntervalMs(config.metadataSnapshotMaxIntervalMs).
           setLeaderImbalanceCheckIntervalNs(leaderImbalanceCheckIntervalNs).
           setMaxIdleIntervalNs(maxIdleIntervalNs).
           setMetrics(sharedServer.controllerMetrics).
diff --git a/core/src/main/scala/kafka/server/SharedServer.scala b/core/src/main/scala/kafka/server/SharedServer.scala
index 8b647e7464f..151429a1797 100644
--- a/core/src/main/scala/kafka/server/SharedServer.scala
+++ b/core/src/main/scala/kafka/server/SharedServer.scala
@@ -25,6 +25,8 @@ import kafka.utils.{CoreUtils, Logging}
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time}
 import org.apache.kafka.controller.QuorumControllerMetrics
+import org.apache.kafka.image.loader.MetadataLoader
+import org.apache.kafka.image.publisher.{SnapshotEmitter, SnapshotGenerator}
 import org.apache.kafka.metadata.MetadataRecordSerde
 import org.apache.kafka.raft.RaftConfig.AddressSpec
 import org.apache.kafka.server.common.ApiMessageAndVersion
@@ -32,7 +34,9 @@ import org.apache.kafka.server.fault.{FaultHandler, LoggingFaultHandler, Process
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 
 import java.util
+import java.util.Collections
 import java.util.concurrent.CompletableFuture
+import java.util.concurrent.atomic.AtomicReference
 
 
 /**
@@ -97,6 +101,10 @@ class SharedServer(
   @volatile var raftManager: KafkaRaftManager[ApiMessageAndVersion] = _
   @volatile var brokerMetrics: BrokerServerMetrics = _
   @volatile var controllerMetrics: QuorumControllerMetrics = _
+  @volatile var loader: MetadataLoader = _
+  val snapshotsDiabledReason = new AtomicReference[String](null)
+  @volatile var snapshotEmitter: SnapshotEmitter = _
+  @volatile var snapshotGenerator: SnapshotGenerator = _
 
   def isUsed(): Boolean = synchronized {
     usedByController || usedByBroker
@@ -145,39 +153,48 @@ class SharedServer(
   /**
    * The fault handler to use when metadata loading fails.
    */
-  def metadataLoaderFaultHandler: FaultHandler = faultHandlerFactory.build("metadata loading",
+  def metadataLoaderFaultHandler: FaultHandler = faultHandlerFactory.build(
+    name = "metadata loading",
     fatal = sharedServerConfig.processRoles.contains(ControllerRole),
     action = () => SharedServer.this.synchronized {
       if (brokerMetrics != null) brokerMetrics.metadataLoadErrorCount.getAndIncrement()
       if (controllerMetrics != null) controllerMetrics.incrementMetadataErrorCount()
+      snapshotsDiabledReason.compareAndSet(null, "metadata loading fault")
     })
 
   /**
    * The fault handler to use when the initial broker metadata load fails.
    */
-  def initialBrokerMetadataLoadFaultHandler: FaultHandler = faultHandlerFactory.build("initial metadata loading",
+  def initialBrokerMetadataLoadFaultHandler: FaultHandler = faultHandlerFactory.build(
+    name = "initial broker metadata loading",
     fatal = true,
     action = () => SharedServer.this.synchronized {
       if (brokerMetrics != null) brokerMetrics.metadataApplyErrorCount.getAndIncrement()
       if (controllerMetrics != null) controllerMetrics.incrementMetadataErrorCount()
+      snapshotsDiabledReason.compareAndSet(null, "initial broker metadata loading fault")
     })
 
   /**
    * The fault handler to use when the QuorumController experiences a fault.
    */
-  def quorumControllerFaultHandler: FaultHandler = faultHandlerFactory.build("quorum controller",
+  def quorumControllerFaultHandler: FaultHandler = faultHandlerFactory.build(
+    name = "quorum controller",
     fatal = true,
-    action = () => {}
-  )
+    action = () => SharedServer.this.synchronized {
+      if (controllerMetrics != null) controllerMetrics.incrementMetadataErrorCount()
+      snapshotsDiabledReason.compareAndSet(null, "quorum controller fault")
+    })
 
   /**
    * The fault handler to use when metadata cannot be published.
    */
-  def metadataPublishingFaultHandler: FaultHandler = faultHandlerFactory.build("metadata publishing",
+  def metadataPublishingFaultHandler: FaultHandler = faultHandlerFactory.build(
+    name = "metadata publishing",
     fatal = false,
     action = () => SharedServer.this.synchronized {
       if (brokerMetrics != null) brokerMetrics.metadataApplyErrorCount.getAndIncrement()
       if (controllerMetrics != null) controllerMetrics.incrementMetadataErrorCount()
+      // Note: snapshot generation does not need to be disabled for a publishing fault.
     })
 
   private def start(): Unit = synchronized {
@@ -210,6 +227,40 @@ class SharedServer(
           threadNamePrefix,
           controllerQuorumVotersFuture)
         raftManager.startup()
+
+        if (sharedServerConfig.processRoles.contains(ControllerRole)) {
+          val loaderBuilder = new MetadataLoader.Builder().
+            setNodeId(metaProps.nodeId).
+            setTime(time).
+            setThreadNamePrefix(threadNamePrefix.getOrElse("")).
+            setFaultHandler(metadataLoaderFaultHandler).
+            setHighWaterMarkAccessor(() => raftManager.client.highWatermark())
+          if (brokerMetrics != null) {
+            loaderBuilder.setMetadataLoaderMetrics(brokerMetrics)
+          }
+          loader = loaderBuilder.build()
+          snapshotEmitter = new SnapshotEmitter.Builder().
+            setNodeId(metaProps.nodeId).
+            setRaftClient(raftManager.client).
+            build()
+          snapshotGenerator = new SnapshotGenerator.Builder(snapshotEmitter).
+            setNodeId(metaProps.nodeId).
+            setTime(time).
+            setFaultHandler(metadataPublishingFaultHandler).
+            setMaxBytesSinceLastSnapshot(sharedServerConfig.metadataSnapshotMaxNewRecordBytes).
+            setMaxTimeSinceLastSnapshotNs(sharedServerConfig.metadataSnapshotMaxIntervalMs).
+            setDisabledReason(snapshotsDiabledReason).
+            build()
+          raftManager.register(loader)
+          try {
+            loader.installPublishers(Collections.singletonList(snapshotGenerator))
+          } catch {
+            case t: Throwable => {
+              error("Unable to install metadata publishers", t)
+              throw new RuntimeException("Unable to install metadata publishers.", t)
+            }
+          }
+        }
         debug("Completed SharedServer startup.")
         started = true
       } catch {
@@ -235,6 +286,20 @@ class SharedServer(
       debug("SharedServer is not running.")
     } else {
       info("Stopping SharedServer")
+      if (loader != null) {
+        CoreUtils.swallow(loader.beginShutdown(), this)
+      }
+      if (snapshotGenerator != null) {
+        CoreUtils.swallow(snapshotGenerator.beginShutdown(), this)
+      }
+      if (loader != null) {
+        CoreUtils.swallow(loader.close(), this)
+        loader = null
+      }
+      if (snapshotGenerator != null) {
+        CoreUtils.swallow(snapshotGenerator.close(), this)
+        snapshotGenerator = null
+      }
       if (raftManager != null) {
         CoreUtils.swallow(raftManager.shutdown(), this)
         raftManager = null
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
index 21fc126691f..789ae89f049 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataListener.scala
@@ -18,7 +18,7 @@ package kafka.server.metadata
 
 import java.util
 import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.{CompletableFuture, TimeUnit}
+import java.util.concurrent.CompletableFuture
 import kafka.metrics.KafkaMetricsGroup
 import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.image.writer.{ImageWriterOptions, RecordListWriter}
@@ -30,14 +30,10 @@ import org.apache.kafka.server.common.ApiMessageAndVersion
 import org.apache.kafka.server.fault.FaultHandler
 import org.apache.kafka.snapshot.SnapshotReader
 
+import java.util.concurrent.TimeUnit.NANOSECONDS
 import scala.compat.java8.OptionConverters._
 
 
-object BrokerMetadataListener {
-  val MetadataBatchProcessingTimeUs = "MetadataBatchProcessingTimeUs"
-  val MetadataBatchSizes = "MetadataBatchSizes"
-}
-
 class BrokerMetadataListener(
   val brokerId: Int,
   time: Time,
@@ -65,16 +61,6 @@ class BrokerMetadataListener(
   private val log = logContext.logger(classOf[BrokerMetadataListener])
   logIdent = logContext.logPrefix()
 
-  /**
-   * A histogram tracking the time in microseconds it took to process batches of events.
-   */
-  private val batchProcessingTimeHist = newHistogram(BrokerMetadataListener.MetadataBatchProcessingTimeUs)
-
-  /**
-   * A histogram tracking the sizes of batches that we have processed.
-   */
-  private val metadataBatchSizeHist = newHistogram(BrokerMetadataListener.MetadataBatchSizes)
-
   /**
    * The highest metadata offset that we've seen.  Written only from the event queue thread.
    */
@@ -293,14 +279,14 @@ class BrokerMetadataListener(
         }
       }
       numBytes = numBytes + batch.sizeInBytes()
-      metadataBatchSizeHist.update(batch.records().size())
+      brokerMetrics.updateBatchSize(batch.records().size())
       numBatches = numBatches + 1
     }
 
     val endTimeNs = time.nanoseconds()
-    val elapsedUs = TimeUnit.MICROSECONDS.convert(endTimeNs - startTimeNs, TimeUnit.NANOSECONDS)
-    batchProcessingTimeHist.update(elapsedUs)
-    BatchLoadResults(numBatches, numRecords, elapsedUs, numBytes)
+    val elapsedNs = endTimeNs - startTimeNs
+    brokerMetrics.updateBatchProcessingTime(elapsedNs)
+    BatchLoadResults(numBatches, numRecords, NANOSECONDS.toMicros(elapsedNs), numBytes)
   }
 
   def startPublishing(publisher: MetadataPublisher): CompletableFuture[Void] = {
@@ -367,8 +353,7 @@ class BrokerMetadataListener(
     publisher.publish(delta, _image)
 
     // Update the metrics since the publisher handled the lastest image
-    brokerMetrics.lastAppliedRecordOffset.set(_highestOffset)
-    brokerMetrics.lastAppliedRecordTimestamp.set(_highestTimestamp)
+    brokerMetrics.updateLastAppliedImageProvenance(_image.provenance())
   }
 
   override def handleLeaderChange(leaderAndEpoch: LeaderAndEpoch): Unit = {
@@ -376,15 +361,7 @@ class BrokerMetadataListener(
   }
 
   override def beginShutdown(): Unit = {
-    eventQueue.beginShutdown("beginShutdown", new ShutdownEvent())
-  }
-
-  class ShutdownEvent extends EventQueue.FailureLoggingEvent(log) {
-    override def run(): Unit = {
-      brokerMetrics.close()
-      removeMetric(BrokerMetadataListener.MetadataBatchProcessingTimeUs)
-      removeMetric(BrokerMetadataListener.MetadataBatchSizes)
-    }
+    eventQueue.beginShutdown("beginShutdown")
   }
 
   def close(): Unit = {
diff --git a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
index 3e68ae85f92..465b10f1e54 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerServerMetrics.scala
@@ -17,17 +17,48 @@
 
 package kafka.server.metadata
 
-import java.util.concurrent.atomic.AtomicLong
+import kafka.metrics.KafkaMetricsGroup
+
+import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
 import org.apache.kafka.common.MetricName
 import org.apache.kafka.common.metrics.Gauge
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.metrics.MetricConfig
+import org.apache.kafka.image.MetadataProvenance
+import org.apache.kafka.image.loader.MetadataLoaderMetrics
+import org.apache.kafka.server.metrics.KafkaYammerMetrics
+
+import java.util.concurrent.TimeUnit.NANOSECONDS
 
-final class BrokerServerMetrics private (metrics: Metrics) extends AutoCloseable {
+final class BrokerServerMetrics private (
+  metrics: Metrics
+) extends MetadataLoaderMetrics with KafkaMetricsGroup {
   import BrokerServerMetrics._
 
-  val lastAppliedRecordOffset: AtomicLong = new AtomicLong(0)
-  val lastAppliedRecordTimestamp: AtomicLong = new AtomicLong(0)
+  private val batchProcessingTimeHistName = explicitMetricName("kafka.server",
+    "BrokerMetadataListener",
+    "MetadataBatchProcessingTimeUs",
+    Map.empty)
+
+  /**
+   * A histogram tracking the time in microseconds it took to process batches of events.
+   */
+  private val batchProcessingTimeHist =
+    KafkaYammerMetrics.defaultRegistry().newHistogram(batchProcessingTimeHistName, true)
+
+  private val batchSizeHistName = explicitMetricName("kafka.server",
+    "BrokerMetadataListener",
+    "MetadataBatchSizes",
+    Map.empty)
+
+  /**
+   * A histogram tracking the sizes of batches that we have processed.
+   */
+  private val batchSizeHist =
+    KafkaYammerMetrics.defaultRegistry().newHistogram(batchSizeHistName, true)
+
+  val lastAppliedImageProvenance: AtomicReference[MetadataProvenance] =
+    new AtomicReference[MetadataProvenance](MetadataProvenance.EMPTY)
   val metadataLoadErrorCount: AtomicLong = new AtomicLong(0)
   val metadataApplyErrorCount: AtomicLong = new AtomicLong(0)
 
@@ -62,15 +93,15 @@ final class BrokerServerMetrics private (metrics: Metrics) extends AutoCloseable
   )
 
   addMetric(metrics, lastAppliedRecordOffsetName) { _ =>
-    lastAppliedRecordOffset.get
+    lastAppliedImageProvenance.get.offset()
   }
 
   addMetric(metrics, lastAppliedRecordTimestampName) { _ =>
-    lastAppliedRecordTimestamp.get
+    lastAppliedImageProvenance.get.lastContainedLogTimeMs()
   }
 
   addMetric(metrics, lastAppliedRecordLagMsName) { now =>
-    now - lastAppliedRecordTimestamp.get
+    now - lastAppliedImageProvenance.get.lastContainedLogTimeMs()
   }
 
   addMetric(metrics, metadataLoadErrorCountName) { _ =>
@@ -82,6 +113,8 @@ final class BrokerServerMetrics private (metrics: Metrics) extends AutoCloseable
   }
 
   override def close(): Unit = {
+    KafkaYammerMetrics.defaultRegistry().removeMetric(batchProcessingTimeHistName)
+    KafkaYammerMetrics.defaultRegistry().removeMetric(batchSizeHistName)
     List(
       lastAppliedRecordOffsetName,
       lastAppliedRecordTimestampName,
@@ -90,6 +123,18 @@ final class BrokerServerMetrics private (metrics: Metrics) extends AutoCloseable
       metadataApplyErrorCountName
     ).foreach(metrics.removeMetric)
   }
+
+  override def updateBatchProcessingTime(elapsedNs: Long): Unit =
+    batchProcessingTimeHist.update(NANOSECONDS.toMicros(elapsedNs))
+
+  override def updateBatchSize(size: Int): Unit = batchSizeHist.update(size)
+
+  override def updateLastAppliedImageProvenance(provenance: MetadataProvenance): Unit =
+    lastAppliedImageProvenance.set(provenance)
+
+  override def lastAppliedOffset(): Long = lastAppliedImageProvenance.get().offset()
+
+  def lastAppliedTimestamp(): Long = lastAppliedImageProvenance.get().lastContainedLogTimeMs()
 }
 
 
diff --git a/core/src/test/java/kafka/test/MockController.java b/core/src/test/java/kafka/test/MockController.java
index ff1154d2119..061e19213e3 100644
--- a/core/src/test/java/kafka/test/MockController.java
+++ b/core/src/test/java/kafka/test/MockController.java
@@ -436,11 +436,6 @@ public class MockController implements Controller {
         return CompletableFuture.completedFuture(results);
     }
 
-    @Override
-    public CompletableFuture<Long> beginWritingSnapshot() {
-        throw new UnsupportedOperationException();
-    }
-
     @Override
     public void beginShutdown() {
         this.active = false;
diff --git a/core/src/test/scala/kafka/server/metadata/BrokerServerMetricsTest.scala b/core/src/test/scala/kafka/server/metadata/BrokerServerMetricsTest.scala
index ea2b439c166..200deed4270 100644
--- a/core/src/test/scala/kafka/server/metadata/BrokerServerMetricsTest.scala
+++ b/core/src/test/scala/kafka/server/metadata/BrokerServerMetricsTest.scala
@@ -22,9 +22,11 @@ import kafka.utils.TestUtils
 import org.apache.kafka.common.MetricName
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.utils.MockTime
+import org.apache.kafka.image.MetadataProvenance
 import org.junit.jupiter.api.Assertions.assertEquals
 import org.junit.jupiter.api.Assertions.assertTrue
 import org.junit.jupiter.api.Test
+
 import scala.jdk.CollectionConverters._
 
 final class BrokerServerMetricsTest {
@@ -59,11 +61,14 @@ final class BrokerServerMetricsTest {
     val metrics = new Metrics()
     TestUtils.resource(BrokerServerMetrics(metrics)) { brokerMetrics =>
       val offsetMetric = metrics.metrics().get(brokerMetrics.lastAppliedRecordOffsetName)
-      assertEquals(0, offsetMetric.metricValue.asInstanceOf[Long])
+      assertEquals(-1L, offsetMetric.metricValue.asInstanceOf[Long])
 
       // Update metric value and check
       val expectedValue = 1000
-      brokerMetrics.lastAppliedRecordOffset.set(expectedValue)
+      brokerMetrics.updateLastAppliedImageProvenance(new MetadataProvenance(
+        expectedValue,
+        brokerMetrics.lastAppliedImageProvenance.get().epoch(),
+        brokerMetrics.lastAppliedTimestamp()));
       assertEquals(expectedValue, offsetMetric.metricValue.asInstanceOf[Long])
     }
   }
@@ -77,12 +82,16 @@ final class BrokerServerMetricsTest {
       val timestampMetric = metrics.metrics().get(brokerMetrics.lastAppliedRecordTimestampName)
       val lagMetric = metrics.metrics().get(brokerMetrics.lastAppliedRecordLagMsName)
 
-      assertEquals(0, timestampMetric.metricValue.asInstanceOf[Long])
-      assertEquals(time.milliseconds, lagMetric.metricValue.asInstanceOf[Long])
+      assertEquals(-1L, timestampMetric.metricValue.asInstanceOf[Long])
+      assertEquals(time.milliseconds + 1, lagMetric.metricValue.asInstanceOf[Long])
 
       // Update metric value and check
-      val timestamp = 500
-      brokerMetrics.lastAppliedRecordTimestamp.set(timestamp)
+      val timestamp = 500L
+
+      brokerMetrics.updateLastAppliedImageProvenance(new MetadataProvenance(
+        brokerMetrics.lastAppliedOffset(),
+        brokerMetrics.lastAppliedImageProvenance.get().epoch(),
+        timestamp))
       assertEquals(timestamp, timestampMetric.metricValue.asInstanceOf[Long])
       assertEquals(time.milliseconds - timestamp, lagMetric.metricValue.asInstanceOf[Long])
     }
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 84d6f5a2ef9..bd1b506e27c 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -22,7 +22,6 @@ import java.util
 import java.util.Collections.{singletonList, singletonMap}
 import java.util.{Collections, Properties}
 import java.util.concurrent.ExecutionException
-
 import kafka.integration.KafkaServerTestHarness
 import kafka.log.LogConfig._
 import kafka.utils._
@@ -43,7 +42,7 @@ import org.apache.kafka.common.record.{CompressionType, RecordVersion}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1
 import org.junit.jupiter.api.Assertions._
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.api.{Test, Timeout}
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 import org.mockito.ArgumentMatchers.{any, anyString}
@@ -53,6 +52,7 @@ import scala.annotation.nowarn
 import scala.collection.{Map, Seq}
 import scala.jdk.CollectionConverters._
 
+@Timeout(100)
 class DynamicConfigChangeTest extends KafkaServerTestHarness {
   def generateConfigs = List(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnectOrNull)))
 
diff --git a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
index a589c0572dd..e559a6b753d 100644
--- a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataListenerTest.scala
@@ -86,8 +86,8 @@ class BrokerMetadataListenerTest {
       val imageRecords = listener.getImageRecords().get()
       assertEquals(0, imageRecords.size())
       assertEquals(100L, listener.highestMetadataOffset)
-      assertEquals(0L, metrics.lastAppliedRecordOffset.get)
-      assertEquals(0L, metrics.lastAppliedRecordTimestamp.get)
+      assertEquals(-1L, metrics.lastAppliedOffset())
+      assertEquals(-1L, metrics.lastAppliedTimestamp())
       assertEquals(0L, metrics.metadataLoadErrorCount.get)
       assertEquals(0L, metrics.metadataApplyErrorCount.get)
 
@@ -121,8 +121,8 @@ class BrokerMetadataListenerTest {
         override def publishedOffset: Long = -1
       }).get()
 
-      assertEquals(fencedLastOffset, metrics.lastAppliedRecordOffset.get)
-      assertEquals(fencedTimestamp, metrics.lastAppliedRecordTimestamp.get)
+      assertEquals(fencedLastOffset, metrics.lastAppliedOffset())
+      assertEquals(fencedTimestamp, metrics.lastAppliedTimestamp())
       assertEquals(0L, metrics.metadataLoadErrorCount.get)
       assertEquals(0L, metrics.metadataApplyErrorCount.get)
     } finally {
diff --git a/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java
index d3fc0fe76ed..313927fc537 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/AclControlManager.java
@@ -28,7 +28,6 @@ import org.apache.kafka.common.metadata.RemoveAccessControlEntryRecord;
 import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer;
 import org.apache.kafka.metadata.authorizer.StandardAcl;
-import org.apache.kafka.metadata.authorizer.StandardAclRecordIterator;
 import org.apache.kafka.metadata.authorizer.StandardAclWithId;
 import org.apache.kafka.raft.OffsetAndEpoch;
 import org.apache.kafka.server.authorizer.AclCreateResult;
@@ -42,7 +41,6 @@ import org.apache.kafka.timeline.TimelineHashSet;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -216,20 +214,4 @@ public class AclControlManager {
     Map<Uuid, StandardAcl> idToAcl() {
         return Collections.unmodifiableMap(idToAcl);
     }
-
-    Iterator<List<ApiMessageAndVersion>> iterator(long epoch) {
-        Iterator<Entry<Uuid, StandardAcl>> iterator = idToAcl.entrySet(epoch).iterator();
-        return new StandardAclRecordIterator(new Iterator<StandardAclWithId>() {
-            @Override
-            public boolean hasNext() {
-                return iterator.hasNext();
-            }
-
-            @Override
-            public StandardAclWithId next() {
-                Entry<Uuid, StandardAcl> entry = iterator.next();
-                return new StandardAclWithId(entry.getKey(), entry.getValue());
-            }
-        });
-    }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java
index 504994b3a5b..b859bbfd65d 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClientQuotaControlManager.java
@@ -36,11 +36,9 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
@@ -288,44 +286,4 @@ public class ClientQuotaControlManager {
 
         return ApiError.NONE;
     }
-
-    class ClientQuotaControlIterator implements Iterator<List<ApiMessageAndVersion>> {
-        private final long epoch;
-        private final Iterator<Entry<ClientQuotaEntity, TimelineHashMap<String, Double>>> iterator;
-
-        ClientQuotaControlIterator(long epoch) {
-            this.epoch = epoch;
-            this.iterator = clientQuotaData.entrySet(epoch).iterator();
-        }
-
-        @Override
-        public boolean hasNext() {
-            return iterator.hasNext();
-        }
-
-        @Override
-        public List<ApiMessageAndVersion> next() {
-            if (!hasNext()) throw new NoSuchElementException();
-            Entry<ClientQuotaEntity, TimelineHashMap<String, Double>> entry = iterator.next();
-            ClientQuotaEntity entity = entry.getKey();
-            List<ApiMessageAndVersion> records = new ArrayList<>();
-            for (Entry<String, Double> quotaEntry : entry.getValue().entrySet(epoch)) {
-                ClientQuotaRecord record = new ClientQuotaRecord();
-                for (Entry<String, String> entityEntry : entity.entries().entrySet()) {
-                    record.entity().add(new EntityData().
-                        setEntityType(entityEntry.getKey()).
-                        setEntityName(entityEntry.getValue()));
-                }
-                record.setKey(quotaEntry.getKey());
-                record.setValue(quotaEntry.getValue());
-                record.setRemove(false);
-                records.add(new ApiMessageAndVersion(record, (short) 0));
-            }
-            return records;
-        }
-    }
-
-    ClientQuotaControlIterator iterator(long epoch) {
-        return new ClientQuotaControlIterator(epoch);
-    }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 95aae773fb4..ba6c0e1d1c2 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -29,9 +29,7 @@ import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
 import org.apache.kafka.common.metadata.FenceBrokerRecord;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpoint;
-import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointCollection;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerFeature;
-import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerFeatureCollection;
 import org.apache.kafka.common.metadata.UnfenceBrokerRecord;
 import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
 import org.apache.kafka.common.protocol.ApiMessage;
@@ -59,7 +57,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.NoSuchElementException;
 import java.util.Optional;
 import java.util.OptionalLong;
 import java.util.Random;
@@ -68,7 +65,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
-import static java.util.Collections.singletonList;
 import static java.util.concurrent.TimeUnit.NANOSECONDS;
 
 
@@ -654,60 +650,4 @@ public class ClusterControlManager {
             readyBrokersFuture = Optional.empty();
         }
     }
-
-    class ClusterControlIterator implements Iterator<List<ApiMessageAndVersion>> {
-        private final Iterator<Entry<Integer, BrokerRegistration>> iterator;
-        private final MetadataVersion metadataVersion;
-
-        ClusterControlIterator(long epoch) {
-            this.iterator = brokerRegistrations.entrySet(epoch).iterator();
-            this.metadataVersion = featureControl.metadataVersion();
-        }
-
-        @Override
-        public boolean hasNext() {
-            return iterator.hasNext();
-        }
-
-        @Override
-        public List<ApiMessageAndVersion> next() {
-            if (!hasNext()) throw new NoSuchElementException();
-            Entry<Integer, BrokerRegistration> entry = iterator.next();
-            int brokerId = entry.getKey();
-            BrokerRegistration registration = entry.getValue();
-            BrokerEndpointCollection endpoints = new BrokerEndpointCollection();
-            for (Entry<String, Endpoint> endpointEntry : registration.listeners().entrySet()) {
-                endpoints.add(new BrokerEndpoint().setName(endpointEntry.getKey()).
-                    setHost(endpointEntry.getValue().host()).
-                    setPort(endpointEntry.getValue().port()).
-                    setSecurityProtocol(endpointEntry.getValue().securityProtocol().id));
-            }
-            BrokerFeatureCollection features = new BrokerFeatureCollection();
-            for (Entry<String, VersionRange> featureEntry : registration.supportedFeatures().entrySet()) {
-                features.add(new BrokerFeature().setName(featureEntry.getKey()).
-                    setMaxSupportedVersion(featureEntry.getValue().max()).
-                    setMinSupportedVersion(featureEntry.getValue().min()));
-            }
-            RegisterBrokerRecord record = new RegisterBrokerRecord().
-                setBrokerId(brokerId).
-                setIncarnationId(registration.incarnationId()).
-                setBrokerEpoch(registration.epoch()).
-                setEndPoints(endpoints).
-                setFeatures(features).
-                setRack(registration.rack().orElse(null)).
-                setFenced(registration.fenced());
-            if (metadataVersion.isInControlledShutdownStateSupported()) {
-                record.setInControlledShutdown(registration.inControlledShutdown());
-            }
-            if (metadataVersion.isMigrationSupported()) {
-                record.setIsMigratingZkBroker(registration.isMigratingZkBroker());
-            }
-            return singletonList(new ApiMessageAndVersion(record,
-                metadataVersion.registerBrokerRecordVersion()));
-        }
-    }
-
-    ClusterControlIterator iterator(long epoch) {
-        return new ClusterControlIterator(epoch);
-    }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
index d569ae1a6ca..b5d71230cf4 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ConfigurationControlManager.java
@@ -43,7 +43,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.function.Consumer;
@@ -474,39 +473,4 @@ public class ConfigurationControlManager {
         Map<String, String> result = configData.get(currentController);
         return (result == null) ? Collections.emptyMap() : result;
     }
-
-    class ConfigurationControlIterator implements Iterator<List<ApiMessageAndVersion>> {
-        private final long epoch;
-        private final Iterator<Entry<ConfigResource, TimelineHashMap<String, String>>> iterator;
-
-        ConfigurationControlIterator(long epoch) {
-            this.epoch = epoch;
-            this.iterator = configData.entrySet(epoch).iterator();
-        }
-
-        @Override
-        public boolean hasNext() {
-            return iterator.hasNext();
-        }
-
-        @Override
-        public List<ApiMessageAndVersion> next() {
-            if (!hasNext()) throw new NoSuchElementException();
-            List<ApiMessageAndVersion> records = new ArrayList<>();
-            Entry<ConfigResource, TimelineHashMap<String, String>> entry = iterator.next();
-            ConfigResource resource = entry.getKey();
-            for (Entry<String, String> configEntry : entry.getValue().entrySet(epoch)) {
-                records.add(new ApiMessageAndVersion(new ConfigRecord().
-                    setResourceName(resource.name()).
-                    setResourceType(resource.type().id()).
-                    setName(configEntry.getKey()).
-                    setValue(configEntry.getValue()), (short) 0));
-            }
-            return records;
-        }
-    }
-
-    ConfigurationControlIterator iterator(long epoch) {
-        return new ConfigurationControlIterator(epoch);
-    }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/Controller.java b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
index 3622fe225dc..ed6c5237533 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/Controller.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/Controller.java
@@ -316,14 +316,6 @@ public interface Controller extends AclMutator, AutoCloseable {
         UpdateFeaturesRequestData request
     );
 
-    /**
-     * Begin writing a controller snapshot.  If there was already an ongoing snapshot, it
-     * simply returns information about that snapshot rather than starting a new one.
-     *
-     * @return              A future yielding the epoch of the snapshot.
-     */
-    CompletableFuture<Long> beginWritingSnapshot();
-
     /**
      * Create partitions on certain topics.
      *
diff --git a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
index b3758586cb8..7b2a9308e96 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/FeatureControlManager.java
@@ -20,11 +20,9 @@ package org.apache.kafka.controller;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Map;
-import java.util.NoSuchElementException;
 import java.util.Optional;
 import java.util.TreeMap;
 import java.util.function.Consumer;
@@ -306,48 +304,6 @@ public class FeatureControlManager {
         }
     }
 
-    class FeatureControlIterator implements Iterator<List<ApiMessageAndVersion>> {
-        private final Iterator<Entry<String, Short>> iterator;
-        private final MetadataVersion metadataVersion;
-        private boolean wroteVersion = false;
-
-        FeatureControlIterator(long epoch) {
-            this.iterator = finalizedVersions.entrySet(epoch).iterator();
-            this.metadataVersion = FeatureControlManager.this.metadataVersion.get(epoch);
-        }
-
-        @Override
-        public boolean hasNext() {
-            return needsWriteMetadataVersion() || iterator.hasNext();
-        }
-
-        private boolean needsWriteMetadataVersion() {
-            return !wroteVersion && metadataVersion.isAtLeast(minimumBootstrapVersion);
-        }
-
-        @Override
-        public List<ApiMessageAndVersion> next() {
-            // Write the metadata.version first
-            if (needsWriteMetadataVersion()) {
-                wroteVersion = true;
-                return Collections.singletonList(new ApiMessageAndVersion(new FeatureLevelRecord()
-                    .setName(MetadataVersion.FEATURE_NAME)
-                    .setFeatureLevel(metadataVersion.featureLevel()), FEATURE_LEVEL_RECORD.lowestSupportedVersion()));
-            }
-
-            // Then write the rest of the features
-            if (!iterator.hasNext()) throw new NoSuchElementException();
-            Entry<String, Short> entry = iterator.next();
-            return Collections.singletonList(new ApiMessageAndVersion(new FeatureLevelRecord()
-                .setName(entry.getKey())
-                .setFeatureLevel(entry.getValue()), (short) 0));
-        }
-    }
-
-    FeatureControlIterator iterator(long epoch) {
-        return new FeatureControlIterator(epoch);
-    }
-
     boolean isControllerId(int nodeId) {
         return quorumFeatures.isControllerId(nodeId);
     }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/LogReplayTracker.java b/metadata/src/main/java/org/apache/kafka/controller/LogReplayTracker.java
index 2e29a2a52f9..41f881245fc 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/LogReplayTracker.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/LogReplayTracker.java
@@ -24,8 +24,7 @@ import org.slf4j.Logger;
 
 /**
  * The LogReplayTracker manages state associated with replaying the metadata log, such as whether
- * we have seen any records and whether we have seen any metadata version records. It is accessed
- * solely from the quorum controller thread.
+ * we have seen any records. It is accessed solely from the quorum controller thread.
  */
 public class LogReplayTracker {
     public static class Builder {
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
index 178ef46bdb5..47e4e1b430f 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ProducerIdControlManager.java
@@ -25,14 +25,10 @@ import org.apache.kafka.timeline.SnapshotRegistry;
 import org.apache.kafka.timeline.TimelineLong;
 import org.apache.kafka.timeline.TimelineObject;
 
-import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
 
 
 public class ProducerIdControlManager {
-
     private final ClusterControlManager clusterControlManager;
     private final TimelineObject<ProducerIdsBlock> nextProducerBlock;
     private final TimelineLong brokerEpoch;
@@ -62,6 +58,11 @@ public class ProducerIdControlManager {
         return ControllerResult.of(Collections.singletonList(new ApiMessageAndVersion(record, (short) 0)), block);
     }
 
+    // VisibleForTesting
+    ProducerIdsBlock nextProducerBlock() {
+        return nextProducerBlock.get();
+    }
+
     void replay(ProducerIdsRecord record) {
         long currentNextProducerId = nextProducerBlock.get().firstProducerId();
         if (record.nextProducerId() <= currentNextProducerId) {
@@ -72,19 +73,4 @@ public class ProducerIdControlManager {
             brokerEpoch.set(record.brokerEpoch());
         }
     }
-
-    Iterator<List<ApiMessageAndVersion>> iterator(long epoch) {
-        List<ApiMessageAndVersion> records = new ArrayList<>(1);
-
-        ProducerIdsBlock producerIdBlock = nextProducerBlock.get(epoch);
-        if (producerIdBlock.firstProducerId() > 0) {
-            records.add(new ApiMessageAndVersion(
-                new ProducerIdsRecord()
-                    .setNextProducerId(producerIdBlock.firstProducerId())
-                    .setBrokerId(producerIdBlock.assignedBrokerId())
-                    .setBrokerEpoch(brokerEpoch.get(epoch)),
-                (short) 0));
-        }
-        return Collections.singleton(records).iterator();
-    }
 }
diff --git a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 6716044bef5..642d6976dba 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -72,7 +72,6 @@ import org.apache.kafka.common.requests.ApiError;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.controller.SnapshotGenerator.Section;
 import org.apache.kafka.metadata.BrokerHeartbeatReply;
 import org.apache.kafka.metadata.BrokerRegistrationReply;
 import org.apache.kafka.metadata.FinalizedControllerFeatures;
@@ -89,7 +88,6 @@ import org.apache.kafka.raft.BatchReader;
 import org.apache.kafka.raft.LeaderAndEpoch;
 import org.apache.kafka.raft.OffsetAndEpoch;
 import org.apache.kafka.raft.RaftClient;
-import org.apache.kafka.metadata.util.SnapshotReason;
 import org.apache.kafka.server.authorizer.AclCreateResult;
 import org.apache.kafka.server.authorizer.AclDeleteResult;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
@@ -98,7 +96,6 @@ import org.apache.kafka.server.fault.FaultHandler;
 import org.apache.kafka.server.policy.AlterConfigPolicy;
 import org.apache.kafka.server.policy.CreateTopicPolicy;
 import org.apache.kafka.snapshot.SnapshotReader;
-import org.apache.kafka.snapshot.SnapshotWriter;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.slf4j.Logger;
 
@@ -107,7 +104,6 @@ import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map.Entry;
 import java.util.Map;
@@ -168,8 +164,6 @@ public final class QuorumController implements Controller {
         private short defaultReplicationFactor = 3;
         private int defaultNumPartitions = 1;
         private ReplicaPlacer replicaPlacer = new StripedReplicaPlacer(new Random());
-        private long snapshotMaxNewRecordBytes = Long.MAX_VALUE;
-        private long snapshotMaxIntervalMs = 0;
         private OptionalLong leaderImbalanceCheckIntervalNs = OptionalLong.empty();
         private OptionalLong maxIdleIntervalNs = OptionalLong.empty();
         private long sessionTimeoutNs = ClusterControlManager.DEFAULT_SESSION_TIMEOUT_NS;
@@ -242,16 +236,6 @@ public final class QuorumController implements Controller {
             return this;
         }
 
-        public Builder setSnapshotMaxNewRecordBytes(long value) {
-            this.snapshotMaxNewRecordBytes = value;
-            return this;
-        }
-
-        public Builder setSnapshotMaxIntervalMs(long value) {
-            this.snapshotMaxIntervalMs = value;
-            return this;
-        }
-
         public Builder setLeaderImbalanceCheckIntervalNs(OptionalLong value) {
             this.leaderImbalanceCheckIntervalNs = value;
             return this;
@@ -351,8 +335,6 @@ public final class QuorumController implements Controller {
                     defaultReplicationFactor,
                     defaultNumPartitions,
                     replicaPlacer,
-                    snapshotMaxNewRecordBytes,
-                    snapshotMaxIntervalMs,
                     leaderImbalanceCheckIntervalNs,
                     maxIdleIntervalNs,
                     sessionTimeoutNs,
@@ -521,132 +503,6 @@ public final class QuorumController implements Controller {
         queue.append(event);
     }
 
-    private static final String GENERATE_SNAPSHOT = "generateSnapshot";
-
-    private static final int MAX_BATCHES_PER_GENERATE_CALL = 10;
-
-    class SnapshotGeneratorManager implements Runnable {
-        private SnapshotGenerator generator = null;
-
-        void createSnapshotGenerator(long committedOffset, int committedEpoch, long committedTimestamp) {
-            if (snapshotInProgress()) {
-                throw new IllegalStateException("Snapshot generator already exists");
-            }
-            if (!snapshotRegistry.hasSnapshot(committedOffset)) {
-                throw new IllegalStateException(
-                    String.format(
-                        "Cannot generate a snapshot at committed offset %d because it does not exists in the snapshot registry.",
-                        committedOffset
-                    )
-                );
-            }
-
-            OffsetAndEpoch snapshotId = new OffsetAndEpoch(
-                committedOffset + 1,
-                committedEpoch
-            );
-
-            Optional<SnapshotWriter<ApiMessageAndVersion>> writer = raftClient.createSnapshot(
-                snapshotId,
-                committedTimestamp
-            );
-            if (writer.isPresent()) {
-                generator = new SnapshotGenerator(
-                    logContext,
-                    writer.get(),
-                    MAX_BATCHES_PER_GENERATE_CALL,
-                    Arrays.asList(
-                        new Section("features", featureControl.iterator(committedOffset)),
-                        new Section("cluster", clusterControl.iterator(committedOffset)),
-                        new Section("replication", replicationControl.iterator(committedOffset)),
-                        new Section("configuration", configurationControl.iterator(committedOffset)),
-                        new Section("clientQuotas", clientQuotaControlManager.iterator(committedOffset)),
-                        new Section("producerIds", producerIdControlManager.iterator(committedOffset)),
-                        new Section("acls", aclControlManager.iterator(committedOffset))
-                    )
-                );
-                reschedule(0);
-            } else {
-                log.info(
-                    "Skipping generation of snapshot for committed offset {} and epoch {} since it already exists",
-                    committedOffset,
-                    committedEpoch
-                );
-            }
-        }
-
-        void cancel() {
-            if (!snapshotInProgress()) return;
-            log.error("Cancelling snapshot {}", generator.lastContainedLogOffset());
-            generator.writer().close();
-            generator = null;
-
-            // Delete every in-memory snapshot up to the committed offset. They are not needed since this
-            // snapshot generation was canceled.
-            snapshotRegistry.deleteSnapshotsUpTo(lastCommittedOffset);
-
-            queue.cancelDeferred(GENERATE_SNAPSHOT);
-        }
-
-        void reschedule(long delayNs) {
-            ControllerEvent event = new ControllerEvent(GENERATE_SNAPSHOT, this);
-            queue.scheduleDeferred(event.name,
-                new EarliestDeadlineFunction(time.nanoseconds() + delayNs), event);
-        }
-
-        void handleSnapshotFinished(Optional<Exception> exception) {
-            if (exception.isPresent()) {
-                log.error("Error while generating snapshot {}", generator.lastContainedLogOffset(), exception.get());
-            } else {
-                log.info("Finished generating snapshot {}.", generator.lastContainedLogOffset());
-            }
-
-            generator.writer().close();
-            generator = null;
-
-            // Delete every in-memory snapshot up to the committed offset. They are not needed since this
-            // snapshot generation finished.
-            snapshotRegistry.deleteSnapshotsUpTo(lastCommittedOffset);
-
-            // The snapshot counters for size-based and time-based snapshots could have changed to cause a new
-            // snapshot to get generated.
-            maybeGenerateSnapshot();
-        }
-
-        @Override
-        public void run() {
-            if (!snapshotInProgress()) {
-                log.debug("No snapshot is in progress because it was previously canceled");
-                return;
-            }
-
-            OptionalLong nextDelay;
-            try {
-                nextDelay = generator.generateBatches();
-            } catch (Exception e) {
-                handleSnapshotFinished(Optional.of(e));
-                return;
-            }
-
-            if (nextDelay.isPresent()) {
-                reschedule(nextDelay.getAsLong());
-            } else {
-                handleSnapshotFinished(Optional.empty());
-            }
-        }
-
-        OptionalLong snapshotLastOffsetFromLog() {
-            if (!snapshotInProgress()) {
-                return OptionalLong.empty();
-            }
-            return OptionalLong.of(generator.lastContainedLogOffset());
-        }
-
-        public boolean snapshotInProgress() {
-            return generator != null;
-        }
-    }
-
     /**
      * A controller event that reads the committed internal state in order to expose it
      * to an API.
@@ -985,12 +841,8 @@ public final class QuorumController implements Controller {
                             // Complete any events in the purgatory that were waiting for this offset.
                             purgatory.completeUpTo(offset);
 
-                            // Delete all the in-memory snapshots that are no longer needed.
-                            //
-                            // If the active controller has a snapshot in progress, it needs to keep that in-memory
-                            // snapshot. Otherwise, the active controller can delete up to the current committed offset.
-                            snapshotRegistry.deleteSnapshotsUpTo(
-                                snapshotGeneratorManager.snapshotLastOffsetFromLog().orElse(offset));
+                            // The active controller can delete up to the current committed offset.
+                            snapshotRegistry.deleteSnapshotsUpTo(offset);
                         } else {
                             // If the controller is a standby, replay the records that were
                             // created by the active controller.
@@ -1022,8 +874,7 @@ public final class QuorumController implements Controller {
                         updateLastCommittedState(
                             offset,
                             epoch,
-                            batch.appendTimestamp(),
-                            committedBytesSinceLastSnapshot + batch.sizeInBytes()
+                            batch.appendTimestamp()
                         );
 
                         if (offset >= raftClient.latestSnapshotId().map(OffsetAndEpoch::offset).orElse(0L)) {
@@ -1033,8 +884,6 @@ public final class QuorumController implements Controller {
                             );
                         }
                     }
-
-                    maybeGenerateSnapshot();
                 } finally {
                     reader.close();
                 }
@@ -1086,12 +935,9 @@ public final class QuorumController implements Controller {
                             i++;
                         }
                     }
-                    updateLastCommittedState(
-                        reader.lastContainedLogOffset(),
+                    updateLastCommittedState(reader.lastContainedLogOffset(),
                         reader.lastContainedLogEpoch(),
-                        reader.lastContainedLogTimestamp(),
-                        0
-                    );
+                        reader.lastContainedLogTimestamp());
                     snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
                     authorizer.ifPresent(a -> a.loadSnapshot(aclControlManager.idToAcl()));
                 } finally {
@@ -1247,13 +1093,11 @@ public final class QuorumController implements Controller {
     private void updateLastCommittedState(
         long offset,
         int epoch,
-        long timestamp,
-        long bytesSinceLastSnapshot
+        long timestamp
     ) {
         lastCommittedOffset = offset;
         lastCommittedEpoch = epoch;
         lastCommittedTimestamp = timestamp;
-        committedBytesSinceLastSnapshot = bytesSinceLastSnapshot;
 
         controllerMetrics.setLastCommittedRecordOffset(offset);
         if (!isActiveController()) {
@@ -1426,38 +1270,6 @@ public final class QuorumController implements Controller {
         queue.cancelDeferred(WRITE_NO_OP_RECORD);
     }
 
-    private static final String MAYBE_GENERATE_SNAPSHOT = "maybeGenerateSnapshot";
-
-    private void maybeScheduleNextGenerateSnapshot() {
-        if (!generateSnapshotScheduled) {
-            long now = time.milliseconds();
-            long delayMs = Math.min(
-                0,
-                snapshotMaxIntervalMs + oldestNonSnapshottedTimestamp - now
-            );
-
-            log.debug(
-                "Scheduling write event for {} because snapshotMaxIntervalMs ({}), " +
-                "oldestNonSnapshottedTimestamp ({}) and now ({})",
-                MAYBE_GENERATE_SNAPSHOT,
-                snapshotMaxIntervalMs,
-                oldestNonSnapshottedTimestamp,
-                now
-            );
-
-            ControllerEvent event = new ControllerEvent(MAYBE_GENERATE_SNAPSHOT, this::maybeGenerateSnapshot);
-
-            long scheduleNs = time.nanoseconds() + TimeUnit.MILLISECONDS.toNanos(delayMs);
-            queue.scheduleDeferred(MAYBE_GENERATE_SNAPSHOT, new EarliestDeadlineFunction(scheduleNs), event);
-            generateSnapshotScheduled = true;
-        }
-    }
-
-    private void cancelNextGenerateSnapshot() {
-        queue.cancelDeferred(MAYBE_GENERATE_SNAPSHOT);
-        generateSnapshotScheduled = false;
-    }
-
     private void handleFeatureControlChange() {
         // The feature control maybe have changed. On the active controller cancel or schedule noop
         // record writes accordingly.
@@ -1536,72 +1348,13 @@ public final class QuorumController implements Controller {
         }
     }
 
-    private void maybeGenerateSnapshot() {
-        if (snapshotGeneratorManager.snapshotInProgress()) {
-            /* Skip snapshot generation if there is a snaphshot in progress.
-             *
-             * When the in-progress snapshot completes it will call this method to check if the controller should
-             * generate another snapshot due to any of the reasons supported by this method.
-             */
-        } else {
-            Set<SnapshotReason> snapshotReasons = new HashSet<>();
-            // Check if a snapshot should be generated because of committed bytes
-            if (committedBytesSinceLastSnapshot >= snapshotMaxNewRecordBytes) {
-                snapshotReasons.add(
-                    SnapshotReason.maxBytesExceeded(committedBytesSinceLastSnapshot, snapshotMaxNewRecordBytes)
-                );
-            }
-
-            // Check if a snapshot should be generated because of committed append times
-            if (snapshotMaxIntervalMs > 0) {
-                // Time base snasphots are enabled
-                long snapshotIntervalMs = time.milliseconds() - oldestNonSnapshottedTimestamp;
-                if (snapshotIntervalMs >= snapshotMaxIntervalMs) {
-                    snapshotReasons.add(SnapshotReason.maxIntervalExceeded(snapshotIntervalMs, snapshotMaxIntervalMs));
-                } else {
-                    maybeScheduleNextGenerateSnapshot();
-                }
-            }
-
-            if (!snapshotReasons.isEmpty()) {
-                if (!isActiveController()) {
-                    // The inactive controllers only create an in-memory snapshot when generating a snapshot. This is
-                    // unlike the active controller which creates in-memory snapshots every time an uncommitted batch
-                    // gets replayed.
-                    snapshotRegistry.getOrCreateSnapshot(lastCommittedOffset);
-                }
-
-                log.info(
-                    "Generating a snapshot that includes (epoch={}, offset={}) because: {}",
-                    lastCommittedEpoch,
-                    lastCommittedOffset,
-                    SnapshotReason.stringFromReasons(snapshotReasons)
-                );
-
-                snapshotGeneratorManager.createSnapshotGenerator(
-                    lastCommittedOffset,
-                    lastCommittedEpoch,
-                    lastCommittedTimestamp
-                );
-
-                // Reset all of the snapshot counters
-                committedBytesSinceLastSnapshot = 0;
-                oldestNonSnapshottedTimestamp = Long.MAX_VALUE;
-
-                // Starting a snapshot invalidates any scheduled snapshot generation
-                cancelNextGenerateSnapshot();
-            }
-        }
-    }
-
     /**
      * Clear all data structures and reset all KRaft state.
      */
     private void resetToEmptyState() {
-        snapshotGeneratorManager.cancel();
         snapshotRegistry.reset();
 
-        updateLastCommittedState(-1, -1, -1, 0);
+        updateLastCommittedState(-1, -1, -1);
     }
 
     /**
@@ -1715,11 +1468,6 @@ public final class QuorumController implements Controller {
      */
     private final LogReplayTracker logReplayTracker;
 
-    /**
-     * Manages generating controller snapshots.
-     */
-    private final SnapshotGeneratorManager snapshotGeneratorManager = new SnapshotGeneratorManager();
-
     /**
      * The interface that we use to mutate the Raft log.
      */
@@ -1766,21 +1514,6 @@ public final class QuorumController implements Controller {
      */
     private long writeOffset;
 
-    /**
-     * Maximum number of bytes processed through handling commits before generating a snapshot.
-     */
-    private final long snapshotMaxNewRecordBytes;
-
-    /**
-     * Number of bytes processed through handling commits since the last snapshot was generated.
-     */
-    private long committedBytesSinceLastSnapshot = 0;
-
-    /**
-     * Maximum amount of to wait for a record in the log to get included in a snapshot.
-     */
-    private final long snapshotMaxIntervalMs;
-
     /**
      * Timestamp for the oldest record that was committed but not included in a snapshot.
      */
@@ -1843,8 +1576,6 @@ public final class QuorumController implements Controller {
         short defaultReplicationFactor,
         int defaultNumPartitions,
         ReplicaPlacer replicaPlacer,
-        long snapshotMaxNewRecordBytes,
-        long snapshotMaxIntervalMs,
         OptionalLong leaderImbalanceCheckIntervalNs,
         OptionalLong maxIdleIntervalNs,
         long sessionTimeoutNs,
@@ -1903,8 +1634,6 @@ public final class QuorumController implements Controller {
             setZkMigrationEnabled(zkMigrationEnabled).
             build();
         this.producerIdControlManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
-        this.snapshotMaxNewRecordBytes = snapshotMaxNewRecordBytes;
-        this.snapshotMaxIntervalMs = snapshotMaxIntervalMs;
         this.leaderImbalanceCheckIntervalNs = leaderImbalanceCheckIntervalNs;
         this.maxIdleIntervalNs = maxIdleIntervalNs;
         this.replicationControl = new ReplicationControlManager.Builder().
@@ -2243,28 +1972,6 @@ public final class QuorumController implements Controller {
         });
     }
 
-    @Override
-    public CompletableFuture<Long> beginWritingSnapshot() {
-        CompletableFuture<Long> future = new CompletableFuture<>();
-        appendControlEvent("beginWritingSnapshot", () -> {
-            if (!snapshotGeneratorManager.snapshotInProgress()) {
-                log.info(
-                    "Generating a snapshot that includes (epoch={}, offset={}) because, {}.",
-                    lastCommittedEpoch,
-                    lastCommittedOffset,
-                    SnapshotReason.UNKNOWN
-                );
-                snapshotGeneratorManager.createSnapshotGenerator(
-                    lastCommittedOffset,
-                    lastCommittedEpoch,
-                    lastCommittedTimestamp
-                );
-            }
-            future.complete(snapshotGeneratorManager.generator.lastContainedLogOffset());
-        });
-        return future;
-    }
-
     @Override
     public CompletableFuture<List<AclCreateResult>> createAcls(
         ControllerRequestContext context,
diff --git a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
index dbe23218d48..5fd9af85e06 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
@@ -108,7 +108,6 @@ import java.util.List;
 import java.util.ListIterator;
 import java.util.Map.Entry;
 import java.util.Map;
-import java.util.NoSuchElementException;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.Set;
@@ -1911,39 +1910,6 @@ public class ReplicationControlManager {
             setReplicas(Replicas.toList(partition.replicas)));
     }
 
-    class ReplicationControlIterator implements Iterator<List<ApiMessageAndVersion>> {
-        private final long epoch;
-        private final Iterator<TopicControlInfo> iterator;
-
-        ReplicationControlIterator(long epoch) {
-            this.epoch = epoch;
-            this.iterator = topics.values(epoch).iterator();
-        }
-
-        @Override
-        public boolean hasNext() {
-            return iterator.hasNext();
-        }
-
-        @Override
-        public List<ApiMessageAndVersion> next() {
-            if (!hasNext()) throw new NoSuchElementException();
-            TopicControlInfo topic = iterator.next();
-            List<ApiMessageAndVersion> records = new ArrayList<>();
-            records.add(new ApiMessageAndVersion(new TopicRecord().
-                setName(topic.name).
-                setTopicId(topic.id), (short) 0));
-            for (Entry<Integer, PartitionRegistration> entry : topic.parts.entrySet(epoch)) {
-                records.add(entry.getValue().toRecord(topic.id, entry.getKey()));
-            }
-            return records;
-        }
-    }
-
-    ReplicationControlIterator iterator(long epoch) {
-        return new ReplicationControlIterator(epoch);
-    }
-
     private static final class IneligibleReplica {
         private final int replicaId;
         private final String reason;
diff --git a/metadata/src/main/java/org/apache/kafka/controller/SnapshotGenerator.java b/metadata/src/main/java/org/apache/kafka/controller/SnapshotGenerator.java
deleted file mode 100644
index d34696ef44f..00000000000
--- a/metadata/src/main/java/org/apache/kafka/controller/SnapshotGenerator.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.controller;
-
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.OptionalLong;
-
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.snapshot.SnapshotWriter;
-import org.slf4j.Logger;
-
-
-final class SnapshotGenerator {
-    static class Section {
-        private final String name;
-        private final Iterator<List<ApiMessageAndVersion>> iterator;
-
-        Section(String name, Iterator<List<ApiMessageAndVersion>> iterator) {
-            this.name = name;
-            this.iterator = iterator;
-        }
-
-        String name() {
-            return name;
-        }
-
-        Iterator<List<ApiMessageAndVersion>> iterator() {
-            return iterator;
-        }
-    }
-
-    private final Logger log;
-    private final SnapshotWriter<ApiMessageAndVersion> writer;
-    private final int maxBatchesPerGenerateCall;
-    private final List<Section> sections;
-    private final Iterator<Section> sectionIterator;
-    private Iterator<List<ApiMessageAndVersion>> batchIterator;
-    private List<ApiMessageAndVersion> batch;
-    private Section section;
-    private long numRecords;
-
-    SnapshotGenerator(LogContext logContext,
-                      SnapshotWriter<ApiMessageAndVersion> writer,
-                      int maxBatchesPerGenerateCall,
-                      List<Section> sections) {
-        this.log = logContext.logger(SnapshotGenerator.class);
-        this.writer = writer;
-        this.maxBatchesPerGenerateCall = maxBatchesPerGenerateCall;
-        this.sections = sections;
-        this.sectionIterator = this.sections.iterator();
-        this.batchIterator = Collections.emptyIterator();
-        this.batch = null;
-        this.section = null;
-        this.numRecords = 0;
-    }
-
-    /**
-     * Returns the last offset from the log that will be included in the snapshot.
-     */
-    long lastContainedLogOffset() {
-        return writer.lastContainedLogOffset();
-    }
-
-    SnapshotWriter writer() {
-        return writer;
-    }
-
-    /**
-     * Generate and write the next batch of records.
-     *
-     * @return true if the last batch was generated, otherwise false
-     */
-    private boolean generateBatch() throws Exception {
-        if (batch == null) {
-            while (!batchIterator.hasNext()) {
-                if (section != null) {
-                    log.info("Generated {} record(s) for the {} section of snapshot {}.",
-                             numRecords, section.name(), writer.snapshotId());
-                    section = null;
-                    numRecords = 0;
-                }
-                if (!sectionIterator.hasNext()) {
-                    writer.freeze();
-                    return true;
-                }
-                section = sectionIterator.next();
-                log.info("Generating records for the {} section of snapshot {}.",
-                         section.name(), writer.snapshotId());
-                batchIterator = section.iterator();
-            }
-            batch = batchIterator.next();
-        }
-
-        writer.append(batch);
-        numRecords += batch.size();
-        batch = null;
-        return false;
-    }
-
-    /**
-     * Generate the next few batches of records.
-     *
-     * @return  The number of nanoseconds to delay before rescheduling the
-     *          generateBatches event, or empty if the snapshot is done.
-     */
-    OptionalLong generateBatches() throws Exception {
-        for (int numBatches = 0; numBatches < maxBatchesPerGenerateCall; numBatches++) {
-            if (generateBatch()) {
-                return OptionalLong.empty();
-            }
-        }
-        return OptionalLong.of(0);
-    }
-}
diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java b/metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java
new file mode 100644
index 00000000000..c3ab72b3b79
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/LogDeltaManifest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader;
+
+import org.apache.kafka.image.MetadataProvenance;
+
+import java.util.Objects;
+
+
+/**
+ * Contains information about a set of changes that were loaded from the metadata log.
+ */
+public class LogDeltaManifest {
+    /**
+     * The highest offset and epoch included in this delta, inclusive.
+     */
+    private final MetadataProvenance provenance;
+
+    /**
+     * The number of batches that were loaded.
+     */
+    private final int numBatches;
+
+    /**
+     * The time in nanoseconds that it took to load the changes.
+     */
+    private final long elapsedNs;
+
+    /**
+     * The total size of the records in bytes that we read while creating the delta.
+     */
+    private final long numBytes;
+
+    public LogDeltaManifest(
+        MetadataProvenance provenance,
+        int numBatches,
+        long elapsedNs,
+        long numBytes
+    ) {
+        this.provenance = provenance;
+        this.numBatches = numBatches;
+        this.elapsedNs = elapsedNs;
+        this.numBytes = numBytes;
+    }
+
+
+    public MetadataProvenance provenance() {
+        return provenance;
+    }
+
+    public int numBatches() {
+        return numBatches;
+    }
+
+    public long elapsedNs() {
+        return elapsedNs;
+    }
+
+    public long numBytes() {
+        return numBytes;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                provenance,
+                numBatches,
+                elapsedNs,
+                numBytes);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || !o.getClass().equals(this.getClass())) return false;
+        LogDeltaManifest other = (LogDeltaManifest) o;
+        return provenance.equals(other.provenance) &&
+                numBatches == other.numBatches &&
+                elapsedNs == other.elapsedNs &&
+                numBytes == other.numBytes;
+    }
+
+    @Override
+    public String toString() {
+        return "LogDeltaManifest(" +
+                "provenance=" + provenance +
+                ", numBatches=" + numBatches +
+                ", elapsedNs=" + elapsedNs +
+                ", numBytes=" + numBytes +
+                ")";
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
new file mode 100644
index 00000000000..9bfe5a5884f
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoader.java
@@ -0,0 +1,572 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.image.publisher.MetadataPublisher;
+import org.apache.kafka.image.writer.ImageReWriter;
+import org.apache.kafka.image.writer.ImageWriterOptions;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.raft.RaftClient;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.fault.FaultHandler;
+import org.apache.kafka.server.fault.FaultHandlerException;
+import org.apache.kafka.snapshot.SnapshotReader;
+import org.slf4j.Logger;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+
+import static java.util.concurrent.TimeUnit.NANOSECONDS;
+
+
+/**
+ * The MetadataLoader follows changes provided by a RaftClient, and packages them into metadata
+ * deltas and images that can be consumed by publishers.
+ *
+ * The Loader maintains its own thread, which is used to make all callbacks into publishers. If a
+ * publisher A is installed before B, A will receive all callbacks before B. This is also true if
+ * A and B are installed as part of a list [A, B].
+ *
+ * Publishers should not modify any data structures passed to them.
+ *
+ * It is possible to change the list of publishers dynamically over time. Whenever a new publisher is
+ * added, it receives a catch-up delta which contains the full state. Any publisher installed when the
+ * loader is closed will itself be closed.
+ */
+public class MetadataLoader implements RaftClient.Listener<ApiMessageAndVersion>, AutoCloseable {
+    public static class Builder {
+        private int nodeId = -1;
+        private Time time = Time.SYSTEM;
+        private LogContext logContext = null;
+        private String threadNamePrefix = "";
+        private FaultHandler faultHandler = (m, e) -> new FaultHandlerException(m, e);
+        private MetadataLoaderMetrics metrics = new MetadataLoaderMetrics() {
+            private volatile long lastAppliedOffset = -1L;
+
+            @Override
+            public void updateBatchProcessingTime(long elapsedNs) { }
+
+            @Override
+            public void updateBatchSize(int size) { }
+
+            @Override
+            public void updateLastAppliedImageProvenance(MetadataProvenance provenance) {
+                this.lastAppliedOffset = provenance.offset();
+            }
+
+            @Override
+            public long lastAppliedOffset() {
+                return lastAppliedOffset;
+            }
+
+            @Override
+            public void close() throws Exception { }
+        };
+        private Supplier<OptionalLong> highWaterMarkAccessor = null;
+
+        public Builder setNodeId(int nodeId) {
+            this.nodeId = nodeId;
+            return this;
+        }
+
+        public Builder setTime(Time time) {
+            this.time = time;
+            return this;
+        }
+
+        public Builder setThreadNamePrefix(String threadNamePrefix) {
+            this.threadNamePrefix = threadNamePrefix;
+            return this;
+        }
+
+        public Builder setFaultHandler(FaultHandler faultHandler) {
+            this.faultHandler = faultHandler;
+            return this;
+        }
+
+        public Builder setMetadataLoaderMetrics(MetadataLoaderMetrics metrics) {
+            this.metrics = metrics;
+            return this;
+        }
+
+        public Builder setHighWaterMarkAccessor(Supplier<OptionalLong> highWaterMarkAccessor) {
+            this.highWaterMarkAccessor = highWaterMarkAccessor;
+            return this;
+        }
+
+        public MetadataLoader build() {
+            if (logContext == null) {
+                logContext = new LogContext("[MetadataLoader " + nodeId + "] ");
+            }
+            if (highWaterMarkAccessor == null) {
+                throw new RuntimeException("You must set the high water mark accessor.");
+            }
+            return new MetadataLoader(
+                time,
+                logContext,
+                threadNamePrefix,
+                faultHandler,
+                metrics,
+                highWaterMarkAccessor);
+        }
+    }
+
+    /**
+     * The log4j logger for this loader.
+     */
+    private final Logger log;
+
+    /**
+     * The clock used by this loader.
+     */
+    private final Time time;
+
+    /**
+     * The fault handler to use if metadata loading fails.
+     */
+    private final FaultHandler faultHandler;
+
+    /**
+     * Callbacks for updating metrics.
+     */
+    private final MetadataLoaderMetrics metrics;
+
+    /**
+     * A function which supplies the current high water mark, or empty if it is not known.
+     */
+    private final Supplier<OptionalLong> highWaterMarkAccessor;
+
+    /**
+     * Publishers which haven't been initialized yet.
+     */
+    private final LinkedHashMap<String, MetadataPublisher> uninitializedPublishers;
+
+    /**
+     * Publishers which are receiving updates.
+     */
+    private final LinkedHashMap<String, MetadataPublisher> publishers;
+
+    /**
+     * True if we have caught up with the initial high water mark.
+     */
+    private boolean catchingUp = false;
+
+    /**
+     * The current leader and epoch.
+     */
+    private LeaderAndEpoch currentLeaderAndEpoch = LeaderAndEpoch.UNKNOWN;
+
+    /**
+     * The current metadata image. Accessed only from the event queue thread.
+     */
+    private MetadataImage image;
+
+    /**
+     * The event queue which runs this loader.
+     */
+    private final KafkaEventQueue eventQueue;
+
+    private MetadataLoader(
+        Time time,
+        LogContext logContext,
+        String threadNamePrefix,
+        FaultHandler faultHandler,
+        MetadataLoaderMetrics metrics,
+        Supplier<OptionalLong> highWaterMarkAccessor
+    ) {
+        this.log = logContext.logger(MetadataLoader.class);
+        this.time = time;
+        this.faultHandler = faultHandler;
+        this.metrics = metrics;
+        this.highWaterMarkAccessor = highWaterMarkAccessor;
+        this.uninitializedPublishers = new LinkedHashMap<>();
+        this.publishers = new LinkedHashMap<>();
+        this.image = MetadataImage.EMPTY;
+        this.eventQueue = new KafkaEventQueue(time, logContext, threadNamePrefix);
+    }
+
+    private boolean stillNeedToCatchUp(long offset) {
+        if (!catchingUp) {
+            log.trace("We are not in the initial catching up state.");
+            return false;
+        }
+        OptionalLong highWaterMark = highWaterMarkAccessor.get();
+        if (!highWaterMark.isPresent()) {
+            log.info("The loader is still catching up because we still don't know the high " +
+                    "water mark yet.");
+            return true;
+        }
+        if (highWaterMark.getAsLong() > offset) {
+            log.info("The loader is still catching up because we have loaded up to offset " +
+                    offset + ", but the high water mark is " + highWaterMark.getAsLong());
+            return true;
+        }
+        log.info("The loader finished catch up to the current high water mark of " +
+                highWaterMark.getAsLong());
+        catchingUp = true;
+        return false;
+    }
+
+    private void maybeInitializeNewPublishers() {
+        if (uninitializedPublishers.isEmpty()) {
+            log.trace("There are no uninitialized publishers to initialize.");
+            return;
+        }
+        long startNs = time.nanoseconds();
+        MetadataDelta delta = new MetadataDelta.Builder().
+                setImage(image).
+                build();
+        ImageReWriter writer = new ImageReWriter(delta);
+        image.write(writer, new ImageWriterOptions.Builder().
+                setMetadataVersion(image.features().metadataVersion()).
+                build());
+        SnapshotManifest manifest = new SnapshotManifest(
+                image.provenance(),
+                time.nanoseconds() - startNs);
+        for (Iterator<MetadataPublisher> iter = uninitializedPublishers.values().iterator();
+                iter.hasNext(); ) {
+            MetadataPublisher publisher = iter.next();
+            iter.remove();
+            try {
+                log.info("Publishing initial snapshot at offset {} to {}",
+                        image.highestOffsetAndEpoch().offset(), publisher.name());
+                publisher.publishSnapshot(delta, image, manifest);
+                publishers.put(publisher.name(), publisher);
+            } catch (Throwable e) {
+                faultHandler.handleFault("Unhandled error publishing the initial metadata " +
+                        "image from snapshot at offset " + image.highestOffsetAndEpoch().offset() +
+                        " with publisher " + publisher.name(), e);
+            }
+        }
+    }
+
+    @Override
+    public void handleCommit(BatchReader<ApiMessageAndVersion> reader) {
+        eventQueue.append(() -> {
+            try {
+                MetadataDelta delta = new MetadataDelta.Builder().
+                        setImage(image).
+                        build();
+                LogDeltaManifest manifest = loadLogDelta(delta, reader);
+                if (log.isDebugEnabled()) {
+                    log.debug("Generated a metadata delta between {} and {} from {} batch(es) " +
+                            "in {} us.", image.offset(), manifest.provenance().offset(),
+                            manifest.numBatches(), NANOSECONDS.toMicros(manifest.elapsedNs()));
+                }
+                try {
+                    image = delta.apply(manifest.provenance());
+                } catch (Throwable e) {
+                    faultHandler.handleFault("Error generating new metadata image from " +
+                        "metadata delta between offset " + image.offset() +
+                            " and " + manifest.provenance().offset(), e);
+                    return;
+                }
+                if (stillNeedToCatchUp(manifest.provenance().offset())) {
+                    return;
+                }
+                log.debug("Publishing new image with provenance {}.", image.provenance());
+                for (MetadataPublisher publisher : publishers.values()) {
+                    try {
+                        publisher.publishLogDelta(delta, image, manifest);
+                    } catch (Throwable e) {
+                        faultHandler.handleFault("Unhandled error publishing the new metadata " +
+                            "image ending at " + manifest.provenance().offset() +
+                                " with publisher " + publisher.name(), e);
+                    }
+                }
+                maybeInitializeNewPublishers();
+                metrics.updateLastAppliedImageProvenance(image.provenance());
+            } catch (Throwable e) {
+                // This is a general catch-all block where we don't expect to end up;
+                // failure-prone operations should have individual try/catch blocks around them.
+                faultHandler.handleFault("Unhandled fault in MetadataLoader#handleCommit. " +
+                    "Last image offset was " + image.offset(), e);
+            } finally {
+                reader.close();
+            }
+        });
+    }
+
+    /**
+     * Load some  batches of records from the log. We have to do some bookkeeping here to
+     * translate between batch offsets and record offsets, and track the number of bytes we
+     * have read. Additionally, there is the chance that one of the records is a metadata
+     * version change which needs to be handled differently.
+     *
+     * @param delta     The metadata delta we are preparing.
+     * @param reader    The reader which yields the batches.
+     * @return          A manifest of what was loaded.
+     */
+    LogDeltaManifest loadLogDelta(
+        MetadataDelta delta,
+        BatchReader<ApiMessageAndVersion> reader
+    ) {
+        long startNs = time.nanoseconds();
+        int numBatches = 0;
+        long numBytes = 0L;
+        long lastOffset = image.provenance().offset();
+        int lastEpoch = image.provenance().epoch();
+        long lastContainedLogTimeMs = image.provenance().lastContainedLogTimeMs();
+
+        while (reader.hasNext()) {
+            Batch<ApiMessageAndVersion> batch = reader.next();
+            int indexWithinBatch = 0;
+            for (ApiMessageAndVersion record : batch.records()) {
+                try {
+                    delta.replay(record.message());
+                } catch (Throwable e) {
+                    faultHandler.handleFault("Error loading metadata log record from offset " +
+                            batch.baseOffset() + indexWithinBatch, e);
+                }
+                indexWithinBatch++;
+            }
+            metrics.updateBatchSize(batch.records().size());
+            lastOffset = batch.lastOffset();
+            lastEpoch = batch.epoch();
+            lastContainedLogTimeMs = batch.appendTimestamp();
+            numBytes += batch.sizeInBytes();
+            numBatches++;
+        }
+        MetadataProvenance provenance =
+                new MetadataProvenance(lastOffset, lastEpoch, lastContainedLogTimeMs);
+        long elapsedNs = time.nanoseconds() - startNs;
+        metrics.updateBatchProcessingTime(elapsedNs);
+        return new LogDeltaManifest(provenance,
+                numBatches,
+                elapsedNs,
+                numBytes);
+    }
+
+    @Override
+    public void handleSnapshot(SnapshotReader<ApiMessageAndVersion> reader) {
+        eventQueue.append(() -> {
+            try {
+                MetadataDelta delta = new MetadataDelta.Builder().
+                        setImage(image).
+                        build();
+                SnapshotManifest manifest = loadSnapshot(delta, reader);
+                if (log.isDebugEnabled()) {
+                    log.debug("Generated a metadata delta from a snapshot at offset {} " +
+                            "in {} us.", manifest.provenance().offset(),
+                            NANOSECONDS.toMicros(manifest.elapsedNs()));
+                }
+                try {
+                    image = delta.apply(manifest.provenance());
+                } catch (Throwable e) {
+                    faultHandler.handleFault("Error generating new metadata image from " +
+                            "snapshot at offset " + reader.lastContainedLogOffset(), e);
+                    return;
+                }
+                if (stillNeedToCatchUp(manifest.provenance().offset())) {
+                    return;
+                }
+                log.debug("Publishing new snapshot image with provenance {}.", image.provenance());
+                for (MetadataPublisher publisher : publishers.values()) {
+                    try {
+                        publisher.publishSnapshot(delta, image, manifest);
+                    } catch (Throwable e) {
+                        faultHandler.handleFault("Unhandled error publishing the new metadata " +
+                                "image from snapshot at offset " + reader.lastContainedLogOffset() +
+                                    " with publisher " + publisher.name(), e);
+                    }
+                }
+                maybeInitializeNewPublishers();
+                metrics.updateLastAppliedImageProvenance(image.provenance());
+            } catch (Throwable e) {
+                // This is a general catch-all block where we don't expect to end up;
+                // failure-prone operations should have individual try/catch blocks around them.
+                faultHandler.handleFault("Unhandled fault in MetadataLoader#handleSnapshot. " +
+                        "Snapshot offset was " + reader.lastContainedLogOffset(), e);
+            } finally {
+                reader.close();
+            }
+        });
+    }
+
+    /**
+     * Load a snapshot. This is relatively straightforward since we don't track as many things as
+     * we do in loadLogDelta. The main complication here is that we have to maintain an index
+     * of what record we are processing so that we can give useful error messages.
+     *
+     * @param delta     The metadata delta we are preparing.
+     * @param reader    The reader which yields the snapshot batches.
+     * @return          A manifest of what was loaded.
+     */
+    SnapshotManifest loadSnapshot(
+            MetadataDelta delta,
+            SnapshotReader<ApiMessageAndVersion> reader
+    ) {
+        long startNs = time.nanoseconds();
+        int snapshotIndex = 0;
+        while (reader.hasNext()) {
+            Batch<ApiMessageAndVersion> batch = reader.next();
+            for (ApiMessageAndVersion record : batch.records()) {
+                try {
+                    delta.replay(record.message());
+                } catch (Throwable e) {
+                    faultHandler.handleFault("Error loading metadata log record " + snapshotIndex +
+                            " in snapshot at offset " + reader.lastContainedLogOffset(), e);
+                }
+                snapshotIndex++;
+            }
+        }
+        MetadataProvenance provenance = new MetadataProvenance(reader.lastContainedLogOffset(),
+                reader.lastContainedLogEpoch(), reader.lastContainedLogTimestamp());
+        return new SnapshotManifest(provenance,
+                time.nanoseconds() - startNs);
+    }
+
+    @Override
+    public void handleLeaderChange(LeaderAndEpoch leaderAndEpoch) {
+        eventQueue.append(() -> {
+            currentLeaderAndEpoch = leaderAndEpoch;
+        });
+    }
+
+    /**
+     * Install a list of publishers. When a publisher is installed, we will publish a MetadataDelta
+     * to it which contains the entire current image.
+     *
+     * @param newPublishers     The publishers to install.
+     *
+     * @return                  A future which yields null when the publishers have been added, or
+     *                          an exception if the installation failed.
+     */
+    public CompletableFuture<Void> installPublishers(List<? extends MetadataPublisher> newPublishers) {
+        if (newPublishers.isEmpty()) return CompletableFuture.completedFuture(null);
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        eventQueue.append(() -> {
+            try {
+                installNewPublishers(newPublishers);
+                future.complete(null);
+            } catch (Throwable e) {
+                future.completeExceptionally(faultHandler.handleFault("Unhandled fault in " +
+                        "MetadataLoader#installPublishers", e));
+            }
+        });
+        return future;
+    }
+
+    void installNewPublishers(
+        List<? extends MetadataPublisher> newPublishers
+    ) {
+        // Publishers can't be re-installed if they're already present.
+        for (MetadataPublisher newPublisher : newPublishers) {
+            MetadataPublisher prev = publishers.get(newPublisher.name());
+            if (prev == null) {
+                prev = uninitializedPublishers.get(newPublisher.name());
+            }
+            if (prev != null) {
+                if (prev == newPublisher) {
+                    throw faultHandler.handleFault("Attempted to install publisher " +
+                            newPublisher.name() + ", which is already installed.");
+                } else {
+                    throw faultHandler.handleFault("Attempted to install a new publisher " +
+                            "named " + newPublisher.name() + ", but there is already a publisher " +
+                            "with that name.");
+                }
+            }
+            uninitializedPublishers.put(newPublisher.name(), newPublisher);
+        }
+    }
+
+    // VisibleForTesting
+    void waitForAllEventsToBeHandled() throws Exception {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        eventQueue.append(() -> future.complete(null));
+        future.get();
+    }
+
+    /**
+     * Remove a publisher and close it.
+     *
+     * @param publisher         The publisher to remove and close.
+     *
+     * @return                  A future which yields null when the publisher has been removed
+     *                          and closed, or an exception if the removal failed.
+     */
+    public CompletableFuture<Void> removeAndClosePublisher(MetadataPublisher publisher) {
+        CompletableFuture<Void> future = new CompletableFuture<>();
+        eventQueue.append(() -> {
+            try {
+                if (!publishers.remove(publisher.name(), publisher)) {
+                    if (!uninitializedPublishers.remove(publisher.name(), publisher)) {
+                        throw faultHandler.handleFault("Attempted to remove publisher " + publisher.name() +
+                                ", which is not installed.");
+                    }
+                }
+                closePublisher(publisher);
+                future.complete(null);
+            } catch (Throwable e) {
+                future.completeExceptionally(e);
+            }
+        });
+        return future;
+    }
+
+    public long lastAppliedOffset() {
+        return metrics.lastAppliedOffset();
+    }
+
+    @Override
+    public void beginShutdown() {
+        eventQueue.beginShutdown("beginShutdown", () -> {
+            for (Iterator<MetadataPublisher> iter = uninitializedPublishers.values().iterator();
+                    iter.hasNext(); ) {
+                closePublisher(iter.next());
+                iter.remove();
+            }
+            for (Iterator<MetadataPublisher> iter = publishers.values().iterator();
+                 iter.hasNext(); ) {
+                closePublisher(iter.next());
+                iter.remove();
+            }
+        });
+    }
+
+    Time time() {
+        return time;
+    }
+
+    private void closePublisher(MetadataPublisher publisher) {
+        try {
+            publisher.close();
+        } catch (Throwable e) {
+            faultHandler.handleFault("Got unexpected exception while closing " +
+                    "publisher " + publisher.name(), e);
+        }
+    }
+
+    @Override
+    public void close() throws Exception {
+        beginShutdown();
+        eventQueue.close();
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoaderMetrics.java b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoaderMetrics.java
new file mode 100644
index 00000000000..654bc9dd505
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/MetadataLoaderMetrics.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader;
+
+import org.apache.kafka.image.MetadataProvenance;
+
+
+/**
+ * An interface for the metadata loader metrics.
+ */
+public interface MetadataLoaderMetrics extends AutoCloseable {
+    /**
+     * Update the batch processing time histogram.
+     */
+    void updateBatchProcessingTime(long elapsedNs);
+
+    /**
+     * Update the batch size histogram.
+     */
+    void updateBatchSize(int size);
+
+    /**
+     * Set the provenance of the last image which has been processed by all publishers.
+     */
+    void updateLastAppliedImageProvenance(MetadataProvenance provenance);
+
+    /**
+     * Retrieve the last offset which has been processed by all publishers.
+     */
+    long lastAppliedOffset();
+}
diff --git a/metadata/src/main/java/org/apache/kafka/image/loader/SnapshotManifest.java b/metadata/src/main/java/org/apache/kafka/image/loader/SnapshotManifest.java
new file mode 100644
index 00000000000..b6c6dcce4d5
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/loader/SnapshotManifest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader;
+
+import org.apache.kafka.image.MetadataProvenance;
+
+import java.util.Objects;
+
+
+/**
+ * Contains information about a snapshot that was loaded.
+ */
+public class SnapshotManifest {
+    /**
+     * The source of this snapshot.
+     */
+    private final MetadataProvenance provenance;
+
+    /**
+     * The time in microseconds that it took to load the snapshot.
+     */
+    private final long elapsedNs;
+
+    public SnapshotManifest(
+        MetadataProvenance provenance,
+        long elapsedNs
+    ) {
+        this.provenance = provenance;
+        this.elapsedNs = elapsedNs;
+    }
+
+    public MetadataProvenance provenance() {
+        return provenance;
+    }
+
+    public long elapsedNs() {
+        return elapsedNs;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(
+                provenance,
+                elapsedNs);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || !o.getClass().equals(this.getClass())) return false;
+        SnapshotManifest other = (SnapshotManifest) o;
+        return provenance.equals(other.provenance) &&
+                elapsedNs == other.elapsedNs;
+    }
+
+    @Override
+    public String toString() {
+        return "SnapshotManifest(" +
+                "provenance=" + provenance +
+                ", elapsedNs=" + elapsedNs +
+                ")";
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/image/publisher/MetadataPublisher.java b/metadata/src/main/java/org/apache/kafka/image/publisher/MetadataPublisher.java
new file mode 100644
index 00000000000..8dfba7a99ab
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/publisher/MetadataPublisher.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.publisher;
+
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LogDeltaManifest;
+import org.apache.kafka.image.loader.SnapshotManifest;
+
+
+/**
+ * Publishes metadata deltas which we have loaded from the log and snapshots.
+ *
+ * Publishers receive a stream of callbacks from the metadata loader which keeps them notified
+ * of the latest cluster metadata. This interface abstracts away some of the complications of
+ * following the cluster metadata. For example, if the loader needs to read a snapshot, it will
+ * present the contents of the snapshot in the form of a delta from the previous state.
+ */
+public interface MetadataPublisher extends AutoCloseable {
+    /**
+     * Returns the name of this publisher.
+     *
+     * @return The publisher name.
+     */
+    String name();
+
+    /**
+     * Publish a new cluster metadata snapshot that we loaded.
+     *
+     * @param delta    The delta between the previous state and the new one.
+     * @param newImage The complete new state.
+     * @param manifest The contents of what was published.
+     */
+    void publishSnapshot(
+            MetadataDelta delta,
+            MetadataImage newImage,
+            SnapshotManifest manifest
+    );
+
+    /**
+     * Publish a change to the cluster metadata.
+     *
+     * @param delta    The delta between the previous state and the new one.
+     * @param newImage The complete new state.
+     * @param manifest The contents of what was published.
+     */
+    void publishLogDelta(
+            MetadataDelta delta,
+            MetadataImage newImage,
+            LogDeltaManifest manifest
+    );
+
+    /**
+     * Close this metadata publisher.
+     */
+    void close() throws Exception;
+}
diff --git a/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotEmitter.java b/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotEmitter.java
new file mode 100644
index 00000000000..31ac2169be4
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotEmitter.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.publisher;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.image.writer.ImageWriterOptions;
+import org.apache.kafka.image.writer.RaftSnapshotWriter;
+import org.apache.kafka.raft.RaftClient;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.snapshot.SnapshotWriter;
+import org.slf4j.Logger;
+
+import java.util.Optional;
+
+
+public class SnapshotEmitter implements SnapshotGenerator.Emitter {
+    /**
+     * The maximum number of records we will put in each snapshot batch by default.
+     *
+     * From the perspective of the Raft layer, the limit on batch size is specified in terms of
+     * bytes, not number of records. See MAX_BATCH_SIZE_BYTES in KafkaRaftClient for details.
+     * However, it's more convenient to limit the batch size here in terms of number of records.
+     * So we chose a low number that will not cause problems.
+     */
+    private final static int DEFAULT_BATCH_SIZE = 1024;
+
+    public static class Builder {
+        private int nodeId = 0;
+        private RaftClient<ApiMessageAndVersion> raftClient = null;
+        private int batchSize = DEFAULT_BATCH_SIZE;
+
+        public Builder setNodeId(int nodeId) {
+            this.nodeId = nodeId;
+            return this;
+        }
+
+        public Builder setRaftClient(RaftClient<ApiMessageAndVersion> raftClient) {
+            this.raftClient = raftClient;
+            return this;
+        }
+
+        public Builder setBatchSize(int batchSize) {
+            this.batchSize = batchSize;
+            return this;
+        }
+
+        public SnapshotEmitter build() {
+            if (raftClient == null) throw new RuntimeException("You must set the raftClient.");
+            return new SnapshotEmitter(nodeId,
+                    raftClient,
+                    batchSize);
+        }
+    }
+
+    /**
+     * The slf4j logger to use.
+     */
+    private final Logger log;
+
+    /**
+     * The RaftClient to use.
+     */
+    private final RaftClient<ApiMessageAndVersion> raftClient;
+
+    /**
+     * The maximum number of records to put in each batch.
+     */
+    private final int batchSize;
+
+    private SnapshotEmitter(
+            int nodeId,
+            RaftClient<ApiMessageAndVersion> raftClient,
+            int batchSize
+    ) {
+        this.log = new LogContext("[SnapshotEmitter id=" + nodeId + "] ").logger(SnapshotEmitter.class);
+        this.raftClient = raftClient;
+        this.batchSize = batchSize;
+    }
+
+    @Override
+    public void maybeEmit(MetadataImage image) {
+        MetadataProvenance provenance = image.provenance();
+        Optional<SnapshotWriter<ApiMessageAndVersion>> snapshotWriter =
+            raftClient.createSnapshot(provenance.offsetAndEpoch(),
+                    provenance.lastContainedLogTimeMs());
+        if (!snapshotWriter.isPresent()) {
+            log.error("Not generating {} because it already exists.", provenance.snapshotName());
+            return;
+        }
+        RaftSnapshotWriter writer = new RaftSnapshotWriter(snapshotWriter.get(), batchSize);
+        try {
+            image.write(writer, new ImageWriterOptions.Builder().
+                    setMetadataVersion(image.features().metadataVersion()).
+                    build());
+            writer.close(true);
+        } catch (Throwable e) {
+            log.error("Encountered error while writing {}", provenance.snapshotName(), e);
+            throw e;
+        } finally {
+            Utils.closeQuietly(writer, "RaftSnapshotWriter");
+            Utils.closeQuietly(snapshotWriter.get(), "SnapshotWriter");
+        }
+        log.info("Successfully wrote {}", provenance.snapshotName());
+    }
+}
diff --git a/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java b/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java
new file mode 100644
index 00000000000..43809de3898
--- /dev/null
+++ b/metadata/src/main/java/org/apache/kafka/image/publisher/SnapshotGenerator.java
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.publisher;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LogDeltaManifest;
+import org.apache.kafka.image.loader.SnapshotManifest;
+import org.apache.kafka.queue.EventQueue;
+import org.apache.kafka.queue.KafkaEventQueue;
+import org.apache.kafka.server.fault.FaultHandler;
+import org.slf4j.Logger;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+
+/**
+ * A metadata publisher that generates snapshots when appropriate.
+ */
+public class SnapshotGenerator implements MetadataPublisher {
+    public static class Builder {
+        private final Emitter emitter;
+        private int nodeId = 0;
+        private Time time = Time.SYSTEM;
+        private FaultHandler faultHandler = (m, e) -> null;
+        private long maxBytesSinceLastSnapshot = 100 * 1024L * 1024L;
+        private long maxTimeSinceLastSnapshotNs = TimeUnit.DAYS.toNanos(1);
+        private AtomicReference<String> disabledReason = null;
+
+        public Builder(Emitter emitter) {
+            this.emitter = emitter;
+        }
+
+        public Builder setNodeId(int nodeId) {
+            this.nodeId = nodeId;
+            return this;
+        }
+
+        public Builder setTime(Time time) {
+            this.time = time;
+            return this;
+        }
+
+        public Builder setFaultHandler(FaultHandler faultHandler) {
+            this.faultHandler = faultHandler;
+            return this;
+        }
+
+        public Builder setMaxBytesSinceLastSnapshot(long maxBytesSinceLastSnapshot) {
+            this.maxBytesSinceLastSnapshot = maxBytesSinceLastSnapshot;
+            return this;
+        }
+
+        public Builder setMaxTimeSinceLastSnapshotNs(long maxTimeSinceLastSnapshotNs) {
+            this.maxTimeSinceLastSnapshotNs = maxTimeSinceLastSnapshotNs;
+            return this;
+        }
+
+        public Builder setDisabledReason(AtomicReference<String> disabledReason) {
+            this.disabledReason = disabledReason;
+            return this;
+        }
+
+        public SnapshotGenerator build() {
+            if (disabledReason == null) {
+                disabledReason = new AtomicReference<>();
+            }
+            return new SnapshotGenerator(
+                nodeId,
+                time,
+                emitter,
+                faultHandler,
+                maxBytesSinceLastSnapshot,
+                maxTimeSinceLastSnapshotNs,
+                disabledReason
+            );
+        }
+    }
+
+    /**
+     * The callback which actually generates the snapshot.
+     */
+    public interface Emitter {
+        /**
+         * Emit a snapshot for the given image.
+         *
+         * Note: if a snapshot has already been emitted for the given offset and epoch pair, this
+         * function will not recreate it.
+         *
+         * @param image     The metadata image to emit.
+         */
+        void maybeEmit(MetadataImage image);
+    }
+
+    /**
+     * The node ID.
+     */
+    private final int nodeId;
+
+    /**
+     * The clock to use.
+     */
+    private final Time time;
+
+    /**
+     * The emitter callback, which actually generates the snapshot.
+     */
+    private final Emitter emitter;
+
+    /**
+     * The slf4j logger to use.
+     */
+    private final Logger log;
+
+    /**
+     * The fault handler to use.
+     */
+    private final FaultHandler faultHandler;
+
+    /**
+     * The maximum number of bytes we will wait to see before triggering a new snapshot.
+     */
+    private final long maxBytesSinceLastSnapshot;
+
+    /**
+     * The maximum amount of time we will wait before triggering a snapshot, or 0 to disable
+     * time-based snapshotting.
+     */
+    private final long maxTimeSinceLastSnapshotNs;
+
+    /**
+     * If non-null, the reason why snapshots have been disabled.
+     */
+    private final AtomicReference<String> disabledReason;
+
+    /**
+     * The event queue used to schedule emitting snapshots.
+     */
+    private final EventQueue eventQueue;
+
+    /**
+     * The log bytes that we have read since the last snapshot.
+     */
+    private long bytesSinceLastSnapshot;
+
+    /**
+     * The time at which we created the last snapshot.
+     */
+    private long lastSnapshotTimeNs;
+
+    private SnapshotGenerator(
+        int nodeId,
+        Time time,
+        Emitter emitter,
+        FaultHandler faultHandler,
+        long maxBytesSinceLastSnapshot,
+        long maxTimeSinceLastSnapshotNs,
+        AtomicReference<String> disabledReason
+    ) {
+        this.nodeId = nodeId;
+        this.time = time;
+        this.emitter = emitter;
+        this.faultHandler = faultHandler;
+        this.maxBytesSinceLastSnapshot = maxBytesSinceLastSnapshot;
+        this.maxTimeSinceLastSnapshotNs = maxTimeSinceLastSnapshotNs;
+        LogContext logContext = new LogContext("[SnapshotGenerator " + nodeId + "] ");
+        this.log = logContext.logger(SnapshotGenerator.class);
+        this.disabledReason = disabledReason;
+        this.eventQueue = new KafkaEventQueue(time, logContext, "SnapshotGenerator" + nodeId);
+        resetSnapshotCounters();
+        log.debug("Starting SnapshotGenerator.");
+    }
+
+    @Override
+    public String name() {
+        return "SnapshotGenerator";
+    }
+
+    void resetSnapshotCounters() {
+        this.bytesSinceLastSnapshot = 0L;
+        this.lastSnapshotTimeNs = time.nanoseconds();
+    }
+
+    @Override
+    public void publishSnapshot(
+        MetadataDelta delta,
+        MetadataImage newImage,
+        SnapshotManifest manifest
+    ) {
+        log.debug("Resetting the snapshot counters because we just read a snapshot at offset {}.",
+                newImage.provenance().offset());
+        resetSnapshotCounters();
+    }
+
+    @Override
+    public void publishLogDelta(
+        MetadataDelta delta,
+        MetadataImage newImage,
+        LogDeltaManifest manifest
+    ) {
+        bytesSinceLastSnapshot += manifest.numBytes();
+        if (bytesSinceLastSnapshot >= maxBytesSinceLastSnapshot) {
+            if (eventQueue.isEmpty()) {
+                scheduleEmit("we have replayed at least " + maxBytesSinceLastSnapshot +
+                    " bytes", newImage);
+            } else if (log.isTraceEnabled()) {
+                log.trace("Not scheduling bytes-based snapshot because event queue is not empty yet.");
+            }
+        } else if (maxTimeSinceLastSnapshotNs != 0 &&
+                (time.nanoseconds() - lastSnapshotTimeNs >= maxTimeSinceLastSnapshotNs)) {
+            if (eventQueue.isEmpty()) {
+                scheduleEmit("we have waited at least " +
+                    TimeUnit.NANOSECONDS.toMinutes(maxTimeSinceLastSnapshotNs) + " minute(s)", newImage);
+            } else if (log.isTraceEnabled()) {
+                log.trace("Not scheduling time-based snapshot because event queue is not empty yet.");
+            }
+        } else if (log.isTraceEnabled()) {
+            log.trace("Neither time-based nor bytes-based criteria are met; not scheduling snapshot.");
+        }
+    }
+
+    void scheduleEmit(
+        String reason,
+        MetadataImage image
+    ) {
+        resetSnapshotCounters();
+        eventQueue.append(() -> {
+            String currentDisabledReason = disabledReason.get();
+            if (currentDisabledReason != null) {
+                log.error("Not emitting {} despite the fact that {} because snapshots are " +
+                    "disabled; {}", image.provenance().snapshotName(), reason,
+                        currentDisabledReason);
+            } else {
+                log.info("Creating new KRaft snapshot file {} because {}.",
+                        image.provenance().snapshotName(), reason);
+                try {
+                    emitter.maybeEmit(image);
+                } catch (Throwable e) {
+                    faultHandler.handleFault("KRaft snapshot file generation error", e);
+                }
+            }
+        });
+    }
+
+    public void beginShutdown() {
+        log.debug("Beginning shutdown of SnapshotGenerator.");
+        this.disabledReason.compareAndSet(null, "we are shutting down");
+        eventQueue.beginShutdown("beginShutdown");
+    }
+
+    @Override
+    public void close() throws InterruptedException {
+        eventQueue.beginShutdown("close");
+        log.debug("Closing SnapshotGenerator.");
+        eventQueue.close();
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java
index fdc03276451..566fa4acd54 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/AclControlManagerTest.java
@@ -215,11 +215,9 @@ public class AclControlManagerTest {
 
         // Verify that the ACLs stored in the AclControlManager match the ones we expect.
         Set<ApiMessageAndVersion> foundAcls = new HashSet<>();
-        for (Iterator<List<ApiMessageAndVersion>> iterator = manager.iterator(Long.MAX_VALUE);
-                 iterator.hasNext(); ) {
-            for (ApiMessageAndVersion apiMessageAndVersion : iterator.next()) {
-                assertTrue(foundAcls.add(apiMessageAndVersion));
-            }
+        for (Map.Entry<Uuid, StandardAcl> entry : manager.idToAcl().entrySet()) {
+            foundAcls.add(new ApiMessageAndVersion(
+                    new StandardAclWithId(entry.getKey(), entry.getValue()).toRecord(), (short) 0));
         }
         assertEquals(loadedAcls, foundAcls);
 
@@ -233,7 +231,7 @@ public class AclControlManagerTest {
         // a cluster metadata authorizer.
         snapshotRegistry.revertToSnapshot(0);
         authorizer.loadSnapshot(manager.idToAcl());
-        assertFalse(manager.iterator(Long.MAX_VALUE).hasNext());
+        assertTrue(manager.idToAcl().isEmpty());
     }
 
     @Test
@@ -243,11 +241,9 @@ public class AclControlManagerTest {
         MockClusterMetadataAuthorizer authorizer = new MockClusterMetadataAuthorizer();
         authorizer.loadSnapshot(manager.idToAcl());
         manager.replay(StandardAclWithIdTest.TEST_ACLS.get(0).toRecord(), Optional.empty());
-        assertEquals(new ApiMessageAndVersion(TEST_ACLS.get(0).toRecord(), (short) 0),
-            manager.iterator(Long.MAX_VALUE).next().get(0));
         manager.replay(new RemoveAccessControlEntryRecord().
             setId(TEST_ACLS.get(0).id()), Optional.empty());
-        assertFalse(manager.iterator(Long.MAX_VALUE).hasNext());
+        assertTrue(manager.idToAcl().isEmpty());
     }
 
     @Test
@@ -284,7 +280,7 @@ public class AclControlManagerTest {
             }
         }
         RecordTestUtils.replayAll(manager, createResult.records());
-        assertTrue(manager.iterator(Long.MAX_VALUE).hasNext());
+        assertFalse(manager.idToAcl().isEmpty());
 
         ControllerResult<List<AclDeleteResult>> deleteResult =
             manager.deleteAcls(Arrays.asList(
@@ -308,12 +304,8 @@ public class AclControlManagerTest {
             deleteResult.response().get(1).exception().get().getClass());
         RecordTestUtils.replayAll(manager, deleteResult.records());
 
-        Iterator<List<ApiMessageAndVersion>> iterator = manager.iterator(Long.MAX_VALUE);
-        assertTrue(iterator.hasNext());
-        List<ApiMessageAndVersion> list = iterator.next();
-        assertEquals(1, list.size());
-        assertEquals(TEST_ACLS.get(1).toBinding(), StandardAcl.fromRecord(
-            (AccessControlEntryRecord) list.get(0).message()).toBinding());
+        Iterator<Map.Entry<Uuid, StandardAcl>> iterator = manager.idToAcl().entrySet().iterator();
+        assertEquals(TEST_ACLS.get(1).acl(), iterator.next().getValue());
         assertFalse(iterator.hasNext());
     }
 
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClientQuotaControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClientQuotaControlManagerTest.java
index b915db3fe0d..1fb81cbf7a4 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ClientQuotaControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ClientQuotaControlManagerTest.java
@@ -205,49 +205,50 @@ public class ClientQuotaControlManagerTest {
 
         List<ClientQuotaAlteration> alters = new ArrayList<>();
         quotasToTest.forEach((entity, quota) -> entityQuotaToAlterations(entity, quota, alters::add));
-        alterQuotas(alters, manager);
-
-        RecordTestUtils.assertBatchIteratorContains(Arrays.asList(
-            Arrays.asList(new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
+        List<ApiMessageAndVersion> records = alterQuotas(alters, manager);
+        List<ApiMessageAndVersion> expectedRecords = Arrays.asList(
+            new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
                 new EntityData().setEntityType("user").setEntityName("user-1"),
                 new EntityData().setEntityType("client-id").setEntityName("client-id-1"))).
-                    setKey("request_percentage").setValue(50.5).setRemove(false), (short) 0)),
-            Arrays.asList(new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
+                    setKey("request_percentage").setValue(50.5).setRemove(false), (short) 0),
+            new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
                 new EntityData().setEntityType("user").setEntityName("user-2"),
                 new EntityData().setEntityType("client-id").setEntityName("client-id-1"))).
-                    setKey("request_percentage").setValue(51.51).setRemove(false), (short) 0)),
-            Arrays.asList(new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
+                    setKey("request_percentage").setValue(51.51).setRemove(false), (short) 0),
+            new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
                 new EntityData().setEntityType("user").setEntityName("user-3"),
                 new EntityData().setEntityType("client-id").setEntityName("client-id-2"))).
-                    setKey("request_percentage").setValue(52.52).setRemove(false), (short) 0)),
-            Arrays.asList(new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
+                    setKey("request_percentage").setValue(52.52).setRemove(false), (short) 0),
+            new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
                 new EntityData().setEntityType("user").setEntityName(null),
                 new EntityData().setEntityType("client-id").setEntityName("client-id-1"))).
-                    setKey("request_percentage").setValue(53.53).setRemove(false), (short) 0)),
-            Arrays.asList(new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
+                    setKey("request_percentage").setValue(53.53).setRemove(false), (short) 0),
+            new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
                 new EntityData().setEntityType("user").setEntityName("user-1"),
                 new EntityData().setEntityType("client-id").setEntityName(null))).
-                    setKey("request_percentage").setValue(54.54).setRemove(false), (short) 0)),
-            Arrays.asList(new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
+                    setKey("request_percentage").setValue(54.54).setRemove(false), (short) 0),
+            new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
                 new EntityData().setEntityType("user").setEntityName("user-3"),
                 new EntityData().setEntityType("client-id").setEntityName(null))).
-                    setKey("request_percentage").setValue(55.55).setRemove(false), (short) 0)),
-            Arrays.asList(new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
+                    setKey("request_percentage").setValue(55.55).setRemove(false), (short) 0),
+            new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
                 new EntityData().setEntityType("user").setEntityName("user-1"))).
-                    setKey("request_percentage").setValue(56.56).setRemove(false), (short) 0)),
-            Arrays.asList(new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
+                    setKey("request_percentage").setValue(56.56).setRemove(false), (short) 0),
+            new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
                 new EntityData().setEntityType("user").setEntityName("user-2"))).
-                    setKey("request_percentage").setValue(57.57).setRemove(false), (short) 0)),
-            Arrays.asList(new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
+                    setKey("request_percentage").setValue(57.57).setRemove(false), (short) 0),
+            new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
                 new EntityData().setEntityType("user").setEntityName("user-3"))).
-                    setKey("request_percentage").setValue(58.58).setRemove(false), (short) 0)),
-            Arrays.asList(new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
+                    setKey("request_percentage").setValue(58.58).setRemove(false), (short) 0),
+            new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
                 new EntityData().setEntityType("user").setEntityName(null))).
-                    setKey("request_percentage").setValue(59.59).setRemove(false), (short) 0)),
-            Arrays.asList(new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
+                    setKey("request_percentage").setValue(59.59).setRemove(false), (short) 0),
+            new ApiMessageAndVersion(new ClientQuotaRecord().setEntity(Arrays.asList(
                 new EntityData().setEntityType("client-id").setEntityName("client-id-2"))).
-                    setKey("request_percentage").setValue(60.60).setRemove(false), (short) 0))),
-            manager.iterator(Long.MAX_VALUE));
+                    setKey("request_percentage").setValue(60.60).setRemove(false), (short) 0));
+        RecordTestUtils.deepSortRecords(records);
+        RecordTestUtils.deepSortRecords(expectedRecords);
+        assertEquals(expectedRecords, records);
     }
 
     static void entityQuotaToAlterations(ClientQuotaEntity entity, Map<String, Double> quota,
@@ -258,11 +259,15 @@ public class ClientQuotaControlManagerTest {
         acceptor.accept(new ClientQuotaAlteration(entity, ops));
     }
 
-    static void alterQuotas(List<ClientQuotaAlteration> alterations, ClientQuotaControlManager manager) {
+    static List<ApiMessageAndVersion> alterQuotas(
+        List<ClientQuotaAlteration> alterations,
+        ClientQuotaControlManager manager
+    ) {
         ControllerResult<Map<ClientQuotaEntity, ApiError>> result = manager.alterClientQuotas(alterations);
         assertTrue(result.response().values().stream().allMatch(ApiError::isSuccess));
         result.records().forEach(apiMessageAndVersion ->
                 manager.replay((ClientQuotaRecord) apiMessageAndVersion.message()));
+        return result.records();
     }
 
     static Map<String, Double> quotas(String key, Double value) {
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
index 72eea5427e8..86918d5513c 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ClusterControlManagerTest.java
@@ -34,12 +34,12 @@ import org.apache.kafka.common.metadata.UnregisterBrokerRecord;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.image.writer.ImageWriterOptions;
 import org.apache.kafka.metadata.BrokerRegistration;
 import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
 import org.apache.kafka.metadata.BrokerRegistrationInControlledShutdownChange;
 import org.apache.kafka.metadata.BrokerRegistrationReply;
 import org.apache.kafka.metadata.FinalizedControllerFeatures;
-import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.metadata.VersionRange;
 import org.apache.kafka.metadata.placement.PartitionAssignment;
 import org.apache.kafka.metadata.placement.PlacementSpec;
@@ -418,7 +418,7 @@ public class ClusterControlManagerTest {
 
     @ParameterizedTest
     @EnumSource(value = MetadataVersion.class, names = {"IBP_3_3_IV2", "IBP_3_3_IV3"})
-    public void testIterator(MetadataVersion metadataVersion) throws Exception {
+    public void testRegistrationsToRecords(MetadataVersion metadataVersion) throws Exception {
         MockTime time = new MockTime(0, 0, 0);
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
         FeatureControlManager featureControl = new FeatureControlManager.Builder().
@@ -460,8 +460,12 @@ public class ClusterControlManagerTest {
                     IN_CONTROLLED_SHUTDOWN.value());
         clusterControl.replay(registrationChangeRecord);
         short expectedVersion = metadataVersion.registerBrokerRecordVersion();
-        RecordTestUtils.assertBatchIteratorContains(Arrays.asList(
-            Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
+
+        ImageWriterOptions options = new ImageWriterOptions.Builder().
+                setMetadataVersion(metadataVersion).
+                setLossHandler(__ -> { }).
+                build();
+        assertEquals(new ApiMessageAndVersion(new RegisterBrokerRecord().
                 setBrokerEpoch(100).setBrokerId(0).setRack(null).
                 setEndPoints(new BrokerEndpointCollection(Collections.singleton(
                     new BrokerEndpoint().setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
@@ -469,27 +473,28 @@ public class ClusterControlManagerTest {
                         setName("PLAINTEXT").
                         setHost("example.com")).iterator())).
                 setInControlledShutdown(metadataVersion.isInControlledShutdownStateSupported()).
-                setFenced(false), expectedVersion)),
-            Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
+                setFenced(false), expectedVersion),
+            clusterControl.brokerRegistrations().get(0).toRecord(options));
+        assertEquals(new ApiMessageAndVersion(new RegisterBrokerRecord().
                 setBrokerEpoch(100).setBrokerId(1).setRack(null).
                 setEndPoints(new BrokerEndpointCollection(Collections.singleton(
                     new BrokerEndpoint().setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
                         setPort((short) 9093).
                         setName("PLAINTEXT").
                         setHost("example.com")).iterator())).
-                setFenced(false), expectedVersion)),
-            Arrays.asList(new ApiMessageAndVersion(new RegisterBrokerRecord().
+                setFenced(false), expectedVersion),
+            clusterControl.brokerRegistrations().get(1).toRecord(options));
+        assertEquals(new ApiMessageAndVersion(new RegisterBrokerRecord().
                 setBrokerEpoch(100).setBrokerId(2).setRack(null).
                 setEndPoints(new BrokerEndpointCollection(Collections.singleton(
                     new BrokerEndpoint().setSecurityProtocol(SecurityProtocol.PLAINTEXT.id).
                         setPort((short) 9094).
                         setName("PLAINTEXT").
                         setHost("example.com")).iterator())).
-                setFenced(true), expectedVersion))),
-                clusterControl.iterator(Long.MAX_VALUE));
+                        setFenced(true), expectedVersion),
+            clusterControl.brokerRegistrations().get(2).toRecord(options));
     }
 
-
     @Test
     public void testRegistrationWithUnsupportedMetadataVersion() {
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(new LogContext());
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
index 02bd6e4d7e9..bbca7bfbf6b 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java
@@ -135,14 +135,6 @@ public class ConfigurationControlManagerTest {
             setName("def").setValue("blah"));
         assertEquals(toMap(entry("abc", "x,y,z"), entry("def", "blah")),
             manager.getConfigs(MYTOPIC));
-        RecordTestUtils.assertBatchIteratorContains(asList(
-            asList(new ApiMessageAndVersion(new ConfigRecord().
-                    setResourceType(TOPIC.id()).setResourceName("mytopic").
-                    setName("abc").setValue("x,y,z"), CONFIG_RECORD.highestSupportedVersion()),
-                new ApiMessageAndVersion(new ConfigRecord().
-                    setResourceType(TOPIC.id()).setResourceName("mytopic").
-                    setName("def").setValue("blah"), CONFIG_RECORD.highestSupportedVersion()))),
-            manager.iterator(Long.MAX_VALUE));
     }
 
     @Test
diff --git a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
index 2478f4ce164..8345b4e3149 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/FeatureControlManagerTest.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
@@ -202,25 +203,7 @@ public class FeatureControlManagerTest {
     }
 
     @Test
-    public void testFeatureControlIteratorWithOldMetadataVersion() throws Exception {
-        // We require minimum of IBP_3_3_IV0 to write metadata version in the snapshot.
-
-        LogContext logContext = new LogContext();
-        SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
-        FeatureControlManager manager = new FeatureControlManager.Builder()
-            .setLogContext(logContext)
-            .setSnapshotRegistry(snapshotRegistry)
-            .setMetadataVersion(MetadataVersion.IBP_3_2_IV0)
-            .build();
-
-        RecordTestUtils.assertBatchIteratorContains(
-            Collections.emptyList(),
-            manager.iterator(Long.MAX_VALUE)
-        );
-    }
-
-    @Test
-    public void testFeatureControlIterator() throws Exception {
+    public void testReplayRecords() throws Exception {
         LogContext logContext = new LogContext();
         SnapshotRegistry snapshotRegistry = new SnapshotRegistry(logContext);
         FeatureControlManager manager = new FeatureControlManager.Builder().
@@ -233,17 +216,12 @@ public class FeatureControlManagerTest {
             updateFeatures(updateMap("foo", 5, "bar", 1),
                 Collections.emptyMap(), Collections.emptyMap(), false);
         RecordTestUtils.replayAll(manager, result.records());
-        RecordTestUtils.assertBatchIteratorContains(Arrays.asList(
-            Arrays.asList(new ApiMessageAndVersion(new FeatureLevelRecord().
-                    setName("metadata.version").
-                    setFeatureLevel((short) 4), (short) 0)),
-            Arrays.asList(new ApiMessageAndVersion(new FeatureLevelRecord().
-                setName("foo").
-                setFeatureLevel((short) 5), (short) 0)),
-            Arrays.asList(new ApiMessageAndVersion(new FeatureLevelRecord().
-                setName("bar").
-                setFeatureLevel((short) 1), (short) 0))),
-            manager.iterator(Long.MAX_VALUE));
+        assertEquals(MetadataVersion.IBP_3_3_IV0, manager.metadataVersion());
+        assertEquals(Optional.of((short) 5), manager.finalizedFeatures(Long.MAX_VALUE).get("foo"));
+        assertEquals(Optional.of((short) 1), manager.finalizedFeatures(Long.MAX_VALUE).get("bar"));
+        assertEquals(new HashSet<>(Arrays.asList(
+            MetadataVersion.FEATURE_NAME, "foo", "bar")),
+                manager.finalizedFeatures(Long.MAX_VALUE).featureNames());
     }
 
     private static final FeatureControlManager.Builder TEST_MANAGER_BUILDER1 =
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
index 80c5c505ae0..d0c16d1e3b0 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ProducerIdControlManagerTest.java
@@ -26,20 +26,14 @@ import org.apache.kafka.common.metadata.RegisterBrokerRecord;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.server.common.ProducerIdsBlock;
 import org.apache.kafka.timeline.SnapshotRegistry;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import java.util.Iterator;
-import java.util.List;
-
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 
 public class ProducerIdControlManagerTest {
@@ -149,29 +143,11 @@ public class ProducerIdControlManagerTest {
     }
 
     @Test
-    public void testSnapshotIterator() {
-        ProducerIdsBlock range = null;
+    public void testGenerateProducerIds() {
         for (int i = 0; i < 100; i++) {
-            range = generateProducerIds(producerIdControlManager, i % 4, 100);
-        }
-
-        Iterator<List<ApiMessageAndVersion>> snapshotIterator = producerIdControlManager.iterator(Long.MAX_VALUE);
-        assertTrue(snapshotIterator.hasNext());
-        List<ApiMessageAndVersion> batch = snapshotIterator.next();
-        assertEquals(1, batch.size(), "Producer IDs record batch should only contain a single record");
-        assertEquals(range.firstProducerId() + range.size(), ((ProducerIdsRecord) batch.get(0).message()).nextProducerId());
-        assertFalse(snapshotIterator.hasNext(), "Producer IDs iterator should only contain a single batch");
-
-        ProducerIdControlManager newProducerIdManager = new ProducerIdControlManager(clusterControl, snapshotRegistry);
-        snapshotIterator = producerIdControlManager.iterator(Long.MAX_VALUE);
-        while (snapshotIterator.hasNext()) {
-            snapshotIterator.next().forEach(message -> newProducerIdManager.replay((ProducerIdsRecord) message.message()));
+            generateProducerIds(producerIdControlManager, i % 4, 100);
         }
-
-        // Verify that after reloading state from this "snapshot", we don't produce any overlapping IDs
-        long lastProducerID = range.firstProducerId() + range.size() - 1;
-        range = generateProducerIds(producerIdControlManager, 1, 100);
-        assertTrue(range.firstProducerId() > lastProducerID);
+        assertEquals(new ProducerIdsBlock(3, 100000, 1000), producerIdControlManager.nextProducerBlock());
     }
 
     static ProducerIdsBlock generateProducerIds(
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 10921c77fbd..9d0508d876a 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -31,8 +31,6 @@ import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.OptionalLong;
 import java.util.Set;
-import java.util.Spliterator;
-import java.util.Spliterators;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -42,7 +40,6 @@ import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
-import java.util.stream.StreamSupport;
 
 import org.apache.kafka.common.Uuid;
 import org.apache.kafka.common.errors.BrokerIdNotRegisteredException;
@@ -85,27 +82,22 @@ import org.apache.kafka.common.metadata.RegisterBrokerRecord.BrokerEndpointColle
 import org.apache.kafka.common.metadata.TopicRecord;
 import org.apache.kafka.common.protocol.Errors;
 import org.apache.kafka.common.requests.ApiError;
-import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.controller.QuorumController.ConfigResourceExistenceChecker;
 import org.apache.kafka.metadata.BrokerHeartbeatReply;
+import org.apache.kafka.metadata.BrokerRegistrationFencingChange;
 import org.apache.kafka.metadata.BrokerRegistrationReply;
 import org.apache.kafka.metadata.FinalizedControllerFeatures;
-import org.apache.kafka.metadata.MetadataRecordSerde;
 import org.apache.kafka.metadata.PartitionRegistration;
-import org.apache.kafka.metadata.RecordTestUtils;
 import org.apache.kafka.metadata.authorizer.StandardAuthorizer;
 import org.apache.kafka.metadata.bootstrap.BootstrapMetadata;
 import org.apache.kafka.metadata.util.BatchFileWriter;
 import org.apache.kafka.metalog.LocalLogManager;
 import org.apache.kafka.metalog.LocalLogManagerTestEnv;
-import org.apache.kafka.raft.Batch;
 import org.apache.kafka.raft.OffsetAndEpoch;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.MetadataVersion;
 import org.apache.kafka.snapshot.FileRawSnapshotReader;
-import org.apache.kafka.snapshot.SnapshotReader;
 import org.apache.kafka.snapshot.RawSnapshotReader;
-import org.apache.kafka.snapshot.RecordsSnapshotReader;
 import org.apache.kafka.snapshot.Snapshots;
 import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Disabled;
@@ -128,7 +120,6 @@ import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
 
 
 @Timeout(value = 40)
@@ -669,324 +660,43 @@ public class QuorumControllerTest {
             fooId = fooData.topics().find("foo").topicId();
             active.allocateProducerIds(ANONYMOUS_CONTEXT,
                 new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get();
-            long snapshotLogOffset = active.beginWritingSnapshot().get();
-            reader = logEnv.waitForSnapshot(snapshotLogOffset);
-            SnapshotReader<ApiMessageAndVersion> snapshot = createSnapshotReader(reader);
-            assertEquals(snapshotLogOffset, snapshot.lastContainedLogOffset());
-            checkSnapshotContent(expectedSnapshotContent(fooId, brokerEpochs), snapshot);
-        }
-
-        try (
-            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).
-                setSnapshotReader(reader).
-                build();
-            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
-                setControllerBuilderInitializer(controllerBuilder -> {
-                    controllerBuilder.setConfigSchema(SCHEMA);
-                }).
-                build();
-        ) {
-            QuorumController active = controlEnv.activeController();
-            long snapshotLogOffset = active.beginWritingSnapshot().get();
-            SnapshotReader<ApiMessageAndVersion> snapshot = createSnapshotReader(
-                logEnv.waitForSnapshot(snapshotLogOffset)
-            );
-            assertEquals(snapshotLogOffset, snapshot.lastContainedLogOffset());
-            checkSnapshotContent(expectedSnapshotContent(fooId, brokerEpochs), snapshot);
-        }
-    }
-
-    @Test
-    public void testSnapshotConfiguration() throws Throwable {
-        final int numBrokers = 4;
-        final int maxNewRecordBytes = 4;
-        Map<Integer, Long> brokerEpochs = new HashMap<>();
-        Uuid fooId;
-        try (
-            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).
-                build();
-            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
-                setControllerBuilderInitializer(controllerBuilder -> {
-                    controllerBuilder.setConfigSchema(SCHEMA);
-                    controllerBuilder.setSnapshotMaxNewRecordBytes(maxNewRecordBytes);
-                    controllerBuilder.setBootstrapMetadata(SIMPLE_BOOTSTRAP);
-                }).
-                build();
-        ) {
-            QuorumController active = controlEnv.activeController();
-            for (int i = 0; i < numBrokers; i++) {
-                BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT,
-                    new BrokerRegistrationRequestData().
-                        setBrokerId(i).
-                        setRack(null).
-                        setClusterId(active.clusterId()).
-                        setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
-                        setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)).
-                        setListeners(new ListenerCollection(Arrays.asList(new Listener().
-                            setName("PLAINTEXT").setHost("localhost").
-                            setPort(9092 + i)).iterator()))).get();
-                brokerEpochs.put(i, reply.epoch());
-            }
-            for (int i = 0; i < numBrokers - 1; i++) {
-                assertEquals(new BrokerHeartbeatReply(true, false, false, false),
-                    active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
-                        setWantFence(false).setBrokerEpoch(brokerEpochs.get(i)).
-                        setBrokerId(i).setCurrentMetadataOffset(100000L)).get());
-            }
-            CreateTopicsResponseData fooData = active.createTopics(ANONYMOUS_CONTEXT,
-                new CreateTopicsRequestData().setTopics(
-                    new CreatableTopicCollection(Collections.singleton(
-                        new CreatableTopic().setName("foo").setNumPartitions(-1).
-                            setReplicationFactor((short) -1).
-                            setAssignments(new CreatableReplicaAssignmentCollection(
-                                Arrays.asList(new CreatableReplicaAssignment().
-                                    setPartitionIndex(0).
-                                    setBrokerIds(Arrays.asList(0, 1, 2)),
-                                new CreatableReplicaAssignment().
-                                    setPartitionIndex(1).
-                                    setBrokerIds(Arrays.asList(1, 2, 0))).
-                                        iterator()))).iterator())),
-                Collections.singleton("foo")).get();
-            fooId = fooData.topics().find("foo").topicId();
-            active.allocateProducerIds(ANONYMOUS_CONTEXT,
-                    new AllocateProducerIdsRequestData().setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0))).get();
-
-            SnapshotReader<ApiMessageAndVersion> snapshot = createSnapshotReader(logEnv.waitForLatestSnapshot());
-            checkSnapshotSubcontent(
-                expectedSnapshotContent(fooId, brokerEpochs),
-                snapshot
-            );
+            controlEnv.close();
+            assertEquals(generateTestRecords(fooId, brokerEpochs), logEnv.allRecords());
         }
     }
 
-    @Test
-    public void testSnapshotOnlyAfterConfiguredMinBytes() throws Throwable {
-        final int numBrokers = 4;
-        final int maxNewRecordBytes = 1000;
-        Map<Integer, Long> brokerEpochs = new HashMap<>();
-        try (
-            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).
-                build();
-            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
-                setControllerBuilderInitializer(controllerBuilder -> {
-                    controllerBuilder.setConfigSchema(SCHEMA);
-                    controllerBuilder.setSnapshotMaxNewRecordBytes(maxNewRecordBytes);
-                }).
-                build();
-        ) {
-            QuorumController active = controlEnv.activeController();
-            for (int i = 0; i < numBrokers; i++) {
-                BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT,
-                    new BrokerRegistrationRequestData().
-                        setBrokerId(i).
-                        setRack(null).
-                        setClusterId(active.clusterId()).
-                        setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_4_IV0)).
-                        setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)).
-                        setListeners(new ListenerCollection(Arrays.asList(new Listener().
-                            setName("PLAINTEXT").setHost("localhost").
-                            setPort(9092 + i)).iterator()))).get();
-                brokerEpochs.put(i, reply.epoch());
-                assertEquals(new BrokerHeartbeatReply(true, false, false, false),
-                    active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
-                        setWantFence(false).setBrokerEpoch(brokerEpochs.get(i)).
-                        setBrokerId(i).setCurrentMetadataOffset(100000L)).get());
-            }
-
-            assertTrue(logEnv.appendedBytes() < maxNewRecordBytes,
-                String.format("%s appended bytes is not less than %s max new record bytes",
-                    logEnv.appendedBytes(),
-                    maxNewRecordBytes));
-
-            // Keep creating topic until we reached the max bytes limit
-            int counter = 0;
-            while (logEnv.appendedBytes() < maxNewRecordBytes) {
-                counter += 1;
-                String topicName = String.format("foo-%s", counter);
-                active.createTopics(ANONYMOUS_CONTEXT, new CreateTopicsRequestData().setTopics(
-                        new CreatableTopicCollection(Collections.singleton(
-                            new CreatableTopic().setName(topicName).setNumPartitions(-1).
-                                setReplicationFactor((short) -1).
-                                setAssignments(new CreatableReplicaAssignmentCollection(
-                                    Arrays.asList(new CreatableReplicaAssignment().
-                                        setPartitionIndex(0).
-                                        setBrokerIds(Arrays.asList(0, 1, 2)),
-                                    new CreatableReplicaAssignment().
-                                        setPartitionIndex(1).
-                                        setBrokerIds(Arrays.asList(1, 2, 0))).
-                                            iterator()))).iterator())),
-                    Collections.singleton(topicName)).get(60, TimeUnit.SECONDS);
-            }
-            logEnv.waitForLatestSnapshot();
-        }
-    }
-
-    @Test
-    public void testSnapshotAfterConfiguredMaxInterval() throws Throwable {
-        final int numBrokers = 4;
-        Map<Integer, Long> brokerEpochs = new HashMap<>();
-        try (
-            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(3).
-                build();
-            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
-                setControllerBuilderInitializer(controllerBuilder -> {
-                    controllerBuilder.setConfigSchema(SCHEMA);
-                    controllerBuilder.setSnapshotMaxIntervalMs(100);
-                    // Disable snapshot generation due to bytes committed
-                    controllerBuilder.setSnapshotMaxNewRecordBytes(Long.MAX_VALUE);
-                }).
-                build();
-        ) {
-            QuorumController active = controlEnv.activeController();
-            for (int i = 0; i < numBrokers; i++) {
-                BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT,
-                    new BrokerRegistrationRequestData().
-                        setBrokerId(i).
-                        setRack(null).
-                        setClusterId(active.clusterId()).
-                        setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_4_IV0)).
-                        setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)).
-                        setListeners(new ListenerCollection(Arrays.asList(new Listener().
-                            setName("PLAINTEXT").setHost("localhost").
-                            setPort(9092 + i)).iterator()))).get();
-                brokerEpochs.put(i, reply.epoch());
-                assertEquals(new BrokerHeartbeatReply(true, false, false, false),
-                    active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
-                        setWantFence(false).setBrokerEpoch(brokerEpochs.get(i)).
-                        setBrokerId(i).setCurrentMetadataOffset(100000L)).get());
-            }
-
-            logEnv.waitForLatestSnapshot();
-        }
-    }
-
-    @Test
-    public void testSnapshotAfterRepeatedResign() throws Throwable {
-        final int numBrokers = 4;
-        final int maxNewRecordBytes = 1000;
-        Map<Integer, Long> brokerEpochs = new HashMap<>();
-        try (
-            LocalLogManagerTestEnv logEnv = new LocalLogManagerTestEnv.Builder(1).
-                build();
-            QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
-                setControllerBuilderInitializer(controllerBuilder -> {
-                    controllerBuilder.setConfigSchema(SCHEMA);
-                    controllerBuilder.setSnapshotMaxNewRecordBytes(maxNewRecordBytes);
-                }).
-                build();
-        ) {
-            QuorumController active = controlEnv.activeController();
-            for (int i = 0; i < numBrokers; i++) {
-                BrokerRegistrationReply reply = active.registerBroker(ANONYMOUS_CONTEXT,
-                    new BrokerRegistrationRequestData().
-                        setBrokerId(i).
-                        setRack(null).
-                        setClusterId(active.clusterId()).
-                        setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_4_IV0)).
-                        setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + i)).
-                        setListeners(new ListenerCollection(Arrays.asList(new Listener().
-                            setName("PLAINTEXT").setHost("localhost").
-                            setPort(9092 + i)).iterator()))).get();
-                brokerEpochs.put(i, reply.epoch());
-                assertEquals(new BrokerHeartbeatReply(true, false, false, false),
-                    active.processBrokerHeartbeat(ANONYMOUS_CONTEXT, new BrokerHeartbeatRequestData().
-                        setWantFence(false).setBrokerEpoch(brokerEpochs.get(i)).
-                        setBrokerId(i).setCurrentMetadataOffset(100000L)).get());
-            }
-
-            assertTrue(logEnv.appendedBytes() < maxNewRecordBytes,
-                String.format("%s appended bytes is not less than %s max new record bytes",
-                    logEnv.appendedBytes(),
-                    maxNewRecordBytes));
-
-            // Keep creating topic and resign leader until we reached the max bytes limit
-            int counter = 0;
-            while (logEnv.appendedBytes() < maxNewRecordBytes) {
-                active = controlEnv.activeController();
-
-                counter += 1;
-                String topicName = String.format("foo-%s", counter);
-                active.createTopics(ANONYMOUS_CONTEXT, new CreateTopicsRequestData().setTopics(
-                        new CreatableTopicCollection(Collections.singleton(
-                            new CreatableTopic().setName(topicName).setNumPartitions(-1).
-                                setReplicationFactor((short) -1).
-                                setAssignments(new CreatableReplicaAssignmentCollection(
-                                    Arrays.asList(new CreatableReplicaAssignment().
-                                        setPartitionIndex(0).
-                                        setBrokerIds(Arrays.asList(0, 1, 2)),
-                                    new CreatableReplicaAssignment().
-                                        setPartitionIndex(1).
-                                        setBrokerIds(Arrays.asList(1, 2, 0))).
-                                            iterator()))).iterator())),
-                    Collections.singleton(topicName)).get(60, TimeUnit.SECONDS);
-
-                LocalLogManager activeLocalLogManager = logEnv.logManagers().get(active.nodeId());
-                activeLocalLogManager.resign(activeLocalLogManager.leaderAndEpoch().epoch());
-            }
-            logEnv.waitForLatestSnapshot();
-        }
-    }
-
-    private SnapshotReader<ApiMessageAndVersion> createSnapshotReader(RawSnapshotReader reader) {
-        return RecordsSnapshotReader.of(
-            reader,
-            new MetadataRecordSerde(),
-            BufferSupplier.create(),
-            Integer.MAX_VALUE,
-            true
-        );
-    }
-
-    private List<ApiMessageAndVersion> expectedSnapshotContent(Uuid fooId, Map<Integer, Long> brokerEpochs) {
+    private List<ApiMessageAndVersion> generateTestRecords(Uuid fooId, Map<Integer, Long> brokerEpochs) {
         return Arrays.asList(
             new ApiMessageAndVersion(new FeatureLevelRecord().
                 setName(MetadataVersion.FEATURE_NAME).
                 setFeatureLevel(MetadataVersion.IBP_3_3_IV3.featureLevel()), (short) 0),
-            new ApiMessageAndVersion(new TopicRecord().
-                setName("foo").setTopicId(fooId), (short) 0),
-            new ApiMessageAndVersion(new PartitionRecord().setPartitionId(0).
-                setTopicId(fooId).setReplicas(Arrays.asList(0, 1, 2)).
-                setIsr(Arrays.asList(0, 1, 2)).setRemovingReplicas(Collections.emptyList()).
-                setAddingReplicas(Collections.emptyList()).setLeader(0).setLeaderEpoch(0).
-                setPartitionEpoch(0), (short) 0),
-            new ApiMessageAndVersion(new PartitionRecord().setPartitionId(1).
-                setTopicId(fooId).setReplicas(Arrays.asList(1, 2, 0)).
-                setIsr(Arrays.asList(1, 2, 0)).setRemovingReplicas(Collections.emptyList()).
-                setAddingReplicas(Collections.emptyList()).setLeader(1).setLeaderEpoch(0).
-                setPartitionEpoch(0), (short) 0),
             new ApiMessageAndVersion(new RegisterBrokerRecord().
                 setBrokerId(0).setBrokerEpoch(brokerEpochs.get(0)).
                 setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB0")).
-                setEndPoints(
-                    new BrokerEndpointCollection(
-                        Arrays.asList(
-                            new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
-                            setPort(9092).setSecurityProtocol((short) 0)).iterator())).
+                setEndPoints(new BrokerEndpointCollection(
+                    Arrays.asList(new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
+                        setPort(9092).setSecurityProtocol((short) 0)).iterator())).
                 setFeatures(registrationFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
                 setRack(null).
-                setFenced(false), (short) 1),
+                setFenced(true), (short) 1),
             new ApiMessageAndVersion(new RegisterBrokerRecord().
                 setBrokerId(1).setBrokerEpoch(brokerEpochs.get(1)).
                 setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB1")).
-                setEndPoints(
-                    new BrokerEndpointCollection(
-                        Arrays.asList(
-                            new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
-                            setPort(9093).setSecurityProtocol((short) 0)).iterator())).
+                setEndPoints(new BrokerEndpointCollection(Arrays.asList(
+                    new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
+                        setPort(9093).setSecurityProtocol((short) 0)).iterator())).
                 setFeatures(registrationFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
                 setRack(null).
-                setFenced(false), (short) 1),
+                setFenced(true), (short) 1),
             new ApiMessageAndVersion(new RegisterBrokerRecord().
                 setBrokerId(2).setBrokerEpoch(brokerEpochs.get(2)).
                 setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB2")).
-                setEndPoints(
-                    new BrokerEndpointCollection(
-                        Arrays.asList(
-                            new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
-                            setPort(9094).setSecurityProtocol((short) 0)).iterator())).
+                setEndPoints(new BrokerEndpointCollection(
+                    Arrays.asList(new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
+                        setPort(9094).setSecurityProtocol((short) 0)).iterator())).
                 setFeatures(registrationFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
                 setRack(null).
-                setFenced(false), (short) 1),
+                setFenced(true), (short) 1),
             new ApiMessageAndVersion(new RegisterBrokerRecord().
                 setBrokerId(3).setBrokerEpoch(brokerEpochs.get(3)).
                 setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB3")).
@@ -994,59 +704,36 @@ public class QuorumControllerTest {
                     new BrokerEndpoint().setName("PLAINTEXT").setHost("localhost").
                         setPort(9095).setSecurityProtocol((short) 0)).iterator())).
                 setFeatures(registrationFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_3_IV3)).
-                setRack(null), (short) 1),
+                setRack(null).
+                setFenced(true), (short) 1),
+            new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
+                setBrokerId(0).
+                setBrokerEpoch(brokerEpochs.get(0)).
+                setFenced(BrokerRegistrationFencingChange.UNFENCE.value()), (short) 0),
+            new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
+                setBrokerId(1).
+                setBrokerEpoch(brokerEpochs.get(1)).
+                setFenced(BrokerRegistrationFencingChange.UNFENCE.value()), (short) 0),
+            new ApiMessageAndVersion(new BrokerRegistrationChangeRecord().
+                setBrokerId(2).
+                setBrokerEpoch(brokerEpochs.get(2)).
+                setFenced(BrokerRegistrationFencingChange.UNFENCE.value()), (short) 0),
+            new ApiMessageAndVersion(new TopicRecord().
+                setName("foo").setTopicId(fooId), (short) 0),
+            new ApiMessageAndVersion(new PartitionRecord().setPartitionId(0).
+                setTopicId(fooId).setReplicas(Arrays.asList(0, 1, 2)).
+                setIsr(Arrays.asList(0, 1, 2)).setRemovingReplicas(Collections.emptyList()).
+                setAddingReplicas(Collections.emptyList()).setLeader(0).setLeaderEpoch(0).
+                setPartitionEpoch(0), (short) 0),
+            new ApiMessageAndVersion(new PartitionRecord().setPartitionId(1).
+                setTopicId(fooId).setReplicas(Arrays.asList(1, 2, 0)).
+                setIsr(Arrays.asList(1, 2, 0)).setRemovingReplicas(Collections.emptyList()).
+                setAddingReplicas(Collections.emptyList()).setLeader(1).setLeaderEpoch(0).
+                setPartitionEpoch(0), (short) 0),
             new ApiMessageAndVersion(new ProducerIdsRecord().
                 setBrokerId(0).
                 setBrokerEpoch(brokerEpochs.get(0)).
-                setNextProducerId(1000), (short) 0)
-        );
-    }
-
-    private void checkSnapshotContent(
-        List<ApiMessageAndVersion> expected,
-        Iterator<Batch<ApiMessageAndVersion>> iterator
-    ) throws Exception {
-        RecordTestUtils.assertBatchIteratorContains(
-            Arrays.asList(expected),
-            Arrays.asList(
-                StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
-                             .flatMap(batch ->  batch.records().stream())
-                             .collect(Collectors.toList())
-            ).iterator()
-        );
-    }
-
-    /**
-     * This function checks that the iterator is a subset of the expected list.
-     *
-     * This is needed because when generating snapshots through configuration is difficult to control exactly when a
-     * snapshot will be generated and which committed offset will be included in the snapshot.
-     */
-    private void checkSnapshotSubcontent(
-        List<ApiMessageAndVersion> expected,
-        Iterator<Batch<ApiMessageAndVersion>> iterator
-    ) throws Exception {
-        RecordTestUtils.deepSortRecords(expected);
-
-        List<ApiMessageAndVersion> actual = StreamSupport
-            .stream(Spliterators.spliteratorUnknownSize(iterator, Spliterator.ORDERED), false)
-            .flatMap(batch ->  batch.records().stream())
-            .collect(Collectors.toList());
-
-        RecordTestUtils.deepSortRecords(actual);
-
-        int expectedIndex = 0;
-        for (ApiMessageAndVersion current : actual) {
-            while (expectedIndex < expected.size() && !expected.get(expectedIndex).equals(current)) {
-                expectedIndex += 1;
-            }
-
-            if (expectedIndex >= expected.size()) {
-                fail("Failed to find record " + current + " in the expected record set: " + expected);
-            }
-
-            expectedIndex += 1;
-        }
+                setNextProducerId(1000), (short) 0));
     }
 
     /**
@@ -1380,7 +1067,7 @@ public class QuorumControllerTest {
                 }).
                 build()
         ) {
-            logEnv.appendInitialRecords(expectedSnapshotContent(FOO_ID, ALL_ZERO_BROKER_EPOCHS));
+            logEnv.appendInitialRecords(generateTestRecords(FOO_ID, ALL_ZERO_BROKER_EPOCHS));
             logEnv.logManagers().forEach(m -> m.setMaxReadOffset(2));
             try (
                 QuorumControllerTestEnv controlEnv = new QuorumControllerTestEnv.Builder(logEnv).
diff --git a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
index c81f25e4c00..0997792cd90 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java
@@ -58,7 +58,6 @@ import org.apache.kafka.common.message.ListPartitionReassignmentsResponseData.On
 import org.apache.kafka.common.metadata.BrokerRegistrationChangeRecord;
 import org.apache.kafka.common.metadata.ConfigRecord;
 import org.apache.kafka.common.metadata.PartitionChangeRecord;
-import org.apache.kafka.common.metadata.PartitionRecord;
 import org.apache.kafka.common.metadata.RegisterBrokerRecord;
 import org.apache.kafka.common.metadata.RemoveTopicRecord;
 import org.apache.kafka.common.metadata.TopicRecord;
@@ -519,16 +518,6 @@ public class ReplicationControlManagerTest {
                 setErrorCode(Errors.TOPIC_ALREADY_EXISTS.code()).
                 setErrorMessage("Topic 'foo' already exists."));
         assertEquals(expectedResponse4, result4.response());
-        Uuid fooId = result3.response().topics().find("foo").topicId();
-        RecordTestUtils.assertBatchIteratorContains(asList(
-            asList(new ApiMessageAndVersion(new PartitionRecord().
-                    setPartitionId(0).setTopicId(fooId).
-                    setReplicas(asList(1, 2, 0)).setIsr(asList(1, 2, 0)).
-                    setRemovingReplicas(Collections.emptyList()).setAddingReplicas(Collections.emptyList()).setLeader(1).
-                    setLeaderEpoch(0).setPartitionEpoch(0), (short) 0),
-                new ApiMessageAndVersion(new TopicRecord().
-                    setTopicId(fooId).setName("foo"), (short) 0))),
-            ctx.replicationControl.iterator(Long.MAX_VALUE));
     }
 
     @Test
diff --git a/metadata/src/test/java/org/apache/kafka/controller/SnapshotGeneratorTest.java b/metadata/src/test/java/org/apache/kafka/controller/SnapshotGeneratorTest.java
deleted file mode 100644
index f7fa18f20a4..00000000000
--- a/metadata/src/test/java/org/apache/kafka/controller/SnapshotGeneratorTest.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.controller;
-
-import org.apache.kafka.common.Uuid;
-import org.apache.kafka.common.config.ConfigResource;
-import org.apache.kafka.common.memory.MemoryPool;
-import org.apache.kafka.common.metadata.ConfigRecord;
-import org.apache.kafka.common.metadata.TopicRecord;
-import org.apache.kafka.common.record.CompressionType;
-import org.apache.kafka.common.utils.LogContext;
-import org.apache.kafka.common.utils.MockTime;
-import org.apache.kafka.controller.SnapshotGenerator.Section;
-import org.apache.kafka.metadata.MetadataRecordSerde;
-import org.apache.kafka.raft.OffsetAndEpoch;
-import org.apache.kafka.server.common.ApiMessageAndVersion;
-import org.apache.kafka.snapshot.SnapshotWriter;
-import org.apache.kafka.snapshot.MockRawSnapshotWriter;
-import org.apache.kafka.snapshot.RawSnapshotWriter;
-import org.apache.kafka.snapshot.RecordsSnapshotWriter;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.Timeout;
-
-import java.util.Arrays;
-import java.util.List;
-import java.util.OptionalLong;
-import java.util.Optional;
-
-import static org.apache.kafka.raft.KafkaRaftClient.MAX_BATCH_SIZE_BYTES;
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-
-@Timeout(40)
-public class SnapshotGeneratorTest {
-    private static final List<List<ApiMessageAndVersion>> BATCHES;
-
-    static {
-        BATCHES = Arrays.asList(
-            Arrays.asList(new ApiMessageAndVersion(new TopicRecord().
-                setName("foo").setTopicId(Uuid.randomUuid()), (short) 0)),
-            Arrays.asList(new ApiMessageAndVersion(new TopicRecord().
-                setName("bar").setTopicId(Uuid.randomUuid()), (short) 0)),
-            Arrays.asList(new ApiMessageAndVersion(new TopicRecord().
-                setName("baz").setTopicId(Uuid.randomUuid()), (short) 0)),
-            Arrays.asList(new ApiMessageAndVersion(new ConfigRecord().
-                    setResourceName("foo").setResourceType(ConfigResource.Type.TOPIC.id()).
-                    setName("retention.ms").setValue("10000000"), (short) 0),
-                new ApiMessageAndVersion(new ConfigRecord().
-                    setResourceName("foo").setResourceType(ConfigResource.Type.TOPIC.id()).
-                    setName("max.message.bytes").setValue("100000000"), (short) 0)),
-            Arrays.asList(new ApiMessageAndVersion(new ConfigRecord().
-                setResourceName("bar").setResourceType(ConfigResource.Type.TOPIC.id()).
-                setName("retention.ms").setValue("5000000"), (short) 0)));
-    }
-
-    @Test
-    public void testGenerateBatches() throws Exception {
-        SnapshotWriter<ApiMessageAndVersion> writer = createSnapshotWriter(123, 0);
-        List<Section> sections = Arrays.asList(new Section("replication",
-                Arrays.asList(BATCHES.get(0), BATCHES.get(1), BATCHES.get(2)).iterator()),
-            new Section("configuration",
-                Arrays.asList(BATCHES.get(3), BATCHES.get(4)).iterator()));
-        SnapshotGenerator generator = new SnapshotGenerator(new LogContext(),
-            writer, 2, sections);
-        assertFalse(writer.isFrozen());
-        assertEquals(123L, generator.lastContainedLogOffset());
-        assertEquals(writer, generator.writer());
-        assertEquals(OptionalLong.of(0L), generator.generateBatches());
-        assertEquals(OptionalLong.of(0L), generator.generateBatches());
-        assertFalse(writer.isFrozen());
-        assertEquals(OptionalLong.empty(), generator.generateBatches());
-        assertTrue(writer.isFrozen());
-    }
-
-    private SnapshotWriter<ApiMessageAndVersion> createSnapshotWriter(
-        long committedOffset,
-        long lastContainedLogTime
-    ) {
-        return RecordsSnapshotWriter.createWithHeader(
-            () -> createNewSnapshot(new OffsetAndEpoch(committedOffset + 1, 1)),
-            MAX_BATCH_SIZE_BYTES,
-            MemoryPool.NONE,
-            new MockTime(),
-            lastContainedLogTime,
-            CompressionType.NONE,
-            new MetadataRecordSerde()
-        ).get();
-    }
-
-    private Optional<RawSnapshotWriter> createNewSnapshot(
-            OffsetAndEpoch snapshotId
-    ) {
-        return Optional.of(new MockRawSnapshotWriter(snapshotId, buffer -> { }));
-    }
-}
diff --git a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
new file mode 100644
index 00000000000..585f1dc40a7
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
@@ -0,0 +1,517 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.loader;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.metadata.FeatureLevelRecord;
+import org.apache.kafka.common.metadata.RemoveTopicRecord;
+import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.image.publisher.MetadataPublisher;
+import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.fault.MockFaultHandler;
+import org.apache.kafka.snapshot.SnapshotReader;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.CsvSource;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV1;
+import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@Timeout(value = 40)
+public class MetadataLoaderTest {
+    @Test
+    public void testCreateAndClose() throws Exception {
+        MockFaultHandler faultHandler = new MockFaultHandler("testCreateAndClose");
+        try (MetadataLoader loader = new MetadataLoader.Builder().
+                setFaultHandler(faultHandler).
+                setHighWaterMarkAccessor(() -> OptionalLong.empty()).
+                build()) {
+            assertEquals(-1L, loader.lastAppliedOffset());
+        }
+        faultHandler.maybeRethrowFirstException();
+    }
+
+    static class MockPublisher implements MetadataPublisher {
+        private final String name;
+        MetadataDelta latestDelta = null;
+        MetadataImage latestImage = null;
+        LogDeltaManifest latestLogDeltaManifest = null;
+        SnapshotManifest latestSnapshotManifest = null;
+        boolean closed = false;
+
+        MockPublisher() {
+            this("MockPublisher");
+        }
+
+        MockPublisher(String name) {
+            this.name = name;
+        }
+
+        @Override
+        public String name() {
+            return name;
+        }
+
+        @Override
+        public void publishSnapshot(
+            MetadataDelta delta,
+            MetadataImage newImage,
+            SnapshotManifest manifest
+        ) {
+            latestDelta = delta;
+            latestImage = newImage;
+            latestSnapshotManifest = manifest;
+        }
+
+        @Override
+        public void publishLogDelta(
+            MetadataDelta delta,
+            MetadataImage newImage,
+            LogDeltaManifest manifest
+        ) {
+            latestDelta = delta;
+            latestImage = newImage;
+            latestLogDeltaManifest = manifest;
+        }
+
+        @Override
+        public void close() throws Exception {
+            closed = true;
+        }
+    }
+
+    /**
+     * Install 2 publishers and check that the publishers that were installed are closed when the
+     * loader is closed.
+     */
+    @Test
+    public void testInstallPublishers() throws Exception {
+        MockFaultHandler faultHandler = new MockFaultHandler("testInstallPublishers");
+        List<MockPublisher> publishers = asList(new MockPublisher("a"),
+                new MockPublisher("b"),
+                new MockPublisher("c"));
+        try (MetadataLoader loader = new MetadataLoader.Builder().
+                setFaultHandler(faultHandler).
+                setHighWaterMarkAccessor(() -> OptionalLong.empty()).
+                build()) {
+            loader.installPublishers(publishers.subList(0, 2)).get();
+        }
+        assertTrue(publishers.get(0).closed);
+        assertNull(publishers.get(0).latestImage);
+        assertTrue(publishers.get(1).closed);
+        assertNull(publishers.get(1).latestImage);
+        assertFalse(publishers.get(2).closed);
+        assertNull(publishers.get(2).latestImage);
+        faultHandler.maybeRethrowFirstException();
+    }
+
+    static class MockSnapshotReader implements SnapshotReader<ApiMessageAndVersion> {
+        private final MetadataProvenance provenance;
+        private final Iterator<Batch<ApiMessageAndVersion>> iterator;
+        private MockTime time = null;
+        boolean closed = false;
+
+        static MockSnapshotReader fromRecordLists(
+            MetadataProvenance provenance,
+            List<List<ApiMessageAndVersion>> lists
+        ) {
+            List<Batch<ApiMessageAndVersion>> batches = new ArrayList<>();
+            lists.forEach(records -> batches.add(Batch.data(
+                provenance.offset(),
+                provenance.epoch(),
+                provenance.lastContainedLogTimeMs(),
+                0,
+                records)));
+            return new MockSnapshotReader(provenance, batches);
+        }
+
+        MockSnapshotReader(
+            MetadataProvenance provenance,
+            List<Batch<ApiMessageAndVersion>> batches
+        ) {
+            this.provenance = provenance;
+            this.iterator = batches.iterator();
+        }
+
+        MockSnapshotReader setTime(MockTime time) {
+            this.time = time;
+            return this;
+        }
+
+        @Override
+        public OffsetAndEpoch snapshotId() {
+            return provenance.offsetAndEpoch();
+        }
+
+        @Override
+        public long lastContainedLogOffset() {
+            return provenance.offset();
+        }
+
+        @Override
+        public int lastContainedLogEpoch() {
+            return provenance.epoch();
+        }
+
+        @Override
+        public long lastContainedLogTimestamp() {
+            return provenance.lastContainedLogTimeMs();
+        }
+
+        @Override
+        public void close() {
+            closed = true;
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (time != null) time.sleep(1);
+            return iterator.hasNext();
+        }
+
+        @Override
+        public Batch<ApiMessageAndVersion> next() {
+            if (time != null) time.sleep(1);
+            return iterator.next();
+        }
+    }
+
+    /**
+     * Test that a publisher cannot be installed more than once.
+     */
+    @ParameterizedTest
+    @CsvSource(value = {"false,false", "false,true", "true,false", "true,true"})
+    public void testPublisherCannotBeInstalledMoreThanOnce(
+        boolean loadSnapshot,
+        boolean sameObject
+    ) throws Exception {
+        MockFaultHandler faultHandler =
+                new MockFaultHandler("testPublisherCannotBeInstalledMoreThanOnce");
+        MockPublisher publisher = new MockPublisher();
+        try (MetadataLoader loader = new MetadataLoader.Builder().
+                setFaultHandler(faultHandler).
+                setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).
+                build()) {
+            loader.installPublishers(asList(publisher)).get();
+            if (loadSnapshot) {
+                MockSnapshotReader snapshotReader = new MockSnapshotReader(
+                        new MetadataProvenance(200, 100, 4000),
+                        asList(Batch.control(200, 100, 4000, 10, 200)));
+                loader.handleSnapshot(snapshotReader);
+            }
+            loader.waitForAllEventsToBeHandled();
+            if (sameObject) {
+                assertEquals("testPublisherCannotBeInstalledMoreThanOnce: Attempted to install " +
+                    "publisher MockPublisher, which is already installed.",
+                        assertThrows(ExecutionException.class,
+                                () -> loader.installPublishers(asList(publisher)).get()).
+                                getCause().getMessage());
+            } else {
+                assertEquals("testPublisherCannotBeInstalledMoreThanOnce: Attempted to install " +
+                    "a new publisher named MockPublisher, but there is already a publisher with that name.",
+                        assertThrows(ExecutionException.class,
+                                () -> loader.installPublishers(asList(new MockPublisher())).get()).
+                                getCause().getMessage());
+            }
+        }
+    }
+
+    /**
+     * Install 2 publishers and remove one.
+     */
+    @Test
+    public void testRemovePublisher() throws Exception {
+        MockFaultHandler faultHandler = new MockFaultHandler("testRemovePublisher");
+        List<MockPublisher> publishers = asList(new MockPublisher("a"),
+                new MockPublisher("b"),
+                new MockPublisher("c"));
+        try (MetadataLoader loader = new MetadataLoader.Builder().
+                setFaultHandler(faultHandler).
+                setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).
+                build()) {
+            loader.installPublishers(publishers.subList(0, 2)).get();
+            loader.removeAndClosePublisher(publishers.get(1)).get();
+            MockSnapshotReader snapshotReader = MockSnapshotReader.fromRecordLists(
+                new MetadataProvenance(100, 50, 2000),
+                asList(asList(new ApiMessageAndVersion(
+                    new FeatureLevelRecord().
+                        setName(MetadataVersion.FEATURE_NAME).
+                        setFeatureLevel(IBP_3_3_IV2.featureLevel()), (short) 0))));
+            assertFalse(snapshotReader.closed);
+            loader.handleSnapshot(snapshotReader);
+            loader.waitForAllEventsToBeHandled();
+            assertTrue(snapshotReader.closed);
+            loader.removeAndClosePublisher(publishers.get(0)).get();
+        }
+        assertTrue(publishers.get(0).closed);
+        assertEquals(IBP_3_3_IV2,
+                publishers.get(0).latestImage.features().metadataVersion());
+        assertTrue(publishers.get(1).closed);
+        assertNull(publishers.get(1).latestImage);
+        assertFalse(publishers.get(2).closed);
+        assertNull(publishers.get(2).latestImage);
+        faultHandler.maybeRethrowFirstException();
+    }
+
+    /**
+     * Test loading a snapshot with 0 records.
+     */
+    @Test
+    public void testLoadEmptySnapshot() throws Exception {
+        MockFaultHandler faultHandler = new MockFaultHandler("testLoadEmptySnapshot");
+        MockTime time = new MockTime();
+        List<MockPublisher> publishers = asList(new MockPublisher());
+        try (MetadataLoader loader = new MetadataLoader.Builder().
+                setFaultHandler(faultHandler).
+                setTime(time).
+                setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).
+                build()) {
+            loader.installPublishers(publishers).get();
+            loadEmptySnapshot(loader, 200);
+            assertEquals(200L, loader.lastAppliedOffset());
+            loadEmptySnapshot(loader, 300);
+            assertEquals(300L, loader.lastAppliedOffset());
+            assertEquals(new SnapshotManifest(new MetadataProvenance(300, 100, 4000), 3000000L),
+                publishers.get(0).latestSnapshotManifest);
+        }
+        assertTrue(publishers.get(0).closed);
+        assertEquals(MetadataVersion.IBP_3_0_IV1,
+                publishers.get(0).latestImage.features().metadataVersion());
+        assertTrue(publishers.get(0).latestImage.isEmpty());
+        faultHandler.maybeRethrowFirstException();
+    }
+
+    private void loadEmptySnapshot(
+        MetadataLoader loader,
+        long offset
+    ) throws Exception {
+        MockSnapshotReader snapshotReader = new MockSnapshotReader(
+                new MetadataProvenance(offset, 100, 4000),
+                asList(Batch.control(200, 100, 4000, 10, 200)));
+        if (loader.time() instanceof MockTime) {
+            snapshotReader.setTime((MockTime) loader.time());
+        }
+        loader.handleSnapshot(snapshotReader);
+        loader.waitForAllEventsToBeHandled();
+    }
+
+    static class MockBatchReader implements BatchReader<ApiMessageAndVersion> {
+        private final long baseOffset;
+        private final Iterator<Batch<ApiMessageAndVersion>> iterator;
+        private boolean closed = false;
+        private MockTime time = null;
+
+        static Batch<ApiMessageAndVersion> newBatch(
+            long batchBaseOffset,
+            int epoch,
+            List<ApiMessageAndVersion> records
+        ) {
+            return Batch.data(batchBaseOffset, epoch, 0, 0, records);
+        }
+
+        MockBatchReader(
+            long baseOffset,
+            List<Batch<ApiMessageAndVersion>> batches
+        ) {
+            this.baseOffset = baseOffset;
+            this.iterator = batches.iterator();
+        }
+
+        private MockBatchReader setTime(MockTime time) {
+            this.time = time;
+            return this;
+        }
+
+        @Override
+        public long baseOffset() {
+            return baseOffset;
+        }
+
+        @Override
+        public OptionalLong lastOffset() {
+            return OptionalLong.empty();
+        }
+
+        @Override
+        public void close() {
+            this.closed = true;
+        }
+
+        @Override
+        public boolean hasNext() {
+            if (time != null) time.sleep(1);
+            return iterator.hasNext();
+        }
+
+        @Override
+        public Batch<ApiMessageAndVersion> next() {
+            if (time != null) time.sleep(1);
+            return iterator.next();
+        }
+    }
+
+    /**
+     * Test loading a batch with 0 records.
+     */
+    @Test
+    public void testLoadEmptyBatch() throws Exception {
+        MockFaultHandler faultHandler = new MockFaultHandler("testLoadEmptyBatch");
+        MockTime time = new MockTime();
+        List<MockPublisher> publishers = asList(new MockPublisher());
+        try (MetadataLoader loader = new MetadataLoader.Builder().
+                setFaultHandler(faultHandler).
+                setTime(time).
+                setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).
+                build()) {
+            loader.installPublishers(publishers).get();
+            loadTestSnapshot(loader, 200);
+            MockBatchReader batchReader = new MockBatchReader(300, asList(
+                Batch.control(300, 100, 4000, 10, 400))).
+                    setTime(time);
+            loader.handleCommit(batchReader);
+            loader.waitForAllEventsToBeHandled();
+            assertTrue(batchReader.closed);
+            assertEquals(400L, loader.lastAppliedOffset());
+        }
+        assertTrue(publishers.get(0).closed);
+        assertEquals(new LogDeltaManifest(new MetadataProvenance(400, 100, 4000), 1,
+                        3000000L, 10),
+            publishers.get(0).latestLogDeltaManifest);
+        assertEquals(MetadataVersion.IBP_3_3_IV1,
+            publishers.get(0).latestImage.features().metadataVersion());
+        faultHandler.maybeRethrowFirstException();
+    }
+
+    /**
+     * Test that the lastAppliedOffset moves forward as expected.
+     */
+    @Test
+    public void testLastAppliedOffset() throws Exception {
+        MockFaultHandler faultHandler = new MockFaultHandler("testLastAppliedOffset");
+        List<MockPublisher> publishers = asList(new MockPublisher("a"),
+                new MockPublisher("b"));
+        try (MetadataLoader loader = new MetadataLoader.Builder().
+                setFaultHandler(faultHandler).
+                setHighWaterMarkAccessor(() -> OptionalLong.of(0L)).
+                build()) {
+            loader.installPublishers(publishers).get();
+            loader.handleSnapshot(MockSnapshotReader.fromRecordLists(
+                new MetadataProvenance(200, 100, 4000), asList(
+                    asList(new ApiMessageAndVersion(new FeatureLevelRecord().
+                        setName(MetadataVersion.FEATURE_NAME).
+                        setFeatureLevel(IBP_3_3_IV1.featureLevel()), (short) 0)),
+                    asList(new ApiMessageAndVersion(new TopicRecord().
+                        setName("foo").
+                        setTopicId(Uuid.fromString("Uum7sfhHQP-obSvfywmNUA")), (short) 0))
+                )));
+            loader.waitForAllEventsToBeHandled();
+            assertEquals(200L, loader.lastAppliedOffset());
+            loader.handleCommit(new MockBatchReader(201, asList(
+                MockBatchReader.newBatch(201, 100, asList(
+                    new ApiMessageAndVersion(new RemoveTopicRecord().
+                        setTopicId(Uuid.fromString("Uum7sfhHQP-obSvfywmNUA")), (short) 0))))));
+            loader.waitForAllEventsToBeHandled();
+            assertEquals(201L, loader.lastAppliedOffset());
+        }
+        for (int i = 0; i < 2; i++) {
+            assertTrue(publishers.get(i).closed);
+            assertTrue(publishers.get(i).closed);
+            assertEquals(IBP_3_3_IV1,
+                    publishers.get(i).latestImage.features().metadataVersion());
+        }
+        faultHandler.maybeRethrowFirstException();
+    }
+
+    /**
+     * Test that we do not leave the catchingUp state state until we have loaded up to the high
+     * water mark.
+     */
+    public void testCatchingUpState() throws Exception {
+        MockFaultHandler faultHandler = new MockFaultHandler("testLastAppliedOffset");
+        List<MockPublisher> publishers = asList(new MockPublisher("a"),
+                new MockPublisher("b"));
+        AtomicReference<OptionalLong> highWaterMark = new AtomicReference<>(OptionalLong.empty());
+        try (MetadataLoader loader = new MetadataLoader.Builder().
+                setFaultHandler(faultHandler).
+                setHighWaterMarkAccessor(() -> highWaterMark.get()).
+                build()) {
+            loader.installPublishers(publishers).get();
+            loadTestSnapshot(loader, 200);
+
+            // We don't update lastAppliedOffset because we're still in catchingUp state due to
+            // highWaterMark being OptionalLong.empty (aka unknown).
+            assertEquals(-1L, loader.lastAppliedOffset());
+
+            // Setting the high water mark here doesn't do anything because we only check it when
+            // we're publishing an update. This is OK because we know that we'll get updates
+            // frequently. If there is no other activity, there will at least be NoOpRecords.
+            highWaterMark.set(OptionalLong.of(0));
+            assertEquals(-1L, loader.lastAppliedOffset());
+
+            // This still doesn't advance lastAppliedOffset since the high water mark at 220
+            // is greater than our snapshot at 210.
+            highWaterMark.set(OptionalLong.of(220));
+            loadTestSnapshot(loader, 210);
+            assertEquals(-1L, loader.lastAppliedOffset());
+
+            // Loading a test snapshot at 220 allows us to leave catchUp state.
+            loadTestSnapshot(loader, 220);
+            assertEquals(220L, loader.lastAppliedOffset());
+        }
+        faultHandler.maybeRethrowFirstException();
+    }
+
+    private void loadTestSnapshot(
+        MetadataLoader loader,
+        long offset
+    ) throws Exception {
+        loader.handleSnapshot(MockSnapshotReader.fromRecordLists(
+                new MetadataProvenance(offset, 100, 4000), asList(
+                        asList(new ApiMessageAndVersion(new FeatureLevelRecord().
+                                setName(MetadataVersion.FEATURE_NAME).
+                                setFeatureLevel(IBP_3_3_IV1.featureLevel()), (short) 0)),
+                        asList(new ApiMessageAndVersion(new TopicRecord().
+                                setName("foo").
+                                setTopicId(Uuid.fromString("Uum7sfhHQP-obSvfywmNUA")), (short) 0))
+                )));
+        loader.waitForAllEventsToBeHandled();
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
new file mode 100644
index 00000000000..ef40d714604
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotEmitterTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.publisher;
+
+import org.apache.kafka.image.MetadataImageTest;
+import org.apache.kafka.raft.LeaderAndEpoch;
+import org.apache.kafka.raft.OffsetAndEpoch;
+import org.apache.kafka.raft.RaftClient;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.snapshot.SnapshotWriter;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.NavigableSet;
+import java.util.Optional;
+import java.util.OptionalInt;
+import java.util.OptionalLong;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+
+@Timeout(value = 40)
+public class SnapshotEmitterTest {
+    static class MockRaftClient implements RaftClient<ApiMessageAndVersion> {
+        TreeMap<OffsetAndEpoch, MockSnapshotWriter> writers = new TreeMap<>();
+
+        @Override
+        public void initialize() {
+            // nothing to do
+        }
+
+        @Override
+        public void register(Listener<ApiMessageAndVersion> listener) {
+            // nothing to do
+        }
+
+        @Override
+        public void unregister(Listener<ApiMessageAndVersion> listener) {
+            // nothing to do
+        }
+
+        @Override
+        public OptionalLong highWatermark() {
+            return OptionalLong.empty();
+        }
+
+        @Override
+        public LeaderAndEpoch leaderAndEpoch() {
+            return LeaderAndEpoch.UNKNOWN;
+        }
+
+        @Override
+        public OptionalInt nodeId() {
+            return OptionalInt.empty();
+        }
+
+        @Override
+        public long scheduleAppend(int epoch, List<ApiMessageAndVersion> records) {
+            return 0;
+        }
+
+        @Override
+        public long scheduleAtomicAppend(int epoch, List<ApiMessageAndVersion> records) {
+            return 0;
+        }
+
+        @Override
+        public CompletableFuture<Void> shutdown(int timeoutMs) {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        @Override
+        public void resign(int epoch) {
+            // nothing to do
+        }
+
+        @Override
+        public Optional<SnapshotWriter<ApiMessageAndVersion>> createSnapshot(
+            OffsetAndEpoch snapshotId,
+            long lastContainedLogTime
+        ) {
+            if (writers.containsKey(snapshotId)) {
+                return Optional.empty();
+            }
+            MockSnapshotWriter writer = new MockSnapshotWriter(snapshotId);
+            writers.put(snapshotId, writer);
+            return Optional.of(writer);
+        }
+
+        @Override
+        public Optional<OffsetAndEpoch> latestSnapshotId() {
+            NavigableSet<OffsetAndEpoch> descendingSet = writers.descendingKeySet();
+            if (descendingSet.isEmpty()) {
+                return Optional.empty();
+            } else {
+                return Optional.of(descendingSet.first());
+            }
+        }
+
+        @Override
+        public void close() throws Exception {
+            // nothing to do
+        }
+    }
+
+    static class MockSnapshotWriter implements SnapshotWriter<ApiMessageAndVersion> {
+        private final OffsetAndEpoch snapshotId;
+        private boolean frozen = false;
+        private boolean closed = false;
+        private final List<List<ApiMessageAndVersion>> batches;
+
+        MockSnapshotWriter(OffsetAndEpoch snapshotId) {
+            this.snapshotId = snapshotId;
+            this.batches = new ArrayList<>();
+        }
+
+        @Override
+        public OffsetAndEpoch snapshotId() {
+            return snapshotId;
+        }
+
+        @Override
+        public long lastContainedLogOffset() {
+            return snapshotId.offset();
+        }
+
+        @Override
+        public int lastContainedLogEpoch() {
+            return snapshotId.epoch();
+        }
+
+        @Override
+        public boolean isFrozen() {
+            return frozen;
+        }
+
+        @Override
+        public void append(List<ApiMessageAndVersion> records) {
+            batches.add(records);
+        }
+
+        List<List<ApiMessageAndVersion>> batches() {
+            List<List<ApiMessageAndVersion>> results = new ArrayList<>();
+            batches.forEach(batch -> results.add(new ArrayList<>(batch)));
+            return results;
+        }
+
+        @Override
+        public void freeze() {
+            frozen = true;
+        }
+
+        @Override
+        public void close() {
+            closed = true;
+        }
+
+        boolean isClosed() {
+            return closed;
+        }
+    }
+
+    @Test
+    public void testEmit() throws Exception {
+        MockRaftClient mockRaftClient = new MockRaftClient();
+        SnapshotEmitter emitter = new SnapshotEmitter.Builder().
+            setBatchSize(2).
+            setRaftClient(mockRaftClient).
+            build();
+        emitter.maybeEmit(MetadataImageTest.IMAGE1);
+        MockSnapshotWriter writer = mockRaftClient.writers.get(
+                MetadataImageTest.IMAGE1.highestOffsetAndEpoch());
+        assertNotNull(writer);
+        assertEquals(MetadataImageTest.IMAGE1.highestOffsetAndEpoch().offset(),
+                writer.lastContainedLogOffset());
+        assertEquals(MetadataImageTest.IMAGE1.highestOffsetAndEpoch().epoch(),
+                writer.lastContainedLogEpoch());
+        assertTrue(writer.isFrozen());
+        assertTrue(writer.isClosed());
+
+        // Second call to emit does nothing because we already have a snapshot at that offset and epoch.
+        emitter.maybeEmit(MetadataImageTest.IMAGE1);
+        assertEquals(1, mockRaftClient.writers.size());
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java
new file mode 100644
index 00000000000..42fb8d92f6e
--- /dev/null
+++ b/metadata/src/test/java/org/apache/kafka/image/publisher/SnapshotGeneratorTest.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image.publisher;
+
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.MetadataProvenance;
+import org.apache.kafka.image.loader.LogDeltaManifest;
+import org.apache.kafka.metadata.RecordTestUtils;
+import org.apache.kafka.server.fault.FaultHandlerException;
+import org.apache.kafka.server.fault.MockFaultHandler;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+
+@Timeout(value = 40)
+public class SnapshotGeneratorTest {
+    static class MockEmitter implements SnapshotGenerator.Emitter {
+        private final CountDownLatch latch = new CountDownLatch(1);
+        private final List<MetadataImage> images = new ArrayList<>();
+        private RuntimeException problem = null;
+
+        MockEmitter setReady() {
+            latch.countDown();
+            return this;
+        }
+
+        synchronized MockEmitter setProblem(RuntimeException problem) {
+            this.problem = problem;
+            return this;
+        }
+
+        @Override
+        public synchronized void maybeEmit(MetadataImage image) {
+            RuntimeException currentProblem = problem;
+            if (currentProblem != null) {
+                throw currentProblem;
+            }
+            try {
+                latch.await();
+            } catch (Throwable e) {
+                throw new RuntimeException(e);
+            }
+            images.add(image);
+        }
+
+        synchronized List<MetadataImage> images() {
+            return new ArrayList<>(images);
+        }
+    }
+
+    private final static MetadataDelta TEST_DELTA;
+
+    static {
+        TEST_DELTA = new MetadataDelta.Builder().
+                setImage(MetadataImage.EMPTY).
+                build();
+        TEST_DELTA.replay(RecordTestUtils.testRecord(0).message());
+    }
+
+    private final static MetadataImage TEST_IMAGE = TEST_DELTA.apply(MetadataProvenance.EMPTY);
+
+    @Test
+    public void testCreateSnapshot() throws Exception {
+        MockFaultHandler faultHandler = new MockFaultHandler("SnapshotGenerator");
+        MockEmitter emitter = new MockEmitter();
+        try (SnapshotGenerator generator = new SnapshotGenerator.Builder(emitter).
+                setFaultHandler(faultHandler).
+                setMaxBytesSinceLastSnapshot(200).
+                setMaxTimeSinceLastSnapshotNs(TimeUnit.DAYS.toNanos(10)).
+                build()) {
+            // Publish a log delta batch. This one will not trigger a snapshot yet.
+            generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
+                    new LogDeltaManifest(MetadataProvenance.EMPTY, 1, 100, 100));
+            // Publish a log delta batch. This will trigger a snapshot.
+            generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
+                    new LogDeltaManifest(MetadataProvenance.EMPTY, 1, 100, 100));
+            // Publish a log delta batch. This one will be ignored because there are other images
+            // queued for writing.
+            generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
+                    new LogDeltaManifest(MetadataProvenance.EMPTY, 1, 100, 2000));
+            assertEquals(Collections.emptyList(), emitter.images());
+            emitter.setReady();
+        }
+        assertEquals(Arrays.asList(TEST_IMAGE), emitter.images());
+        faultHandler.maybeRethrowFirstException();
+    }
+
+    @Test
+    public void testSnapshotsDisabled() throws Exception {
+        MockFaultHandler faultHandler = new MockFaultHandler("SnapshotGenerator");
+        MockEmitter emitter = new MockEmitter().setReady();
+        AtomicReference<String> disabledReason = new AtomicReference<>();
+        try (SnapshotGenerator generator = new SnapshotGenerator.Builder(emitter).
+                setFaultHandler(faultHandler).
+                setMaxBytesSinceLastSnapshot(1).
+                setMaxTimeSinceLastSnapshotNs(0).
+                setDisabledReason(disabledReason).
+                build()) {
+            disabledReason.compareAndSet(null, "we are testing disable()");
+            // No snapshots are generated because snapshots are disabled.
+            generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
+                    new LogDeltaManifest(MetadataProvenance.EMPTY, 1, 100, 100));
+        }
+        assertEquals(Collections.emptyList(), emitter.images());
+        faultHandler.maybeRethrowFirstException();
+    }
+
+    @Test
+    public void testTimeBasedSnapshots() throws Exception {
+        MockFaultHandler faultHandler = new MockFaultHandler("SnapshotGenerator");
+        MockEmitter emitter = new MockEmitter().setReady();
+        MockTime mockTime = new MockTime();
+        try (SnapshotGenerator generator = new SnapshotGenerator.Builder(emitter).
+                setTime(mockTime).
+                setFaultHandler(faultHandler).
+                setMaxBytesSinceLastSnapshot(200).
+                setMaxTimeSinceLastSnapshotNs(TimeUnit.MINUTES.toNanos(30)).
+                build()) {
+            // This image isn't published yet.
+            generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
+                    new LogDeltaManifest(MetadataProvenance.EMPTY, 1, 100, 50));
+            assertEquals(Collections.emptyList(), emitter.images());
+            mockTime.sleep(TimeUnit.MINUTES.toNanos(40));
+            // Next image is published because of the time delay.
+            generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
+                    new LogDeltaManifest(MetadataProvenance.EMPTY, 1, 100, 50));
+            TestUtils.waitForCondition(() -> emitter.images().size() == 1, "images.size == 1");
+            // bytesSinceLastSnapshot was reset to 0 by the previous snapshot,
+            // so this does not trigger a new snapshot.
+            generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
+                    new LogDeltaManifest(MetadataProvenance.EMPTY, 1, 100, 150));
+        }
+        assertEquals(Arrays.asList(TEST_IMAGE), emitter.images());
+        faultHandler.maybeRethrowFirstException();
+    }
+
+    @Test
+    public void testEmitterProblem() throws Exception {
+        MockFaultHandler faultHandler = new MockFaultHandler("SnapshotGenerator");
+        MockEmitter emitter = new MockEmitter().setProblem(new RuntimeException("oops"));
+        try (SnapshotGenerator generator = new SnapshotGenerator.Builder(emitter).
+                setFaultHandler(faultHandler).
+                setMaxBytesSinceLastSnapshot(200).
+                build()) {
+            for (int i = 0; i < 2; i++) {
+                generator.publishLogDelta(TEST_DELTA, TEST_IMAGE,
+                        new LogDeltaManifest(MetadataProvenance.EMPTY, 1, 10000, 50000));
+            }
+        }
+        assertEquals(Collections.emptyList(), emitter.images());
+        assertNotNull(faultHandler.firstException());
+        assertEquals(FaultHandlerException.class, faultHandler.firstException().getClass());
+        assertEquals("SnapshotGenerator: KRaft snapshot file generation error: oops",
+                faultHandler.firstException().getMessage());
+    }
+}
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
index 3bc07c06af0..a90e2687200 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManager.java
@@ -46,6 +46,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.AbstractMap.SimpleImmutableEntry;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.IdentityHashMap;
@@ -318,9 +319,9 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
         }
 
         /**
-         * Returns the snapshot whos last offset is the committed offset.
+         * Returns the snapshot whose last offset is the committed offset.
          *
-         * If such snapshot doesn't exists, it waits until it does.
+         * If such snapshot doesn't exist, it waits until it does.
          */
         synchronized RawSnapshotReader waitForSnapshot(long committedOffset) throws InterruptedException {
             while (true) {
@@ -380,6 +381,20 @@ public final class LocalLogManager implements RaftClient<ApiMessageAndVersion>,
         public long initialMaxReadOffset() {
             return initialMaxReadOffset;
         }
+
+        /**
+         * Return all records in the log as a list.
+         */
+        public synchronized List<ApiMessageAndVersion> allRecords() {
+            List<ApiMessageAndVersion> allRecords = new ArrayList<>();
+            for (LocalBatch batch : batches.values()) {
+                if (batch instanceof LocalRecordBatch) {
+                    LocalRecordBatch recordBatch = (LocalRecordBatch) batch;
+                    allRecords.addAll(recordBatch.records);
+                }
+            }
+            return allRecords;
+        }
     }
 
     private static class MetaLogListenerData {
diff --git a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
index 1693b62be1a..d72b7557b48 100644
--- a/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
+++ b/metadata/src/test/java/org/apache/kafka/metalog/LocalLogManagerTestEnv.java
@@ -140,6 +140,13 @@ public class LocalLogManagerTestEnv implements AutoCloseable {
         this.logManagers = newLogManagers;
     }
 
+    /**
+     * Return all records in the log as a list.
+     */
+    public List<ApiMessageAndVersion> allRecords() {
+        return shared.allRecords();
+    }
+
     /**
      * Append some records to the log. This method is meant to be called before the
      * controllers are started, to simulate a pre-existing metadata log.
diff --git a/tests/kafkatest/tests/core/snapshot_test.py b/tests/kafkatest/tests/core/snapshot_test.py
index 1218cff319c..cb62cb93f55 100644
--- a/tests/kafkatest/tests/core/snapshot_test.py
+++ b/tests/kafkatest/tests/core/snapshot_test.py
@@ -118,7 +118,7 @@ class TestSnapshots(ProduceConsumeValidateTest):
         cmd = "ls %s" % file_path
         files = node.account.ssh_output(cmd, allow_fail=True, combine_stderr=False)
 
-        if len(files) is 0:
+        if len(files) == 0:
             self.logger.debug("File %s does not exist" % file_path)
             return False
         else: