You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2021/10/11 17:26:20 UTC

[kafka] branch trunk updated: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broker restarts. (#11058)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 34d56dc  KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broker restarts. (#11058)
34d56dc is described below

commit 34d56dc8d00bd27955eb9bb6ac01d5ae7f134dbd
Author: Satish Duggana <sa...@apache.org>
AuthorDate: Mon Oct 11 22:54:55 2021 +0530

    KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broker restarts. (#11058)
    
    Added snapshots for consumed remote log metadata for each partition to avoid consuming again in case of broker restarts. These snapshots are stored in the respective topic partition log directories.
    
    Reviewers: Kowshik Prakasam <kp...@confluent.io>, Cong Ding <co...@ccding.com>, Jun Rao <ju...@gmail.com>
---
 checkstyle/import-control.xml                      |   5 +
 .../remote/storage/RemoteLogMetadataManager.java   |   2 +-
 .../remote/storage/RemoteLogSegmentMetadata.java   |  18 +-
 .../log/remote/storage/RemoteLogSegmentState.java  |   2 +-
 .../metadata/storage/CommittedOffsetsFile.java     |  86 +++++++
 .../remote/metadata/storage/ConsumerManager.java   |  10 +-
 .../log/remote/metadata/storage/ConsumerTask.java  | 157 ++++++++++--
 .../storage/FileBasedRemoteLogMetadataCache.java   | 109 +++++++++
 .../storage/RemoteLogLeaderEpochState.java         |  24 +-
 .../metadata/storage/RemoteLogMetadataCache.java   |  74 +++---
 .../storage/RemoteLogMetadataSnapshotFile.java     | 267 +++++++++++++++++++++
 .../storage/RemoteLogSegmentMetadataSnapshot.java  | 209 ++++++++++++++++
 .../RemotePartitionMetadataEventHandler.java       |  10 +
 .../storage/RemotePartitionMetadataStore.java      |  63 ++++-
 .../TopicBasedRemoteLogMetadataManager.java        |  48 +++-
 .../TopicBasedRemoteLogMetadataManagerConfig.java  |  11 +
 .../serialization/RemoteLogMetadataSerde.java      |   5 +
 .../RemoteLogSegmentMetadataSnapshotTransform.java |  73 ++++++
 .../RemoteLogSegmentMetadataSnapshotRecord.json    |  92 +++++++
 .../FileBasedRemoteLogMetadataCacheTest.java       |  86 +++++++
 .../storage/RemoteLogMetadataSnapshotFileTest.java |  82 +++++++
 .../storage/RemoteLogSegmentLifecycleTest.java     |  16 +-
 ...picBasedRemoteLogMetadataManagerConfigTest.java |   6 +-
 .../TopicBasedRemoteLogMetadataManagerHarness.java |  48 +++-
 ...icBasedRemoteLogMetadataManagerRestartTest.java | 183 ++++++++++++++
 .../TopicBasedRemoteLogMetadataManagerTest.java    |  56 +++--
 ...RemoteLogMetadataManagerWrapperWithHarness.java |  25 +-
 .../storage/RemoteLogMetadataManagerTest.java      |   1 -
 28 files changed, 1626 insertions(+), 142 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index ba3087b..89d3484 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -305,6 +305,11 @@
       <allow pkg="org.apache.kafka.server.common" />
       <allow pkg="org.apache.kafka.server.log" />
       <allow pkg="org.apache.kafka.test" />
+
+      <subpackage name="remote">
+        <allow pkg="scala.collection" />
+      </subpackage>
+
     </subpackage>
   </subpackage>
 
diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
index 1bf04b3..9a29746 100644
--- a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
+++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManager.java
@@ -73,7 +73,7 @@ public interface RemoteLogMetadataManager extends Configurable, Closeable {
      * <p>
      * <pre>
      * +---------------------+            +----------------------+
-     * |COPY_SEGMENT_STARTED |----------->|COPY_SEGMENT_FINISHED |
+     * |COPY_SEGMENT_STARTED |-----------&gt;|COPY_SEGMENT_FINISHED |
      * +-------------------+-+            +--+-------------------+
      *                     |                 |
      *                     |                 |
diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
index 2290fd9..e0cbb79 100644
--- a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
+++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentMetadata.java
@@ -87,15 +87,15 @@ public class RemoteLogSegmentMetadata extends RemoteLogMetadata {
      * @param state               State of the respective segment of remoteLogSegmentId.
      * @param segmentLeaderEpochs leader epochs occurred within this segment.
      */
-    private RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId,
-                                     long startOffset,
-                                     long endOffset,
-                                     long maxTimestampMs,
-                                     int brokerId,
-                                     long eventTimestampMs,
-                                     int segmentSizeInBytes,
-                                     RemoteLogSegmentState state,
-                                     Map<Integer, Long> segmentLeaderEpochs) {
+    public RemoteLogSegmentMetadata(RemoteLogSegmentId remoteLogSegmentId,
+                                    long startOffset,
+                                    long endOffset,
+                                    long maxTimestampMs,
+                                    int brokerId,
+                                    long eventTimestampMs,
+                                    int segmentSizeInBytes,
+                                    RemoteLogSegmentState state,
+                                    Map<Integer, Long> segmentLeaderEpochs) {
         super(brokerId, eventTimestampMs);
         this.remoteLogSegmentId = Objects.requireNonNull(remoteLogSegmentId, "remoteLogSegmentId can not be null");
         this.state = Objects.requireNonNull(state, "state can not be null");
diff --git a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentState.java b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentState.java
index bf0befe..c618321 100644
--- a/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentState.java
+++ b/storage/api/src/main/java/org/apache/kafka/server/log/remote/storage/RemoteLogSegmentState.java
@@ -34,7 +34,7 @@ import java.util.stream.Collectors;
  * <p>
  * <pre>
  * +---------------------+            +----------------------+
- * |COPY_SEGMENT_STARTED |----------> |COPY_SEGMENT_FINISHED |
+ * |COPY_SEGMENT_STARTED |----------&gt; |COPY_SEGMENT_FINISHED |
  * +-------------------+-+            +--+-------------------+
  *                     |                 |
  *                     |                 |
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/CommittedOffsetsFile.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/CommittedOffsetsFile.java
new file mode 100644
index 0000000..1eddc0b
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/CommittedOffsetsFile.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Pattern;
+
+/**
+ * This class represents a file containing the committed offsets of remote log metadata partitions.
+ */
+public class CommittedOffsetsFile {
+    private static final int CURRENT_VERSION = 0;
+    private static final String SEPARATOR = " ";
+
+    private static final Pattern MINIMUM_ONE_WHITESPACE = Pattern.compile("\\s+");
+    private final CheckpointFile<Map.Entry<Integer, Long>> checkpointFile;
+
+    CommittedOffsetsFile(File offsetsFile) throws IOException {
+        CheckpointFile.EntryFormatter<Map.Entry<Integer, Long>> formatter = new EntryFormatter();
+        checkpointFile = new CheckpointFile<>(offsetsFile, CURRENT_VERSION, formatter);
+    }
+
+    private static class EntryFormatter implements CheckpointFile.EntryFormatter<Map.Entry<Integer, Long>> {
+
+        @Override
+        public String toString(Map.Entry<Integer, Long> entry) {
+            // Each entry is stored in a new line as <partition-num offset>
+            return entry.getKey() + SEPARATOR + entry.getValue();
+        }
+
+        @Override
+        public Optional<Map.Entry<Integer, Long>> fromString(String line) {
+            String[] strings = MINIMUM_ONE_WHITESPACE.split(line);
+            if (strings.length != 2) {
+                return Optional.empty();
+            }
+
+            try {
+                return Optional.of(Utils.mkEntry(Integer.parseInt(strings[0]), Long.parseLong(strings[1])));
+            } catch (NumberFormatException e) {
+                return Optional.empty();
+            }
+
+        }
+    }
+
+    public synchronized void writeEntries(Map<Integer, Long> committedOffsets) throws IOException {
+        checkpointFile.write(committedOffsets.entrySet());
+    }
+
+    public synchronized Map<Integer, Long> readEntries() throws IOException {
+        List<Map.Entry<Integer, Long>> entries = checkpointFile.read();
+        Map<Integer, Long> partitionToOffsets = new HashMap<>(entries.size());
+        for (Map.Entry<Integer, Long> entry : entries) {
+            Long existingValue = partitionToOffsets.put(entry.getKey(), entry.getValue());
+            if (existingValue != null) {
+                throw new IOException("Multiple entries exist for key: " + entry.getKey());
+            }
+        }
+
+        return partitionToOffsets;
+    }
+}
\ No newline at end of file
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
index 4f29c87..77f83fb 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerManager.java
@@ -27,7 +27,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
+import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
@@ -39,6 +41,8 @@ import java.util.concurrent.TimeoutException;
  */
 public class ConsumerManager implements Closeable {
 
+    public static final String COMMITTED_OFFSETS_FILE_NAME = "_rlmm_committed_offsets";
+
     private static final Logger log = LoggerFactory.getLogger(ConsumerManager.class);
     private static final long CONSUME_RECHECK_INTERVAL_MS = 50L;
 
@@ -49,14 +53,15 @@ public class ConsumerManager implements Closeable {
 
     public ConsumerManager(TopicBasedRemoteLogMetadataManagerConfig rlmmConfig,
                            RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler,
-                           RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner,
+                           RemoteLogMetadataTopicPartitioner topicPartitioner,
                            Time time) {
         this.rlmmConfig = rlmmConfig;
         this.time = time;
 
         //Create a task to consume messages and submit the respective events to RemotePartitionMetadataEventHandler.
         KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(rlmmConfig.consumerProperties());
-        consumerTask = new ConsumerTask(consumer, remotePartitionMetadataEventHandler, rlmmTopicPartitioner);
+        Path committedOffsetsPath = new File(rlmmConfig.logDir(), COMMITTED_OFFSETS_FILE_NAME).toPath();
+        consumerTask = new ConsumerTask(consumer, remotePartitionMetadataEventHandler, topicPartitioner, committedOffsetsPath, time, 60_000L);
         consumerTaskThread = KafkaThread.nonDaemon("RLMMConsumerTask", consumerTask);
     }
 
@@ -64,6 +69,7 @@ public class ConsumerManager implements Closeable {
         try {
             // Start a thread to continuously consume records from topic partitions.
             consumerTaskThread.start();
+            log.info("RLMM Consumer task thread is started");
         } catch (Exception e) {
             throw new KafkaException("Error encountered while initializing and scheduling ConsumerTask thread", e);
         }
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
index 7691c6f..2509a44 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
 import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
@@ -29,8 +30,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
+import java.io.IOException;
+import java.nio.file.Path;
 import java.time.Duration;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Objects;
@@ -66,14 +70,17 @@ class ConsumerTask implements Runnable, Closeable {
     private final KafkaConsumer<byte[], byte[]> consumer;
     private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler;
     private final RemoteLogMetadataTopicPartitioner topicPartitioner;
+    private final Time time;
 
     // It indicates whether the closing process has been started or not. If it is set as true,
     // consumer will stop consuming messages and it will not allow partition assignments to be updated.
     private volatile boolean closing = false;
+
     // It indicates whether the consumer needs to assign the partitions or not. This is set when it is
     // determined that the consumer needs to be assigned with the updated partitions.
     private volatile boolean assignPartitions = false;
 
+    // It represents a lock for any operations related to the assignedTopicPartitions.
     private final Object assignPartitionsLock = new Object();
 
     // Remote log metadata topic partitions that consumer is assigned to.
@@ -82,44 +89,143 @@ class ConsumerTask implements Runnable, Closeable {
     // User topic partitions that this broker is a leader/follower for.
     private Set<TopicIdPartition> assignedTopicPartitions = Collections.emptySet();
 
-    // Map of remote log metadata topic partition to consumed offsets.
+    // Map of remote log metadata topic partition to consumed offsets. Received consumer records
+    // may or may not have been processed based on the assigned topic partitions.
     private final Map<Integer, Long> partitionToConsumedOffsets = new ConcurrentHashMap<>();
 
+    // Map of remote log metadata topic partition to processed offsets that were synced in committedOffsetsFile.
+    private Map<Integer, Long> lastSyncedPartitionToConsumedOffsets = Collections.emptyMap();
+
+    private final long committedOffsetSyncIntervalMs;
+    private CommittedOffsetsFile committedOffsetsFile;
+    private long lastSyncedTimeMs;
+
     public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer,
                         RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler,
-                        RemoteLogMetadataTopicPartitioner topicPartitioner) {
-        Objects.requireNonNull(consumer);
-        Objects.requireNonNull(remotePartitionMetadataEventHandler);
-        Objects.requireNonNull(topicPartitioner);
-
-        this.consumer = consumer;
-        this.remotePartitionMetadataEventHandler = remotePartitionMetadataEventHandler;
-        this.topicPartitioner = topicPartitioner;
+                        RemoteLogMetadataTopicPartitioner topicPartitioner,
+                        Path committedOffsetsPath,
+                        Time time,
+                        long committedOffsetSyncIntervalMs) {
+        this.consumer = Objects.requireNonNull(consumer);
+        this.remotePartitionMetadataEventHandler = Objects.requireNonNull(remotePartitionMetadataEventHandler);
+        this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
+        this.time = Objects.requireNonNull(time);
+        this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
+
+        initializeConsumerAssignment(committedOffsetsPath);
+    }
+
+    private void initializeConsumerAssignment(Path committedOffsetsPath) {
+        try {
+            committedOffsetsFile = new CommittedOffsetsFile(committedOffsetsPath.toFile());
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+
+        Map<Integer, Long> committedOffsets = Collections.emptyMap();
+        try {
+            // Load committed offset and assign them in the consumer.
+            committedOffsets = committedOffsetsFile.readEntries();
+        } catch (IOException e) {
+            // Ignore the error and consumer consumes from the earliest offset.
+            log.error("Encountered error while building committed offsets from the file. " +
+                              "Consumer will consume from the earliest offset for the assigned partitions.", e);
+        }
+
+        if (!committedOffsets.isEmpty()) {
+            // Assign topic partitions from the earlier committed offsets file.
+            Set<Integer> earlierAssignedPartitions = committedOffsets.keySet();
+            assignedMetaPartitions = Collections.unmodifiableSet(earlierAssignedPartitions);
+            Set<TopicPartition> metadataTopicPartitions = earlierAssignedPartitions.stream()
+                                                                                   .map(x -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, x))
+                                                                                   .collect(Collectors.toSet());
+            consumer.assign(metadataTopicPartitions);
+
+            // Seek to the committed offsets
+            for (Map.Entry<Integer, Long> entry : committedOffsets.entrySet()) {
+                partitionToConsumedOffsets.put(entry.getKey(), entry.getValue());
+                consumer.seek(new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, entry.getKey()), entry.getValue());
+            }
+
+            lastSyncedPartitionToConsumedOffsets = Collections.unmodifiableMap(committedOffsets);
+        }
     }
 
     @Override
     public void run() {
         log.info("Started Consumer task thread.");
+        lastSyncedTimeMs = time.milliseconds();
         try {
             while (!closing) {
                 maybeWaitForPartitionsAssignment();
 
                 log.info("Polling consumer to receive remote log metadata topic records");
-                ConsumerRecords<byte[], byte[]> consumerRecords
-                        = consumer.poll(Duration.ofMillis(POLL_INTERVAL_MS));
+                ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(POLL_INTERVAL_MS));
                 for (ConsumerRecord<byte[], byte[]> record : consumerRecords) {
-                    handleRemoteLogMetadata(serde.deserialize(record.value()));
-                    partitionToConsumedOffsets.put(record.partition(), record.offset());
+                    processConsumerRecord(record);
                 }
+
+                maybeSyncCommittedDataAndOffsets(false);
             }
         } catch (Exception e) {
             log.error("Error occurred in consumer task, close:[{}]", closing, e);
         } finally {
+            maybeSyncCommittedDataAndOffsets(true);
             closeConsumer();
             log.info("Exiting from consumer task thread");
         }
     }
 
+    private void processConsumerRecord(ConsumerRecord<byte[], byte[]> record) {
+        // Taking assignPartitionsLock here as updateAssignmentsForPartitions changes assignedTopicPartitions
+        // and also calls remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition) for the removed
+        // partitions.
+        RemoteLogMetadata remoteLogMetadata = serde.deserialize(record.value());
+        synchronized (assignPartitionsLock) {
+            if (assignedTopicPartitions.contains(remoteLogMetadata.topicIdPartition())) {
+                remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
+            } else {
+                log.debug("This event {} is skipped as the topic partition is not assigned for this instance.", remoteLogMetadata);
+            }
+            partitionToConsumedOffsets.put(record.partition(), record.offset());
+        }
+    }
+
+    private void maybeSyncCommittedDataAndOffsets(boolean forceSync) {
+        // Return immediately if there is no consumption from last time.
+        boolean noConsumedOffsetUpdates = partitionToConsumedOffsets.equals(lastSyncedPartitionToConsumedOffsets);
+        if (noConsumedOffsetUpdates || !forceSync && time.milliseconds() - lastSyncedTimeMs < committedOffsetSyncIntervalMs) {
+            log.debug("Skip syncing committed offsets, noConsumedOffsetUpdates: {}, forceSync: {}", noConsumedOffsetUpdates, forceSync);
+            return;
+        }
+
+        try {
+            // Need to take lock on assignPartitionsLock as assignedTopicPartitions might
+            // get updated by other threads.
+            synchronized (assignPartitionsLock) {
+                for (TopicIdPartition topicIdPartition : assignedTopicPartitions) {
+                    int metadataPartition = topicPartitioner.metadataPartition(topicIdPartition);
+                    Long offset = partitionToConsumedOffsets.get(metadataPartition);
+                    if (offset != null) {
+                        remotePartitionMetadataEventHandler.syncLogMetadataSnapshot(topicIdPartition, metadataPartition, offset);
+                    } else {
+                        log.debug("Skipping syncup of the remote-log-metadata-file for partition:{} , with remote log metadata partition{}, and no offset",
+                                topicIdPartition, metadataPartition);
+                    }
+                }
+
+                // Write partitionToConsumedOffsets into committed offsets file as we do not want to process them again
+                // in case of restarts.
+                committedOffsetsFile.writeEntries(partitionToConsumedOffsets);
+                lastSyncedPartitionToConsumedOffsets = new HashMap<>(partitionToConsumedOffsets);
+            }
+
+            lastSyncedTimeMs = time.milliseconds();
+        } catch (IOException e) {
+            throw new KafkaException("Error encountered while writing committed offsets to a local file", e);
+        }
+    }
+
     private void closeConsumer() {
         log.info("Closing the consumer instance");
         try {
@@ -158,6 +264,9 @@ class ConsumerTask implements Runnable, Closeable {
 
             if (assignPartitions) {
                 assignedMetaPartitionsSnapshot = new HashSet<>(assignedMetaPartitions);
+                // Removing unassigned meta partitions from partitionToConsumedOffsets and partitionToCommittedOffsets
+                partitionToConsumedOffsets.entrySet().removeIf(entry -> !assignedMetaPartitions.contains(entry.getKey()));
+
                 assignPartitions = false;
             }
         }
@@ -167,18 +276,11 @@ class ConsumerTask implements Runnable, Closeable {
         }
     }
 
-    private void handleRemoteLogMetadata(RemoteLogMetadata remoteLogMetadata) {
-        if (assignedTopicPartitions.contains(remoteLogMetadata.topicIdPartition())) {
-            remotePartitionMetadataEventHandler.handleRemoteLogMetadata(remoteLogMetadata);
-        } else {
-            log.debug("This event {} is skipped as the topic partition is not assigned for this instance.", remoteLogMetadata);
-        }
-    }
-
     private void executeReassignment(Set<Integer> assignedMetaPartitionsSnapshot) {
-        Set<TopicPartition> assignedMetaTopicPartitions = assignedMetaPartitionsSnapshot.stream()
-                .map(partitionNum -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partitionNum))
-                .collect(Collectors.toSet());
+        Set<TopicPartition> assignedMetaTopicPartitions =
+                assignedMetaPartitionsSnapshot.stream()
+                                              .map(partitionNum -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, partitionNum))
+                                              .collect(Collectors.toSet());
         log.info("Reassigning partitions to consumer task [{}]", assignedMetaTopicPartitions);
         consumer.assign(assignedMetaTopicPartitions);
     }
@@ -210,12 +312,19 @@ class ConsumerTask implements Runnable, Closeable {
             for (TopicIdPartition tp : updatedReassignedPartitions) {
                 updatedAssignedMetaPartitions.add(topicPartitioner.metadataPartition(tp));
             }
+
+            // Clear removed topic partitions from inmemory cache.
+            for (TopicIdPartition removedPartition : removedPartitions) {
+                remotePartitionMetadataEventHandler.clearTopicPartition(removedPartition);
+            }
+
             assignedTopicPartitions = Collections.unmodifiableSet(updatedReassignedPartitions);
             log.debug("Assigned topic partitions: {}", assignedTopicPartitions);
 
             if (!updatedAssignedMetaPartitions.equals(assignedMetaPartitions)) {
                 assignedMetaPartitions = Collections.unmodifiableSet(updatedAssignedMetaPartitions);
                 log.debug("Assigned metadata topic partitions: {}", assignedMetaPartitions);
+
                 assignPartitions = true;
                 assignPartitionsLock.notifyAll();
             } else {
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
new file mode 100644
index 0000000..15e4562
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCache.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * This is a wrapper around {@link RemoteLogMetadataCache} providing a file based snapshot of
+ * {@link RemoteLogMetadataCache} for the given {@code topicIdPartition}. Snapshot is stored in the given
+ * {@code partitionDir}.
+ */
+public class FileBasedRemoteLogMetadataCache extends RemoteLogMetadataCache {
+    private static final Logger log = LoggerFactory.getLogger(FileBasedRemoteLogMetadataCache.class);
+    private final RemoteLogMetadataSnapshotFile snapshotFile;
+    private final TopicIdPartition topicIdPartition;
+
+    public FileBasedRemoteLogMetadataCache(TopicIdPartition topicIdPartition,
+                                           Path partitionDir) {
+        if (!partitionDir.toFile().exists() || !partitionDir.toFile().isDirectory()) {
+            throw new KafkaException("Given partition directory:" + partitionDir + " must be an existing directory.");
+        }
+
+        this.topicIdPartition = topicIdPartition;
+        snapshotFile = new RemoteLogMetadataSnapshotFile(partitionDir);
+
+        try {
+            snapshotFile.read().ifPresent(snapshot -> loadRemoteLogSegmentMetadata(snapshot));
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    protected void loadRemoteLogSegmentMetadata(RemoteLogMetadataSnapshotFile.Snapshot snapshot) {
+        log.info("Loading snapshot for partition {} is: {}", topicIdPartition, snapshot);
+        for (RemoteLogSegmentMetadataSnapshot metadataSnapshot : snapshot.remoteLogSegmentMetadataSnapshots()) {
+            switch (metadataSnapshot.state()) {
+                case COPY_SEGMENT_STARTED:
+                    addCopyInProgressSegment(createRemoteLogSegmentMetadata(metadataSnapshot));
+                    break;
+                case COPY_SEGMENT_FINISHED:
+                    handleSegmentWithCopySegmentFinishedState(createRemoteLogSegmentMetadata(metadataSnapshot));
+                    break;
+                case DELETE_SEGMENT_STARTED:
+                    handleSegmentWithDeleteSegmentStartedState(createRemoteLogSegmentMetadata(metadataSnapshot));
+                    break;
+                case DELETE_SEGMENT_FINISHED:
+                default:
+                    throw new IllegalArgumentException("Given remoteLogSegmentMetadata has invalid state: " + metadataSnapshot);
+            }
+        }
+    }
+
+    private RemoteLogSegmentMetadata createRemoteLogSegmentMetadata(RemoteLogSegmentMetadataSnapshot snapshot) {
+        return new RemoteLogSegmentMetadata(new RemoteLogSegmentId(topicIdPartition, snapshot.segmentId()), snapshot.startOffset(),
+                                            snapshot.endOffset(), snapshot.maxTimestampMs(), snapshot.brokerId(), snapshot.eventTimestampMs(),
+                                            snapshot.segmentSizeInBytes(), snapshot.state(), snapshot.segmentLeaderEpochs());
+    }
+
+    /**
+     * Flushes the in-memory state to the snapshot file.
+     *
+     * @param metadataPartition       remote log metadata partition from which the messages have been consumed for the given
+     *                                user topic partition.
+     * @param metadataPartitionOffset remote log metadata partition offset up to which the messages have been consumed.
+     * @throws IOException if any errors occurred while writing the snapshot to the file.
+     */
+    public void flushToFile(int metadataPartition,
+                            Long metadataPartitionOffset) throws IOException {
+        List<RemoteLogSegmentMetadataSnapshot> snapshots = new ArrayList<>(idToSegmentMetadata.size());
+        for (RemoteLogLeaderEpochState state : leaderEpochEntries.values()) {
+            // Add unreferenced segments first, as to maintain the order when these segments are again read from
+            // the snapshot to build RemoteLogMetadataCache.
+            for (RemoteLogSegmentId id : state.unreferencedSegmentIds()) {
+                snapshots.add(RemoteLogSegmentMetadataSnapshot.create(idToSegmentMetadata.get(id)));
+            }
+            
+            // Add referenced segments.
+            for (RemoteLogSegmentId id : state.referencedSegmentIds()) {
+                snapshots.add(RemoteLogSegmentMetadataSnapshot.create(idToSegmentMetadata.get(id)));
+            }
+        }
+
+        snapshotFile.write(new RemoteLogMetadataSnapshotFile.Snapshot(metadataPartition, metadataPartitionOffset, snapshots));
+    }
+}
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java
index 7535aff..d5787dd 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogLeaderEpochState.java
@@ -117,8 +117,7 @@ class RemoteLogLeaderEpochState {
         }
     }
 
-    void handleSegmentWithDeleteSegmentStartedState(Long startOffset, RemoteLogSegmentId remoteLogSegmentId,
-                                                    Long leaderEpochEndOffset) {
+    void handleSegmentWithDeleteSegmentStartedState(Long startOffset, RemoteLogSegmentId remoteLogSegmentId) {
         // Remove the offset mappings as this segment is getting deleted.
         offsetToId.remove(startOffset, remoteLogSegmentId);
 
@@ -127,8 +126,7 @@ class RemoteLogLeaderEpochState {
         unreferencedSegmentIds.add(remoteLogSegmentId);
     }
 
-    void handleSegmentWithDeleteSegmentFinishedState(long startOffset, RemoteLogSegmentId remoteLogSegmentId,
-                                                     Long leaderEpochEndOffset) {
+    void handleSegmentWithDeleteSegmentFinishedState(RemoteLogSegmentId remoteLogSegmentId) {
         // It completely removes the tracking of this segment as it is considered as deleted.
         unreferencedSegmentIds.remove(remoteLogSegmentId);
     }
@@ -149,6 +147,14 @@ class RemoteLogLeaderEpochState {
         return entry == null ? null : entry.getValue();
     }
 
+    Collection<RemoteLogSegmentId> unreferencedSegmentIds() {
+        return Collections.unmodifiableCollection(unreferencedSegmentIds);
+    }
+
+    Collection<RemoteLogSegmentId> referencedSegmentIds() {
+        return Collections.unmodifiableCollection(offsetToId.values());
+    }
+
     /**
      * Action interface to act on remote log segment transition for the given {@link RemoteLogLeaderEpochState}.
      */
@@ -158,15 +164,15 @@ class RemoteLogLeaderEpochState {
         /**
          * Performs this operation with the given {@code remoteLogLeaderEpochState}.
          *
+         * @param leaderEpoch               leader epoch value
          * @param remoteLogLeaderEpochState In-memory state of the segments for a leader epoch.
          * @param startOffset               start offset of the segment.
          * @param segmentId                 segment id.
-         * @param leaderEpochEndOffset      end offset for the given leader epoch.
          */
-        void accept(RemoteLogLeaderEpochState remoteLogLeaderEpochState,
-                    Long startOffset,
-                    RemoteLogSegmentId segmentId,
-                    Long leaderEpochEndOffset);
+        void accept(int leaderEpoch,
+                    RemoteLogLeaderEpochState remoteLogLeaderEpochState,
+                    long startOffset,
+                    RemoteLogSegmentId segmentId);
     }
 
 }
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
index 5853d7f..ed88bc4 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataCache.java
@@ -95,14 +95,14 @@ public class RemoteLogMetadataCache {
     private static final Logger log = LoggerFactory.getLogger(RemoteLogMetadataCache.class);
 
     // It contains all the segment-id to metadata mappings which did not reach the terminal state viz DELETE_SEGMENT_FINISHED.
-    private final ConcurrentMap<RemoteLogSegmentId, RemoteLogSegmentMetadata> idToSegmentMetadata
+    protected final ConcurrentMap<RemoteLogSegmentId, RemoteLogSegmentMetadata> idToSegmentMetadata
             = new ConcurrentHashMap<>();
 
     // It contains leader epoch to the respective entry containing the state.
     // TODO We are not clearing the entry for epoch when RemoteLogLeaderEpochState becomes empty. This will be addressed
     // later. We will look into it when we integrate these APIs along with RemoteLogManager changes.
     // https://issues.apache.org/jira/browse/KAFKA-12641
-    private final ConcurrentMap<Integer, RemoteLogLeaderEpochState> leaderEpochEntries = new ConcurrentHashMap<>();
+    protected final ConcurrentMap<Integer, RemoteLogLeaderEpochState> leaderEpochEntries = new ConcurrentHashMap<>();
 
     /**
      * Returns {@link RemoteLogSegmentMetadata} if it exists for the given leader-epoch containing the offset and with
@@ -161,77 +161,73 @@ public class RemoteLogMetadataCache {
                 throw new IllegalArgumentException("metadataUpdate: " + metadataUpdate + " with state " + RemoteLogSegmentState.COPY_SEGMENT_STARTED +
                                                    " can not be updated");
             case COPY_SEGMENT_FINISHED:
-                handleSegmentWithCopySegmentFinishedState(metadataUpdate, existingMetadata);
+                handleSegmentWithCopySegmentFinishedState(existingMetadata.createWithUpdates(metadataUpdate));
                 break;
             case DELETE_SEGMENT_STARTED:
-                handleSegmentWithDeleteSegmentStartedState(metadataUpdate, existingMetadata);
+                handleSegmentWithDeleteSegmentStartedState(existingMetadata.createWithUpdates(metadataUpdate));
                 break;
             case DELETE_SEGMENT_FINISHED:
-                handleSegmentWithDeleteSegmentFinishedState(metadataUpdate, existingMetadata);
+                handleSegmentWithDeleteSegmentFinishedState(existingMetadata.createWithUpdates(metadataUpdate));
                 break;
             default:
                 throw new IllegalArgumentException("Metadata with the state " + targetState + " is not supported");
         }
     }
 
-    private void handleSegmentWithCopySegmentFinishedState(RemoteLogSegmentMetadataUpdate metadataUpdate,
-                                                           RemoteLogSegmentMetadata existingMetadata) {
-        log.debug("Adding remote log segment metadata to leader epoch mappings with update: [{}]", metadataUpdate);
-
-        doHandleSegmentStateTransitionForLeaderEpochs(existingMetadata,
-                RemoteLogLeaderEpochState::handleSegmentWithCopySegmentFinishedState);
+    protected final void handleSegmentWithCopySegmentFinishedState(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
+        doHandleSegmentStateTransitionForLeaderEpochs(remoteLogSegmentMetadata,
+                                                      (leaderEpoch, remoteLogLeaderEpochState, startOffset, segmentId) -> {
+                                                          long leaderEpochEndOffset = highestOffsetForEpoch(leaderEpoch,
+                                                                                                            remoteLogSegmentMetadata);
+                                                          remoteLogLeaderEpochState.handleSegmentWithCopySegmentFinishedState(startOffset,
+                                                                                                                              segmentId,
+                                                                                                                              leaderEpochEndOffset);
+                                                      });
 
         // Put the entry with the updated metadata.
-        idToSegmentMetadata.put(existingMetadata.remoteLogSegmentId(),
-                existingMetadata.createWithUpdates(metadataUpdate));
+        idToSegmentMetadata.put(remoteLogSegmentMetadata.remoteLogSegmentId(), remoteLogSegmentMetadata);
     }
 
-    private void handleSegmentWithDeleteSegmentStartedState(RemoteLogSegmentMetadataUpdate metadataUpdate,
-                                                            RemoteLogSegmentMetadata existingMetadata) {
-        log.debug("Cleaning up the state for : [{}]", metadataUpdate);
+    protected final void handleSegmentWithDeleteSegmentStartedState(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
+        log.debug("Cleaning up the state for : [{}]", remoteLogSegmentMetadata);
 
-        doHandleSegmentStateTransitionForLeaderEpochs(existingMetadata,
-                RemoteLogLeaderEpochState::handleSegmentWithDeleteSegmentStartedState);
+        doHandleSegmentStateTransitionForLeaderEpochs(remoteLogSegmentMetadata,
+                                                      (leaderEpoch, remoteLogLeaderEpochState, startOffset, segmentId) ->
+                                                              remoteLogLeaderEpochState.handleSegmentWithDeleteSegmentStartedState(startOffset, segmentId));
 
         // Put the entry with the updated metadata.
-        idToSegmentMetadata.put(existingMetadata.remoteLogSegmentId(),
-                existingMetadata.createWithUpdates(metadataUpdate));
+        idToSegmentMetadata.put(remoteLogSegmentMetadata.remoteLogSegmentId(), remoteLogSegmentMetadata);
     }
 
-    private void handleSegmentWithDeleteSegmentFinishedState(RemoteLogSegmentMetadataUpdate metadataUpdate,
-                                                             RemoteLogSegmentMetadata existingMetadata) {
-        log.debug("Removing the entry as it reached the terminal state: [{}]", metadataUpdate);
+    private void handleSegmentWithDeleteSegmentFinishedState(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
+        log.debug("Removing the entry as it reached the terminal state: [{}]", remoteLogSegmentMetadata);
 
-        doHandleSegmentStateTransitionForLeaderEpochs(existingMetadata,
-                RemoteLogLeaderEpochState::handleSegmentWithDeleteSegmentFinishedState);
+        doHandleSegmentStateTransitionForLeaderEpochs(remoteLogSegmentMetadata,
+                                                      (leaderEpoch, remoteLogLeaderEpochState, startOffset, segmentId) ->
+                                                              remoteLogLeaderEpochState.handleSegmentWithDeleteSegmentFinishedState(segmentId));
 
         // Remove the segment's id to metadata mapping because this segment is considered as deleted and it cleared all
         // the state of this segment in the cache.
-        idToSegmentMetadata.remove(existingMetadata.remoteLogSegmentId());
+        idToSegmentMetadata.remove(remoteLogSegmentMetadata.remoteLogSegmentId());
     }
 
-    private void doHandleSegmentStateTransitionForLeaderEpochs(RemoteLogSegmentMetadata existingMetadata,
+    private void doHandleSegmentStateTransitionForLeaderEpochs(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
                                                                RemoteLogLeaderEpochState.Action action) {
-        RemoteLogSegmentId remoteLogSegmentId = existingMetadata.remoteLogSegmentId();
-        Map<Integer, Long> leaderEpochToOffset = existingMetadata.segmentLeaderEpochs();
+        RemoteLogSegmentId remoteLogSegmentId = remoteLogSegmentMetadata.remoteLogSegmentId();
+        Map<Integer, Long> leaderEpochToOffset = remoteLogSegmentMetadata.segmentLeaderEpochs();
 
         // Go through all the leader epochs and apply the given action.
         for (Map.Entry<Integer, Long> entry : leaderEpochToOffset.entrySet()) {
             Integer leaderEpoch = entry.getKey();
             Long startOffset = entry.getValue();
-            RemoteLogLeaderEpochState remoteLogLeaderEpochState = leaderEpochEntries.get(leaderEpoch);
-
-            if (remoteLogLeaderEpochState == null) {
-                throw new IllegalStateException("RemoteLogLeaderEpochState does not exist for the leader epoch: "
-                                                + leaderEpoch);
-            } else {
-                long leaderEpochEndOffset = highestOffsetForEpoch(leaderEpoch, existingMetadata);
-                action.accept(remoteLogLeaderEpochState, startOffset, remoteLogSegmentId, leaderEpochEndOffset);
-            }
+            // leaderEpochEntries will be empty when resorting the metadata from snapshot.
+            RemoteLogLeaderEpochState remoteLogLeaderEpochState = leaderEpochEntries.computeIfAbsent(
+                    leaderEpoch, x -> new RemoteLogLeaderEpochState());
+            action.accept(leaderEpoch, remoteLogLeaderEpochState, startOffset, remoteLogSegmentId);
         }
     }
 
-    private long highestOffsetForEpoch(Integer leaderEpoch, RemoteLogSegmentMetadata segmentMetadata) {
+    private static long highestOffsetForEpoch(Integer leaderEpoch, RemoteLogSegmentMetadata segmentMetadata) {
         // Compute the highest offset for the leader epoch with in the segment
         NavigableMap<Integer, Long> epochToOffset = segmentMetadata.segmentLeaderEpochs();
         Map.Entry<Integer, Long> nextEntry = epochToOffset.higherEntry(leaderEpoch);
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java
new file mode 100644
index 0000000..cee77ee
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java
@@ -0,0 +1,267 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class represents the remote log data snapshot stored in a file for a specific topic partition. This is used by
+ * {@link TopicBasedRemoteLogMetadataManager} to store the remote log metadata received for a specific partition from
+ * remote log metadata topic. This will avoid reading the remote log metadata messages from the topic again when a
+ * broker restarts.
+ */
+public class RemoteLogMetadataSnapshotFile {
+    private static final Logger log = LoggerFactory.getLogger(RemoteLogMetadataSnapshotFile.class);
+
+    public static final String COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME = "remote_log_snapshot";
+
+    // File format:
+    // <header>[<entry>...]
+    // header: <version:short><metadata-partition:int><metadata-partition-offset:long><entries-size:int>
+    // entry: <entry-length><entry-bytes>
+
+    // header size: 2 (version) + 4 (partition num) + 8 (offset) + 4 (entries size) = 18
+    private static final int HEADER_SIZE = 18;
+
+    private final File metadataStoreFile;
+    private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+    /**
+     * Creates a CommittedLogMetadataSnapshotFile instance backed by a file with the name `remote_log_snapshot` in
+     * the given {@code metadataStoreDir}. It creates the file if it does not exist.
+     *
+     * @param metadataStoreDir directory in which the snapshot file to be created.
+     */
+    RemoteLogMetadataSnapshotFile(Path metadataStoreDir) {
+        this.metadataStoreFile = new File(metadataStoreDir.toFile(), COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME);
+
+        // Create an empty file if it does not exist.
+        try {
+            boolean newFileCreated = metadataStoreFile.createNewFile();
+            log.info("Remote log metadata snapshot file: [{}], newFileCreated: [{}]", metadataStoreFile, newFileCreated);
+        } catch (IOException e) {
+            throw new KafkaException(e);
+        }
+    }
+
+    /**
+     * Writes the given snapshot replacing the earlier snapshot data.
+     *
+     * @param snapshot Snapshot to be stored.
+     * @throws IOException if there4 is any error in writing the given snapshot to the file.
+     */
+    public synchronized void write(Snapshot snapshot) throws IOException {
+        Path newMetadataSnapshotFilePath = new File(metadataStoreFile.getAbsolutePath() + ".tmp").toPath();
+        try (FileChannel fileChannel = FileChannel.open(newMetadataSnapshotFilePath,
+                                                        StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE)) {
+
+            // header: <version:short><metadata-partition:int><metadata-partition-offset:long>
+            ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE);
+
+            // Write version
+            headerBuffer.putShort(snapshot.version());
+
+            // Write metadata partition and metadata partition offset
+            headerBuffer.putInt(snapshot.metadataPartition());
+
+            // Write metadata partition offset
+            headerBuffer.putLong(snapshot.metadataPartitionOffset());
+
+            // Write entries size
+            Collection<RemoteLogSegmentMetadataSnapshot> metadataSnapshots = snapshot.remoteLogSegmentMetadataSnapshots();
+            headerBuffer.putInt(metadataSnapshots.size());
+
+            // Write header
+            headerBuffer.flip();
+            fileChannel.write(headerBuffer);
+
+            // Write each entry
+            ByteBuffer lenBuffer = ByteBuffer.allocate(4);
+            for (RemoteLogSegmentMetadataSnapshot metadataSnapshot : metadataSnapshots) {
+                final byte[] serializedBytes = serde.serialize(metadataSnapshot);
+                // entry format: <entry-length><entry-bytes>
+
+                // Write entry length
+                lenBuffer.putInt(serializedBytes.length);
+                lenBuffer.flip();
+                fileChannel.write(lenBuffer);
+                lenBuffer.rewind();
+
+                // Write entry bytes
+                fileChannel.write(ByteBuffer.wrap(serializedBytes));
+            }
+
+            fileChannel.force(true);
+        }
+
+        Utils.atomicMoveWithFallback(newMetadataSnapshotFilePath, metadataStoreFile.toPath());
+    }
+
+    /**
+     * @return the Snapshot if it exists.
+     * @throws IOException if there is any error in reading the stored snapshot.
+     */
+    public synchronized Optional<Snapshot> read() throws IOException {
+
+        // Checking for empty files.
+        if (metadataStoreFile.length() == 0) {
+            return Optional.empty();
+        }
+
+        try (ReadableByteChannel channel = Channels.newChannel(new FileInputStream(metadataStoreFile))) {
+
+            // header: <version:short><metadata-partition:int><metadata-partition-offset:long>
+            // Read header
+            ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE);
+            channel.read(headerBuffer);
+            headerBuffer.rewind();
+            short version = headerBuffer.getShort();
+            int metadataPartition = headerBuffer.getInt();
+            long metadataPartitionOffset = headerBuffer.getLong();
+            int metadataSnapshotsSize = headerBuffer.getInt();
+
+            List<RemoteLogSegmentMetadataSnapshot> result = new ArrayList<>(metadataSnapshotsSize);
+            ByteBuffer lenBuffer = ByteBuffer.allocate(4);
+            int lenBufferReadCt;
+            while ((lenBufferReadCt = channel.read(lenBuffer)) > 0) {
+                lenBuffer.rewind();
+
+                if (lenBufferReadCt != lenBuffer.capacity()) {
+                    throw new IOException("Invalid amount of data read for the length of an entry, file may have been corrupted.");
+                }
+
+                // entry format: <entry-length><entry-bytes>
+
+                // Read the length of each entry
+                final int len = lenBuffer.getInt();
+                lenBuffer.rewind();
+
+                // Read the entry
+                ByteBuffer data = ByteBuffer.allocate(len);
+                final int read = channel.read(data);
+                if (read != len) {
+                    throw new IOException("Invalid amount of data read, file may have been corrupted.");
+                }
+
+                // We are always adding RemoteLogSegmentMetadata only as you can see in #write() method.
+                // Did not add a specific serde for RemoteLogSegmentMetadata and reusing RemoteLogMetadataSerde
+                final RemoteLogSegmentMetadataSnapshot remoteLogSegmentMetadata =
+                        (RemoteLogSegmentMetadataSnapshot) serde.deserialize(data.array());
+                result.add(remoteLogSegmentMetadata);
+            }
+
+            if (metadataSnapshotsSize != result.size()) {
+                throw new IOException("Unexpected entries in the snapshot file. Expected size: " + metadataSnapshotsSize
+                                              + ", but found: " + result.size());
+            }
+
+            return Optional.of(new Snapshot(version, metadataPartition, metadataPartitionOffset, result));
+        }
+    }
+
+    /**
+     * This class represents the collection of remote log metadata for a specific topic partition.
+     */
+    public static final class Snapshot {
+        private static final short CURRENT_VERSION = 0;
+
+        private final short version;
+        private final int metadataPartition;
+        private final long metadataPartitionOffset;
+        private final Collection<RemoteLogSegmentMetadataSnapshot> remoteLogSegmentMetadataSnapshots;
+
+        public Snapshot(int metadataPartition,
+                        long metadataPartitionOffset,
+                        Collection<RemoteLogSegmentMetadataSnapshot> remoteLogSegmentMetadataSnapshots) {
+            this(CURRENT_VERSION, metadataPartition, metadataPartitionOffset, remoteLogSegmentMetadataSnapshots);
+        }
+
+        public Snapshot(short version,
+                        int metadataPartition,
+                        long metadataPartitionOffset,
+                        Collection<RemoteLogSegmentMetadataSnapshot> remoteLogSegmentMetadataSnapshots) {
+            // We will add multiple version support in future if needed. For now, the only supported version is CURRENT_VERSION viz 0.
+            if (version != CURRENT_VERSION) {
+                throw new IllegalArgumentException("Unexpected version received: " + version);
+            }
+            this.version = version;
+            this.metadataPartition = metadataPartition;
+            this.metadataPartitionOffset = metadataPartitionOffset;
+            this.remoteLogSegmentMetadataSnapshots = remoteLogSegmentMetadataSnapshots;
+        }
+
+        public short version() {
+            return version;
+        }
+
+        public int metadataPartition() {
+            return metadataPartition;
+        }
+
+        public long metadataPartitionOffset() {
+            return metadataPartitionOffset;
+        }
+
+        public Collection<RemoteLogSegmentMetadataSnapshot> remoteLogSegmentMetadataSnapshots() {
+            return remoteLogSegmentMetadataSnapshots;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (!(o instanceof Snapshot)) return false;
+            Snapshot snapshot = (Snapshot) o;
+            return version == snapshot.version && metadataPartition == snapshot.metadataPartition
+                    && metadataPartitionOffset == snapshot.metadataPartitionOffset
+                    && Objects.equals(remoteLogSegmentMetadataSnapshots, snapshot.remoteLogSegmentMetadataSnapshots);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(version, metadataPartition, metadataPartitionOffset, remoteLogSegmentMetadataSnapshots);
+        }
+
+        @Override
+        public String toString() {
+            return "Snapshot{" +
+                    "version=" + version +
+                    ", metadataPartition=" + metadataPartition +
+                    ", metadataPartitionOffset=" + metadataPartitionOffset +
+                    ", remoteLogSegmentMetadataSnapshotsSize" + remoteLogSegmentMetadataSnapshots.size() +
+                    '}';
+        }
+    }
+}
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
new file mode 100644
index 0000000..e7292c2
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentMetadataSnapshot.java
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.Objects;
+import java.util.TreeMap;
+
+/**
+ * This class represents the entry containing the metadata about a remote log segment. This is similar to
+ * {@link RemoteLogSegmentMetadata} but it does not contain topic partition information. This class keeps
+ * only remote log segment ID but not the topic partition.
+ *
+ * This class is used in storing the snapshot of remote log metadata for a specific topic partition as mentioned
+ * in {@link RemoteLogMetadataSnapshotFile.Snapshot}.
+ */
+public class RemoteLogSegmentMetadataSnapshot extends RemoteLogMetadata {
+
+    /**
+     * Universally unique remote log segment id.
+     */
+    private final Uuid segmentId;
+
+    /**
+     * Start offset of this segment.
+     */
+    private final long startOffset;
+
+    /**
+     * End offset of this segment.
+     */
+    private final long endOffset;
+
+    /**
+     * Maximum timestamp in milli seconds in the segment
+     */
+    private final long maxTimestampMs;
+
+    /**
+     * LeaderEpoch vs offset for messages within this segment.
+     */
+    private final NavigableMap<Integer, Long> segmentLeaderEpochs;
+
+    /**
+     * Size of the segment in bytes.
+     */
+    private final int segmentSizeInBytes;
+
+    /**
+     * It indicates the state in which the action is executed on this segment.
+     */
+    private final RemoteLogSegmentState state;
+
+    /**
+     * Creates an instance with the given metadata of remote log segment.
+     * <p>
+     * {@code segmentLeaderEpochs} can not be empty. If all the records in this segment belong to the same leader epoch
+     * then it should have an entry with epoch mapping to start-offset of this segment.
+     *
+     * @param segmentId                  Universally unique remote log segment id.
+     * @param startOffset         Start offset of this segment (inclusive).
+     * @param endOffset           End offset of this segment (inclusive).
+     * @param maxTimestampMs      Maximum timestamp in milli seconds in this segment.
+     * @param brokerId            Broker id from which this event is generated.
+     * @param eventTimestampMs    Epoch time in milli seconds at which the remote log segment is copied to the remote tier storage.
+     * @param segmentSizeInBytes  Size of this segment in bytes.
+     * @param state               State of the respective segment of remoteLogSegmentId.
+     * @param segmentLeaderEpochs leader epochs occurred within this segment.
+     */
+    public RemoteLogSegmentMetadataSnapshot(Uuid segmentId,
+                                            long startOffset,
+                                            long endOffset,
+                                            long maxTimestampMs,
+                                            int brokerId,
+                                            long eventTimestampMs,
+                                            int segmentSizeInBytes,
+                                            RemoteLogSegmentState state,
+                                            Map<Integer, Long> segmentLeaderEpochs) {
+        super(brokerId, eventTimestampMs);
+        this.segmentId = Objects.requireNonNull(segmentId, "remoteLogSegmentId can not be null");
+        this.state = Objects.requireNonNull(state, "state can not be null");
+
+        this.startOffset = startOffset;
+        this.endOffset = endOffset;
+        this.maxTimestampMs = maxTimestampMs;
+        this.segmentSizeInBytes = segmentSizeInBytes;
+
+        if (segmentLeaderEpochs == null || segmentLeaderEpochs.isEmpty()) {
+            throw new IllegalArgumentException("segmentLeaderEpochs can not be null or empty");
+        }
+
+        this.segmentLeaderEpochs = Collections.unmodifiableNavigableMap(new TreeMap<>(segmentLeaderEpochs));
+    }
+
+    public static RemoteLogSegmentMetadataSnapshot create(RemoteLogSegmentMetadata metadata) {
+        return new RemoteLogSegmentMetadataSnapshot(metadata.remoteLogSegmentId().id(), metadata.startOffset(), metadata.endOffset(),
+                                                    metadata.maxTimestampMs(), metadata.brokerId(), metadata.eventTimestampMs(),
+                                                    metadata.segmentSizeInBytes(), metadata.state(), metadata.segmentLeaderEpochs());
+    }
+
+    /**
+     * @return unique id of this segment.
+     */
+    public Uuid segmentId() {
+        return segmentId;
+    }
+
+    /**
+     * @return Start offset of this segment (inclusive).
+     */
+    public long startOffset() {
+        return startOffset;
+    }
+
+    /**
+     * @return End offset of this segment (inclusive).
+     */
+    public long endOffset() {
+        return endOffset;
+    }
+
+    /**
+     * @return Total size of this segment in bytes.
+     */
+    public int segmentSizeInBytes() {
+        return segmentSizeInBytes;
+    }
+
+    /**
+     * @return Maximum timestamp in milli seconds of a record within this segment.
+     */
+    public long maxTimestampMs() {
+        return maxTimestampMs;
+    }
+
+    /**
+     * @return Map of leader epoch vs offset for the records available in this segment.
+     */
+    public NavigableMap<Integer, Long> segmentLeaderEpochs() {
+        return segmentLeaderEpochs;
+    }
+
+    /**
+     * Returns the current state of this remote log segment. It can be any of the below
+     * <ul>
+     *     {@link RemoteLogSegmentState#COPY_SEGMENT_STARTED}
+     *     {@link RemoteLogSegmentState#COPY_SEGMENT_FINISHED}
+     *     {@link RemoteLogSegmentState#DELETE_SEGMENT_STARTED}
+     *     {@link RemoteLogSegmentState#DELETE_SEGMENT_FINISHED}
+     * </ul>
+     */
+    public RemoteLogSegmentState state() {
+        return state;
+    }
+
+    @Override
+    public TopicIdPartition topicIdPartition() {
+        throw new UnsupportedOperationException("This metadata does not have topic partition with it.");
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (!(o instanceof RemoteLogSegmentMetadataSnapshot)) return false;
+        RemoteLogSegmentMetadataSnapshot that = (RemoteLogSegmentMetadataSnapshot) o;
+        return startOffset == that.startOffset && endOffset == that.endOffset && maxTimestampMs == that.maxTimestampMs && segmentSizeInBytes == that.segmentSizeInBytes && Objects.equals(
+                segmentId, that.segmentId) && Objects.equals(segmentLeaderEpochs, that.segmentLeaderEpochs) && state == that.state;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(segmentId, startOffset, endOffset, maxTimestampMs, segmentLeaderEpochs, segmentSizeInBytes, state);
+    }
+
+    @Override
+    public String toString() {
+        return "RemoteLogSegmentMetadataSnapshot{" +
+                "segmentId=" + segmentId +
+                ", startOffset=" + startOffset +
+                ", endOffset=" + endOffset +
+                ", maxTimestampMs=" + maxTimestampMs +
+                ", segmentLeaderEpochs=" + segmentLeaderEpochs +
+                ", segmentSizeInBytes=" + segmentSizeInBytes +
+                ", state=" + state +
+                '}';
+    }
+}
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java
index 862defe..c92a51e 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataEventHandler.java
@@ -16,11 +16,14 @@
  */
 package org.apache.kafka.server.log.remote.metadata.storage;
 
+import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
 import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
 
+import java.io.IOException;
+
 public abstract class RemotePartitionMetadataEventHandler {
 
     public void handleRemoteLogMetadata(RemoteLogMetadata remoteLogMetadata) {
@@ -40,4 +43,11 @@ public abstract class RemotePartitionMetadataEventHandler {
     protected abstract void handleRemoteLogSegmentMetadataUpdate(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate);
 
     protected abstract void handleRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata);
+
+    public abstract void syncLogMetadataSnapshot(TopicIdPartition topicIdPartition,
+                                                 int metadataPartition,
+                                                 Long metadataPartitionOffset) throws IOException;
+
+    public abstract void clearTopicPartition(TopicIdPartition topicIdPartition);
+
 }
\ No newline at end of file
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
index aebef84..7051d18 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.server.log.remote.metadata.storage;
 
 import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
 import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
@@ -28,7 +29,9 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
+import java.io.File;
 import java.io.IOException;
+import java.nio.file.Path;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.Map;
@@ -42,21 +45,36 @@ import java.util.concurrent.ConcurrentHashMap;
 public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHandler implements Closeable {
     private static final Logger log = LoggerFactory.getLogger(RemotePartitionMetadataStore.class);
 
+    private final Path logDir;
+
     private Map<TopicIdPartition, RemotePartitionDeleteMetadata> idToPartitionDeleteMetadata =
             new ConcurrentHashMap<>();
 
-    private Map<TopicIdPartition, RemoteLogMetadataCache> idToRemoteLogMetadataCache =
+    private Map<TopicIdPartition, FileBasedRemoteLogMetadataCache> idToRemoteLogMetadataCache =
             new ConcurrentHashMap<>();
 
+    public RemotePartitionMetadataStore(Path logDir) {
+        this.logDir = logDir;
+    }
+
     @Override
     public void handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
         log.debug("Adding remote log segment : [{}]", remoteLogSegmentMetadata);
 
-        RemoteLogSegmentId remoteLogSegmentId = remoteLogSegmentMetadata.remoteLogSegmentId();
+        final RemoteLogSegmentId remoteLogSegmentId = remoteLogSegmentMetadata.remoteLogSegmentId();
+        TopicIdPartition topicIdPartition = remoteLogSegmentId.topicIdPartition();
+
+        // This should have been already existing as it is loaded when the partitions are assigned.
+        RemoteLogMetadataCache remoteLogMetadataCache = idToRemoteLogMetadataCache.get(topicIdPartition);
+        if (remoteLogMetadataCache != null) {
+            remoteLogMetadataCache.addCopyInProgressSegment(remoteLogSegmentMetadata);
+        } else {
+            throw new IllegalStateException("No partition metadata found for : " + topicIdPartition);
+        }
+    }
 
-        idToRemoteLogMetadataCache
-                .computeIfAbsent(remoteLogSegmentId.topicIdPartition(), id -> new RemoteLogMetadataCache())
-                .addCopyInProgressSegment(remoteLogSegmentMetadata);
+    private Path partitionLogDirectory(TopicPartition topicPartition) {
+        return new File(logDir.toFile(), topicPartition.topic() + "-" + topicPartition.partition()).toPath();
     }
 
     @Override
@@ -69,10 +87,10 @@ public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHa
             try {
                 remoteLogMetadataCache.updateRemoteLogSegmentMetadata(rlsmUpdate);
             } catch (RemoteResourceNotFoundException e) {
-                log.error("Error occurred while updating the remote log segment.");
+                log.warn("Error occurred while updating the remote log segment.", e);
             }
         } else {
-            log.error("No partition metadata found for : " + topicIdPartition);
+            throw new IllegalStateException("No partition metadata found for : " + topicIdPartition);
         }
     }
 
@@ -91,6 +109,27 @@ public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHa
         }
     }
 
+    @Override
+    public void syncLogMetadataSnapshot(TopicIdPartition topicIdPartition,
+                                        int metadataPartition,
+                                        Long metadataPartitionOffset) throws IOException {
+        RemotePartitionDeleteMetadata partitionDeleteMetadata = idToPartitionDeleteMetadata.get(topicIdPartition);
+        if (partitionDeleteMetadata != null) {
+            log.info("Skipping syncing of metadata snapshot as remote partition [{}] is with state: [{}] ", topicIdPartition,
+                     partitionDeleteMetadata);
+        } else {
+            FileBasedRemoteLogMetadataCache remoteLogMetadataCache = idToRemoteLogMetadataCache.get(topicIdPartition);
+            if (remoteLogMetadataCache != null) {
+                remoteLogMetadataCache.flushToFile(metadataPartition, metadataPartitionOffset);
+            }
+        }
+    }
+
+    @Override
+    public void clearTopicPartition(TopicIdPartition topicIdPartition) {
+        idToRemoteLogMetadataCache.remove(topicIdPartition);
+    }
+
     public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition)
             throws RemoteStorageException {
         Objects.requireNonNull(topicIdPartition, "topicIdPartition can not be null");
@@ -105,9 +144,9 @@ public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHa
         return getRemoteLogMetadataCache(topicIdPartition).listRemoteLogSegments(leaderEpoch);
     }
 
-    private RemoteLogMetadataCache getRemoteLogMetadataCache(TopicIdPartition topicIdPartition)
+    private FileBasedRemoteLogMetadataCache getRemoteLogMetadataCache(TopicIdPartition topicIdPartition)
             throws RemoteResourceNotFoundException {
-        RemoteLogMetadataCache remoteLogMetadataCache = idToRemoteLogMetadataCache.get(topicIdPartition);
+        FileBasedRemoteLogMetadataCache remoteLogMetadataCache = idToRemoteLogMetadataCache.get(topicIdPartition);
         if (remoteLogMetadataCache == null) {
             throw new RemoteResourceNotFoundException("No resource found for partition: " + topicIdPartition);
         }
@@ -140,4 +179,10 @@ public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHa
         idToPartitionDeleteMetadata = Collections.emptyMap();
         idToRemoteLogMetadataCache = Collections.emptyMap();
     }
+
+    public void maybeLoadPartition(TopicIdPartition partition) {
+        idToRemoteLogMetadataCache.computeIfAbsent(partition,
+            topicIdPartition -> new FileBasedRemoteLogMetadataCache(topicIdPartition, partitionLogDirectory(topicIdPartition.topicPartition())));
+    }
+
 }
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
index b6bfd64..0271780 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManager.java
@@ -39,6 +39,7 @@ import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
 import java.time.Duration;
 import java.util.Collections;
@@ -72,6 +73,7 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
     private final AtomicBoolean closing = new AtomicBoolean(false);
     private final AtomicBoolean initialized = new AtomicBoolean(false);
     private final Time time = Time.SYSTEM;
+    private final boolean startConsumerThread;
 
     private Thread initializationThread;
     private volatile ProducerManager producerManager;
@@ -81,12 +83,21 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
     // requests calling different methods which use the resources like producer/consumer managers.
     private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
 
-    private final RemotePartitionMetadataStore remotePartitionMetadataStore = new RemotePartitionMetadataStore();
+    private RemotePartitionMetadataStore remotePartitionMetadataStore;
     private volatile TopicBasedRemoteLogMetadataManagerConfig rlmmConfig;
     private volatile RemoteLogMetadataTopicPartitioner rlmmTopicPartitioner;
     private final Set<TopicIdPartition> pendingAssignPartitions = Collections.synchronizedSet(new HashSet<>());
     private volatile boolean initializationFailed;
 
+    public TopicBasedRemoteLogMetadataManager() {
+        this(true);
+    }
+
+    // Visible for testing.
+    public TopicBasedRemoteLogMetadataManager(boolean startConsumerThread) {
+        this.startConsumerThread = startConsumerThread;
+    }
+
     @Override
     public CompletableFuture<Void> addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata)
             throws RemoteStorageException {
@@ -267,26 +278,34 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
         log.info("Received leadership notifications with leader partitions {} and follower partitions {}",
                  leaderPartitions, followerPartitions);
 
-        HashSet<TopicIdPartition> allPartitions = new HashSet<>(leaderPartitions);
-        allPartitions.addAll(followerPartitions);
         lock.readLock().lock();
         try {
             if (closing.get()) {
                 throw new IllegalStateException("This instance is in closing state");
             }
 
+            HashSet<TopicIdPartition> allPartitions = new HashSet<>(leaderPartitions);
+            allPartitions.addAll(followerPartitions);
             if (!initialized.get()) {
                 // If it is not yet initialized, then keep them as pending partitions and assign them
                 // when it is initialized successfully in initializeResources().
                 this.pendingAssignPartitions.addAll(allPartitions);
             } else {
-                consumerManager.addAssignmentsForPartitions(allPartitions);
+                assignPartitions(allPartitions);
             }
         } finally {
             lock.readLock().unlock();
         }
     }
 
+    private void assignPartitions(Set<TopicIdPartition> allPartitions) {
+        for (TopicIdPartition partition : allPartitions) {
+            remotePartitionMetadataStore.maybeLoadPartition(partition);
+        }
+
+        consumerManager.addAssignmentsForPartitions(allPartitions);
+    }
+
     @Override
     public void onStopPartitions(Set<TopicIdPartition> partitions) {
         lock.readLock().lock();
@@ -323,6 +342,7 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
 
             rlmmConfig = new TopicBasedRemoteLogMetadataManagerConfig(configs);
             rlmmTopicPartitioner = new RemoteLogMetadataTopicPartitioner(rlmmConfig.metadataTopicPartitionsCount());
+            remotePartitionMetadataStore = new RemotePartitionMetadataStore(new File(rlmmConfig.logDir()).toPath());
             configured = true;
             log.info("Successfully initialized with rlmmConfig: {}", rlmmConfig);
 
@@ -385,10 +405,14 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
                 try {
                     producerManager = new ProducerManager(rlmmConfig, rlmmTopicPartitioner);
                     consumerManager = new ConsumerManager(rlmmConfig, remotePartitionMetadataStore, rlmmTopicPartitioner, time);
-                    consumerManager.startConsumerThread();
+                    if (startConsumerThread) {
+                        consumerManager.startConsumerThread();
+                    } else {
+                        log.info("RLMM Consumer task thread is not configured to be started.");
+                    }
 
                     if (!pendingAssignPartitions.isEmpty()) {
-                        consumerManager.addAssignmentsForPartitions(pendingAssignPartitions);
+                        assignPartitions(pendingAssignPartitions);
                         pendingAssignPartitions.clear();
                     }
 
@@ -476,6 +500,18 @@ public class TopicBasedRemoteLogMetadataManager implements RemoteLogMetadataMana
         }
     }
 
+    // Visible for testing.
+    public TopicBasedRemoteLogMetadataManagerConfig config() {
+        return rlmmConfig;
+    }
+
+    // Visible for testing.
+    public void startConsumerThread() {
+        if (consumerManager != null) {
+            consumerManager.startConsumerThread();
+        }
+    }
+
     @Override
     public void close() throws IOException {
         // Close all the resources.
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
index 3fd6b2b..7e52519 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfig.java
@@ -74,6 +74,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
     public static final String REMOTE_LOG_METADATA_PRODUCER_PREFIX = "remote.log.metadata.producer.";
     public static final String REMOTE_LOG_METADATA_CONSUMER_PREFIX = "remote.log.metadata.consumer.";
     public static final String BROKER_ID = "broker.id";
+    public static final String LOG_DIR = "log.dir";
 
     private static final String REMOTE_LOG_METADATA_CLIENT_PREFIX = "__remote_log_metadata_client";
 
@@ -97,6 +98,7 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
 
     private final String clientIdPrefix;
     private final int metadataTopicPartitionsCount;
+    private final String logDir;
     private final long consumeWaitMs;
     private final long metadataTopicRetentionMs;
     private final short metadataTopicReplicationFactor;
@@ -111,6 +113,11 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
 
         Map<String, Object> parsedConfigs = CONFIG.parse(props);
 
+        logDir = (String) props.get(LOG_DIR);
+        if (logDir == null || logDir.isEmpty()) {
+            throw new IllegalArgumentException(LOG_DIR + " config must not be null or empty.");
+        }
+
         metadataTopicPartitionsCount = (int) parsedConfigs.get(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP);
         metadataTopicReplicationFactor = (short) parsedConfigs.get(REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP);
         metadataTopicRetentionMs = (long) parsedConfigs.get(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP);
@@ -179,6 +186,10 @@ public final class TopicBasedRemoteLogMetadataManagerConfig {
         return initializationRetryIntervalMs;
     }
 
+    public String logDir() {
+        return logDir;
+    }
+
     public Map<String, Object> consumerProperties() {
         return consumerProps;
     }
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java
index 94975b9..4a63b56 100644
--- a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogMetadataSerde.java
@@ -19,8 +19,10 @@ package org.apache.kafka.server.log.remote.metadata.storage.serialization;
 import org.apache.kafka.common.protocol.ApiMessage;
 import org.apache.kafka.server.common.ApiMessageAndVersion;
 import org.apache.kafka.server.common.serialization.BytesApiMessageSerde;
+import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogSegmentMetadataSnapshot;
 import org.apache.kafka.server.log.remote.metadata.storage.generated.MetadataRecordType;
 import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
+import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataSnapshotRecord;
 import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataUpdateRecord;
 import org.apache.kafka.server.log.remote.metadata.storage.generated.RemotePartitionDeleteMetadataRecord;
 import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata;
@@ -39,6 +41,7 @@ public class RemoteLogMetadataSerde {
     private static final short REMOTE_LOG_SEGMENT_METADATA_API_KEY = new RemoteLogSegmentMetadataRecord().apiKey();
     private static final short REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY = new RemoteLogSegmentMetadataUpdateRecord().apiKey();
     private static final short REMOTE_PARTITION_DELETE_API_KEY = new RemotePartitionDeleteMetadataRecord().apiKey();
+    private static final short REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY = new RemoteLogSegmentMetadataSnapshotRecord().apiKey();
 
     private final Map<String, Short> remoteLogStorageClassToApiKey;
     private final Map<Short, RemoteLogMetadataTransform> keyToTransform;
@@ -64,6 +67,7 @@ public class RemoteLogMetadataSerde {
         map.put(REMOTE_LOG_SEGMENT_METADATA_API_KEY, new RemoteLogSegmentMetadataTransform());
         map.put(REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY, new RemoteLogSegmentMetadataUpdateTransform());
         map.put(REMOTE_PARTITION_DELETE_API_KEY, new RemotePartitionDeleteMetadataTransform());
+        map.put(REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY, new RemoteLogSegmentMetadataSnapshotTransform());
         return map;
     }
 
@@ -72,6 +76,7 @@ public class RemoteLogMetadataSerde {
         map.put(RemoteLogSegmentMetadata.class.getName(), REMOTE_LOG_SEGMENT_METADATA_API_KEY);
         map.put(RemoteLogSegmentMetadataUpdate.class.getName(), REMOTE_LOG_SEGMENT_METADATA_UPDATE_API_KEY);
         map.put(RemotePartitionDeleteMetadata.class.getName(), REMOTE_PARTITION_DELETE_API_KEY);
+        map.put(RemoteLogSegmentMetadataSnapshot.class.getName(), REMOTE_LOG_SEGMENT_METADATA_SNAPSHOT_API_KEY);
         return map;
     }
 
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
new file mode 100644
index 0000000..bd613f8
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataSnapshotTransform.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.log.remote.metadata.storage.RemoteLogSegmentMetadataSnapshot;
+import org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataSnapshotRecord;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class RemoteLogSegmentMetadataSnapshotTransform implements RemoteLogMetadataTransform<RemoteLogSegmentMetadataSnapshot> {
+
+    public ApiMessageAndVersion toApiMessageAndVersion(RemoteLogSegmentMetadataSnapshot segmentMetadata) {
+        RemoteLogSegmentMetadataSnapshotRecord record = new RemoteLogSegmentMetadataSnapshotRecord()
+                .setSegmentId(segmentMetadata.segmentId())
+                .setStartOffset(segmentMetadata.startOffset())
+                .setEndOffset(segmentMetadata.endOffset())
+                .setBrokerId(segmentMetadata.brokerId())
+                .setEventTimestampMs(segmentMetadata.eventTimestampMs())
+                .setMaxTimestampMs(segmentMetadata.maxTimestampMs())
+                .setSegmentSizeInBytes(segmentMetadata.segmentSizeInBytes())
+                .setSegmentLeaderEpochs(createSegmentLeaderEpochsEntry(segmentMetadata.segmentLeaderEpochs()))
+                .setRemoteLogSegmentState(segmentMetadata.state().id());
+
+        return new ApiMessageAndVersion(record, record.highestSupportedVersion());
+    }
+
+    private List<RemoteLogSegmentMetadataSnapshotRecord.SegmentLeaderEpochEntry> createSegmentLeaderEpochsEntry(Map<Integer, Long> leaderEpochs) {
+        return leaderEpochs.entrySet().stream()
+                           .map(entry -> new RemoteLogSegmentMetadataSnapshotRecord.SegmentLeaderEpochEntry()
+                           .setLeaderEpoch(entry.getKey())
+                           .setOffset(entry.getValue()))
+                           .collect(Collectors.toList());
+    }
+
+    @Override
+    public RemoteLogSegmentMetadataSnapshot fromApiMessageAndVersion(ApiMessageAndVersion apiMessageAndVersion) {
+        RemoteLogSegmentMetadataSnapshotRecord record = (RemoteLogSegmentMetadataSnapshotRecord) apiMessageAndVersion.message();
+        Map<Integer, Long> segmentLeaderEpochs = new HashMap<>();
+        for (RemoteLogSegmentMetadataSnapshotRecord.SegmentLeaderEpochEntry segmentLeaderEpoch : record.segmentLeaderEpochs()) {
+            segmentLeaderEpochs.put(segmentLeaderEpoch.leaderEpoch(), segmentLeaderEpoch.offset());
+        }
+
+        return new RemoteLogSegmentMetadataSnapshot(record.segmentId(),
+                                                    record.startOffset(),
+                                                    record.endOffset(),
+                                                    record.maxTimestampMs(),
+                                                    record.brokerId(),
+                                                    record.eventTimestampMs(),
+                                                    record.segmentSizeInBytes(),
+                                                    RemoteLogSegmentState.forId(record.remoteLogSegmentState()),
+                                                    segmentLeaderEpochs);
+    }
+
+}
diff --git a/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json b/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
new file mode 100644
index 0000000..dbb2913
--- /dev/null
+++ b/storage/src/main/resources/message/RemoteLogSegmentMetadataSnapshotRecord.json
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//    http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 3,
+  "type": "metadata",
+  "name": "RemoteLogSegmentMetadataSnapshotRecord",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+    {
+      "name": "SegmentId",
+      "type": "uuid",
+      "versions": "0+",
+      "about": "Unique identifier of the log segment"
+    },
+    {
+      "name": "StartOffset",
+      "type": "int64",
+      "versions": "0+",
+      "about": "Start offset  of the segment."
+    },
+    {
+      "name": "EndOffset",
+      "type": "int64",
+      "versions": "0+",
+      "about": "End offset  of the segment."
+    },
+    {
+      "name": "BrokerId",
+      "type": "int32",
+      "versions": "0+",
+      "about": "Broker (controller or leader) id from which this event is created or updated."
+    },
+    {
+      "name": "MaxTimestampMs",
+      "type": "int64",
+      "versions": "0+",
+      "about": "Maximum timestamp with in this segment."
+    },
+    {
+      "name": "EventTimestampMs",
+      "type": "int64",
+      "versions": "0+",
+      "about": "Event timestamp of this segment."
+    },
+    {
+      "name": "SegmentLeaderEpochs",
+      "type": "[]SegmentLeaderEpochEntry",
+      "versions": "0+",
+      "about": "Leader epochs of this segment.",
+      "fields": [
+        {
+          "name": "LeaderEpoch",
+          "type": "int32",
+          "versions": "0+",
+          "about": "Leader epoch"
+        },
+        {
+          "name": "Offset",
+          "type": "int64",
+          "versions": "0+",
+          "about": "Start offset for the leader epoch"
+        }
+      ]
+    },
+    {
+      "name": "SegmentSizeInBytes",
+      "type": "int32",
+      "versions": "0+",
+      "about": "Segment size in bytes"
+    },
+    {
+      "name": "RemoteLogSegmentState",
+      "type": "int8",
+      "versions": "0+",
+      "about": "State of the remote log segment"
+    }
+  ]
+}
\ No newline at end of file
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java
new file mode 100644
index 0000000..5f77417
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/FileBasedRemoteLogMetadataCacheTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Test;
+
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.Optional;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class FileBasedRemoteLogMetadataCacheTest {
+
+    @Test
+    public void testFileBasedRemoteLogMetadataCacheWithUnreferencedSegments() throws Exception {
+        TopicIdPartition partition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("test", 0));
+        int brokerId = 0;
+        Path path = TestUtils.tempDirectory().toPath();
+
+        // Create file based metadata cache.
+        FileBasedRemoteLogMetadataCache cache = new FileBasedRemoteLogMetadataCache(partition, path);
+
+        // Add a segment with start offset as 0 for leader epoch 0.
+        RemoteLogSegmentId segmentId1 = new RemoteLogSegmentId(partition, Uuid.randomUuid());
+        RemoteLogSegmentMetadata metadata1 = new RemoteLogSegmentMetadata(segmentId1,
+                                                                          0, 100, System.currentTimeMillis(), brokerId, System.currentTimeMillis(),
+                                                                          1024 * 1024, Collections.singletonMap(0, 0L));
+        cache.addCopyInProgressSegment(metadata1);
+        RemoteLogSegmentMetadataUpdate metadataUpdate1 = new RemoteLogSegmentMetadataUpdate(segmentId1, System.currentTimeMillis(),
+                                                                                           RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+        cache.updateRemoteLogSegmentMetadata(metadataUpdate1);
+        Optional<RemoteLogSegmentMetadata> receivedMetadata = cache.remoteLogSegmentMetadata(0, 0L);
+        assertTrue(receivedMetadata.isPresent());
+        assertEquals(metadata1.createWithUpdates(metadataUpdate1), receivedMetadata.get());
+
+        // Add a new segment with start offset as 0 for leader epoch 0, which should replace the earlier segment.
+        RemoteLogSegmentId segmentId2 = new RemoteLogSegmentId(partition, Uuid.randomUuid());
+        RemoteLogSegmentMetadata metadata2 = new RemoteLogSegmentMetadata(segmentId2,
+                                                                          0, 900, System.currentTimeMillis(), brokerId, System.currentTimeMillis(),
+                                                                          1024 * 1024, Collections.singletonMap(0, 0L));
+        cache.addCopyInProgressSegment(metadata2);
+        RemoteLogSegmentMetadataUpdate metadataUpdate2 = new RemoteLogSegmentMetadataUpdate(segmentId2, System.currentTimeMillis(),
+                                                                                           RemoteLogSegmentState.COPY_SEGMENT_FINISHED, brokerId);
+        cache.updateRemoteLogSegmentMetadata(metadataUpdate2);
+
+        // Fetch segment for leader epoch:0 and start offset:0, it should be the newly added segment.
+        Optional<RemoteLogSegmentMetadata> receivedMetadata2 = cache.remoteLogSegmentMetadata(0, 0L);
+        assertTrue(receivedMetadata2.isPresent());
+        assertEquals(metadata2.createWithUpdates(metadataUpdate2), receivedMetadata2.get());
+        // Flush the cache to the file.
+        cache.flushToFile(0, 0L);
+
+        // Create a new cache with loading from the stored path.
+        FileBasedRemoteLogMetadataCache loadedCache = new FileBasedRemoteLogMetadataCache(partition, path);
+
+        // Fetch segment for leader epoch:0 and start offset:0, it should be metadata2.
+        // This ensures that the ordering of metadata is taken care after loading from the stored snapshots.
+        Optional<RemoteLogSegmentMetadata> receivedMetadataAfterLoad = loadedCache.remoteLogSegmentMetadata(0, 0L);
+        assertTrue(receivedMetadataAfterLoad.isPresent());
+        assertEquals(metadata2.createWithUpdates(metadataUpdate2), receivedMetadataAfterLoad.get());
+    }
+}
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java
new file mode 100644
index 0000000..1b46028
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFileTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+
+public class RemoteLogMetadataSnapshotFileTest {
+
+    @Test
+    public void testEmptyCommittedLogMetadataFile() throws Exception {
+        File metadataStoreDir = TestUtils.tempDirectory("_rlmm_committed");
+        RemoteLogMetadataSnapshotFile snapshotFile = new RemoteLogMetadataSnapshotFile(metadataStoreDir.toPath());
+
+        // There should be an empty snapshot as nothing is written into it.
+        Assertions.assertFalse(snapshotFile.read().isPresent());
+    }
+
+    @Test
+    public void testEmptySnapshotWithCommittedLogMetadataFile() throws Exception {
+        File metadataStoreDir = TestUtils.tempDirectory("_rlmm_committed");
+        RemoteLogMetadataSnapshotFile snapshotFile = new RemoteLogMetadataSnapshotFile(metadataStoreDir.toPath());
+
+        snapshotFile.write(new RemoteLogMetadataSnapshotFile.Snapshot(0, 0L, Collections.emptyList()));
+
+        // There should be an empty snapshot as the written snapshot did not have any remote log segment metadata.
+        Assertions.assertTrue(snapshotFile.read().isPresent());
+        Assertions.assertTrue(snapshotFile.read().get().remoteLogSegmentMetadataSnapshots().isEmpty());
+    }
+
+    @Test
+    public void testWriteReadCommittedLogMetadataFile() throws Exception {
+        File metadataStoreDir = TestUtils.tempDirectory("_rlmm_committed");
+        RemoteLogMetadataSnapshotFile snapshotFile = new RemoteLogMetadataSnapshotFile(metadataStoreDir.toPath());
+
+        List<RemoteLogSegmentMetadataSnapshot> remoteLogSegmentMetadatas = new ArrayList<>();
+        long startOffset = 0;
+        for (int i = 0; i < 100; i++) {
+            long endOffset = startOffset + 100L;
+            remoteLogSegmentMetadatas.add(
+                    new RemoteLogSegmentMetadataSnapshot(Uuid.randomUuid(), startOffset, endOffset,
+                                                         System.currentTimeMillis(), 1, 100, 1024,
+                                                         RemoteLogSegmentState.COPY_SEGMENT_FINISHED, Collections.singletonMap(i, startOffset)));
+            startOffset = endOffset + 1;
+        }
+
+        RemoteLogMetadataSnapshotFile.Snapshot snapshot = new RemoteLogMetadataSnapshotFile.Snapshot(0, 120,
+                                                                                                     remoteLogSegmentMetadatas);
+        snapshotFile.write(snapshot);
+
+        Optional<RemoteLogMetadataSnapshotFile.Snapshot> maybeReadSnapshot = snapshotFile.read();
+        Assertions.assertTrue(maybeReadSnapshot.isPresent());
+
+        Assertions.assertEquals(snapshot, maybeReadSnapshot.get());
+        Assertions.assertEquals(new HashSet<>(snapshot.remoteLogSegmentMetadataSnapshots()),
+                                new HashSet<>(maybeReadSnapshot.get().remoteLogSegmentMetadataSnapshots()));
+    }
+}
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
index 53d79a0..8272b8d 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogSegmentLifecycleTest.java
@@ -417,7 +417,7 @@ public class RemoteLogSegmentLifecycleTest {
      * This is passed to {@link #testRemoteLogSegmentLifeCycle(RemoteLogSegmentLifecycleManager)} to test
      * {@code RemoteLogMetadataCache} for several lifecycle operations.
      * <p>
-     * This starts a Kafka cluster with {@link #initialize(Set)} with {@link #brokerCount()} no of servers. It also
+     * This starts a Kafka cluster with {@link #initialize(Set, boolean)} )} with {@link #brokerCount()} no of servers. It also
      * creates the remote log metadata topic required for {@code TopicBasedRemoteLogMetadataManager}. This cluster will
      * be stopped by invoking {@link #close()}.
      */
@@ -428,14 +428,14 @@ public class RemoteLogSegmentLifecycleTest {
         @Override
         public synchronized void initialize(TopicIdPartition topicIdPartition) {
             this.topicIdPartition = topicIdPartition;
-            super.initialize(Collections.singleton(topicIdPartition));
+            super.initialize(Collections.singleton(topicIdPartition), true);
         }
 
         @Override
         public void addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata) throws RemoteStorageException {
             try {
                 // Wait until the segment is added successfully.
-                topicBasedRlmm().addRemoteLogSegmentMetadata(segmentMetadata).get();
+                remoteLogMetadataManager().addRemoteLogSegmentMetadata(segmentMetadata).get();
             } catch (Exception e) {
                 throw new RemoteStorageException(e);
             }
@@ -445,7 +445,7 @@ public class RemoteLogSegmentLifecycleTest {
         public void updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate segmentMetadataUpdate) throws RemoteStorageException {
             try {
                 // Wait until the segment is updated successfully.
-                topicBasedRlmm().updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get();
+                remoteLogMetadataManager().updateRemoteLogSegmentMetadata(segmentMetadataUpdate).get();
             } catch (Exception e) {
                 throw new RemoteStorageException(e);
             }
@@ -453,23 +453,23 @@ public class RemoteLogSegmentLifecycleTest {
 
         @Override
         public Optional<Long> highestOffsetForEpoch(int leaderEpoch) throws RemoteStorageException {
-            return topicBasedRlmm().highestOffsetForEpoch(topicIdPartition, leaderEpoch);
+            return remoteLogMetadataManager().highestOffsetForEpoch(topicIdPartition, leaderEpoch);
         }
 
         @Override
         public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(int leaderEpoch,
                                                                            long offset) throws RemoteStorageException {
-            return topicBasedRlmm().remoteLogSegmentMetadata(topicIdPartition, leaderEpoch, offset);
+            return remoteLogMetadataManager().remoteLogSegmentMetadata(topicIdPartition, leaderEpoch, offset);
         }
 
         @Override
         public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(int leaderEpoch) throws RemoteStorageException {
-            return topicBasedRlmm().listRemoteLogSegments(topicIdPartition, leaderEpoch);
+            return remoteLogMetadataManager().listRemoteLogSegments(topicIdPartition, leaderEpoch);
         }
 
         @Override
         public Iterator<RemoteLogSegmentMetadata> listAllRemoteLogSegments() throws RemoteStorageException {
-            return topicBasedRlmm().listRemoteLogSegments(topicIdPartition);
+            return remoteLogMetadataManager().listRemoteLogSegments(topicIdPartition);
         }
 
         @Override
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
index 7e1930d..3785c8d 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerConfigTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.server.log.remote.metadata.storage;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.test.TestUtils;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
@@ -28,6 +29,8 @@ import java.util.AbstractMap;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID;
+import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR;
 import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
 import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_CONSUMER_PREFIX;
 import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_PRODUCER_PREFIX;
@@ -116,7 +119,8 @@ public class TopicBasedRemoteLogMetadataManagerConfigTest {
                                                        Map<String, Object> consumerConfig) {
         Map<String, Object> props = new HashMap<>();
         props.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
-        props.put("broker.id", 1);
+        props.put(BROKER_ID, 1);
+        props.put(LOG_DIR, TestUtils.tempDirectory().getAbsolutePath());
 
         props.put(REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP, (short) 3);
         props.put(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, 10);
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
index 6bc56b5..31dd43f 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerHarness.java
@@ -18,24 +18,32 @@ package org.apache.kafka.server.log.remote.metadata.storage;
 
 import kafka.api.IntegrationTestHarness;
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.BROKER_ID;
 import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX;
+import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR;
 import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP;
 import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP;
 import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP;
 
+/**
+ * A test harness class that brings up 3 brokers and registers {@link TopicBasedRemoteLogMetadataManager} on broker with id as 0.
+ */
 public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHarness {
     private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerHarness.class);
 
@@ -49,16 +57,42 @@ public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHa
         return Collections.emptyMap();
     }
 
-    public void initialize(Set<TopicIdPartition> topicIdPartitions) {
+    public void initialize(Set<TopicIdPartition> topicIdPartitions,
+                           boolean startConsumerThread) {
         // Call setup to start the cluster.
         super.setUp();
 
-        topicBasedRemoteLogMetadataManager = new TopicBasedRemoteLogMetadataManager();
+        initializeRemoteLogMetadataManager(topicIdPartitions, startConsumerThread);
+    }
+
+    public void initializeRemoteLogMetadataManager(Set<TopicIdPartition> topicIdPartitions,
+                                                   boolean startConsumerThread) {
+        String logDir = TestUtils.tempDirectory("rlmm_segs_").getAbsolutePath();
+        topicBasedRemoteLogMetadataManager = new TopicBasedRemoteLogMetadataManager(startConsumerThread) {
+            @Override
+            public void onPartitionLeadershipChanges(Set<TopicIdPartition> leaderPartitions,
+                                                     Set<TopicIdPartition> followerPartitions) {
+                Set<TopicIdPartition> allReplicas = new HashSet<>(leaderPartitions);
+                allReplicas.addAll(followerPartitions);
+                // Make sure the topic partition dirs exist as the topics might not have been created on this broker.
+                for (TopicIdPartition topicIdPartition : allReplicas) {
+                    // Create partition directory in the log directory created by topicBasedRemoteLogMetadataManager.
+                    File partitionDir = new File(new File(config().logDir()), topicIdPartition.topicPartition().topic() + "-" + topicIdPartition.topicPartition().partition());
+                    partitionDir.mkdirs();
+                    if (!partitionDir.exists()) {
+                        throw new KafkaException("Partition directory:[" + partitionDir + "] could not be created successfully.");
+                    }
+                }
+
+                super.onPartitionLeadershipChanges(leaderPartitions, followerPartitions);
+            }
+        };
 
         // Initialize TopicBasedRemoteLogMetadataManager.
         Map<String, Object> configs = new HashMap<>();
         configs.put(REMOTE_LOG_METADATA_COMMON_CLIENT_PREFIX + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, brokerList());
         configs.put(BROKER_ID, 0);
+        configs.put(LOG_DIR, logDir);
         configs.put(REMOTE_LOG_METADATA_TOPIC_PARTITIONS_PROP, METADATA_TOPIC_PARTITIONS_COUNT);
         configs.put(REMOTE_LOG_METADATA_TOPIC_REPLICATION_FACTOR_PROP, METADATA_TOPIC_REPLICATION_FACTOR);
         configs.put(REMOTE_LOG_METADATA_TOPIC_RETENTION_MS_PROP, METADATA_TOPIC_RETENTION_MS);
@@ -72,7 +106,7 @@ public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHa
         try {
             waitUntilInitialized(60_000);
         } catch (TimeoutException e) {
-            throw new RuntimeException(e);
+            throw new KafkaException(e);
         }
 
         topicBasedRemoteLogMetadataManager.onPartitionLeadershipChanges(topicIdPartitions, Collections.emptySet());
@@ -96,14 +130,18 @@ public class TopicBasedRemoteLogMetadataManagerHarness extends IntegrationTestHa
         return 3;
     }
 
-    protected TopicBasedRemoteLogMetadataManager topicBasedRlmm() {
+    protected TopicBasedRemoteLogMetadataManager remoteLogMetadataManager() {
         return topicBasedRemoteLogMetadataManager;
     }
 
     public void close() throws IOException {
-        Utils.closeQuietly(topicBasedRemoteLogMetadataManager, "TopicBasedRemoteLogMetadataManager");
+        closeRemoteLogMetadataManager();
 
         // Stop the servers and zookeeper.
         tearDown();
     }
+
+    public void closeRemoteLogMetadataManager() {
+        Utils.closeQuietly(topicBasedRemoteLogMetadataManager, "TopicBasedRemoteLogMetadataManager");
+    }
 }
\ No newline at end of file
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
new file mode 100644
index 0000000..2c7baf8
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerRestartTest.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static org.apache.kafka.server.log.remote.metadata.storage.ConsumerManager.COMMITTED_OFFSETS_FILE_NAME;
+import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.LOG_DIR;
+
+@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters
+public class TopicBasedRemoteLogMetadataManagerRestartTest {
+
+    private static final int SEG_SIZE = 1024 * 1024;
+
+    private final Time time = new MockTime(1);
+    private final String logDir = TestUtils.tempDirectory("_rlmm_segs_").getAbsolutePath();
+
+    private TopicBasedRemoteLogMetadataManagerHarness remoteLogMetadataManagerHarness;
+
+    @BeforeEach
+    public void setup() {
+        // Start the cluster and initialize TopicBasedRemoteLogMetadataManager.
+        remoteLogMetadataManagerHarness = new TopicBasedRemoteLogMetadataManagerHarness() {
+            protected Map<String, Object> overrideRemoteLogMetadataManagerProps() {
+                Map<String, Object> props = new HashMap<>();
+                props.put(LOG_DIR, logDir);
+                return props;
+            }
+        };
+        remoteLogMetadataManagerHarness.initialize(Collections.emptySet(), true);
+    }
+
+    private void startTopicBasedRemoteLogMetadataManagerHarness(boolean startConsumerThread) {
+        remoteLogMetadataManagerHarness.initializeRemoteLogMetadataManager(Collections.emptySet(), startConsumerThread);
+    }
+
+    @AfterEach
+    public void teardown() throws IOException {
+        if (remoteLogMetadataManagerHarness != null) {
+            remoteLogMetadataManagerHarness.close();
+        }
+    }
+
+    private void stopTopicBasedRemoteLogMetadataManagerHarness() throws IOException {
+        remoteLogMetadataManagerHarness.closeRemoteLogMetadataManager();
+    }
+
+    private TopicBasedRemoteLogMetadataManager topicBasedRlmm() {
+        return remoteLogMetadataManagerHarness.remoteLogMetadataManager();
+    }
+
+    @Test
+    public void testRLMMAPIsAfterRestart() throws Exception {
+        // Create topics.
+        String leaderTopic = "new-leader";
+        HashMap<Object, Seq<Object>> assignedLeaderTopicReplicas = new HashMap<>();
+        List<Object> leaderTopicReplicas = new ArrayList<>();
+        // Set broker id 0 as the first entry which is taken as the leader.
+        leaderTopicReplicas.add(0);
+        leaderTopicReplicas.add(1);
+        leaderTopicReplicas.add(2);
+        assignedLeaderTopicReplicas.put(0, JavaConverters.asScalaBuffer(leaderTopicReplicas));
+        remoteLogMetadataManagerHarness.createTopic(leaderTopic, JavaConverters.mapAsScalaMap(assignedLeaderTopicReplicas));
+
+        String followerTopic = "new-follower";
+        HashMap<Object, Seq<Object>> assignedFollowerTopicReplicas = new HashMap<>();
+        List<Object> followerTopicReplicas = new ArrayList<>();
+        // Set broker id 1 as the first entry which is taken as the leader.
+        followerTopicReplicas.add(1);
+        followerTopicReplicas.add(2);
+        followerTopicReplicas.add(0);
+        assignedFollowerTopicReplicas.put(0, JavaConverters.asScalaBuffer(followerTopicReplicas));
+        remoteLogMetadataManagerHarness.createTopic(followerTopic, JavaConverters.mapAsScalaMap(assignedFollowerTopicReplicas));
+
+        final TopicIdPartition leaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0));
+        final TopicIdPartition followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0));
+
+        // Register these partitions to RLMM.
+        topicBasedRlmm().onPartitionLeadershipChanges(Collections.singleton(leaderTopicIdPartition), Collections.singleton(followerTopicIdPartition));
+
+        // Add segments for these partitions but they are not available as they have not yet been subscribed.
+        RemoteLogSegmentMetadata leaderSegmentMetadata = new RemoteLogSegmentMetadata(
+                new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
+                0, 100, -1L, 0,
+                time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
+        topicBasedRlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata).get();
+
+        RemoteLogSegmentMetadata followerSegmentMetadata = new RemoteLogSegmentMetadata(
+                new RemoteLogSegmentId(followerTopicIdPartition, Uuid.randomUuid()),
+                0, 100, -1L, 0,
+                time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 0L));
+        topicBasedRlmm().addRemoteLogSegmentMetadata(followerSegmentMetadata).get();
+
+        // Stop TopicBasedRemoteLogMetadataManager only.
+        stopTopicBasedRemoteLogMetadataManagerHarness();
+
+        // Start TopicBasedRemoteLogMetadataManager but do not start consumer thread to check whether the stored metadata is
+        // loaded successfully or not.
+        startTopicBasedRemoteLogMetadataManagerHarness(false);
+
+        // Register these partitions to RLMM, which loads the respective metadata snapshots.
+        topicBasedRlmm().onPartitionLeadershipChanges(Collections.singleton(leaderTopicIdPartition), Collections.singleton(followerTopicIdPartition));
+
+        // Check for the stored entries from the earlier run.
+        Assertions.assertTrue(TestUtils.sameElementsWithoutOrder(Collections.singleton(leaderSegmentMetadata).iterator(),
+                                                                 topicBasedRlmm().listRemoteLogSegments(leaderTopicIdPartition)));
+        Assertions.assertTrue(TestUtils.sameElementsWithoutOrder(Collections.singleton(followerSegmentMetadata).iterator(),
+                                                                 topicBasedRlmm().listRemoteLogSegments(followerTopicIdPartition)));
+        // Check whether the check-pointed consumer offsets are stored or not.
+        Path committedOffsetsPath = new File(logDir, COMMITTED_OFFSETS_FILE_NAME).toPath();
+        Assertions.assertTrue(committedOffsetsPath.toFile().exists());
+        CommittedOffsetsFile committedOffsetsFile = new CommittedOffsetsFile(committedOffsetsPath.toFile());
+
+        int metadataPartition1 = topicBasedRlmm().metadataPartition(leaderTopicIdPartition);
+        int metadataPartition2 = topicBasedRlmm().metadataPartition(followerTopicIdPartition);
+        Optional<Long> receivedOffsetForPartition1 = topicBasedRlmm().receivedOffsetForPartition(metadataPartition1);
+        Optional<Long> receivedOffsetForPartition2 = topicBasedRlmm().receivedOffsetForPartition(metadataPartition2);
+        Assertions.assertTrue(receivedOffsetForPartition1.isPresent());
+        Assertions.assertTrue(receivedOffsetForPartition2.isPresent());
+
+        // Make sure these offsets are at least 0.
+        Assertions.assertTrue(receivedOffsetForPartition1.get() >= 0);
+        Assertions.assertTrue(receivedOffsetForPartition2.get() >= 0);
+
+        // Check the stored entries and the offsets that were set on consumer are the same.
+        Map<Integer, Long> partitionToOffset = committedOffsetsFile.readEntries();
+        Assertions.assertEquals(partitionToOffset.get(metadataPartition1), receivedOffsetForPartition1.get());
+        Assertions.assertEquals(partitionToOffset.get(metadataPartition2), receivedOffsetForPartition2.get());
+
+        // Start Consumer thread
+        topicBasedRlmm().startConsumerThread();
+
+        // Add one more segment
+        RemoteLogSegmentMetadata leaderSegmentMetadata2 = new RemoteLogSegmentMetadata(
+                new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid()),
+                101, 200, -1L, 0,
+                time.milliseconds(), SEG_SIZE, Collections.singletonMap(0, 101L));
+        topicBasedRlmm().addRemoteLogSegmentMetadata(leaderSegmentMetadata2).get();
+
+        // Check that both the stored segment and recently added segment are available.
+        Assertions.assertTrue(TestUtils.sameElementsWithoutOrder(Arrays.asList(leaderSegmentMetadata, leaderSegmentMetadata2).iterator(),
+                                                                 topicBasedRlmm().listRemoteLogSegments(leaderTopicIdPartition)));
+    }
+
+}
\ No newline at end of file
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
index dce2c26..89b25c6 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerTest.java
@@ -31,31 +31,29 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import scala.collection.JavaConverters;
+import scala.collection.Seq;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
-import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
 import java.util.concurrent.TimeoutException;
 
-import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP;
-
+@SuppressWarnings("deprecation") // Added for Scala 2.12 compatibility for usages of JavaConverters
 public class TopicBasedRemoteLogMetadataManagerTest {
     private static final Logger log = LoggerFactory.getLogger(TopicBasedRemoteLogMetadataManagerTest.class);
 
     private static final int SEG_SIZE = 1024 * 1024;
 
     private final Time time = new MockTime(1);
-    private final TopicBasedRemoteLogMetadataManagerHarness remoteLogMetadataManagerHarness = new TopicBasedRemoteLogMetadataManagerHarness() {
-        @Override
-        protected Map<String, Object> overrideRemoteLogMetadataManagerProps() {
-            return Collections.singletonMap(REMOTE_LOG_METADATA_CONSUME_WAIT_MS_PROP, 5000L);
-        }
-    };
+    private final TopicBasedRemoteLogMetadataManagerHarness remoteLogMetadataManagerHarness = new TopicBasedRemoteLogMetadataManagerHarness();
 
     @BeforeEach
     public void setup() {
         // Start the cluster and initialize TopicBasedRemoteLogMetadataManager.
-        remoteLogMetadataManagerHarness.initialize(Collections.emptySet());
+        remoteLogMetadataManagerHarness.initialize(Collections.emptySet(), true);
     }
 
     @AfterEach
@@ -64,13 +62,41 @@ public class TopicBasedRemoteLogMetadataManagerTest {
     }
 
     public TopicBasedRemoteLogMetadataManager topicBasedRlmm() {
-        return remoteLogMetadataManagerHarness.topicBasedRlmm();
+        return remoteLogMetadataManagerHarness.remoteLogMetadataManager();
+    }
+
+    @Test
+    public void testWithNoAssignedPartitions() throws Exception {
+        // This test checks simple lifecycle of TopicBasedRemoteLogMetadataManager with out assigning any leader/follower partitions.
+        // This should close successfully releasing the resources.
+        log.info("Not assigning any partitions on TopicBasedRemoteLogMetadataManager");
     }
 
     @Test
     public void testNewPartitionUpdates() throws Exception {
-        final TopicIdPartition newLeaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("new-leader", 0));
-        final TopicIdPartition newFollowerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("new-follower", 0));
+        // Create topics.
+        String leaderTopic = "new-leader";
+        HashMap<Object, Seq<Object>> assignedLeaderTopicReplicas = new HashMap<>();
+        List<Object> leaderTopicReplicas = new ArrayList<>();
+        // Set broker id 0 as the first entry which is taken as the leader.
+        leaderTopicReplicas.add(0);
+        leaderTopicReplicas.add(1);
+        leaderTopicReplicas.add(2);
+        assignedLeaderTopicReplicas.put(0, JavaConverters.asScalaBuffer(leaderTopicReplicas));
+        remoteLogMetadataManagerHarness.createTopic(leaderTopic, JavaConverters.mapAsScalaMap(assignedLeaderTopicReplicas));
+
+        String followerTopic = "new-follower";
+        HashMap<Object, Seq<Object>> assignedFollowerTopicReplicas = new HashMap<>();
+        List<Object> followerTopicReplicas = new ArrayList<>();
+        // Set broker id 1 as the first entry which is taken as the leader.
+        followerTopicReplicas.add(1);
+        followerTopicReplicas.add(2);
+        followerTopicReplicas.add(0);
+        assignedFollowerTopicReplicas.put(0, JavaConverters.asScalaBuffer(followerTopicReplicas));
+        remoteLogMetadataManagerHarness.createTopic(followerTopic, JavaConverters.mapAsScalaMap(assignedFollowerTopicReplicas));
+
+        final TopicIdPartition newLeaderTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(leaderTopic, 0));
+        final TopicIdPartition newFollowerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition(followerTopic, 0));
 
         // Add segments for these partitions but an exception is received as they have not yet been subscribed.
         // These messages would have been published to the respective metadata topic partitions but the ConsumerManager
@@ -101,8 +127,8 @@ public class TopicBasedRemoteLogMetadataManagerTest {
     }
 
     private void waitUntilConsumerCatchesup(TopicIdPartition newLeaderTopicIdPartition,
-                                            TopicIdPartition newFollowerTopicIdPartition,
-                                            long timeoutMs) throws TimeoutException {
+                                          TopicIdPartition newFollowerTopicIdPartition,
+                                          long timeoutMs) throws TimeoutException {
         int leaderMetadataPartition = topicBasedRlmm().metadataPartition(newLeaderTopicIdPartition);
         int followerMetadataPartition = topicBasedRlmm().metadataPartition(newFollowerTopicIdPartition);
 
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerWrapperWithHarness.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerWrapperWithHarness.java
index 9c25fb0..ef9287d 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerWrapperWithHarness.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/TopicBasedRemoteLogMetadataManagerWrapperWithHarness.java
@@ -37,63 +37,64 @@ public class TopicBasedRemoteLogMetadataManagerWrapperWithHarness implements Rem
 
     @Override
     public CompletableFuture<Void> addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException {
-        return remoteLogMetadataManagerHarness.topicBasedRlmm().addRemoteLogSegmentMetadata(remoteLogSegmentMetadata);
+        return remoteLogMetadataManagerHarness.remoteLogMetadataManager().addRemoteLogSegmentMetadata(remoteLogSegmentMetadata);
     }
 
     @Override
     public CompletableFuture<Void> updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate) throws RemoteStorageException {
-        return remoteLogMetadataManagerHarness.topicBasedRlmm().updateRemoteLogSegmentMetadata(remoteLogSegmentMetadataUpdate);
+        return remoteLogMetadataManagerHarness.remoteLogMetadataManager().updateRemoteLogSegmentMetadata(remoteLogSegmentMetadataUpdate);
     }
 
     @Override
     public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
                                                                        int epochForOffset,
                                                                        long offset) throws RemoteStorageException {
-        return remoteLogMetadataManagerHarness.topicBasedRlmm().remoteLogSegmentMetadata(topicIdPartition, epochForOffset, offset);
+        return remoteLogMetadataManagerHarness.remoteLogMetadataManager().remoteLogSegmentMetadata(topicIdPartition, epochForOffset, offset);
     }
 
     @Override
     public Optional<Long> highestOffsetForEpoch(TopicIdPartition topicIdPartition,
                                                 int leaderEpoch) throws RemoteStorageException {
-        return remoteLogMetadataManagerHarness.topicBasedRlmm().highestOffsetForEpoch(topicIdPartition, leaderEpoch);
+        return remoteLogMetadataManagerHarness.remoteLogMetadataManager().highestOffsetForEpoch(topicIdPartition, leaderEpoch);
     }
 
     @Override
     public CompletableFuture<Void> putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) throws RemoteStorageException {
-        return remoteLogMetadataManagerHarness.topicBasedRlmm().putRemotePartitionDeleteMetadata(remotePartitionDeleteMetadata);
+        return remoteLogMetadataManagerHarness.remoteLogMetadataManager().putRemotePartitionDeleteMetadata(remotePartitionDeleteMetadata);
     }
 
     @Override
     public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition) throws RemoteStorageException {
-        return remoteLogMetadataManagerHarness.topicBasedRlmm().listRemoteLogSegments(topicIdPartition);
+        return remoteLogMetadataManagerHarness.remoteLogMetadataManager().listRemoteLogSegments(topicIdPartition);
     }
 
     @Override
     public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition,
                                                                     int leaderEpoch) throws RemoteStorageException {
-        return remoteLogMetadataManagerHarness.topicBasedRlmm().listRemoteLogSegments(topicIdPartition, leaderEpoch);
+        return remoteLogMetadataManagerHarness.remoteLogMetadataManager().listRemoteLogSegments(topicIdPartition, leaderEpoch);
     }
 
     @Override
     public void onPartitionLeadershipChanges(Set<TopicIdPartition> leaderPartitions,
                                              Set<TopicIdPartition> followerPartitions) {
-        remoteLogMetadataManagerHarness.topicBasedRlmm().onPartitionLeadershipChanges(leaderPartitions, followerPartitions);
+
+        remoteLogMetadataManagerHarness.remoteLogMetadataManager().onPartitionLeadershipChanges(leaderPartitions, followerPartitions);
     }
 
     @Override
     public void onStopPartitions(Set<TopicIdPartition> partitions) {
-        remoteLogMetadataManagerHarness.topicBasedRlmm().onStopPartitions(partitions);
+        remoteLogMetadataManagerHarness.remoteLogMetadataManager().onStopPartitions(partitions);
     }
 
     @Override
     public void close() throws IOException {
-        remoteLogMetadataManagerHarness.topicBasedRlmm().close();
+        remoteLogMetadataManagerHarness.remoteLogMetadataManager().close();
     }
 
     @Override
     public void configure(Map<String, ?> configs) {
         // This will make sure the cluster is up and TopicBasedRemoteLogMetadataManager is initialized.
-        remoteLogMetadataManagerHarness.initialize(Collections.emptySet());
-        remoteLogMetadataManagerHarness.topicBasedRlmm().configure(configs);
+        remoteLogMetadataManagerHarness.initialize(Collections.emptySet(), true);
+        remoteLogMetadataManagerHarness.remoteLogMetadataManager().configure(configs);
     }
 }
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
index 4bec4ed..95521c4 100644
--- a/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/storage/RemoteLogMetadataManagerTest.java
@@ -90,7 +90,6 @@ public class RemoteLogMetadataManagerTest {
             remoteLogMetadataManager.configure(Collections.emptyMap());
             remoteLogMetadataManager.onPartitionLeadershipChanges(Collections.singleton(TP0), Collections.emptySet());
 
-
             // Create remote log segment metadata and add them to RLMM.
 
             // segment 0