You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by js...@apache.org on 2023/04/07 16:26:02 UTC

[kafka] branch trunk updated: KAFKA-13020; Implement reading Snapshot log append timestamp (#13345)

This is an automated email from the ASF dual-hosted git repository.

jsancio pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 672dd3ab6ae KAFKA-13020; Implement reading Snapshot log append timestamp (#13345)
672dd3ab6ae is described below

commit 672dd3ab6aea413eaa8170236f351a0f2a35a89c
Author: José Armando García Sancio <js...@users.noreply.github.com>
AuthorDate: Fri Apr 7 09:25:54 2023 -0700

    KAFKA-13020; Implement reading Snapshot log append timestamp (#13345)
    
    The SnapshotReader exposes the "last contained log time". This is mainly used during snapshot cleanup. The previous implementation used the append time of the snapshot record. This is not accurate as this is the time when the snapshot was created and not the log append time of the last record included in the snapshot.
    
    The log append time of the last record included in the snapshot is store in the header control record of the snapshot. The header control record is the first record of the snapshot.
    
    To be able to read this record, this change extends the RecordsIterator to decode and expose the control records in the Records type.
    
    Reviewers: Colin Patrick McCabe <cm...@apache.org>
---
 checkstyle/import-control.xml                      |   1 +
 .../kafka/common/record/ControlRecordUtils.java    |  20 +--
 .../main/scala/kafka/raft/KafkaMetadataLog.scala   |  36 +++---
 core/src/main/scala/kafka/raft/RaftManager.scala   |   1 +
 .../main/scala/kafka/tools/DumpLogSegments.scala   |   4 +-
 .../scala/kafka/raft/KafkaMetadataLogTest.scala    |   1 +
 .../unit/kafka/tools/DumpLogSegmentsTest.scala     |   5 +-
 .../kafka/image/loader/MetadataLoaderTest.java     |  48 +++++--
 .../src/main/java/org/apache/kafka/raft/Batch.java |  39 ++++--
 .../java/org/apache/kafka/raft/ControlRecord.java  | 104 +++++++++++++++
 .../kafka/raft/internals/RecordsIterator.java      | 139 +++++++++++++++------
 .../kafka/snapshot/RecordsSnapshotReader.java      |  24 +++-
 .../kafka/snapshot/RecordsSnapshotWriter.java      |   2 +-
 .../java/org/apache/kafka/ControlRecordTest.java   |  59 +++++++++
 .../kafka/raft/KafkaRaftClientSnapshotTest.java    |   4 +-
 .../kafka/raft/internals/RecordsIteratorTest.java  |  91 ++++++++++++--
 .../kafka/snapshot/SnapshotWriterReaderTest.java   |   6 +-
 17 files changed, 482 insertions(+), 102 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 18189e3eaad..23791d527f6 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -251,6 +251,7 @@
     <allow pkg="org.apache.kafka.common.metadata" />
     <allow pkg="org.apache.kafka.common.protocol" />
     <allow pkg="org.apache.kafka.common.quota" />
+    <allow pkg="org.apache.kafka.common.record" />
     <allow pkg="org.apache.kafka.common.requests" />
     <allow pkg="org.apache.kafka.common.resource" />
     <allow pkg="org.apache.kafka.image" />
diff --git a/clients/src/main/java/org/apache/kafka/common/record/ControlRecordUtils.java b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordUtils.java
index a9407217605..3b1fd21f787 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/ControlRecordUtils.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/ControlRecordUtils.java
@@ -37,39 +37,39 @@ public class ControlRecordUtils {
             throw new IllegalArgumentException(
                 "Expected LEADER_CHANGE control record type(2), but found " + recordType.toString());
         }
-        return deserializeLeaderChangeMessage(record.value().duplicate());
+        return deserializeLeaderChangeMessage(record.value());
     }
 
     public static LeaderChangeMessage deserializeLeaderChangeMessage(ByteBuffer data) {
-        ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.duplicate());
+        ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.slice());
         return new LeaderChangeMessage(byteBufferAccessor, LEADER_CHANGE_CURRENT_VERSION);
     }
 
-    public static SnapshotHeaderRecord deserializedSnapshotHeaderRecord(Record record) {
+    public static SnapshotHeaderRecord deserializeSnapshotHeaderRecord(Record record) {
         ControlRecordType recordType = ControlRecordType.parse(record.key());
         if (recordType != ControlRecordType.SNAPSHOT_HEADER) {
             throw new IllegalArgumentException(
                 "Expected SNAPSHOT_HEADER control record type(3), but found " + recordType.toString());
         }
-        return deserializedSnapshotHeaderRecord(record.value().duplicate());
+        return deserializeSnapshotHeaderRecord(record.value());
     }
 
-    public static SnapshotHeaderRecord deserializedSnapshotHeaderRecord(ByteBuffer data) {
-        ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.duplicate());
+    public static SnapshotHeaderRecord deserializeSnapshotHeaderRecord(ByteBuffer data) {
+        ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.slice());
         return new SnapshotHeaderRecord(byteBufferAccessor, SNAPSHOT_HEADER_CURRENT_VERSION);
     }
 
-    public static SnapshotFooterRecord deserializedSnapshotFooterRecord(Record record) {
+    public static SnapshotFooterRecord deserializeSnapshotFooterRecord(Record record) {
         ControlRecordType recordType = ControlRecordType.parse(record.key());
         if (recordType != ControlRecordType.SNAPSHOT_FOOTER) {
             throw new IllegalArgumentException(
                 "Expected SNAPSHOT_FOOTER control record type(4), but found " + recordType.toString());
         }
-        return deserializedSnapshotFooterRecord(record.value().duplicate());
+        return deserializeSnapshotFooterRecord(record.value());
     }
 
-    public static SnapshotFooterRecord deserializedSnapshotFooterRecord(ByteBuffer data) {
-        ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.duplicate());
+    public static SnapshotFooterRecord deserializeSnapshotFooterRecord(ByteBuffer data) {
+        ByteBufferAccessor byteBufferAccessor = new ByteBufferAccessor(data.slice());
         return new SnapshotFooterRecord(byteBufferAccessor, SNAPSHOT_FOOTER_CURRENT_VERSION);
     }
 }
diff --git a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
index 277b3e1c900..69df7552e15 100644
--- a/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
+++ b/core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
@@ -22,12 +22,15 @@ import kafka.server.{BrokerTopicStats, KafkaConfig, RequestLocal}
 import kafka.utils.{CoreUtils, Logging}
 import org.apache.kafka.common.config.{AbstractConfig, TopicConfig}
 import org.apache.kafka.common.errors.InvalidConfigurationException
-import org.apache.kafka.common.record.{ControlRecordUtils, MemoryRecords, Records}
-import org.apache.kafka.common.utils.{BufferSupplier, Time}
+import org.apache.kafka.common.record.{MemoryRecords, Records}
+import org.apache.kafka.common.utils.BufferSupplier.GrowableBufferSupplier
+import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
 import org.apache.kafka.raft.{Isolation, KafkaRaftClient, LogAppendInfo, LogFetchInfo, LogOffsetMetadata, OffsetAndEpoch, OffsetMetadata, ReplicatedLog, ValidOffsetAndEpoch}
+import org.apache.kafka.server.common.serialization.RecordSerde
 import org.apache.kafka.server.util.Scheduler
-import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter, SnapshotPath, Snapshots}
+import org.apache.kafka.snapshot.{FileRawSnapshotReader, FileRawSnapshotWriter, RawSnapshotReader, RawSnapshotWriter,
+RecordsSnapshotReader, SnapshotPath, Snapshots}
 import org.apache.kafka.storage.internals
 import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchIsolation, LogConfig, LogDirFailureChannel, LogStartOffsetIncrementReason, ProducerStateManagerConfig}
 
@@ -40,6 +43,7 @@ import scala.compat.java8.OptionConverters._
 
 final class KafkaMetadataLog private (
   val log: UnifiedLog,
+  recordSerde: RecordSerde[_],
   time: Time,
   scheduler: Scheduler,
   // Access to this object needs to be synchronized because it is used by the snapshotting thread to notify the
@@ -363,17 +367,19 @@ final class KafkaMetadataLog private (
    * Return the max timestamp of the first batch in a snapshot, if the snapshot exists and has records
    */
   private def readSnapshotTimestamp(snapshotId: OffsetAndEpoch): Option[Long] = {
-    readSnapshot(snapshotId).asScala.flatMap { reader =>
-      val batchIterator = reader.records().batchIterator()
-
-      val firstBatch = batchIterator.next()
-      val records = firstBatch.streamingIterator(new BufferSupplier.GrowableBufferSupplier())
-      if (firstBatch.isControlBatch) {
-        val header = ControlRecordUtils.deserializedSnapshotHeaderRecord(records.next())
-        Some(header.lastContainedLogTimestamp())
-      } else {
-        warn("Did not find control record at beginning of snapshot")
-        None
+    readSnapshot(snapshotId).asScala.map { reader =>
+      val recordsSnapshotReader = RecordsSnapshotReader.of(
+        reader,
+        recordSerde,
+        new GrowableBufferSupplier(),
+        KafkaRaftClient.MAX_BATCH_SIZE_BYTES,
+        true
+      )
+
+      try {
+        recordsSnapshotReader.lastContainedLogTimestamp
+      } finally {
+        recordsSnapshotReader.close()
       }
     }
   }
@@ -548,6 +554,7 @@ object KafkaMetadataLog extends Logging {
     topicPartition: TopicPartition,
     topicId: Uuid,
     dataDir: File,
+    recordSerde: RecordSerde[_],
     time: Time,
     scheduler: Scheduler,
     config: MetadataLogConfig
@@ -597,6 +604,7 @@ object KafkaMetadataLog extends Logging {
 
     val metadataLog = new KafkaMetadataLog(
       log,
+      recordSerde,
       time,
       scheduler,
       recoverSnapshots(log),
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala b/core/src/main/scala/kafka/raft/RaftManager.scala
index ea0f453c9e5..e6e5aa0bb36 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -264,6 +264,7 @@ class KafkaRaftManager[T](
       topicPartition,
       topicId,
       dataDir,
+      recordSerde,
       time,
       scheduler,
       config = MetadataLogConfig(config, KafkaRaftClient.MAX_BATCH_SIZE_BYTES, KafkaRaftClient.MAX_FETCH_SIZE_BYTES)
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index bb773d86ade..8a37e659220 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -302,10 +302,10 @@ object DumpLogSegments {
                     val endTxnMarker = EndTransactionMarker.deserialize(record)
                     print(s" endTxnMarker: ${endTxnMarker.controlType} coordinatorEpoch: ${endTxnMarker.coordinatorEpoch}")
                   case ControlRecordType.SNAPSHOT_HEADER =>
-                    val header = ControlRecordUtils.deserializedSnapshotHeaderRecord(record)
+                    val header = ControlRecordUtils.deserializeSnapshotHeaderRecord(record)
                     print(s" SnapshotHeader ${SnapshotHeaderRecordJsonConverter.write(header, header.version())}")
                   case ControlRecordType.SNAPSHOT_FOOTER =>
-                    val footer = ControlRecordUtils.deserializedSnapshotFooterRecord(record)
+                    val footer = ControlRecordUtils.deserializeSnapshotFooterRecord(record)
                     print(s" SnapshotFooter ${SnapshotFooterRecordJsonConverter.write(footer, footer.version())}")
                   case controlType =>
                     print(s" controlType: $controlType($controlTypeId)")
diff --git a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
index 0f8f9ebc9fc..7852a0dc2a0 100644
--- a/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
+++ b/core/src/test/scala/kafka/raft/KafkaMetadataLogTest.scala
@@ -1041,6 +1041,7 @@ object KafkaMetadataLogTest {
       KafkaRaftServer.MetadataPartition,
       KafkaRaftServer.MetadataTopicId,
       logDir,
+      new ByteArraySerde,
       time,
       time.scheduler,
       metadataLogConfig
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index ea6d591cf33..4114330b058 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -250,7 +250,7 @@ class DumpLogSegmentsTest {
     )
 
     val records: Array[SimpleRecord] = metadataRecords.map(message => {
-      val serde = new MetadataRecordSerde()
+      val serde = MetadataRecordSerde.INSTANCE
       val cache = new ObjectSerializationCache
       val size = serde.recordSize(message, cache)
       val buf = ByteBuffer.allocate(size)
@@ -303,6 +303,7 @@ class DumpLogSegmentsTest {
       KafkaRaftServer.MetadataPartition,
       KafkaRaftServer.MetadataTopicId,
       logDir,
+      MetadataRecordSerde.INSTANCE,
       time,
       time.scheduler,
       MetadataLogConfig(
@@ -328,7 +329,7 @@ class DumpLogSegmentsTest {
         new MockTime,
         lastContainedLogTimestamp,
         CompressionType.NONE,
-        new MetadataRecordSerde
+        MetadataRecordSerde.INSTANCE,
       ).get()
     ) { snapshotWriter =>
       snapshotWriter.append(metadataRecords.asJava)
diff --git a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
index c7f651cf895..1738471692f 100644
--- a/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
+++ b/metadata/src/test/java/org/apache/kafka/image/loader/MetadataLoaderTest.java
@@ -18,9 +18,11 @@
 package org.apache.kafka.image.loader;
 
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.message.SnapshotHeaderRecord;
 import org.apache.kafka.common.metadata.FeatureLevelRecord;
 import org.apache.kafka.common.metadata.RemoveTopicRecord;
 import org.apache.kafka.common.metadata.TopicRecord;
+import org.apache.kafka.common.record.ControlRecordType;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.image.MetadataDelta;
 import org.apache.kafka.image.MetadataImage;
@@ -28,6 +30,7 @@ import org.apache.kafka.image.MetadataProvenance;
 import org.apache.kafka.image.publisher.MetadataPublisher;
 import org.apache.kafka.raft.Batch;
 import org.apache.kafka.raft.BatchReader;
+import org.apache.kafka.raft.ControlRecord;
 import org.apache.kafka.raft.LeaderAndEpoch;
 import org.apache.kafka.raft.OffsetAndEpoch;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
@@ -236,8 +239,17 @@ public class MetadataLoaderTest {
             loader.installPublishers(asList(publisher)).get();
             if (loadSnapshot) {
                 MockSnapshotReader snapshotReader = new MockSnapshotReader(
-                        new MetadataProvenance(200, 100, 4000),
-                        asList(Batch.control(200, 100, 4000, 10, 200)));
+                    new MetadataProvenance(200, 100, 4000),
+                    asList(
+                        Batch.control(
+                            200,
+                            100,
+                            4000,
+                            10,
+                            asList(new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord()))
+                        )
+                    )
+                );
                 loader.handleSnapshot(snapshotReader);
             }
             loader.waitForAllEventsToBeHandled();
@@ -329,8 +341,17 @@ public class MetadataLoaderTest {
         long offset
     ) throws Exception {
         MockSnapshotReader snapshotReader = new MockSnapshotReader(
-                new MetadataProvenance(offset, 100, 4000),
-                asList(Batch.control(200, 100, 4000, 10, 200)));
+            new MetadataProvenance(offset, 100, 4000),
+            asList(
+                Batch.control(
+                    200,
+                    100,
+                    4000,
+                    10,
+                    asList(new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord()))
+                )
+            )
+        );
         if (loader.time() instanceof MockTime) {
             snapshotReader.setTime((MockTime) loader.time());
         }
@@ -409,16 +430,25 @@ public class MetadataLoaderTest {
             loader.installPublishers(publishers).get();
             loadTestSnapshot(loader, 200);
             publishers.get(0).firstPublish.get(10, TimeUnit.SECONDS);
-            MockBatchReader batchReader = new MockBatchReader(300, asList(
-                    Batch.control(300, 100, 4000, 10, 400))).
-                    setTime(time);
+            MockBatchReader batchReader = new MockBatchReader(
+                300,
+                asList(
+                    Batch.control(
+                        300,
+                        100,
+                        4000,
+                        10,
+                        asList(new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord()))
+                    )
+                )
+            ).setTime(time);
             loader.handleCommit(batchReader);
             loader.waitForAllEventsToBeHandled();
             assertTrue(batchReader.closed);
-            assertEquals(400L, loader.lastAppliedOffset());
+            assertEquals(300L, loader.lastAppliedOffset());
         }
         assertTrue(publishers.get(0).closed);
-        assertEquals(new LogDeltaManifest(new MetadataProvenance(400, 100, 4000), LeaderAndEpoch.UNKNOWN, 1,
+        assertEquals(new LogDeltaManifest(new MetadataProvenance(300, 100, 4000), LeaderAndEpoch.UNKNOWN, 1,
                         3000000L, 10),
             publishers.get(0).latestLogDeltaManifest);
         assertEquals(MetadataVersion.IBP_3_3_IV1,
diff --git a/raft/src/main/java/org/apache/kafka/raft/Batch.java b/raft/src/main/java/org/apache/kafka/raft/Batch.java
index 685a75821cb..353723166d9 100644
--- a/raft/src/main/java/org/apache/kafka/raft/Batch.java
+++ b/raft/src/main/java/org/apache/kafka/raft/Batch.java
@@ -33,6 +33,7 @@ public final class Batch<T> implements Iterable<T> {
     private final int sizeInBytes;
     private final long lastOffset;
     private final List<T> records;
+    private final List<ControlRecord> controlRecords;
 
     private Batch(
         long baseOffset,
@@ -40,7 +41,8 @@ public final class Batch<T> implements Iterable<T> {
         long appendTimestamp,
         int sizeInBytes,
         long lastOffset,
-        List<T> records
+        List<T> records,
+        List<ControlRecord> controlRecords
     ) {
         this.baseOffset = baseOffset;
         this.epoch = epoch;
@@ -48,6 +50,7 @@ public final class Batch<T> implements Iterable<T> {
         this.sizeInBytes = sizeInBytes;
         this.lastOffset = lastOffset;
         this.records = records;
+        this.controlRecords = controlRecords;
     }
 
     /**
@@ -78,6 +81,13 @@ public final class Batch<T> implements Iterable<T> {
         return records;
     }
 
+    /**
+     * The list of control records in the batch.
+     */
+    public List<ControlRecord> controlRecords() {
+        return controlRecords;
+    }
+
     /**
      * The epoch of the leader that appended the record batch.
      */
@@ -106,6 +116,7 @@ public final class Batch<T> implements Iterable<T> {
             ", sizeInBytes=" + sizeInBytes +
             ", lastOffset=" + lastOffset +
             ", records=" + records +
+            ", controlRecords=" + controlRecords +
             ')';
     }
 
@@ -119,7 +130,8 @@ public final class Batch<T> implements Iterable<T> {
             appendTimestamp == batch.appendTimestamp &&
             sizeInBytes == batch.sizeInBytes &&
             lastOffset == batch.lastOffset &&
-            Objects.equals(records, batch.records);
+            Objects.equals(records, batch.records) &&
+            Objects.equals(controlRecords, batch.controlRecords);
     }
 
     @Override
@@ -130,7 +142,8 @@ public final class Batch<T> implements Iterable<T> {
             appendTimestamp,
             sizeInBytes,
             lastOffset,
-            records
+            records,
+            controlRecords
         );
     }
 
@@ -150,15 +163,26 @@ public final class Batch<T> implements Iterable<T> {
         int epoch,
         long appendTimestamp,
         int sizeInBytes,
-        long lastOffset
+        List<ControlRecord> records
     ) {
+        if (records.isEmpty()) {
+            throw new IllegalArgumentException(
+                String.format(
+                    "Control batch must contain at least one record; baseOffset = %s; epoch = %s",
+                    baseOffset,
+                    epoch
+                )
+            );
+        }
+
         return new Batch<>(
             baseOffset,
             epoch,
             appendTimestamp,
             sizeInBytes,
-            lastOffset,
-            Collections.emptyList()
+            baseOffset + records.size() - 1,
+            Collections.emptyList(),
+            records
         );
     }
 
@@ -194,7 +218,8 @@ public final class Batch<T> implements Iterable<T> {
             appendTimestamp,
             sizeInBytes,
             baseOffset + records.size() - 1,
-            records
+            records,
+            Collections.emptyList()
         );
     }
 }
diff --git a/raft/src/main/java/org/apache/kafka/raft/ControlRecord.java b/raft/src/main/java/org/apache/kafka/raft/ControlRecord.java
new file mode 100644
index 00000000000..bf685770720
--- /dev/null
+++ b/raft/src/main/java/org/apache/kafka/raft/ControlRecord.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import java.util.Objects;
+import org.apache.kafka.common.message.LeaderChangeMessage;
+import org.apache.kafka.common.message.SnapshotFooterRecord;
+import org.apache.kafka.common.message.SnapshotHeaderRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.record.ControlRecordType;
+
+public final class ControlRecord {
+    private final ControlRecordType recordType;
+    private final ApiMessage message;
+
+    private static void throwIllegalArgument(ControlRecordType recordType, ApiMessage message) {
+        throw new IllegalArgumentException(
+            String.format(
+                "Record type %s doesn't match message class %s",
+                recordType,
+                message.getClass()
+            )
+        );
+    }
+
+    public ControlRecord(ControlRecordType recordType, ApiMessage message) {
+        switch (recordType) {
+            case LEADER_CHANGE:
+                if (!(message instanceof LeaderChangeMessage)) {
+                    throwIllegalArgument(recordType, message);
+                }
+                break;
+            case SNAPSHOT_HEADER:
+                if (!(message instanceof SnapshotHeaderRecord)) {
+                    throwIllegalArgument(recordType, message);
+                }
+                break;
+            case SNAPSHOT_FOOTER:
+                if (!(message instanceof SnapshotFooterRecord)) {
+                    throwIllegalArgument(recordType, message);
+                }
+                break;
+            default:
+                throw new IllegalArgumentException(String.format("Unknown control record type %s", recordType));
+        }
+
+        this.recordType = recordType;
+        this.message = message;
+    }
+
+    public ControlRecordType type() {
+        return recordType;
+    }
+
+    public short version() {
+        switch (recordType) {
+            case LEADER_CHANGE:
+                return ((LeaderChangeMessage) message).version();
+            case SNAPSHOT_HEADER:
+                return ((SnapshotHeaderRecord) message).version();
+            case SNAPSHOT_FOOTER:
+                return ((SnapshotFooterRecord) message).version();
+            default:
+                throw new IllegalStateException(String.format("Unknown control record type %s", recordType));
+        }
+    }
+
+    public ApiMessage message() {
+        return message;
+    }
+
+    @Override
+    public boolean equals(Object other) {
+        if (this == other) return true;
+        if (other == null || getClass() != other.getClass()) return false;
+        ControlRecord that = (ControlRecord) other;
+        return Objects.equals(recordType, that.recordType) &&
+            Objects.equals(message, that.message);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(recordType, message);
+    }
+
+    @Override
+    public String toString() {
+        return String.format("ControlRecord(recordType=%s, message=%s)", recordType, message);
+    }
+}
diff --git a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
index efb1d69a34f..cb8f5762b9d 100644
--- a/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
+++ b/raft/src/main/java/org/apache/kafka/raft/internals/RecordsIterator.java
@@ -26,8 +26,11 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
 import java.util.Optional;
-
+import java.util.function.BiFunction;
+import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.apache.kafka.common.record.ControlRecordUtils;
 import org.apache.kafka.common.record.DefaultRecordBatch;
 import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.record.MemoryRecords;
@@ -37,6 +40,7 @@ import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.common.utils.ByteUtils;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.ControlRecord;
 import org.apache.kafka.server.common.serialization.RecordSerde;
 
 public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseable {
@@ -199,45 +203,56 @@ public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseab
             batch.ensureValid();
         }
 
+        Integer numRecords = batch.countOrNull();
+        if (numRecords == null) {
+            throw new IllegalStateException("Expected a record count for the records batch");
+        }
+
+        DataInputStream input = new DataInputStream(batch.recordInputStream(bufferSupplier));
+
         final Batch<T> result;
-        if (batch.isControlBatch()) {
-            result = Batch.control(
-                batch.baseOffset(),
-                batch.partitionLeaderEpoch(),
-                batch.maxTimestamp(),
-                batch.sizeInBytes(),
-                batch.lastOffset()
-            );
-        } else {
-            Integer numRecords = batch.countOrNull();
-            if (numRecords == null) {
-                throw new IllegalStateException("Expected a record count for the records batch");
-            }
+        try {
+            if (batch.isControlBatch()) {
+                List<ControlRecord> records = new ArrayList<>(numRecords);
+                for (int i = 0; i < numRecords; i++) {
+                    ControlRecord record = readRecord(input, batch.sizeInBytes(), RecordsIterator::decodeControlRecord);
+                    records.add(record);
+                }
 
-            List<T> records = new ArrayList<>(numRecords);
-            DataInputStream input = new DataInputStream(batch.recordInputStream(bufferSupplier));
-            try {
+                result = Batch.control(
+                    batch.baseOffset(),
+                    batch.partitionLeaderEpoch(),
+                    batch.maxTimestamp(),
+                    batch.sizeInBytes(),
+                    records
+                );
+            } else {
+                List<T> records = new ArrayList<>(numRecords);
                 for (int i = 0; i < numRecords; i++) {
-                    T record = readRecord(input, batch.sizeInBytes());
+                    T record = readRecord(input, batch.sizeInBytes(), this::decodeDataRecord);
                     records.add(record);
                 }
-            } finally {
-                Utils.closeQuietly(input, "DataInputStream");
-            }
 
-            result = Batch.data(
-                batch.baseOffset(),
-                batch.partitionLeaderEpoch(),
-                batch.maxTimestamp(),
-                batch.sizeInBytes(),
-                records
-            );
+                result = Batch.data(
+                    batch.baseOffset(),
+                    batch.partitionLeaderEpoch(),
+                    batch.maxTimestamp(),
+                    batch.sizeInBytes(),
+                    records
+                );
+            }
+        } finally {
+            Utils.closeQuietly(input, "DataInputStream");
         }
 
         return result;
     }
 
-    private T readRecord(DataInputStream stream, int totalBatchSize) {
+    private <U> U readRecord(
+        DataInputStream stream,
+        int totalBatchSize,
+        BiFunction<Optional<ByteBuffer>, Optional<ByteBuffer>, U> decoder
+    ) {
         // Read size of body in bytes
         int size;
         try {
@@ -281,20 +296,22 @@ public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseab
             // Read offset delta
             input.readVarint();
 
+            // Read the key
             int keySize = input.readVarint();
-            if (keySize != -1) {
-                throw new IllegalArgumentException("Got key size of " + keySize + ", but this is invalid because it " +
-                        "is not -1 as expected.");
+            Optional<ByteBuffer> key = Optional.empty();
+            if (keySize >= 0) {
+                key = Optional.of(input.readByteBuffer(keySize));
             }
 
+            // Read the value
             int valueSize = input.readVarint();
-            if (valueSize < 1) {
-                throw new IllegalArgumentException("Got payload size of " + valueSize + ", but this is invalid because " +
-                        "it is less than 1.");
+            Optional<ByteBuffer> value = Optional.empty();
+            if (valueSize >= 0) {
+                value = Optional.of(input.readByteBuffer(valueSize));
             }
 
             // Read the metadata record body from the file input reader
-            T record = serde.read(input, valueSize);
+            U record = decoder.apply(key, value);
 
             // Read the number of headers. Currently, this must be a single byte set to 0.
             int numHeaders = buf.array()[size - 1];
@@ -302,9 +319,59 @@ public final class RecordsIterator<T> implements Iterator<Batch<T>>, AutoCloseab
                 throw new IllegalArgumentException("Got numHeaders of " + numHeaders + ", but this is invalid because " +
                         "it is not 0 as expected.");
             }
+
             return record;
         } finally {
             bufferSupplier.release(buf);
         }
     }
+
+    private T decodeDataRecord(Optional<ByteBuffer> key, Optional<ByteBuffer> value) {
+        if (key.isPresent()) {
+            throw new IllegalArgumentException("Got key in the record when no key was expected");
+        }
+
+        if (!value.isPresent()) {
+            throw new IllegalArgumentException("Missing value in the record when a value was expected");
+        } else if (value.get().remaining() == 0) {
+            throw new IllegalArgumentException("Got an unexpected empty value in the record");
+        }
+
+        ByteBuffer valueBuffer = value.get();
+
+        return serde.read(new ByteBufferAccessor(valueBuffer), valueBuffer.remaining());
+    }
+
+    private static ControlRecord decodeControlRecord(Optional<ByteBuffer> key, Optional<ByteBuffer> value) {
+        if (!key.isPresent()) {
+            throw new IllegalArgumentException("Missing key in the record when a key was expected");
+        } else if (key.get().remaining() == 0) {
+            throw new IllegalArgumentException("Got an unexpected empty key in the record");
+        }
+
+        if (!value.isPresent()) {
+            throw new IllegalArgumentException("Missing value in the record when a value was expected");
+        } else if (value.get().remaining() == 0) {
+            throw new IllegalArgumentException("Got an unexpected empty value in the record");
+        }
+
+        ControlRecordType type = ControlRecordType.parse(key.get());
+
+        final ApiMessage message;
+        switch (type) {
+            case LEADER_CHANGE:
+                message = ControlRecordUtils.deserializeLeaderChangeMessage(value.get());
+                break;
+            case SNAPSHOT_HEADER:
+                message = ControlRecordUtils.deserializeSnapshotHeaderRecord(value.get());
+                break;
+            case SNAPSHOT_FOOTER:
+                message = ControlRecordUtils.deserializeSnapshotFooterRecord(value.get());
+                break;
+            default:
+                throw new IllegalArgumentException(String.format("Unknown control record type %s", type));
+        }
+
+        return new ControlRecord(type, message);
+    }
 }
diff --git a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java
index 71380038ad7..afd1928cfb6 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java
@@ -20,12 +20,13 @@ package org.apache.kafka.snapshot;
 import java.util.NoSuchElementException;
 import java.util.Optional;
 import java.util.OptionalLong;
-
+import org.apache.kafka.common.message.SnapshotHeaderRecord;
+import org.apache.kafka.common.record.ControlRecordType;
 import org.apache.kafka.common.utils.BufferSupplier;
 import org.apache.kafka.raft.Batch;
 import org.apache.kafka.raft.OffsetAndEpoch;
-import org.apache.kafka.server.common.serialization.RecordSerde;
 import org.apache.kafka.raft.internals.RecordsIterator;
+import org.apache.kafka.server.common.serialization.RecordSerde;
 
 public final class RecordsSnapshotReader<T> implements SnapshotReader<T> {
     private final OffsetAndEpoch snapshotId;
@@ -121,9 +122,22 @@ public final class RecordsSnapshotReader<T> implements SnapshotReader<T> {
             Batch<T> batch = iterator.next();
 
             if (!lastContainedLogTimestamp.isPresent()) {
-                // The Batch type doesn't support returning control batches. For now lets just use
-                // the append time of the first batch
-                lastContainedLogTimestamp = OptionalLong.of(batch.appendTimestamp());
+                // This must be the first batch which is expected to be a control batch with one record for
+                // the snapshot header.
+                if (batch.controlRecords().isEmpty()) {
+                    throw new IllegalStateException("First batch is not a control batch with at least one record");
+                } else if (!ControlRecordType.SNAPSHOT_HEADER.equals(batch.controlRecords().get(0).type())) {
+                    throw new IllegalStateException(
+                        String.format(
+                            "First control record is not a snapshot header (%s)",
+                            batch.controlRecords().get(0).type()
+                        )
+                    );
+                }
+
+                lastContainedLogTimestamp = OptionalLong.of(
+                    ((SnapshotHeaderRecord) batch.controlRecords().get(0).message()).lastContainedLogTimestamp()
+                );
             }
 
             if (!batch.records().isEmpty()) {
diff --git a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
index 05d3fde09d0..eeacf608a9f 100644
--- a/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
+++ b/raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java
@@ -131,7 +131,7 @@ final public class RecordsSnapshotWriter<T> implements SnapshotWriter<T> {
         );
     }
 
-    public static <T> SnapshotWriter<T> createWithHeader(
+    public static <T> RecordsSnapshotWriter<T> createWithHeader(
         RawSnapshotWriter rawSnapshotWriter,
         int maxBatchSize,
         MemoryPool memoryPool,
diff --git a/raft/src/test/java/org/apache/kafka/ControlRecordTest.java b/raft/src/test/java/org/apache/kafka/ControlRecordTest.java
new file mode 100644
index 00000000000..5ed500995e2
--- /dev/null
+++ b/raft/src/test/java/org/apache/kafka/ControlRecordTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.raft;
+
+import org.apache.kafka.common.message.LeaderChangeMessage;
+import org.apache.kafka.common.message.SnapshotFooterRecord;
+import org.apache.kafka.common.message.SnapshotHeaderRecord;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.record.ControlRecordType;
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public final class ControlRecordTest {
+    @Test
+    void testCtr() {
+        // Valid constructions
+        new ControlRecord(ControlRecordType.LEADER_CHANGE, new LeaderChangeMessage());
+        new ControlRecord(ControlRecordType.SNAPSHOT_HEADER, new SnapshotHeaderRecord());
+        new ControlRecord(ControlRecordType.SNAPSHOT_FOOTER, new SnapshotFooterRecord());
+
+        // Invalid constructions
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new ControlRecord(ControlRecordType.ABORT, new SnapshotFooterRecord())
+        );
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new ControlRecord(ControlRecordType.LEADER_CHANGE, new SnapshotHeaderRecord())
+        );
+        assertThrows(
+            IllegalArgumentException.class,
+            () -> new ControlRecord(ControlRecordType.SNAPSHOT_FOOTER, Mockito.mock(ApiMessage.class))
+        );
+    }
+
+    @Test
+    void testControlRecordTypeValues() {
+        // If this test fails then it means that ControlRecordType was changed. Please review the
+        // implementation for ControlRecord to see if it needs to be updated based on the changes
+        // to ControlRecordType.
+        assertEquals(6, ControlRecordType.values().length);
+    }
+}
diff --git a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
index 23fe3fd0694..4e9a377d5fd 100644
--- a/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/KafkaRaftClientSnapshotTest.java
@@ -1756,14 +1756,14 @@ final public class KafkaRaftClientSnapshotTest {
 
     private static SnapshotWriter<String> snapshotWriter(RaftClientTestContext context, RawSnapshotWriter snapshot) {
         return RecordsSnapshotWriter.createWithHeader(
-            () -> Optional.of(snapshot),
+            snapshot,
             4 * 1024,
             MemoryPool.NONE,
             context.time,
             0,
             CompressionType.NONE,
             new StringSerde()
-        ).get();
+        );
     }
 
     private final static class MemorySnapshotWriter implements RawSnapshotWriter {
diff --git a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
index 9676831b30e..67f16c9ac8f 100644
--- a/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
+++ b/raft/src/test/java/org/apache/kafka/raft/internals/RecordsIteratorTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.raft.internals;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.IdentityHashMap;
 import java.util.List;
@@ -26,20 +27,30 @@ import java.util.NoSuchElementException;
 import java.util.Objects;
 import java.util.Random;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 import net.jqwik.api.ForAll;
 import net.jqwik.api.Property;
 import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.apache.kafka.common.message.SnapshotFooterRecord;
+import org.apache.kafka.common.message.SnapshotHeaderRecord;
 import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.ControlRecordType;
 import org.apache.kafka.common.record.DefaultRecordBatch;
 import org.apache.kafka.common.record.FileRecords;
 import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.Records;
 import org.apache.kafka.common.utils.BufferSupplier;
+import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.raft.Batch;
+import org.apache.kafka.raft.OffsetAndEpoch;
 import org.apache.kafka.server.common.serialization.RecordSerde;
+import org.apache.kafka.snapshot.MockRawSnapshotWriter;
+import org.apache.kafka.snapshot.RecordsSnapshotWriter;
 import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
@@ -129,6 +140,62 @@ public final class RecordsIteratorTest {
         moreFileRecords.close();
     }
 
+    @Test
+    public void testControlRecordIteration() {
+        AtomicReference<ByteBuffer> buffer = new AtomicReference<>(null);
+        try (RecordsSnapshotWriter<String> snapshot = RecordsSnapshotWriter.createWithHeader(
+                new MockRawSnapshotWriter(new OffsetAndEpoch(100, 10), snapshotBuf -> buffer.set(snapshotBuf)),
+                4 * 1024,
+                MemoryPool.NONE,
+                new MockTime(),
+                0,
+                CompressionType.NONE,
+                STRING_SERDE
+            )
+        ) {
+            snapshot.append(Arrays.asList("a", "b", "c"));
+            snapshot.append(Arrays.asList("d", "e", "f"));
+            snapshot.append(Arrays.asList("g", "h", "i"));
+            snapshot.freeze();
+        }
+
+        try (RecordsIterator<String> iterator = createIterator(
+                MemoryRecords.readableRecords(buffer.get()),
+                BufferSupplier.NO_CACHING,
+                true
+            )
+        ) {
+            // Check snapshot header control record
+            Batch<String> batch = iterator.next();
+
+            assertEquals(1, batch.controlRecords().size());
+            assertEquals(ControlRecordType.SNAPSHOT_HEADER, batch.controlRecords().get(0).type());
+            assertEquals(new SnapshotHeaderRecord(), batch.controlRecords().get(0).message());
+
+            // Consume the iterator until we find a control record
+            do {
+                batch = iterator.next();
+            }
+            while (batch.controlRecords().isEmpty());
+
+            // Check snapshot footer control record
+            assertEquals(1, batch.controlRecords().size());
+            assertEquals(ControlRecordType.SNAPSHOT_FOOTER, batch.controlRecords().get(0).type());
+            assertEquals(new SnapshotFooterRecord(), batch.controlRecords().get(0).message());
+
+            // Snapshot footer must be last record
+            assertFalse(iterator.hasNext());
+        }
+    }
+
+    @Test
+    void testControlRecordTypeValues() {
+        // If this test fails then it means that ControlRecordType was changed. Please review the
+        // implementation for RecordsIterator to see if it needs to be updated based on the changes
+        // to ControlRecordType.
+        assertEquals(6, ControlRecordType.values().length);
+    }
+
     private void testIterator(
         List<TestBatch<String>> expectedBatches,
         Records records,
@@ -136,21 +203,21 @@ public final class RecordsIteratorTest {
     ) {
         Set<ByteBuffer> allocatedBuffers = Collections.newSetFromMap(new IdentityHashMap<>());
 
-        RecordsIterator<String> iterator = createIterator(
-            records,
-            mockBufferSupplier(allocatedBuffers),
-            validateCrc
-        );
+        try (RecordsIterator<String> iterator = createIterator(
+                records,
+                mockBufferSupplier(allocatedBuffers),
+                validateCrc
+            )
+        ) {
+            for (TestBatch<String> batch : expectedBatches) {
+                assertTrue(iterator.hasNext());
+                assertEquals(batch, TestBatch.from(iterator.next()));
+            }
 
-        for (TestBatch<String> batch : expectedBatches) {
-            assertTrue(iterator.hasNext());
-            assertEquals(batch, TestBatch.from(iterator.next()));
+            assertFalse(iterator.hasNext());
+            assertThrows(NoSuchElementException.class, iterator::next);
         }
 
-        assertFalse(iterator.hasNext());
-        assertThrows(NoSuchElementException.class, iterator::next);
-
-        iterator.close();
         assertEquals(Collections.emptySet(), allocatedBuffers);
     }
 
diff --git a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
index bd8258cb708..9417e5d44cd 100644
--- a/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
+++ b/raft/src/test/java/org/apache/kafka/snapshot/SnapshotWriterReaderTest.java
@@ -71,6 +71,8 @@ final public class SnapshotWriterReaderTest {
 
         // Verify that an empty snapshot has only the Header and Footer
         try (SnapshotReader<String> reader = readSnapshot(context, id, Integer.MAX_VALUE)) {
+            assertEquals(magicTimestamp, reader.lastContainedLogTimestamp());
+
             RawSnapshotReader snapshot = context.log.readSnapshot(id).get();
             int recordCount = validateDelimiters(snapshot, magicTimestamp);
             assertEquals((recordsPerBatch * batches) + delimiterCount, recordCount);
@@ -218,7 +220,7 @@ final public class SnapshotWriterReaderTest {
         Record record = records.next();
         countRecords += 1;
 
-        SnapshotHeaderRecord headerRecord = ControlRecordUtils.deserializedSnapshotHeaderRecord(record);
+        SnapshotHeaderRecord headerRecord = ControlRecordUtils.deserializeSnapshotHeaderRecord(record);
         assertEquals(headerRecord.version(), ControlRecordUtils.SNAPSHOT_HEADER_CURRENT_VERSION);
         assertEquals(headerRecord.lastContainedLogTimestamp(), lastContainedLogTime);
 
@@ -238,7 +240,7 @@ final public class SnapshotWriterReaderTest {
         // Verify existence of the footer record in the end
         assertTrue(batch.isControlBatch());
 
-        SnapshotFooterRecord footerRecord = ControlRecordUtils.deserializedSnapshotFooterRecord(record);
+        SnapshotFooterRecord footerRecord = ControlRecordUtils.deserializeSnapshotFooterRecord(record);
         assertEquals(footerRecord.version(), ControlRecordUtils.SNAPSHOT_FOOTER_CURRENT_VERSION);
 
         return countRecords;