You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/09/16 00:16:35 UTC

[GitHub] [kafka] junrao commented on a change in pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broker restarts.

junrao commented on a change in pull request #11058:
URL: https://github.com/apache/kafka/pull/11058#discussion_r707813690



##########
File path: storage/src/main/resources/message/RemoteLogSegmentMetadataRecordSnapshot.json
##########
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 0,
+  "type": "data",
+  "name": "RemoteLogSegmentMetadataRecordSnapshot",
+  "validVersions": "0",
+  "flexibleVersions": "none",

Review comment:
       Should we support flexible version from the beginning so that we could potentially support downgrade during future format changes?

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class represents the remote log data snapshot stored in a file for a specific topic partition. This is used by
+ * {@link TopicBasedRemoteLogMetadataManager} to store the remote log metadata received for a specific partition from
+ * remote log metadata topic. This will avoid reading the remote log metadata messages from the topic again when a
+ * broker restarts.
+ */
+public class RemoteLogMetadataSnapshotFile {
+    private static final Logger log = LoggerFactory.getLogger(RemoteLogMetadataSnapshotFile.class);
+
+    public static final String COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME = "remote_log_snapshot";
+
+    // header: <version:short><topicId:2 longs><metadata-partition:int><metadata-partition-offset:long>
+    // size: 2 + (8+8) + 4 + 8 = 30
+    private static final int HEADER_SIZE = 30;
+
+    private final File metadataStoreFile;
+
+    /**
+     * Creates a CommittedLogMetadataSnapshotFile instance backed by a file with the name `remote_log_snapshot` in
+     * the given {@code metadataStoreDir}. It creates the file if it does not exist.
+     *
+     * @param metadataStoreDir directory in which the snapshot file to be created.
+     */
+    RemoteLogMetadataSnapshotFile(Path metadataStoreDir) {
+        this.metadataStoreFile = new File(metadataStoreDir.toFile(), COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME);
+
+        // Create an empty file if it does not exist.
+        try {
+            boolean newFileCreated = metadataStoreFile.createNewFile();
+            log.info("Remote log metadata snapshot file: [{}], newFileCreated: [{}]", metadataStoreFile, newFileCreated);
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    /**
+     * Writes the given snapshot replacing the earlier snapshot data.
+     *
+     * @param snapshot Snapshot to be stored.
+     * @throws IOException if there4 is any error in writing the given snapshot to the file.
+     */
+    public synchronized void write(Snapshot snapshot) throws IOException {
+        File newMetadataSnapshotFile = new File(metadataStoreFile.getAbsolutePath() + ".new");
+        try (WritableByteChannel fileChannel = Channels.newChannel(new FileOutputStream(newMetadataSnapshotFile))) {
+
+            ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE);
+
+            // Write version
+            headerBuffer.putShort(snapshot.version);
+
+            // Write topic-id
+            headerBuffer.putLong(snapshot.topicId.getMostSignificantBits());
+            headerBuffer.putLong(snapshot.topicId.getLeastSignificantBits());
+
+            // Write metadata partition and metadata partition offset
+            headerBuffer.putInt(snapshot.metadataPartition);
+            headerBuffer.putLong(snapshot.metadataPartitionOffset);
+            headerBuffer.flip();
+
+            // Write header
+            fileChannel.write(headerBuffer);
+
+            // Write each entry
+            ByteBuffer lenBuffer = ByteBuffer.allocate(4);
+            RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+            for (RemoteLogSegmentMetadata remoteLogSegmentMetadata : snapshot.remoteLogMetadatas) {
+                final byte[] serializedBytes = serde.serialize(remoteLogSegmentMetadata);
+                // Write length
+                lenBuffer.putInt(serializedBytes.length);
+                lenBuffer.flip();
+                fileChannel.write(lenBuffer);
+                lenBuffer.rewind();
+
+                // Write data
+                fileChannel.write(ByteBuffer.wrap(serializedBytes));
+            }
+        }
+
+        Utils.atomicMoveWithFallback(newMetadataSnapshotFile.toPath(), metadataStoreFile.toPath());
+    }
+
+    /**
+     * @return the Snapshot if it exists.
+     * @throws IOException if there is any error in reading the stored snapshot.
+     */
+    @SuppressWarnings("unchecked")
+    public synchronized Optional<Snapshot> read() throws IOException {
+
+        // Checking for empty files.
+        if (metadataStoreFile.length() == 0) {
+            return Optional.empty();
+        }
+
+        try (ReadableByteChannel channel = Channels.newChannel(new FileInputStream(metadataStoreFile))) {
+
+            // Read header
+            ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE);
+            channel.read(headerBuffer);
+            headerBuffer.rewind();
+            short version = headerBuffer.getShort();
+            Uuid topicId = new Uuid(headerBuffer.getLong(), headerBuffer.getLong());
+            int metadataPartition = headerBuffer.getInt();
+            long metadataPartitionOffset = headerBuffer.getLong();
+
+            RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+            List<RemoteLogSegmentMetadata> result = new ArrayList<>();
+
+            ByteBuffer lenBuffer = ByteBuffer.allocate(4);
+            while (channel.read(lenBuffer) > 0) {
+                lenBuffer.rewind();
+                // Read the length of each entry
+                final int len = lenBuffer.getInt();
+                lenBuffer.rewind();
+
+                // Read the entry
+                ByteBuffer data = ByteBuffer.allocate(len);
+                final int read = channel.read(data);
+                if (read != len) {

Review comment:
       Should we do the same check when reading the lenBuffer?

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
##########
@@ -42,21 +45,36 @@
 public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHandler implements Closeable {
     private static final Logger log = LoggerFactory.getLogger(RemotePartitionMetadataStore.class);
 
+    private final Path logDir;
+
     private Map<TopicIdPartition, RemotePartitionDeleteMetadata> idToPartitionDeleteMetadata =
             new ConcurrentHashMap<>();
 
-    private Map<TopicIdPartition, RemoteLogMetadataCache> idToRemoteLogMetadataCache =
+    private Map<TopicIdPartition, FileBasedRemoteLogMetadataCache> idToRemoteLogMetadataCache =
             new ConcurrentHashMap<>();
 
+    public RemotePartitionMetadataStore(Path logDir) {
+        this.logDir = logDir;
+    }
+
     @Override
     public void handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
         log.debug("Adding remote log segment : [{}]", remoteLogSegmentMetadata);
 
-        RemoteLogSegmentId remoteLogSegmentId = remoteLogSegmentMetadata.remoteLogSegmentId();
+        final RemoteLogSegmentId remoteLogSegmentId = remoteLogSegmentMetadata.remoteLogSegmentId();
+        TopicIdPartition topicIdPartition = remoteLogSegmentId.topicIdPartition();
+
+        // This should have been already existing as it is loaded when the partitions are assigned.
+        RemoteLogMetadataCache remoteLogMetadataCache = idToRemoteLogMetadataCache.get(topicIdPartition);
+        if (remoteLogMetadataCache != null) {
+            remoteLogMetadataCache.addCopyInProgressSegment(remoteLogSegmentMetadata);
+        } else {
+            log.error("No partition metadata found for : " + topicIdPartition);

Review comment:
       Should we throw an exception?

##########
File path: storage/src/main/resources/message/RemoteLogSegmentMetadataRecordSnapshot.json
##########
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 0,
+  "type": "data",
+  "name": "RemoteLogSegmentMetadataRecordSnapshot",
+  "validVersions": "0",
+  "flexibleVersions": "none",
+  "fields": [
+    {
+      "name": "SegmentId",
+      "type": "uuid",
+      "versions": "0+",
+      "about": "Unique identifier of the log segment"
+    },
+    {
+      "name": "StartOffset",
+      "type": "int64",
+      "versions": "0+",
+      "about": "Start offset  of the segment."
+    },
+    {
+      "name": "EndOffset",
+      "type": "int64",
+      "versions": "0+",
+      "about": "End offset  of the segment."
+    },
+    {
+      "name": "BrokerId",
+      "type": "int32",
+      "versions": "0+",
+      "about": "Broker (controller or leader) id from which this event is created. DELETE_PARTITION_MARKED is sent by the controller. DELETE_PARTITION_STARTED and DELETE_PARTITION_FINISHED are sent by remote log metadata topic partition leader."
+    },
+    {
+      "name": "MaxTimestamp",
+      "type": "int64",
+      "versions": "0+",
+      "about": "Maximum timestamp with in this segment."
+    },
+    {
+      "name": "EventTimestamp",
+      "type": "int64",
+      "versions": "0+",
+      "about": "Event timestamp of this segment."
+    },
+    {
+      "name": "SegmentLeaderEpochs",
+      "type": "[]SegmentLeaderEpochEntry",
+      "versions": "0+",
+      "about": "Event timestamp of this segment.",

Review comment:
       This description of the field seems incorrect.

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##########
@@ -120,6 +174,29 @@ public void run() {
         }
     }
 
+    private void syncCommittedDataAndOffsets(boolean forceSync) {
+        boolean noOffsetUpdates = committedPartitionToConsumedOffsets.equals(partitionToConsumedOffsets);
+        if (noOffsetUpdates || !forceSync && time.milliseconds() - lastSyncedTimeMs < committedOffsetSyncIntervalMs) {
+            log.debug("Skip syncing committed offsets, noOffsetUpdates: {}, forceSync: {}", noOffsetUpdates, forceSync);
+            return;
+        }
+
+        try {
+            // todo sync the snapshot file
+            for (TopicIdPartition topicIdPartition : assignedTopicPartitions) {
+                int metadataPartition = topicPartitioner.metadataPartition(topicIdPartition);
+                remotePartitionMetadataEventHandler.syncLogMetadataDataFile(topicIdPartition, metadataPartition,
+                                                                            partitionToConsumedOffsets.get(metadataPartition));
+            }
+
+            committedOffsetsFile.writeEntries(partitionToConsumedOffsets);
+            committedPartitionToConsumedOffsets = new HashMap<>(partitionToConsumedOffsets);

Review comment:
       Hmm, why do we want to reset committedPartitionToConsumedOffsets? This means if there is no new data consumed, we will still write the checkpoint file?

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##########
@@ -85,21 +91,67 @@
     // Map of remote log metadata topic partition to consumed offsets.
     private final Map<Integer, Long> partitionToConsumedOffsets = new ConcurrentHashMap<>();
 
+    private Map<Integer, Long> committedPartitionToConsumedOffsets = Collections.emptyMap();
+
+    private final long committedOffsetSyncIntervalMs;
+    private CommittedOffsetsFile committedOffsetsFile;
+    private long lastSyncedTimeMs;
+
     public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
                         RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler,
-                        RemoteLogMetadataTopicPartitioner topicPartitioner) {
-        Objects.requireNonNull(consumer);
-        Objects.requireNonNull(remotePartitionMetadataEventHandler);
-        Objects.requireNonNull(topicPartitioner);
-
-        this.consumer = consumer;
-        this.remotePartitionMetadataEventHandler = remotePartitionMetadataEventHandler;
-        this.topicPartitioner = topicPartitioner;
+                        RemoteLogMetadataTopicPartitioner topicPartitioner,
+                        Path committedOffsetsPath,
+                        Time time,
+                        long committedOffsetSyncIntervalMs) {
+        this.consumer = Objects.requireNonNull(consumer);
+        this.remotePartitionMetadataEventHandler = Objects.requireNonNull(remotePartitionMetadataEventHandler);
+        this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
+        this.time = Objects.requireNonNull(time);
+        this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
+
+        initializeConsumerAssignment(committedOffsetsPath);
+    }
+
+    private void initializeConsumerAssignment(Path committedOffsetsPath) {
+        try {
+            committedOffsetsFile = new CommittedOffsetsFile(committedOffsetsPath.toFile());
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+
+        Map<Integer, Long> committedOffsets = Collections.emptyMap();
+        try {
+            // load committed offset and assign them in the consumer
+            committedOffsets = committedOffsetsFile.readEntries();

Review comment:
       Should we initialize committedPartitionToConsumedOffsets to the offsets in the checkpoint file?

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##########
@@ -66,6 +70,8 @@
     private final KafkaConsumer<byte[], byte[]> consumer;
     private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler;
     private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+    private final Time time;
+

Review comment:
       extra newline.

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class represents the remote log data snapshot stored in a file for a specific topic partition. This is used by
+ * {@link TopicBasedRemoteLogMetadataManager} to store the remote log metadata received for a specific partition from
+ * remote log metadata topic. This will avoid reading the remote log metadata messages from the topic again when a
+ * broker restarts.
+ */
+public class RemoteLogMetadataSnapshotFile {
+    private static final Logger log = LoggerFactory.getLogger(RemoteLogMetadataSnapshotFile.class);
+
+    public static final String COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME = "remote_log_snapshot";
+
+    // header: <version:short><topicId:2 longs><metadata-partition:int><metadata-partition-offset:long>
+    // size: 2 + (8+8) + 4 + 8 = 30
+    private static final int HEADER_SIZE = 30;
+
+    private final File metadataStoreFile;
+
+    /**
+     * Creates a CommittedLogMetadataSnapshotFile instance backed by a file with the name `remote_log_snapshot` in
+     * the given {@code metadataStoreDir}. It creates the file if it does not exist.
+     *
+     * @param metadataStoreDir directory in which the snapshot file to be created.
+     */
+    RemoteLogMetadataSnapshotFile(Path metadataStoreDir) {
+        this.metadataStoreFile = new File(metadataStoreDir.toFile(), COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME);
+
+        // Create an empty file if it does not exist.
+        try {
+            boolean newFileCreated = metadataStoreFile.createNewFile();
+            log.info("Remote log metadata snapshot file: [{}], newFileCreated: [{}]", metadataStoreFile, newFileCreated);
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    /**
+     * Writes the given snapshot replacing the earlier snapshot data.
+     *
+     * @param snapshot Snapshot to be stored.
+     * @throws IOException if there4 is any error in writing the given snapshot to the file.
+     */
+    public synchronized void write(Snapshot snapshot) throws IOException {
+        File newMetadataSnapshotFile = new File(metadataStoreFile.getAbsolutePath() + ".new");
+        try (WritableByteChannel fileChannel = Channels.newChannel(new FileOutputStream(newMetadataSnapshotFile))) {
+
+            ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE);
+
+            // Write version
+            headerBuffer.putShort(snapshot.version);
+
+            // Write topic-id
+            headerBuffer.putLong(snapshot.topicId.getMostSignificantBits());
+            headerBuffer.putLong(snapshot.topicId.getLeastSignificantBits());
+
+            // Write metadata partition and metadata partition offset
+            headerBuffer.putInt(snapshot.metadataPartition);
+            headerBuffer.putLong(snapshot.metadataPartitionOffset);
+            headerBuffer.flip();
+
+            // Write header
+            fileChannel.write(headerBuffer);
+
+            // Write each entry
+            ByteBuffer lenBuffer = ByteBuffer.allocate(4);
+            RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+            for (RemoteLogSegmentMetadata remoteLogSegmentMetadata : snapshot.remoteLogMetadatas) {
+                final byte[] serializedBytes = serde.serialize(remoteLogSegmentMetadata);
+                // Write length
+                lenBuffer.putInt(serializedBytes.length);
+                lenBuffer.flip();
+                fileChannel.write(lenBuffer);
+                lenBuffer.rewind();
+
+                // Write data
+                fileChannel.write(ByteBuffer.wrap(serializedBytes));
+            }
+        }
+
+        Utils.atomicMoveWithFallback(newMetadataSnapshotFile.toPath(), metadataStoreFile.toPath());
+    }
+
+    /**
+     * @return the Snapshot if it exists.
+     * @throws IOException if there is any error in reading the stored snapshot.
+     */
+    @SuppressWarnings("unchecked")
+    public synchronized Optional<Snapshot> read() throws IOException {
+
+        // Checking for empty files.
+        if (metadataStoreFile.length() == 0) {
+            return Optional.empty();
+        }
+
+        try (ReadableByteChannel channel = Channels.newChannel(new FileInputStream(metadataStoreFile))) {
+
+            // Read header
+            ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE);
+            channel.read(headerBuffer);
+            headerBuffer.rewind();
+            short version = headerBuffer.getShort();
+            Uuid topicId = new Uuid(headerBuffer.getLong(), headerBuffer.getLong());
+            int metadataPartition = headerBuffer.getInt();
+            long metadataPartitionOffset = headerBuffer.getLong();
+
+            RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+            List<RemoteLogSegmentMetadata> result = new ArrayList<>();
+
+            ByteBuffer lenBuffer = ByteBuffer.allocate(4);
+            while (channel.read(lenBuffer) > 0) {
+                lenBuffer.rewind();
+                // Read the length of each entry
+                final int len = lenBuffer.getInt();
+                lenBuffer.rewind();
+
+                // Read the entry
+                ByteBuffer data = ByteBuffer.allocate(len);
+                final int read = channel.read(data);
+                if (read != len) {
+                    throw new IOException("Invalid amount of data read, file may have been corrupted.");
+                }
+
+                // We are always adding RemoteLogSegmentMetadata only as you can see in #write() method.
+                // Did not add a specific serde for RemoteLogSegmentMetadata and reusing RemoteLogMetadataSerde
+                final RemoteLogSegmentMetadata remoteLogSegmentMetadata = (RemoteLogSegmentMetadata) serde.deserialize(data.array());
+                result.add(remoteLogSegmentMetadata);
+            }
+
+            return Optional.of(new Snapshot(version, topicId, metadataPartition, metadataPartitionOffset, result));
+        }
+    }
+
+    /**
+     * This class represents the collection of remote log metadata for a specific topic partition.
+     */
+    public static final class Snapshot {
+        private static final short CURRENT_VERSION = 0;
+
+        private final short version;
+        private final Uuid topicId;
+        private final int metadataPartition;
+        private final long metadataPartitionOffset;
+        private final Collection<RemoteLogSegmentMetadata> remoteLogMetadatas;

Review comment:
       remoteLogMetadatas is weird since data is the plural form of datum.

##########
File path: storage/src/main/resources/message/RemotePartitionDleteMetadataSnapshot.json
##########
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 1,
+  "type": "data",
+  "name": "RemotePartitionDeleteMetadataSnapshot",
+  "validVersions": "0",
+  "flexibleVersions": "none",
+  "fields": [
+    {
+      "name": "BrokerId",
+      "type": "int32",
+      "versions": "0+",
+      "about": "Broker (controller or leader) id from which this event is created. DELETE_PARTITION_MARKED is sent by the controller. DELETE_PARTITION_STARTED and DELETE_PARTITION_FINISHED are sent by remote log metadata topic partition leader."
+    },
+    {
+      "name": "EventTimestamp",
+      "type": "int64",
+      "versions": "0+",
+      "about": "Event timestamp of this segment."

Review comment:
       This is not about a segment.

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class represents the remote log data snapshot stored in a file for a specific topic partition. This is used by
+ * {@link TopicBasedRemoteLogMetadataManager} to store the remote log metadata received for a specific partition from
+ * remote log metadata topic. This will avoid reading the remote log metadata messages from the topic again when a
+ * broker restarts.
+ */
+public class RemoteLogMetadataSnapshotFile {
+    private static final Logger log = LoggerFactory.getLogger(RemoteLogMetadataSnapshotFile.class);
+
+    public static final String COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME = "remote_log_snapshot";
+
+    // header: <version:short><topicId:2 longs><metadata-partition:int><metadata-partition-offset:long>
+    // size: 2 + (8+8) + 4 + 8 = 30
+    private static final int HEADER_SIZE = 30;
+
+    private final File metadataStoreFile;
+
+    /**
+     * Creates a CommittedLogMetadataSnapshotFile instance backed by a file with the name `remote_log_snapshot` in
+     * the given {@code metadataStoreDir}. It creates the file if it does not exist.
+     *
+     * @param metadataStoreDir directory in which the snapshot file to be created.
+     */
+    RemoteLogMetadataSnapshotFile(Path metadataStoreDir) {
+        this.metadataStoreFile = new File(metadataStoreDir.toFile(), COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME);
+
+        // Create an empty file if it does not exist.
+        try {
+            boolean newFileCreated = metadataStoreFile.createNewFile();
+            log.info("Remote log metadata snapshot file: [{}], newFileCreated: [{}]", metadataStoreFile, newFileCreated);
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    /**
+     * Writes the given snapshot replacing the earlier snapshot data.
+     *
+     * @param snapshot Snapshot to be stored.
+     * @throws IOException if there4 is any error in writing the given snapshot to the file.
+     */
+    public synchronized void write(Snapshot snapshot) throws IOException {
+        File newMetadataSnapshotFile = new File(metadataStoreFile.getAbsolutePath() + ".new");

Review comment:
       Our convention is to create a .tmp file.

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##########
@@ -177,8 +254,9 @@ private void handleRemoteLogMetadata(RemoteLogMetadata remoteLogMetadata) {
 
     private void executeReassignment(Set<Integer> assignedMetaPartitionsSnapshot) {
         Set<TopicPartition> assignedMetaTopicPartitions = assignedMetaPartitionsSnapshot.stream()
-                .map(partitionNum -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partitionNum))
-                .collect(Collectors.toSet());
+                                                                                        .map(partitionNum -> new TopicPartition(

Review comment:
       The line is quite long. Could we move assignedMetaPartitionsSnapshot to a new line?

##########
File path: storage/src/main/resources/message/RemoteLogSegmentMetadataRecordSnapshot.json
##########
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 0,
+  "type": "data",
+  "name": "RemoteLogSegmentMetadataRecordSnapshot",
+  "validVersions": "0",
+  "flexibleVersions": "none",
+  "fields": [
+    {
+      "name": "SegmentId",
+      "type": "uuid",
+      "versions": "0+",
+      "about": "Unique identifier of the log segment"
+    },
+    {
+      "name": "StartOffset",
+      "type": "int64",
+      "versions": "0+",
+      "about": "Start offset  of the segment."
+    },
+    {
+      "name": "EndOffset",
+      "type": "int64",
+      "versions": "0+",
+      "about": "End offset  of the segment."
+    },
+    {
+      "name": "BrokerId",
+      "type": "int32",
+      "versions": "0+",
+      "about": "Broker (controller or leader) id from which this event is created. DELETE_PARTITION_MARKED is sent by the controller. DELETE_PARTITION_STARTED and DELETE_PARTITION_FINISHED are sent by remote log metadata topic partition leader."

Review comment:
       The description seems to be intended for RemotePartitionDeleteMetadataSnapshot?

##########
File path: storage/src/main/resources/message/RemoteLogSegmentMetadataRecordSnapshot.json
##########
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 0,
+  "type": "data",
+  "name": "RemoteLogSegmentMetadataRecordSnapshot",

Review comment:
       How is this class used?

##########
File path: checkstyle/import-control.xml
##########
@@ -303,6 +303,12 @@
       <allow pkg="org.apache.kafka.server.common" />
       <allow pkg="org.apache.kafka.server.log" />
       <allow pkg="org.apache.kafka.test" />
+
+      <subpackage name="remote">
+        <allow pkg="scala.collection" />
+        <allow pkg="scala.jdk" />

Review comment:
       Is scala.jdk needed?

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##########
@@ -120,6 +174,29 @@ public void run() {
         }
     }
 
+    private void syncCommittedDataAndOffsets(boolean forceSync) {

Review comment:
       syncCommittedDataAndOffsets => maybeSyncCommittedDataAndOffsets ?

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##########
@@ -120,6 +174,29 @@ public void run() {
         }
     }
 
+    private void syncCommittedDataAndOffsets(boolean forceSync) {
+        boolean noOffsetUpdates = committedPartitionToConsumedOffsets.equals(partitionToConsumedOffsets);
+        if (noOffsetUpdates || !forceSync && time.milliseconds() - lastSyncedTimeMs < committedOffsetSyncIntervalMs) {
+            log.debug("Skip syncing committed offsets, noOffsetUpdates: {}, forceSync: {}", noOffsetUpdates, forceSync);
+            return;
+        }
+
+        try {
+            // todo sync the snapshot file
+            for (TopicIdPartition topicIdPartition : assignedTopicPartitions) {
+                int metadataPartition = topicPartitioner.metadataPartition(topicIdPartition);
+                remotePartitionMetadataEventHandler.syncLogMetadataDataFile(topicIdPartition, metadataPartition,
+                                                                            partitionToConsumedOffsets.get(metadataPartition));
+            }
+
+            committedOffsetsFile.writeEntries(partitionToConsumedOffsets);
+            committedPartitionToConsumedOffsets = new HashMap<>(partitionToConsumedOffsets);
+            lastSyncedTimeMs = time.milliseconds();
+        } catch (IOException e) {
+            log.error("Error encountered while writing committed offsets to a local file", e);

Review comment:
       Could we integrate this with logDirFailureChannel?

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##########
@@ -120,6 +174,29 @@ public void run() {
         }
     }
 
+    private void syncCommittedDataAndOffsets(boolean forceSync) {
+        boolean noOffsetUpdates = committedPartitionToConsumedOffsets.equals(partitionToConsumedOffsets);
+        if (noOffsetUpdates || !forceSync && time.milliseconds() - lastSyncedTimeMs < committedOffsetSyncIntervalMs) {
+            log.debug("Skip syncing committed offsets, noOffsetUpdates: {}, forceSync: {}", noOffsetUpdates, forceSync);
+            return;
+        }
+
+        try {
+            // todo sync the snapshot file

Review comment:
       Should this todo be addressed in this PR?

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
##########
@@ -91,6 +109,23 @@ public void handleRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata re
         }
     }
 
+    @Override
+    public void syncLogMetadataDataFile(TopicIdPartition topicIdPartition,
+                                        int metadataPartition,
+                                        Long metadataPartitionOffset) throws IOException {
+        //todo-tier write partitions

Review comment:
       Will the todo be addressed in this PR?

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##########
@@ -85,21 +91,67 @@
     // Map of remote log metadata topic partition to consumed offsets.
     private final Map<Integer, Long> partitionToConsumedOffsets = new ConcurrentHashMap<>();
 
+    private Map<Integer, Long> committedPartitionToConsumedOffsets = Collections.emptyMap();
+
+    private final long committedOffsetSyncIntervalMs;
+    private CommittedOffsetsFile committedOffsetsFile;
+    private long lastSyncedTimeMs;
+
     public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
                         RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler,
-                        RemoteLogMetadataTopicPartitioner topicPartitioner) {
-        Objects.requireNonNull(consumer);
-        Objects.requireNonNull(remotePartitionMetadataEventHandler);
-        Objects.requireNonNull(topicPartitioner);
-
-        this.consumer = consumer;
-        this.remotePartitionMetadataEventHandler = remotePartitionMetadataEventHandler;
-        this.topicPartitioner = topicPartitioner;
+                        RemoteLogMetadataTopicPartitioner topicPartitioner,
+                        Path committedOffsetsPath,
+                        Time time,
+                        long committedOffsetSyncIntervalMs) {
+        this.consumer = Objects.requireNonNull(consumer);
+        this.remotePartitionMetadataEventHandler = Objects.requireNonNull(remotePartitionMetadataEventHandler);
+        this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
+        this.time = Objects.requireNonNull(time);
+        this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
+
+        initializeConsumerAssignment(committedOffsetsPath);
+    }
+
+    private void initializeConsumerAssignment(Path committedOffsetsPath) {
+        try {
+            committedOffsetsFile = new CommittedOffsetsFile(committedOffsetsPath.toFile());
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+
+        Map<Integer, Long> committedOffsets = Collections.emptyMap();
+        try {
+            // load committed offset and assign them in the consumer
+            committedOffsets = committedOffsetsFile.readEntries();
+        } catch (IOException e) {
+            // Ignore the error and consumer consumes from the earliest offset.
+            log.error("Encountered error while building committed offsets from the file", e);
+        }
+
+        final Set<Map.Entry<Integer, Long>> entries = committedOffsets.entrySet();
+
+        if (!entries.isEmpty()) {
+            // assign topic partitions from the earlier committed offsets file.
+            Set<Integer> earlierAssignedPartitions = committedOffsets.keySet();
+            assignedMetaPartitions = Collections.unmodifiableSet(earlierAssignedPartitions);
+            Set<TopicPartition> metadataTopicPartitions = earlierAssignedPartitions.stream()
+                                                                                   .map(x -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, x))
+                                                                                   .collect(Collectors.toSet());
+            consumer.assign(metadataTopicPartitions);
+
+            // Seek to the committed offsets
+            for (Map.Entry<Integer, Long> entry : entries) {
+                partitionToConsumedOffsets.put(entry.getKey(), entry.getValue());
+                consumer.seek(new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, entry.getKey()), entry.getValue());
+            }
+

Review comment:
       extra new line.

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
##########
@@ -161,53 +162,46 @@ public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate metada
                 throw new IllegalArgumentException("metadataUpdate: " + metadataUpdate + " with state " + RemoteLogSegmentState.COPY_SEGMENT_STARTED +
                                                    " can not be updated");
             case COPY_SEGMENT_FINISHED:
-                handleSegmentWithCopySegmentFinishedState(metadataUpdate, existingMetadata);
+                handleSegmentWithCopySegmentFinishedState(existingMetadata.createWithUpdates(metadataUpdate));
                 break;
             case DELETE_SEGMENT_STARTED:
-                handleSegmentWithDeleteSegmentStartedState(metadataUpdate, existingMetadata);
+                handleSegmentWithDeleteSegmentStartedState(existingMetadata.createWithUpdates(metadataUpdate));
                 break;
             case DELETE_SEGMENT_FINISHED:
-                handleSegmentWithDeleteSegmentFinishedState(metadataUpdate, existingMetadata);
+                handleSegmentWithDeleteSegmentFinishedState(existingMetadata.createWithUpdates(metadataUpdate));
                 break;
             default:
                 throw new IllegalArgumentException("Metadata with the state " + targetState + " is not supported");
         }
     }
 
-    private void handleSegmentWithCopySegmentFinishedState(RemoteLogSegmentMetadataUpdate metadataUpdate,
-                                                           RemoteLogSegmentMetadata existingMetadata) {
-        log.debug("Adding remote log segment metadata to leader epoch mappings with update: [{}]", metadataUpdate);
-
-        doHandleSegmentStateTransitionForLeaderEpochs(existingMetadata,
+    private void handleSegmentWithCopySegmentFinishedState(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
+        doHandleSegmentStateTransitionForLeaderEpochs(remoteLogSegmentMetadata,

Review comment:
       So, the current code has a bug by using existingMetadata?

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
##########
@@ -91,6 +109,23 @@ public void handleRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata re
         }
     }
 
+    @Override
+    public void syncLogMetadataDataFile(TopicIdPartition topicIdPartition,
+                                        int metadataPartition,
+                                        Long metadataPartitionOffset) throws IOException {
+        //todo-tier write partitions
+        RemotePartitionDeleteMetadata partitionDeleteMetadata = idToPartitionDeleteMetadata.get(topicIdPartition);
+        if (partitionDeleteMetadata != null) {
+            log.info("Skipping syncing of metadata snapshot as remote partition [{}] is with state: [{}] ", topicIdPartition,
+                     partitionDeleteMetadata);
+        } else {

Review comment:
       Hmm, until the partition deletion completes, it seems that we still want to persist the segment list so that we could finish deleting them from the remote store?

##########
File path: storage/src/main/resources/message/RemotePartitionDleteMetadataSnapshot.json
##########
@@ -0,0 +1,42 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 1,
+  "type": "data",
+  "name": "RemotePartitionDeleteMetadataSnapshot",

Review comment:
       How is this class used?

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
##########
@@ -81,12 +83,21 @@
     // requests calling different methods which use the resources like producer/consumer managers.
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
-    private final RemotePartitionMetadataStore remotePartitionMetadataStore = new RemotePartitionMetadataStore();
+    private RemotePartitionMetadataStore remotePartitionMetadataStore;
     private volatile TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
     private volatile RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner;
     private final Set<TopicIdPartition> pendingAssignPartitions = Collections.synchronizedSet(new HashSet<>());
     private volatile boolean initializationFailed;
 
+    public TopicBasedRemoteLogMetadataManager() {

Review comment:
       This seems unused?

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java
##########
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class represents the remote log data snapshot stored in a file for a specific topic partition. This is used by
+ * {@link TopicBasedRemoteLogMetadataManager} to store the remote log metadata received for a specific partition from
+ * remote log metadata topic. This will avoid reading the remote log metadata messages from the topic again when a
+ * broker restarts.
+ */
+public class RemoteLogMetadataSnapshotFile {
+    private static final Logger log = LoggerFactory.getLogger(RemoteLogMetadataSnapshotFile.class);
+
+    public static final String COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME = "remote_log_snapshot";
+
+    // header: <version:short><topicId:2 longs><metadata-partition:int><metadata-partition-offset:long>
+    // size: 2 + (8+8) + 4 + 8 = 30
+    private static final int HEADER_SIZE = 30;
+
+    private final File metadataStoreFile;
+
+    /**
+     * Creates a CommittedLogMetadataSnapshotFile instance backed by a file with the name `remote_log_snapshot` in
+     * the given {@code metadataStoreDir}. It creates the file if it does not exist.
+     *
+     * @param metadataStoreDir directory in which the snapshot file to be created.
+     */
+    RemoteLogMetadataSnapshotFile(Path metadataStoreDir) {
+        this.metadataStoreFile = new File(metadataStoreDir.toFile(), COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME);
+
+        // Create an empty file if it does not exist.
+        try {
+            boolean newFileCreated = metadataStoreFile.createNewFile();
+            log.info("Remote log metadata snapshot file: [{}], newFileCreated: [{}]", metadataStoreFile, newFileCreated);
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    /**
+     * Writes the given snapshot replacing the earlier snapshot data.
+     *
+     * @param snapshot Snapshot to be stored.
+     * @throws IOException if there4 is any error in writing the given snapshot to the file.
+     */
+    public synchronized void write(Snapshot snapshot) throws IOException {
+        File newMetadataSnapshotFile = new File(metadataStoreFile.getAbsolutePath() + ".new");
+        try (WritableByteChannel fileChannel = Channels.newChannel(new FileOutputStream(newMetadataSnapshotFile))) {
+
+            ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE);
+
+            // Write version
+            headerBuffer.putShort(snapshot.version);
+
+            // Write topic-id
+            headerBuffer.putLong(snapshot.topicId.getMostSignificantBits());
+            headerBuffer.putLong(snapshot.topicId.getLeastSignificantBits());
+
+            // Write metadata partition and metadata partition offset
+            headerBuffer.putInt(snapshot.metadataPartition);
+            headerBuffer.putLong(snapshot.metadataPartitionOffset);
+            headerBuffer.flip();
+
+            // Write header
+            fileChannel.write(headerBuffer);
+
+            // Write each entry
+            ByteBuffer lenBuffer = ByteBuffer.allocate(4);
+            RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+            for (RemoteLogSegmentMetadata remoteLogSegmentMetadata : snapshot.remoteLogMetadatas) {
+                final byte[] serializedBytes = serde.serialize(remoteLogSegmentMetadata);
+                // Write length
+                lenBuffer.putInt(serializedBytes.length);
+                lenBuffer.flip();
+                fileChannel.write(lenBuffer);
+                lenBuffer.rewind();
+
+                // Write data
+                fileChannel.write(ByteBuffer.wrap(serializedBytes));
+            }
+        }
+
+        Utils.atomicMoveWithFallback(newMetadataSnapshotFile.toPath(), metadataStoreFile.toPath());
+    }
+
+    /**
+     * @return the Snapshot if it exists.
+     * @throws IOException if there is any error in reading the stored snapshot.
+     */
+    @SuppressWarnings("unchecked")
+    public synchronized Optional<Snapshot> read() throws IOException {
+
+        // Checking for empty files.
+        if (metadataStoreFile.length() == 0) {
+            return Optional.empty();
+        }
+
+        try (ReadableByteChannel channel = Channels.newChannel(new FileInputStream(metadataStoreFile))) {
+
+            // Read header
+            ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE);
+            channel.read(headerBuffer);
+            headerBuffer.rewind();
+            short version = headerBuffer.getShort();
+            Uuid topicId = new Uuid(headerBuffer.getLong(), headerBuffer.getLong());
+            int metadataPartition = headerBuffer.getInt();
+            long metadataPartitionOffset = headerBuffer.getLong();
+
+            RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+            List<RemoteLogSegmentMetadata> result = new ArrayList<>();
+
+            ByteBuffer lenBuffer = ByteBuffer.allocate(4);
+            while (channel.read(lenBuffer) > 0) {
+                lenBuffer.rewind();
+                // Read the length of each entry
+                final int len = lenBuffer.getInt();
+                lenBuffer.rewind();
+
+                // Read the entry
+                ByteBuffer data = ByteBuffer.allocate(len);
+                final int read = channel.read(data);
+                if (read != len) {
+                    throw new IOException("Invalid amount of data read, file may have been corrupted.");
+                }
+
+                // We are always adding RemoteLogSegmentMetadata only as you can see in #write() method.
+                // Did not add a specific serde for RemoteLogSegmentMetadata and reusing RemoteLogMetadataSerde
+                final RemoteLogSegmentMetadata remoteLogSegmentMetadata = (RemoteLogSegmentMetadata) serde.deserialize(data.array());
+                result.add(remoteLogSegmentMetadata);
+            }
+
+            return Optional.of(new Snapshot(version, topicId, metadataPartition, metadataPartitionOffset, result));
+        }
+    }
+
+    /**
+     * This class represents the collection of remote log metadata for a specific topic partition.
+     */
+    public static final class Snapshot {
+        private static final short CURRENT_VERSION = 0;
+
+        private final short version;
+        private final Uuid topicId;
+        private final int metadataPartition;
+        private final long metadataPartitionOffset;
+        private final Collection<RemoteLogSegmentMetadata> remoteLogMetadatas;
+
+        public Snapshot(Uuid topicId,
+                        int metadataPartition,
+                        long metadataPartitionOffset,
+                        Collection<RemoteLogSegmentMetadata> remoteLogMetadatas) {
+            this(CURRENT_VERSION, topicId, metadataPartition, metadataPartitionOffset, remoteLogMetadatas);
+        }
+
+        public Snapshot(short version,
+                        Uuid topicId,
+                        int metadataPartition,
+                        long metadataPartitionOffset,
+                        Collection<RemoteLogSegmentMetadata> remoteLogMetadatas) {
+            this.version = version;
+            this.topicId = topicId;
+            this.metadataPartition = metadataPartition;
+            this.metadataPartitionOffset = metadataPartitionOffset;
+            this.remoteLogMetadatas = remoteLogMetadatas;
+        }
+
+        public short version() {
+            return version;
+        }
+
+        public Uuid topicId() {
+            return topicId;
+        }
+
+        public int metadataPartition() {
+            return metadataPartition;
+        }
+
+        public long metadataPartitionOffset() {

Review comment:
       This seems unused?

##########
File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
##########
@@ -161,53 +162,46 @@ public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate metada
                 throw new IllegalArgumentException("metadataUpdate: " + metadataUpdate + " with state " + RemoteLogSegmentState.COPY_SEGMENT_STARTED +
                                                    " can not be updated");
             case COPY_SEGMENT_FINISHED:
-                handleSegmentWithCopySegmentFinishedState(metadataUpdate, existingMetadata);
+                handleSegmentWithCopySegmentFinishedState(existingMetadata.createWithUpdates(metadataUpdate));
                 break;
             case DELETE_SEGMENT_STARTED:
-                handleSegmentWithDeleteSegmentStartedState(metadataUpdate, existingMetadata);
+                handleSegmentWithDeleteSegmentStartedState(existingMetadata.createWithUpdates(metadataUpdate));
                 break;
             case DELETE_SEGMENT_FINISHED:
-                handleSegmentWithDeleteSegmentFinishedState(metadataUpdate, existingMetadata);
+                handleSegmentWithDeleteSegmentFinishedState(existingMetadata.createWithUpdates(metadataUpdate));
                 break;
             default:
                 throw new IllegalArgumentException("Metadata with the state " + targetState + " is not supported");
         }
     }
 
-    private void handleSegmentWithCopySegmentFinishedState(RemoteLogSegmentMetadataUpdate metadataUpdate,
-                                                           RemoteLogSegmentMetadata existingMetadata) {
-        log.debug("Adding remote log segment metadata to leader epoch mappings with update: [{}]", metadataUpdate);
-
-        doHandleSegmentStateTransitionForLeaderEpochs(existingMetadata,
+    private void handleSegmentWithCopySegmentFinishedState(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
+        doHandleSegmentStateTransitionForLeaderEpochs(remoteLogSegmentMetadata,
                 RemoteLogLeaderEpochState::handleSegmentWithCopySegmentFinishedState);
 
         // Put the entry with the updated metadata.
-        idToSegmentMetadata.put(existingMetadata.remoteLogSegmentId(),
-                existingMetadata.createWithUpdates(metadataUpdate));
+        idToSegmentMetadata.put(remoteLogSegmentMetadata.remoteLogSegmentId(), remoteLogSegmentMetadata);

Review comment:
       This is an existing issue. In RemoteLogLeaderEpochState, it seems that methods like handleSegmentWithDeleteSegmentStartedState() and handleSegmentWithDeleteSegmentFinishedState() have unused input params?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org