You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2022/12/17 17:37:08 UTC

[kafka] branch trunk updated: [KAFKA-13369] Follower fetch protocol changes for tiered storage. (#11390)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7146ac57ba9 [KAFKA-13369] Follower fetch protocol changes for tiered storage. (#11390)
7146ac57ba9 is described below

commit 7146ac57ba9ddd035dac992b9f188a8e7677c08d
Author: Satish Duggana <sa...@apache.org>
AuthorDate: Sat Dec 17 23:06:44 2022 +0530

    [KAFKA-13369] Follower fetch protocol changes for tiered storage. (#11390)
    
    This PR implements the follower fetch protocol as mentioned in KIP-405.
    
    Added a new version for ListOffsets protocol to receive local log start offset on the leader replica. This is used by follower replicas to find the local log star offset on the leader.
    
    Added a new version for FetchRequest protocol to receive OffsetMovedToTieredStorageException error. This is part of the enhanced fetch protocol as described in KIP-405.
    
    We introduced a new field locaLogStartOffset to maintain the log start offset in the local logs. Existing logStartOffset will continue to be the log start offset of the effective log that includes the segments in remote storage.
    
    When a follower receives OffsetMovedToTieredStorage, then it tries to build the required state from the leader and remote storage so that it can be ready to move to fetch state.
    
    Introduced RemoteLogManager which is responsible for
    
    initializing RemoteStorageManager and RemoteLogMetadataManager instances.
    receives any leader and follower replica events and partition stop events and act on them
    also provides APIs to fetch indexes, metadata about remote log segments.
    Followup PRs will add more functionality like copying segments to tiered storage, retention checks to clean local and remote log segments. This will change the local log start offset and make sure the follower fetch protocol works fine for several cases.
    
    You can look at the detailed protocol changes in KIP: https://cwiki.apache.org/confluence/display/KAFKA/KIP-405%3A+Kafka+Tiered+Storage#KIP405:KafkaTieredStorage-FollowerReplication
    
    Co-authors: satishd@apache.org, kamal.chandraprakash@gmail.com, yingz@uber.com
    
    Reviewers: Kowshik Prakasam <kp...@confluent.io>, Cong Ding <co...@ccding.com>, Tirtha Chatterjee <ti...@gmail.com>, Yaodong Yang <ya...@gmail.com>, Divij Vaidya <di...@amazon.com>, Luke Chen <sh...@gmail.com>, Jun Rao <ju...@gmail.com>
---
 .gitignore                                         |   2 +
 build.gradle                                       |   2 +
 .../OffsetMovedToTieredStorageException.java       |  31 +++
 .../org/apache/kafka/common/protocol/Errors.java   |   4 +-
 .../org/apache/kafka/common/record/Records.java    |   4 +-
 .../kafka/common/record/RemoteLogInputStream.java  |  79 ++++++
 .../kafka/common/requests/ListOffsetsRequest.java  |   5 +
 .../kafka/common/utils/ChildFirstClassLoader.java  | 131 +++++++++
 .../resources/common/message/FetchRequest.json     |   4 +-
 .../resources/common/message/FetchResponse.json    |   4 +-
 .../common/message/ListOffsetsRequest.json         |   4 +-
 .../common/message/ListOffsetsResponse.json        |   5 +-
 .../common/record/RemoteLogInputStreamTest.java    | 292 +++++++++++++++++++++
 .../server/builders/ReplicaManagerBuilder.java     |   8 +
 core/src/main/scala/kafka/cluster/Partition.scala  |   2 +-
 core/src/main/scala/kafka/log/LogManager.scala     |   8 +-
 .../scala/kafka/log/ProducerStateManager.scala     |  14 +-
 .../main/scala/kafka/log/TransactionIndex.scala    |   4 +-
 core/src/main/scala/kafka/log/UnifiedLog.scala     |  84 +++++-
 .../ClassLoaderAwareRemoteStorageManager.scala     |  76 ++++++
 .../scala/kafka/log/remote/RemoteIndexCache.scala  | 286 ++++++++++++++++++++
 .../scala/kafka/log/remote/RemoteLogManager.scala  | 290 ++++++++++++++++++++
 .../scala/kafka/server/AbstractFetcherThread.scala | 245 ++++++++++++-----
 .../src/main/scala/kafka/server/BrokerServer.scala |  38 ++-
 core/src/main/scala/kafka/server/KafkaServer.scala |  30 ++-
 .../main/scala/kafka/server/LeaderEndPoint.scala   |  22 +-
 .../scala/kafka/server/LocalLeaderEndPoint.scala   |  23 +-
 .../scala/kafka/server/RemoteLeaderEndPoint.scala  |  16 +-
 .../kafka/server/ReplicaAlterLogDirsThread.scala   |   9 +
 .../scala/kafka/server/ReplicaFetcherThread.scala  | 154 ++++++++++-
 .../main/scala/kafka/server/ReplicaManager.scala   |   2 +
 .../kafka/server/epoch/LeaderEpochFileCache.scala  |  44 +++-
 .../java/kafka/test/ClusterTestExtensionsTest.java |   2 +-
 .../java/kafka/test/annotation/ClusterTest.java    |   2 +-
 .../kafka/server/KRaftClusterTest.scala            |  27 ++
 .../server/MetadataVersionIntegrationTest.scala    |   7 +-
 .../ClassLoaderAwareRemoteStorageManagerTest.scala |  45 ++++
 .../kafka/server/LocalLeaderEndPointTest.scala     | 192 ++++++++++++++
 .../kafka/server/RemoteLeaderEndPointTest.scala    | 100 +++++++
 .../unit/kafka/admin/FeatureCommandTest.scala      |  10 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  30 ++-
 .../test/scala/unit/kafka/log/LogManagerTest.scala |  13 +
 .../test/scala/unit/kafka/log/LogTestUtils.scala   |  14 +-
 .../unit/kafka/log/ProducerStateManagerTest.scala  |  30 ++-
 .../unit/kafka/log/TransactionIndexTest.scala      |   8 +
 .../test/scala/unit/kafka/log/UnifiedLogTest.scala | 124 ++++++++-
 .../kafka/log/remote/RemoteIndexCacheTest.scala    | 237 +++++++++++++++++
 .../kafka/log/remote/RemoteLogManagerTest.scala    | 276 +++++++++++++++++++
 .../kafka/server/AbstractFetcherManagerTest.scala  |   8 +-
 .../kafka/server/AbstractFetcherThreadTest.scala   | 152 +++++++++--
 .../scala/unit/kafka/server/KafkaServerTest.scala  |  18 ++
 .../unit/kafka/server/ListOffsetsRequestTest.scala |  20 +-
 .../ListOffsetsRequestWithRemoteStoreTest.scala    |  38 +++
 .../scala/unit/kafka/server/LogOffsetTest.scala    |  19 +-
 .../server/epoch/LeaderEpochFileCacheTest.scala    |  55 ++++
 .../server/epoch/util/MockBlockingSender.scala     |  27 +-
 .../jmh/fetcher/ReplicaFetcherThreadBenchmark.java |   5 +-
 .../kafka/controller/QuorumControllerTest.java     |   8 +-
 .../apache/kafka/server/common/CheckpointFile.java |   6 +-
 .../kafka/server/common/MetadataVersion.java       |  13 +-
 .../kafka/server/common/MetadataVersionTest.java   |  50 +---
 .../storage/NoOpRemoteLogMetadataManager.java      |  84 ++++++
 .../remote/storage/NoOpRemoteStorageManager.java   |  59 +++++
 .../ClassLoaderAwareRemoteLogMetadataManager.java  | 147 +++++++++++
 ...assLoaderAwareRemoteLogMetadataManagerTest.java |  50 ++++
 65 files changed, 3564 insertions(+), 234 deletions(-)

diff --git a/.gitignore b/.gitignore
index dcb1531b498..095a3141d1b 100644
--- a/.gitignore
+++ b/.gitignore
@@ -56,3 +56,5 @@ jmh-benchmarks/src/main/generated
 **/.jqwik-database
 **/src/generated
 **/src/generated-test
+
+storage/kafka-tiered-storage/
diff --git a/build.gradle b/build.gradle
index 63d47e522fd..9bf33767f09 100644
--- a/build.gradle
+++ b/build.gradle
@@ -875,6 +875,7 @@ project(':core') {
     implementation project(':server-common')
     implementation project(':group-coordinator')
     implementation project(':metadata')
+    implementation project(':storage:api')
     implementation project(':raft')
     implementation project(':storage')
 
@@ -910,6 +911,7 @@ project(':core') {
     testImplementation project(':metadata').sourceSets.test.output
     testImplementation project(':raft').sourceSets.test.output
     testImplementation project(':server-common').sourceSets.test.output
+    testImplementation project(':storage:api').sourceSets.test.output
     testImplementation libs.bcpkix
     testImplementation libs.mockitoCore
     testImplementation(libs.apacheda) {
diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OffsetMovedToTieredStorageException.java b/clients/src/main/java/org/apache/kafka/common/errors/OffsetMovedToTieredStorageException.java
new file mode 100644
index 00000000000..6ea48c604d1
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/errors/OffsetMovedToTieredStorageException.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.errors;
+
+public class OffsetMovedToTieredStorageException extends ApiException {
+
+    private static final long serialVersionUID = 1L;
+
+    public OffsetMovedToTieredStorageException(String message) {
+        super(message);
+    }
+
+    public OffsetMovedToTieredStorageException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index c220bbcde43..6fd3aa41ca0 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -88,6 +88,7 @@ import org.apache.kafka.common.errors.NotLeaderOrFollowerException;
 import org.apache.kafka.common.errors.OffsetMetadataTooLarge;
 import org.apache.kafka.common.errors.OffsetNotAvailableException;
 import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.errors.OffsetMovedToTieredStorageException;
 import org.apache.kafka.common.errors.OperationNotAttemptedException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.PolicyViolationException;
@@ -370,7 +371,8 @@ public enum Errors {
     TRANSACTIONAL_ID_NOT_FOUND(105, "The transactionalId could not be found", TransactionalIdNotFoundException::new),
     FETCH_SESSION_TOPIC_ID_ERROR(106, "The fetch session encountered inconsistent topic ID usage", FetchSessionTopicIdException::new),
     INELIGIBLE_REPLICA(107, "The new ISR contains at least one ineligible replica.", IneligibleReplicaException::new),
-    NEW_LEADER_ELECTED(108, "The AlterPartition request successfully updated the partition state but the leader has changed.", NewLeaderElectedException::new);
+    NEW_LEADER_ELECTED(108, "The AlterPartition request successfully updated the partition state but the leader has changed.", NewLeaderElectedException::new),
+    OFFSET_MOVED_TO_TIERED_STORAGE(109, "The requested offset is moved to tiered storage.", OffsetMovedToTieredStorageException::new);
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java b/clients/src/main/java/org/apache/kafka/common/record/Records.java
index 2179c7c1130..c18679afd88 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Records.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java
@@ -48,9 +48,9 @@ public interface Records extends TransferableRecords {
     int SIZE_LENGTH = 4;
     int LOG_OVERHEAD = SIZE_OFFSET + SIZE_LENGTH;
 
-    // the magic offset is at the same offset for all current message formats, but the 4 bytes
+    // The magic offset is at the same offset for all current message formats, but the 4 bytes
     // between the size and the magic is dependent on the version.
-    int MAGIC_OFFSET = 16;
+    int MAGIC_OFFSET = LOG_OVERHEAD + 4;
     int MAGIC_LENGTH = 1;
     int HEADER_SIZE_UP_TO_MAGIC = MAGIC_OFFSET + MAGIC_LENGTH;
 
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RemoteLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/RemoteLogInputStream.java
new file mode 100644
index 00000000000..9d01a0bd984
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/RemoteLogInputStream.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import static org.apache.kafka.common.record.Records.HEADER_SIZE_UP_TO_MAGIC;
+import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
+import static org.apache.kafka.common.record.Records.MAGIC_OFFSET;
+import static org.apache.kafka.common.record.Records.SIZE_OFFSET;
+
+public class RemoteLogInputStream implements LogInputStream<RecordBatch> {
+    private final InputStream inputStream;
+    // LogHeader buffer up to magic.
+    private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(HEADER_SIZE_UP_TO_MAGIC);
+
+    public RemoteLogInputStream(InputStream inputStream) {
+        this.inputStream = inputStream;
+    }
+
+    @Override
+    public RecordBatch nextBatch() throws IOException {
+        logHeaderBuffer.clear();
+        Utils.readFully(inputStream, logHeaderBuffer);
+
+        if (logHeaderBuffer.position() < HEADER_SIZE_UP_TO_MAGIC)
+            return null;
+
+        logHeaderBuffer.rewind();
+        int size = logHeaderBuffer.getInt(SIZE_OFFSET);
+
+        // V0 has the smallest overhead, stricter checking is done later
+        if (size < LegacyRecord.RECORD_OVERHEAD_V0)
+            throw new CorruptRecordException(String.format("Found record size %d smaller than minimum record " +
+                                                                   "overhead (%d).", size, LegacyRecord.RECORD_OVERHEAD_V0));
+
+        // Total size is: "LOG_OVERHEAD + the size of the rest of the content"
+        int bufferSize = LOG_OVERHEAD + size;
+        // buffer contains the complete payload including header and records.
+        ByteBuffer buffer = ByteBuffer.allocate(bufferSize);
+
+        // write log header into buffer
+        buffer.put(logHeaderBuffer);
+
+        // write the records payload into the buffer
+        Utils.readFully(inputStream, buffer);
+        if (buffer.position() != bufferSize)
+            return null;
+        buffer.rewind();
+
+        byte magic = logHeaderBuffer.get(MAGIC_OFFSET);
+        MutableRecordBatch batch;
+        if (magic > RecordBatch.MAGIC_VALUE_V1)
+            batch = new DefaultRecordBatch(buffer);
+        else
+            batch = new AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(buffer);
+
+        return batch;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
index 6b7734aca40..efdc7da2afe 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/ListOffsetsRequest.java
@@ -42,6 +42,11 @@ public class ListOffsetsRequest extends AbstractRequest {
     public static final long LATEST_TIMESTAMP = -1L;
     public static final long MAX_TIMESTAMP = -3L;
 
+    /**
+     * It is used to represent the earliest message stored in the local log which is also called the local-log-start-offset
+     */
+    public static final long EARLIEST_LOCAL_TIMESTAMP = -4L;
+
     public static final int CONSUMER_REPLICA_ID = -1;
     public static final int DEBUGGING_REPLICA_ID = -2;
 
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/ChildFirstClassLoader.java b/clients/src/main/java/org/apache/kafka/common/utils/ChildFirstClassLoader.java
new file mode 100644
index 00000000000..85e865e0983
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/utils/ChildFirstClassLoader.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.apache.kafka.common.KafkaException;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLClassLoader;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.Locale;
+import java.util.NoSuchElementException;
+
+/**
+ * A class loader that looks for classes and resources in a specified class path first, before delegating to its parent
+ * class loader.
+ */
+public class ChildFirstClassLoader extends URLClassLoader {
+    static {
+        ClassLoader.registerAsParallelCapable();
+    }
+
+    /**
+     * @param classPath Class path string
+     * @param parent    The parent classloader. If the required class / resource cannot be found in the given classPath,
+     *                  this classloader will be used to find the class / resource.
+     */
+    public ChildFirstClassLoader(String classPath, ClassLoader parent) {
+        super(classpathToURLs(classPath), parent);
+    }
+
+    static private URL[] classpathToURLs(String classPath) {
+        ArrayList<URL> urls = new ArrayList<>();
+        for (String path : classPath.split(File.pathSeparator)) {
+            if (path == null || path.trim().isEmpty())
+                continue;
+            File file = new File(path);
+
+            try {
+                if (path.endsWith("/*")) {
+                    File parent = new File(new File(file.getCanonicalPath()).getParent());
+                    if (parent.isDirectory()) {
+                        File[] files = parent.listFiles((dir, name) -> {
+                            String lower = name.toLowerCase(Locale.ROOT);
+                            return lower.endsWith(".jar") || lower.endsWith(".zip");
+                        });
+                        if (files != null) {
+                            for (File jarFile : files) {
+                                urls.add(jarFile.getCanonicalFile().toURI().toURL());
+                            }
+                        }
+                    }
+                } else if (file.exists()) {
+                    urls.add(file.getCanonicalFile().toURI().toURL());
+                }
+            } catch (IOException e) {
+                throw new KafkaException(e);
+            }
+        }
+        return urls.toArray(new URL[0]);
+    }
+
+    @Override
+    protected Class<?> loadClass(String name, boolean resolve) throws ClassNotFoundException {
+        synchronized (getClassLoadingLock(name)) {
+            Class<?> c = findLoadedClass(name);
+
+            if (c == null) {
+                try {
+                    c = findClass(name);
+                } catch (ClassNotFoundException e) {
+                    // Try parent
+                    c = super.loadClass(name, false);
+                }
+            }
+
+            if (resolve)
+                resolveClass(c);
+
+            return c;
+        }
+    }
+
+    @Override
+    public URL getResource(String name) {
+        URL url = findResource(name);
+        if (url == null) {
+            // try parent
+            url = super.getResource(name);
+        }
+        return url;
+    }
+
+    @Override
+    public Enumeration<URL> getResources(String name) throws IOException {
+        Enumeration<URL> urls1 = findResources(name);
+        Enumeration<URL> urls2 = getParent() != null ? getParent().getResources(name) : null;
+
+        return new Enumeration<URL>() {
+            @Override
+            public boolean hasMoreElements() {
+                return (urls1 != null && urls1.hasMoreElements()) || (urls2 != null && urls2.hasMoreElements());
+            }
+
+            @Override
+            public URL nextElement() {
+                if (urls1 != null && urls1.hasMoreElements())
+                    return urls1.nextElement();
+                if (urls2 != null && urls2.hasMoreElements())
+                    return urls2.nextElement();
+                throw new NoSuchElementException();
+            }
+        };
+    }
+}
diff --git a/clients/src/main/resources/common/message/FetchRequest.json b/clients/src/main/resources/common/message/FetchRequest.json
index 13ab712be3d..903ed48e185 100644
--- a/clients/src/main/resources/common/message/FetchRequest.json
+++ b/clients/src/main/resources/common/message/FetchRequest.json
@@ -48,7 +48,9 @@
   // the `LastFetchedEpoch` field
   //
   // Version 13 replaces topic names with topic IDs (KIP-516). May return UNKNOWN_TOPIC_ID error code.
-  "validVersions": "0-13",
+  //
+  // Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException(KIP-405)
+  "validVersions": "0-14",
   "flexibleVersions": "12+",
   "fields": [
     { "name": "ClusterId", "type": "string", "versions": "12+", "nullableVersions": "12+", "default": "null",
diff --git a/clients/src/main/resources/common/message/FetchResponse.json b/clients/src/main/resources/common/message/FetchResponse.json
index 9ae28b7d616..7d144f01831 100644
--- a/clients/src/main/resources/common/message/FetchResponse.json
+++ b/clients/src/main/resources/common/message/FetchResponse.json
@@ -41,7 +41,9 @@
   // and leader discovery through the `CurrentLeader` field
   //
   // Version 13 replaces the topic name field with topic ID (KIP-516).
-  "validVersions": "0-13",
+  //
+  // Version 14 is the same as version 13 but it also receives a new error called OffsetMovedToTieredStorageException (KIP-405)
+  "validVersions": "0-14",
   "flexibleVersions": "12+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true,
diff --git a/clients/src/main/resources/common/message/ListOffsetsRequest.json b/clients/src/main/resources/common/message/ListOffsetsRequest.json
index 93c920ee2fe..4e4d07ed49f 100644
--- a/clients/src/main/resources/common/message/ListOffsetsRequest.json
+++ b/clients/src/main/resources/common/message/ListOffsetsRequest.json
@@ -32,7 +32,9 @@
   // Version 6 enables flexible versions.
   //
   // Version 7 enables listing offsets by max timestamp (KIP-734).
-  "validVersions": "0-7",
+  //
+  // Version 8 enables listing offsets by local log start offset (KIP-405).
+  "validVersions": "0-8",
   "flexibleVersions": "6+",
   "fields": [
     { "name": "ReplicaId", "type": "int32", "versions": "0+", "entityType": "brokerId",
diff --git a/clients/src/main/resources/common/message/ListOffsetsResponse.json b/clients/src/main/resources/common/message/ListOffsetsResponse.json
index 6d6be0fdf4f..00a82866005 100644
--- a/clients/src/main/resources/common/message/ListOffsetsResponse.json
+++ b/clients/src/main/resources/common/message/ListOffsetsResponse.json
@@ -31,7 +31,10 @@
   // Version 6 enables flexible versions.
   //
   // Version 7 is the same as version 6 (KIP-734).
-  "validVersions": "0-7",
+  //
+  // Version 8 enables listing offsets by local log start offset.
+  // This is the ealiest log start offset in the local log. (KIP-405).
+  "validVersions": "0-8",
   "flexibleVersions": "6+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "2+", "ignorable": true,
diff --git a/clients/src/test/java/org/apache/kafka/common/record/RemoteLogInputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/record/RemoteLogInputStreamTest.java
new file mode 100644
index 00000000000..0dcb3765553
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/RemoteLogInputStreamTest.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.header.Header;
+import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.test.TestUtils;
+import org.junit.jupiter.api.extension.ExtensionContext;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.ArgumentsProvider;
+import org.junit.jupiter.params.provider.ArgumentsSource;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Stream;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V0;
+import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V1;
+import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2;
+import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP;
+import static org.apache.kafka.common.record.TimestampType.CREATE_TIME;
+import static org.apache.kafka.common.record.TimestampType.NO_TIMESTAMP_TYPE;
+import static org.apache.kafka.test.TestUtils.tempFile;
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class RemoteLogInputStreamTest {
+
+    private static class Args {
+        private final byte magic;
+        private final CompressionType compression;
+
+        public Args(byte magic, CompressionType compression) {
+            this.magic = magic;
+            this.compression = compression;
+        }
+
+        @Override
+        public String toString() {
+            return "Args{magic=" + magic + ", compression=" + compression + "}";
+        }
+    }
+
+    private static class RemoteLogInputStreamArgsProvider implements ArgumentsProvider {
+
+        @Override
+        public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
+            List<Arguments> values = new ArrayList<>();
+            for (byte magic : asList(MAGIC_VALUE_V0, MAGIC_VALUE_V1, MAGIC_VALUE_V2)) {
+                for (CompressionType type : CompressionType.values()) {
+                    values.add(Arguments.of(new Args(magic, type)));
+                }
+            }
+            return values.stream();
+        }
+    }
+
+    @ParameterizedTest
+    @ArgumentsSource(RemoteLogInputStreamArgsProvider.class)
+    public void testSimpleBatchIteration(Args args) throws IOException {
+        byte magic = args.magic;
+        CompressionType compression = args.compression;
+        if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2)
+            return;
+
+        SimpleRecord firstBatchRecord = new SimpleRecord(3241324L, "a".getBytes(), "foo".getBytes());
+        SimpleRecord secondBatchRecord = new SimpleRecord(234280L, "b".getBytes(), "bar".getBytes());
+
+        File file = tempFile();
+        try (FileRecords fileRecords = FileRecords.open(file)) {
+            fileRecords.append(MemoryRecords.withRecords(magic, 0L, compression, CREATE_TIME, firstBatchRecord));
+            fileRecords.append(MemoryRecords.withRecords(magic, 1L, compression, CREATE_TIME, secondBatchRecord));
+            fileRecords.flush();
+        }
+
+        try (FileInputStream is = new FileInputStream(file)) {
+            RemoteLogInputStream logInputStream = new RemoteLogInputStream(is);
+
+            RecordBatch firstBatch = logInputStream.nextBatch();
+            assertGenericRecordBatchData(args, firstBatch, 0L, 3241324L, firstBatchRecord);
+            assertNoProducerData(firstBatch);
+
+            RecordBatch secondBatch = logInputStream.nextBatch();
+            assertGenericRecordBatchData(args, secondBatch, 1L, 234280L, secondBatchRecord);
+            assertNoProducerData(secondBatch);
+
+            assertNull(logInputStream.nextBatch());
+        }
+    }
+
+    @ParameterizedTest
+    @ArgumentsSource(RemoteLogInputStreamArgsProvider.class)
+    public void testBatchIterationWithMultipleRecordsPerBatch(Args args) throws IOException {
+        byte magic = args.magic;
+        CompressionType compression = args.compression;
+        if (magic < MAGIC_VALUE_V2 && compression == CompressionType.NONE)
+            return;
+
+        if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2)
+            return;
+
+        SimpleRecord[] firstBatchRecords = new SimpleRecord[]{
+            new SimpleRecord(3241324L, "a".getBytes(), "1".getBytes()),
+            new SimpleRecord(234280L, "b".getBytes(), "2".getBytes())
+        };
+
+        SimpleRecord[] secondBatchRecords = new SimpleRecord[]{
+            new SimpleRecord(238423489L, "c".getBytes(), "3".getBytes()),
+            new SimpleRecord(897839L, null, "4".getBytes()),
+            new SimpleRecord(8234020L, "e".getBytes(), null)
+        };
+
+        File file = tempFile();
+        try (FileRecords fileRecords = FileRecords.open(file)) {
+            fileRecords.append(MemoryRecords.withRecords(magic, 0L, compression, CREATE_TIME, firstBatchRecords));
+            fileRecords.append(MemoryRecords.withRecords(magic, 1L, compression, CREATE_TIME, secondBatchRecords));
+            fileRecords.flush();
+        }
+
+        try (FileInputStream is = new FileInputStream(file)) {
+            RemoteLogInputStream logInputStream = new RemoteLogInputStream(is);
+
+            RecordBatch firstBatch = logInputStream.nextBatch();
+            assertNoProducerData(firstBatch);
+            assertGenericRecordBatchData(args, firstBatch, 0L, 3241324L, firstBatchRecords);
+
+            RecordBatch secondBatch = logInputStream.nextBatch();
+            assertNoProducerData(secondBatch);
+            assertGenericRecordBatchData(args, secondBatch, 1L, 238423489L, secondBatchRecords);
+
+            assertNull(logInputStream.nextBatch());
+        }
+    }
+
+    @ParameterizedTest
+    @ArgumentsSource(RemoteLogInputStreamArgsProvider.class)
+    public void testBatchIterationV2(Args args) throws IOException {
+        byte magic = args.magic;
+        CompressionType compression = args.compression;
+        if (magic != MAGIC_VALUE_V2)
+            return;
+
+        long producerId = 83843L;
+        short producerEpoch = 15;
+        int baseSequence = 234;
+        int partitionLeaderEpoch = 9832;
+
+        Header[] headers = new Header[]{new RecordHeader("header-key",
+                "header-value".getBytes(StandardCharsets.UTF_8))};
+        SimpleRecord[] firstBatchRecords = new SimpleRecord[]{
+            new SimpleRecord(3241324L, "a".getBytes(), "1".getBytes()),
+            // Add a record with headers.
+            new SimpleRecord(234280L, "b".getBytes(), "2".getBytes(), headers)
+        };
+
+        SimpleRecord[] secondBatchRecords = new SimpleRecord[]{
+            new SimpleRecord(238423489L, "c".getBytes(), "3".getBytes()),
+            new SimpleRecord(897839L, null, "4".getBytes()),
+            new SimpleRecord(8234020L, "e".getBytes(), null)
+        };
+
+        File file = tempFile();
+        try (FileRecords fileRecords = FileRecords.open(file)) {
+            fileRecords.append(MemoryRecords.withIdempotentRecords(magic, 15L, compression, producerId,
+                                                                   producerEpoch, baseSequence, partitionLeaderEpoch, firstBatchRecords));
+            fileRecords.append(MemoryRecords.withTransactionalRecords(magic, 27L, compression, producerId,
+                                                                      producerEpoch, baseSequence + firstBatchRecords.length, partitionLeaderEpoch, secondBatchRecords));
+            fileRecords.flush();
+        }
+
+        try (FileInputStream is = new FileInputStream(file)) {
+            RemoteLogInputStream logInputStream = new RemoteLogInputStream(is);
+
+            RecordBatch firstBatch = logInputStream.nextBatch();
+            assertProducerData(firstBatch, producerId, producerEpoch, baseSequence, false, firstBatchRecords);
+            assertGenericRecordBatchData(args, firstBatch, 15L, 3241324L, firstBatchRecords);
+            assertEquals(partitionLeaderEpoch, firstBatch.partitionLeaderEpoch());
+
+            RecordBatch secondBatch = logInputStream.nextBatch();
+            assertProducerData(secondBatch, producerId, producerEpoch, baseSequence + firstBatchRecords.length,
+                               true, secondBatchRecords);
+            assertGenericRecordBatchData(args, secondBatch, 27L, 238423489L, secondBatchRecords);
+            assertEquals(partitionLeaderEpoch, secondBatch.partitionLeaderEpoch());
+
+            assertNull(logInputStream.nextBatch());
+        }
+    }
+
+    @ParameterizedTest
+    @ArgumentsSource(RemoteLogInputStreamArgsProvider.class)
+    public void testBatchIterationIncompleteBatch(Args args) throws IOException {
+        byte magic = args.magic;
+        CompressionType compression = args.compression;
+        if (compression == CompressionType.ZSTD && magic < MAGIC_VALUE_V2)
+            return;
+
+        try (FileRecords fileRecords = FileRecords.open(tempFile())) {
+            SimpleRecord firstBatchRecord = new SimpleRecord(100L, "foo".getBytes());
+            SimpleRecord secondBatchRecord = new SimpleRecord(200L, "bar".getBytes());
+
+            fileRecords.append(MemoryRecords.withRecords(magic, 0L, compression, CREATE_TIME, firstBatchRecord));
+            fileRecords.append(MemoryRecords.withRecords(magic, 1L, compression, CREATE_TIME, secondBatchRecord));
+            fileRecords.flush();
+            fileRecords.truncateTo(fileRecords.sizeInBytes() - 13);
+
+            FileLogInputStream logInputStream = new FileLogInputStream(fileRecords, 0, fileRecords.sizeInBytes());
+
+            FileLogInputStream.FileChannelRecordBatch firstBatch = logInputStream.nextBatch();
+            assertNoProducerData(firstBatch);
+            assertGenericRecordBatchData(args, firstBatch, 0L, 100L, firstBatchRecord);
+
+            assertNull(logInputStream.nextBatch());
+        }
+    }
+
+    private void assertProducerData(RecordBatch batch, long producerId, short producerEpoch, int baseSequence,
+                                    boolean isTransactional, SimpleRecord... records) {
+        assertEquals(producerId, batch.producerId());
+        assertEquals(producerEpoch, batch.producerEpoch());
+        assertEquals(baseSequence, batch.baseSequence());
+        assertEquals(baseSequence + records.length - 1, batch.lastSequence());
+        assertEquals(isTransactional, batch.isTransactional());
+    }
+
+    private void assertNoProducerData(RecordBatch batch) {
+        assertEquals(RecordBatch.NO_PRODUCER_ID, batch.producerId());
+        assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch());
+        assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence());
+        assertEquals(RecordBatch.NO_SEQUENCE, batch.lastSequence());
+        assertFalse(batch.isTransactional());
+    }
+
+    private void assertGenericRecordBatchData(Args args,
+                                              RecordBatch batch,
+                                              long baseOffset,
+                                              long maxTimestamp,
+                                              SimpleRecord... records) {
+        byte magic = args.magic;
+        CompressionType compression = args.compression;
+        assertEquals(magic, batch.magic());
+        assertEquals(compression, batch.compressionType());
+
+        if (magic == MAGIC_VALUE_V0) {
+            assertEquals(NO_TIMESTAMP_TYPE, batch.timestampType());
+        } else {
+            assertEquals(CREATE_TIME, batch.timestampType());
+            assertEquals(maxTimestamp, batch.maxTimestamp());
+        }
+
+        assertEquals(baseOffset + records.length - 1, batch.lastOffset());
+        if (magic >= MAGIC_VALUE_V2)
+            assertEquals(Integer.valueOf(records.length), batch.countOrNull());
+
+        assertEquals(baseOffset, batch.baseOffset());
+        assertTrue(batch.isValid());
+
+        List<Record> batchRecords = TestUtils.toList(batch);
+        for (int i = 0; i < records.length; i++) {
+            assertEquals(baseOffset + i, batchRecords.get(i).offset());
+            assertEquals(records[i].key(), batchRecords.get(i).key());
+            assertEquals(records[i].value(), batchRecords.get(i).value());
+            assertArrayEquals(records[i].headers(), batchRecords.get(i).headers());
+            if (magic == MAGIC_VALUE_V0)
+                assertEquals(NO_TIMESTAMP, batchRecords.get(i).timestamp());
+            else
+                assertEquals(records[i].timestamp(), batchRecords.get(i).timestamp());
+        }
+    }
+}
diff --git a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
index a1339264bf0..e6b46f41c7e 100644
--- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
@@ -19,6 +19,7 @@ package kafka.server.builders;
 
 import kafka.log.LogManager;
 import kafka.server.AlterPartitionManager;
+import kafka.log.remote.RemoteLogManager;
 import kafka.server.BrokerTopicStats;
 import kafka.server.DelayedDeleteRecords;
 import kafka.server.DelayedElectLeader;
@@ -53,6 +54,7 @@ public class ReplicaManagerBuilder {
     private AlterPartitionManager alterPartitionManager = null;
     private BrokerTopicStats brokerTopicStats = new BrokerTopicStats();
     private AtomicBoolean isShuttingDown = new AtomicBoolean(false);
+    private Optional<RemoteLogManager> remoteLogManager = Optional.empty();
     private Optional<KafkaZkClient> zkClient = Optional.empty();
     private Optional<DelayedOperationPurgatory<DelayedProduce>> delayedProducePurgatory = Optional.empty();
     private Optional<DelayedOperationPurgatory<DelayedFetch>> delayedFetchPurgatory = Optional.empty();
@@ -85,6 +87,11 @@ public class ReplicaManagerBuilder {
         return this;
     }
 
+    public ReplicaManagerBuilder setRemoteLogManager(RemoteLogManager remoteLogManager) {
+        this.remoteLogManager = Optional.ofNullable(remoteLogManager);
+        return this;
+    }
+
     public ReplicaManagerBuilder setQuotaManagers(QuotaManagers quotaManagers) {
         this.quotaManagers = quotaManagers;
         return this;
@@ -157,6 +164,7 @@ public class ReplicaManagerBuilder {
                              time,
                              scheduler,
                              logManager,
+                             OptionConverters.toScala(remoteLogManager),
                              quotaManagers,
                              metadataCache,
                              logDirFailureChannel,
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 63f8f908859..0fe584fa6f6 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -1394,7 +1394,7 @@ class Partition(val topicPartition: TopicPartition,
       case ListOffsetsRequest.LATEST_TIMESTAMP =>
         maybeOffsetsError.map(e => throw e)
           .orElse(Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, lastFetchableOffset, Optional.of(leaderEpoch))))
-      case ListOffsetsRequest.EARLIEST_TIMESTAMP =>
+      case ListOffsetsRequest.EARLIEST_TIMESTAMP | ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP =>
         getOffsetByTimestamp
       case _ =>
         getOffsetByTimestamp.filter(timestampAndOffset => timestampAndOffset.offset < lastFetchableOffset)
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index ea5e33826c8..dc599b544ae 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -18,6 +18,8 @@
 package kafka.log
 
 import kafka.log.LogConfig.MessageFormatVersion
+import kafka.log.remote.RemoteIndexCache
+
 import java.io._
 import java.nio.file.Files
 import java.util.concurrent._
@@ -390,7 +392,11 @@ class LogManager(logDirs: Seq[File],
         }
 
         val logsToLoad = Option(dir.listFiles).getOrElse(Array.empty).filter(logDir =>
-          logDir.isDirectory && UnifiedLog.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic)
+          logDir.isDirectory &&
+            // Ignore remote-log-index-cache directory as that is index cache maintained by tiered storage subsystem
+            // but not any topic-partition dir.
+            !logDir.getName.equals(RemoteIndexCache.DirName) &&
+            UnifiedLog.parseTopicPartitionName(logDir).topic != KafkaRaftServer.MetadataTopic)
         numTotalLogs += logsToLoad.length
         numRemainingLogs.put(dir.getAbsolutePath, logsToLoad.length)
 
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index 3ed1cef2878..835b74066b6 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -520,16 +520,22 @@ class ProducerStateManager(
     val lastTimestamp = oldestTxnLastTimestamp
     lastTimestamp > 0 && (currentTimeMs - lastTimestamp) > maxTransactionTimeoutMs + ProducerStateManager.LateTransactionBufferMs
   }
+  
+  def truncateFullyAndReloadSnapshots(): Unit = {
+    info("Reloading the producer state snapshots")
+    truncateFullyAndStartAt(0L)
+    snapshots = loadSnapshots()
+  }
 
   /**
    * Load producer state snapshots by scanning the _logDir.
    */
   private def loadSnapshots(): ConcurrentSkipListMap[java.lang.Long, SnapshotFile] = {
-    val tm = new ConcurrentSkipListMap[java.lang.Long, SnapshotFile]()
-    for (f <- listSnapshotFiles(_logDir)) {
-      tm.put(f.offset, f)
+    val offsetToSnapshots = new ConcurrentSkipListMap[java.lang.Long, SnapshotFile]()
+    for (snapshotFile <- listSnapshotFiles(_logDir)) {
+      offsetToSnapshots.put(snapshotFile.offset, snapshotFile)
     }
-    tm
+    offsetToSnapshots
   }
 
   /**
diff --git a/core/src/main/scala/kafka/log/TransactionIndex.scala b/core/src/main/scala/kafka/log/TransactionIndex.scala
index ca3d1bb3101..d7967e0213a 100644
--- a/core/src/main/scala/kafka/log/TransactionIndex.scala
+++ b/core/src/main/scala/kafka/log/TransactionIndex.scala
@@ -16,7 +16,7 @@
  */
 package kafka.log
 
-import java.io.{File, IOException}
+import java.io.{Closeable, File, IOException}
 import java.nio.ByteBuffer
 import java.nio.channels.FileChannel
 import java.nio.file.{Files, StandardOpenOption}
@@ -41,7 +41,7 @@ private[log] case class TxnIndexSearchResult(abortedTransactions: List[AbortedTx
  * order to find the start of the transactions.
  */
 @nonthreadsafe
-class TransactionIndex(val startOffset: Long, @volatile private var _file: File) extends Logging {
+class TransactionIndex(val startOffset: Long, @volatile private var _file: File) extends Closeable with Logging {
 
   // note that the file is not created until we need it
   @volatile private var maybeChannel: Option[FileChannel] = None
diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala b/core/src/main/scala/kafka/log/UnifiedLog.scala
index 5cb060dbeff..8830258c8fe 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -25,13 +25,15 @@ import java.util.Optional
 import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap, TimeUnit}
 import kafka.common.{LongRef, OffsetsOutOfOrderException, UnexpectedAppendOffsetException}
 import kafka.log.AppendOrigin.RaftLeader
+import kafka.log.remote.RemoteLogManager
 import kafka.message.{BrokerCompressionCodec, CompressionCodec, NoCompressionCodec}
 import kafka.metrics.KafkaMetricsGroup
+import kafka.server._
 import kafka.server.checkpoints.LeaderEpochCheckpointFile
 import kafka.server.epoch.LeaderEpochFileCache
-import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, LogDirFailureChannel, LogOffsetMetadata, OffsetAndEpoch, PartitionMetadataFile, RequestLocal}
 import kafka.utils._
 import org.apache.kafka.common.errors._
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.{DescribeProducersResponseData, FetchResponseData}
 import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.record._
@@ -42,11 +44,12 @@ import org.apache.kafka.common.utils.{Time, Utils}
 import org.apache.kafka.common.{InvalidRecordException, KafkaException, TopicPartition, Uuid}
 import org.apache.kafka.server.common.MetadataVersion
 import org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0
+import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
 
 import scala.annotation.nowarn
-import scala.jdk.CollectionConverters._
 import scala.collection.mutable.ListBuffer
 import scala.collection.{Seq, immutable, mutable}
+import scala.jdk.CollectionConverters._
 
 object LogAppendInfo {
   val UnknownLogAppendInfo = LogAppendInfo(None, -1, None, RecordBatch.NO_TIMESTAMP, -1L, RecordBatch.NO_TIMESTAMP, -1L,
@@ -249,6 +252,8 @@ case object SnapshotGenerated extends LogStartOffsetIncrementReason {
  *                                  is downgraded below 2.8, a topic ID may be lost and a new ID generated upon re-upgrade.
  *                                  If the inter-broker protocol version on a ZK cluster is below 2.8, partition.metadata
  *                                  will be deleted to avoid ID conflicts upon re-upgrade.
+ * @param remoteStorageSystemEnable flag to indicate whether the system level remote log storage is enabled or not.
+ * @param remoteLogManager          Optional RemoteLogManager instance if it exists.
  */
 @threadsafe
 class UnifiedLog(@volatile var logStartOffset: Long,
@@ -258,7 +263,9 @@ class UnifiedLog(@volatile var logStartOffset: Long,
                  @volatile var leaderEpochCache: Option[LeaderEpochFileCache],
                  val producerStateManager: ProducerStateManager,
                  @volatile private var _topicId: Option[Uuid],
-                 val keepPartitionMetadataFile: Boolean) extends Logging with KafkaMetricsGroup {
+                 val keepPartitionMetadataFile: Boolean,
+                 val remoteStorageSystemEnable: Boolean = false,
+                 remoteLogManager: Option[RemoteLogManager] = None) extends Logging with KafkaMetricsGroup {
 
   import kafka.log.UnifiedLog._
 
@@ -289,6 +296,10 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
   @volatile var partitionMetadataFile: Option[PartitionMetadataFile] = None
 
+  @volatile private[kafka] var _localLogStartOffset: Long = logStartOffset
+
+  def localLogStartOffset(): Long = _localLogStartOffset
+
   locally {
     initializePartitionMetadata()
     updateLogStartOffset(logStartOffset)
@@ -296,6 +307,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     initializeTopicId()
   }
 
+  def remoteLogEnabled(): Boolean = {
+    // Remote log is enabled only for non-compact and non-internal topics
+    remoteStorageSystemEnable &&
+      !(config.compact || Topic.isInternal(topicPartition.topic())
+        || TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME.equals(topicPartition.topic())
+        || Topic.CLUSTER_METADATA_TOPIC_NAME.equals(topicPartition.topic())) &&
+      config.remoteLogConfig.remoteStorageEnable
+  }
+
   /**
    * Initialize topic ID information for the log by maintaining the partition metadata file and setting the in-memory _topicId.
    * Delete partition metadata file if the version does not support topic IDs.
@@ -574,6 +594,12 @@ class UnifiedLog(@volatile var logStartOffset: Long,
     explicitMetricName(pkgStr, "Log", name, tags)
   }
 
+  def loadProducerState(lastOffset: Long): Unit = lock synchronized {
+    rebuildProducerState(lastOffset, producerStateManager)
+    maybeIncrementFirstUnstableOffset()
+    updateHighWatermark(localLog.logEndOffsetMetadata)
+  }
+
   private def recordVersion: RecordVersion = config.recordVersion
 
   private def initializePartitionMetadata(): Unit = lock synchronized {
@@ -1039,6 +1065,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
         if (newLogStartOffset > logStartOffset) {
           updatedLogStartOffset = true
           updateLogStartOffset(newLogStartOffset)
+          _localLogStartOffset = newLogStartOffset
           info(s"Incremented log start offset to $newLogStartOffset due to $reason")
           leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
           producerStateManager.onLogStartOffsetIncremented(newLogStartOffset)
@@ -1264,13 +1291,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
 
       if (config.messageFormatVersion.isLessThan(IBP_0_10_0_IV0) &&
         targetTimestamp != ListOffsetsRequest.EARLIEST_TIMESTAMP &&
+        targetTimestamp != ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP &&
         targetTimestamp != ListOffsetsRequest.LATEST_TIMESTAMP)
         throw new UnsupportedForMessageFormatException(s"Cannot search offsets based on timestamp because message format version " +
           s"for partition $topicPartition is ${config.messageFormatVersion} which is earlier than the minimum " +
           s"required version $IBP_0_10_0_IV0")
 
       // For the earliest and latest, we do not need to return the timestamp.
-      if (targetTimestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP) {
+      if (targetTimestamp == ListOffsetsRequest.EARLIEST_TIMESTAMP ||
+        (!remoteLogEnabled() && targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)) {
         // The first cached epoch usually corresponds to the log start offset, but we have to verify this since
         // it may not be true following a message format version bump as the epoch will not be available for
         // log entries written in the older format.
@@ -1280,6 +1309,15 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           case _ => Optional.empty[Integer]()
         }
         Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, logStartOffset, epochOpt))
+      } else if (targetTimestamp == ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP) {
+        val curLocalLogStartOffset = localLogStartOffset()
+        val earliestLocalLogEpochEntry = leaderEpochCache.flatMap(cache =>
+          cache.epochForOffset(curLocalLogStartOffset).flatMap(cache.epochEntry))
+        val epochOpt = earliestLocalLogEpochEntry match {
+          case Some(entry) if entry.startOffset <= curLocalLogStartOffset => Optional.of[Integer](entry.epoch)
+          case _ => Optional.empty[Integer]()
+        }
+        Some(new TimestampAndOffset(RecordBatch.NO_TIMESTAMP, curLocalLogStartOffset, epochOpt))
       } else if (targetTimestamp == ListOffsetsRequest.LATEST_TIMESTAMP) {
         val latestEpochOpt = leaderEpochCache.flatMap(_.latestEpoch).map(_.asInstanceOf[Integer])
         val epochOptional = Optional.ofNullable(latestEpochOpt.orNull)
@@ -1296,12 +1334,30 @@ class UnifiedLog(@volatile var logStartOffset: Long,
           latestTimestampAndOffset.offset,
           epochOptional))
       } else {
-        // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
-        // constant time access while being safe to use with concurrent collections unlike `toArray`.
-        val segmentsCopy = logSegments.toBuffer
         // We need to search the first segment whose largest timestamp is >= the target timestamp if there is one.
-        val targetSeg = segmentsCopy.find(_.largestTimestamp >= targetTimestamp)
-        targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, logStartOffset))
+        val remoteOffset = if (remoteLogEnabled()) {
+          if (remoteLogManager.isEmpty) {
+            throw new KafkaException("RemoteLogManager is empty even though the remote log storage is enabled.");
+          }
+          if (recordVersion.value < RecordVersion.V2.value) {
+            throw new KafkaException("Tiered storage is supported only with versions supporting leader epochs, that means RecordVersion must be >= 2.")
+          }
+
+          remoteLogManager.get.findOffsetByTimestamp(topicPartition, targetTimestamp, logStartOffset, leaderEpochCache.get)
+        } else None
+
+        if (remoteOffset.nonEmpty) {
+          remoteOffset
+        } else {
+          // If it is not found in remote storage, search in the local storage starting with local log start offset.
+
+          // Cache to avoid race conditions. `toBuffer` is faster than most alternatives and provides
+          // constant time access while being safe to use with concurrent collections unlike `toArray`.
+          val segmentsCopy = logSegments.toBuffer
+
+          val targetSeg = segmentsCopy.find(_.largestTimestamp >= targetTimestamp)
+          targetSeg.flatMap(_.findOffsetByTimestamp(targetTimestamp, _localLogStartOffset))
+        }
       }
     }
   }
@@ -1329,6 +1385,8 @@ class UnifiedLog(@volatile var logStartOffset: Long,
         startIndex = offsetTimeArray.length - 1
       case ListOffsetsRequest.EARLIEST_TIMESTAMP =>
         startIndex = 0
+      case ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP =>
+        startIndex = 0
       case _ =>
         var isFound = false
         debug("Offset time array = " + offsetTimeArray.foreach(o => "%d, %d".format(o._1, o._2)))
@@ -1824,7 +1882,9 @@ object UnifiedLog extends Logging {
             lastShutdownClean: Boolean = true,
             topicId: Option[Uuid],
             keepPartitionMetadataFile: Boolean,
-            numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int]): UnifiedLog = {
+            numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int],
+            remoteStorageSystemEnable: Boolean = false,
+            remoteLogManager: Option[RemoteLogManager] = None): UnifiedLog = {
     // create the log directory if it doesn't exist
     Files.createDirectories(dir.toPath)
     val topicPartition = UnifiedLog.parseTopicPartitionName(dir)
@@ -1861,7 +1921,9 @@ object UnifiedLog extends Logging {
       leaderEpochCache,
       producerStateManager,
       topicId,
-      keepPartitionMetadataFile)
+      keepPartitionMetadataFile,
+      remoteStorageSystemEnable,
+      remoteLogManager)
   }
 
   def logFile(dir: File, offset: Long, suffix: String = ""): File = LocalLog.logFile(dir, offset, suffix)
diff --git a/core/src/main/scala/kafka/log/remote/ClassLoaderAwareRemoteStorageManager.scala b/core/src/main/scala/kafka/log/remote/ClassLoaderAwareRemoteStorageManager.scala
new file mode 100644
index 00000000000..d35c70ed85e
--- /dev/null
+++ b/core/src/main/scala/kafka/log/remote/ClassLoaderAwareRemoteStorageManager.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote
+
+import org.apache.kafka.server.log.remote.storage.{LogSegmentData, RemoteLogSegmentMetadata, RemoteStorageManager}
+
+import java.io.InputStream
+import java.util
+
+/**
+ * A wrapper class of RemoteStorageManager that sets the context class loader when calling RSM methods.
+ */
+class ClassLoaderAwareRemoteStorageManager(val rsm: RemoteStorageManager,
+                                           val rsmClassLoader: ClassLoader) extends RemoteStorageManager {
+
+  def withClassLoader[T](fun: => T): T = {
+    val originalClassLoader = Thread.currentThread.getContextClassLoader
+    Thread.currentThread.setContextClassLoader(rsmClassLoader)
+    try {
+      fun
+    } finally {
+      Thread.currentThread.setContextClassLoader(originalClassLoader)
+    }
+  }
+
+  def delegate(): RemoteStorageManager = {
+    rsm
+  }
+
+  override def close(): Unit = withClassLoader {
+    rsm.close()
+  }
+
+  override def configure(configs: util.Map[String, _]): Unit = withClassLoader {
+    rsm.configure(configs)
+  }
+
+  override def copyLogSegmentData(remoteLogSegmentMetadata: RemoteLogSegmentMetadata,
+                                  logSegmentData: LogSegmentData): Unit = withClassLoader {
+    rsm.copyLogSegmentData(remoteLogSegmentMetadata, logSegmentData)
+  }
+
+  override def fetchLogSegment(remoteLogSegmentMetadata: RemoteLogSegmentMetadata,
+                               startPosition: Int): InputStream = withClassLoader {
+    rsm.fetchLogSegment(remoteLogSegmentMetadata, startPosition)
+  }
+
+  override def fetchLogSegment(remoteLogSegmentMetadata: RemoteLogSegmentMetadata,
+                               startPosition: Int,
+                               endPosition: Int): InputStream = withClassLoader {
+    rsm.fetchLogSegment(remoteLogSegmentMetadata, startPosition, endPosition)
+  }
+
+  override def fetchIndex(remoteLogSegmentMetadata: RemoteLogSegmentMetadata,
+                          indexType: RemoteStorageManager.IndexType): InputStream = withClassLoader {
+    rsm.fetchIndex(remoteLogSegmentMetadata, indexType)
+  }
+
+  override def deleteLogSegmentData(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): Unit = withClassLoader {
+    rsm.deleteLogSegmentData(remoteLogSegmentMetadata)
+  }
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala b/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala
new file mode 100644
index 00000000000..564565bc5af
--- /dev/null
+++ b/core/src/main/scala/kafka/log/remote/RemoteIndexCache.scala
@@ -0,0 +1,286 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote
+
+import kafka.log.{LazyIndex, _}
+import kafka.log.remote.RemoteIndexCache.DirName
+import kafka.utils.{CoreUtils, Logging, ShutdownableThread}
+import org.apache.kafka.common.Uuid
+import org.apache.kafka.common.errors.CorruptRecordException
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
+import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentMetadata, RemoteStorageManager}
+
+import java.io.{Closeable, File, InputStream}
+import java.nio.file.{Files, Path}
+import java.util
+import java.util.concurrent.LinkedBlockingQueue
+import java.util.concurrent.locks.ReentrantReadWriteLock
+
+object RemoteIndexCache {
+  val DirName = "remote-log-index-cache"
+  val TmpFileSuffix = ".tmp"
+}
+
+class Entry(val offsetIndex: LazyIndex[OffsetIndex], val timeIndex: LazyIndex[TimeIndex], val txnIndex: TransactionIndex) {
+  private var markedForCleanup: Boolean = false
+  private val lock: ReentrantReadWriteLock = new ReentrantReadWriteLock()
+
+  def lookupOffset(targetOffset: Long): OffsetPosition = {
+    CoreUtils.inLock(lock.readLock()) {
+      if (markedForCleanup) throw new IllegalStateException("This entry is marked for cleanup")
+      else offsetIndex.get.lookup(targetOffset)
+    }
+  }
+
+  def lookupTimestamp(timestamp: Long, startingOffset: Long): OffsetPosition = {
+    CoreUtils.inLock(lock.readLock()) {
+      if (markedForCleanup) throw new IllegalStateException("This entry is marked for cleanup")
+
+      val timestampOffset = timeIndex.get.lookup(timestamp)
+      offsetIndex.get.lookup(math.max(startingOffset, timestampOffset.offset))
+    }
+  }
+
+  def markForCleanup(): Unit = {
+    CoreUtils.inLock(lock.writeLock()) {
+      if (!markedForCleanup) {
+        markedForCleanup = true
+        Array(offsetIndex, timeIndex).foreach(index =>
+          index.renameTo(new File(CoreUtils.replaceSuffix(index.file.getPath, "", UnifiedLog.DeletedFileSuffix))))
+        txnIndex.renameTo(new File(CoreUtils.replaceSuffix(txnIndex.file.getPath, "",
+          UnifiedLog.DeletedFileSuffix)))
+      }
+    }
+  }
+
+  def cleanup(): Unit = {
+    markForCleanup()
+    CoreUtils.tryAll(Seq(() => offsetIndex.deleteIfExists(), () => timeIndex.deleteIfExists(), () => txnIndex.deleteIfExists()))
+  }
+
+  def close(): Unit = {
+    Array(offsetIndex, timeIndex).foreach(index => try {
+      index.close()
+    } catch {
+      case _: Exception => // ignore error.
+    })
+    Utils.closeQuietly(txnIndex, "Closing the transaction index.")
+  }
+}
+
+/**
+ * This is a LRU cache of remote index files stored in `$logdir/remote-log-index-cache`. This is helpful to avoid
+ * re-fetching the index files like offset, time indexes from the remote storage for every fetch call.
+ *
+ * @param maxSize              maximum number of segment index entries to be cached.
+ * @param remoteStorageManager RemoteStorageManager instance, to be used in fetching indexes.
+ * @param logDir               log directory
+ */
+class RemoteIndexCache(maxSize: Int = 1024, remoteStorageManager: RemoteStorageManager, logDir: String)
+  extends Logging with Closeable {
+
+  val cacheDir = new File(logDir, DirName)
+  @volatile var closed = false
+
+  val expiredIndexes = new LinkedBlockingQueue[Entry]()
+  val lock = new Object()
+
+  val entries: util.Map[Uuid, Entry] = new java.util.LinkedHashMap[Uuid, Entry](maxSize / 2,
+    0.75f, true) {
+    override def removeEldestEntry(eldest: util.Map.Entry[Uuid, Entry]): Boolean = {
+      if (this.size() > maxSize) {
+        val entry = eldest.getValue
+        // Mark the entries for cleanup, background thread will clean them later.
+        entry.markForCleanup()
+        expiredIndexes.add(entry)
+        true
+      } else {
+        false
+      }
+    }
+  }
+
+  private def init(): Unit = {
+    if (cacheDir.mkdir())
+      info(s"Created $cacheDir successfully")
+
+    // Delete any .deleted files remained from the earlier run of the broker.
+    Files.list(cacheDir.toPath).forEach((path: Path) => {
+      if (path.endsWith(UnifiedLog.DeletedFileSuffix)) {
+        Files.deleteIfExists(path)
+      }
+    })
+
+    Files.list(cacheDir.toPath).forEach((path:Path) => {
+      val pathStr = path.getFileName.toString
+      val name = pathStr.substring(0, pathStr.lastIndexOf("_") + 1)
+
+      // Create entries for each path if all the index files exist.
+      val firstIndex = name.indexOf('_')
+      val offset = name.substring(0, firstIndex).toInt
+      val uuid = Uuid.fromString(name.substring(firstIndex + 1, name.lastIndexOf('_')))
+
+      if(!entries.containsKey(uuid)) {
+        val offsetIndexFile = new File(cacheDir, name + UnifiedLog.IndexFileSuffix)
+        val timestampIndexFile = new File(cacheDir, name + UnifiedLog.TimeIndexFileSuffix)
+        val txnIndexFile = new File(cacheDir, name + UnifiedLog.TxnIndexFileSuffix)
+
+        if (offsetIndexFile.exists() && timestampIndexFile.exists() && txnIndexFile.exists()) {
+
+          val offsetIndex: LazyIndex[OffsetIndex] = {
+            val index = LazyIndex.forOffset(offsetIndexFile, offset, Int.MaxValue, writable = false)
+            index.get.sanityCheck()
+            index
+          }
+
+          val timeIndex: LazyIndex[TimeIndex] = {
+            val index = LazyIndex.forTime(timestampIndexFile, offset, Int.MaxValue, writable = false)
+            index.get.sanityCheck()
+            index
+          }
+
+          val txnIndex: TransactionIndex = {
+            val index = new TransactionIndex(offset, txnIndexFile)
+            index.sanityCheck()
+            index
+          }
+
+          val entry = new Entry(offsetIndex, timeIndex, txnIndex)
+          entries.put(uuid, entry)
+        } else {
+          // Delete all of them if any one of those indexes is not available for a specific segment id
+          Files.deleteIfExists(offsetIndexFile.toPath)
+          Files.deleteIfExists(timestampIndexFile.toPath)
+          Files.deleteIfExists(txnIndexFile.toPath)
+        }
+      }
+    })
+  }
+
+  init()
+
+  // Start cleaner thread that will clean the expired entries
+  val cleanerThread: ShutdownableThread = new ShutdownableThread("remote-log-index-cleaner") {
+    setDaemon(true)
+
+    override def doWork(): Unit = {
+      while (!closed) {
+        try {
+          val entry = expiredIndexes.take()
+          info(s"Cleaning up index entry $entry")
+          entry.cleanup()
+        } catch {
+          case ex: InterruptedException => info("Cleaner thread was interrupted", ex)
+          case ex: Exception => error("Error occurred while fetching/cleaning up expired entry", ex)
+        }
+      }
+    }
+  }
+  cleanerThread.start()
+
+  def getIndexEntry(remoteLogSegmentMetadata: RemoteLogSegmentMetadata): Entry = {
+    if(closed) throw new IllegalStateException("Instance is already closed.")
+
+    def loadIndexFile[T](fileName: String,
+                         suffix: String,
+                         fetchRemoteIndex: RemoteLogSegmentMetadata => InputStream,
+                         readIndex: File => T): T = {
+      val indexFile = new File(cacheDir, fileName + suffix)
+
+      def fetchAndCreateIndex(): T = {
+        val tmpIndexFile = new File(cacheDir, fileName + suffix + RemoteIndexCache.TmpFileSuffix)
+
+        val inputStream = fetchRemoteIndex(remoteLogSegmentMetadata)
+        try {
+          Files.copy(inputStream, tmpIndexFile.toPath)
+        } finally {
+          if (inputStream != null) {
+            inputStream.close()
+          }
+        }
+
+        Utils.atomicMoveWithFallback(tmpIndexFile.toPath, indexFile.toPath, false)
+        readIndex(indexFile)
+      }
+
+      if (indexFile.exists()) {
+        try {
+          readIndex(indexFile)
+        } catch {
+          case ex: CorruptRecordException =>
+            info("Error occurred while loading the stored index", ex)
+            fetchAndCreateIndex()
+        }
+      } else {
+        fetchAndCreateIndex()
+      }
+    }
+
+    lock synchronized {
+      entries.computeIfAbsent(remoteLogSegmentMetadata.remoteLogSegmentId().id(), (uuid: Uuid) => {
+        val startOffset = remoteLogSegmentMetadata.startOffset()
+        // uuid.toString uses URL encoding which is safe for filenames and URLs.
+        val fileName = startOffset.toString + "_" + uuid.toString + "_"
+
+        val offsetIndex: LazyIndex[OffsetIndex] = loadIndexFile(fileName, UnifiedLog.IndexFileSuffix,
+          rlsMetadata => remoteStorageManager.fetchIndex(rlsMetadata, IndexType.OFFSET),
+          file => {
+            val index = LazyIndex.forOffset(file, startOffset, Int.MaxValue, writable = false)
+            index.get.sanityCheck()
+            index
+          })
+
+        val timeIndex: LazyIndex[TimeIndex] = loadIndexFile(fileName, UnifiedLog.TimeIndexFileSuffix,
+          rlsMetadata => remoteStorageManager.fetchIndex(rlsMetadata, IndexType.TIMESTAMP),
+          file => {
+            val index = LazyIndex.forTime(file, startOffset, Int.MaxValue, writable = false)
+            index.get.sanityCheck()
+            index
+          })
+
+        val txnIndex: TransactionIndex = loadIndexFile(fileName, UnifiedLog.TxnIndexFileSuffix,
+          rlsMetadata => remoteStorageManager.fetchIndex(rlsMetadata, IndexType.TRANSACTION),
+          file => {
+            val index = new TransactionIndex(startOffset, file)
+            index.sanityCheck()
+            index
+          })
+
+        new Entry(offsetIndex, timeIndex, txnIndex)
+      })
+    }
+  }
+
+  def lookupOffset(remoteLogSegmentMetadata: RemoteLogSegmentMetadata, offset: Long): Int = {
+    getIndexEntry(remoteLogSegmentMetadata).lookupOffset(offset).position
+  }
+
+  def lookupTimestamp(remoteLogSegmentMetadata: RemoteLogSegmentMetadata, timestamp: Long, startingOffset: Long): Int = {
+    getIndexEntry(remoteLogSegmentMetadata).lookupTimestamp(timestamp, startingOffset).position
+  }
+
+  def close(): Unit = {
+    closed = true
+    cleanerThread.shutdown()
+    // Close all the opened indexes.
+    lock synchronized {
+      entries.values().stream().forEach(entry => entry.close())
+    }
+  }
+
+}
diff --git a/core/src/main/scala/kafka/log/remote/RemoteLogManager.scala b/core/src/main/scala/kafka/log/remote/RemoteLogManager.scala
new file mode 100644
index 00000000000..6558094842a
--- /dev/null
+++ b/core/src/main/scala/kafka/log/remote/RemoteLogManager.scala
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote
+
+import kafka.cluster.Partition
+import kafka.metrics.KafkaMetricsGroup
+import kafka.server.KafkaConfig
+import kafka.server.epoch.LeaderEpochFileCache
+import kafka.utils.Logging
+import org.apache.kafka.common._
+import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
+import org.apache.kafka.common.record.{RecordBatch, RemoteLogInputStream}
+import org.apache.kafka.common.utils.{ChildFirstClassLoader, Utils}
+import org.apache.kafka.server.log.remote.metadata.storage.ClassLoaderAwareRemoteLogMetadataManager
+import org.apache.kafka.server.log.remote.storage.{RemoteLogManagerConfig, RemoteLogMetadataManager, RemoteLogSegmentMetadata, RemoteStorageManager}
+
+import java.io.{Closeable, InputStream}
+import java.security.{AccessController, PrivilegedAction}
+import java.util
+import java.util.Optional
+import java.util.concurrent.{ConcurrentHashMap, ConcurrentMap}
+import scala.collection.Set
+import scala.jdk.CollectionConverters._
+
+/**
+ * This class is responsible for
+ *  - initializing `RemoteStorageManager` and `RemoteLogMetadataManager` instances.
+ *  - receives any leader and follower replica events and partition stop events and act on them
+ *  - also provides APIs to fetch indexes, metadata about remote log segments.
+ *
+ * @param rlmConfig Configuration required for remote logging subsystem(tiered storage) at the broker level.
+ * @param brokerId  id of the current broker.
+ * @param logDir    directory of Kafka log segments.
+ */
+class RemoteLogManager(rlmConfig: RemoteLogManagerConfig,
+                       brokerId: Int,
+                       logDir: String) extends Logging with Closeable with KafkaMetricsGroup {
+
+  // topic ids received on leadership changes
+  private val topicPartitionIds: ConcurrentMap[TopicPartition, Uuid] = new ConcurrentHashMap[TopicPartition, Uuid]()
+
+  private val remoteLogStorageManager: RemoteStorageManager = createRemoteStorageManager()
+  private val remoteLogMetadataManager: RemoteLogMetadataManager = createRemoteLogMetadataManager()
+
+  private val indexCache = new RemoteIndexCache(remoteStorageManager = remoteLogStorageManager, logDir = logDir)
+
+  private var closed = false
+
+  private[remote] def createRemoteStorageManager(): RemoteStorageManager = {
+    def createDelegate(classLoader: ClassLoader): RemoteStorageManager = {
+      classLoader.loadClass(rlmConfig.remoteStorageManagerClassName())
+        .getDeclaredConstructor().newInstance().asInstanceOf[RemoteStorageManager]
+    }
+
+    AccessController.doPrivileged(new PrivilegedAction[RemoteStorageManager] {
+      private val classPath = rlmConfig.remoteStorageManagerClassPath()
+
+      override def run(): RemoteStorageManager = {
+          if (classPath != null && classPath.trim.nonEmpty) {
+            val classLoader = new ChildFirstClassLoader(classPath, this.getClass.getClassLoader)
+            val delegate = createDelegate(classLoader)
+            new ClassLoaderAwareRemoteStorageManager(delegate, classLoader)
+          } else {
+            createDelegate(this.getClass.getClassLoader)
+          }
+      }
+    })
+  }
+
+  private def configureRSM(): Unit = {
+    val rsmProps = new util.HashMap[String, Any]()
+    rlmConfig.remoteStorageManagerProps().asScala.foreach { case (k, v) => rsmProps.put(k, v) }
+    rsmProps.put(KafkaConfig.BrokerIdProp, brokerId)
+    remoteLogStorageManager.configure(rsmProps)
+  }
+
+  private[remote] def createRemoteLogMetadataManager(): RemoteLogMetadataManager = {
+    def createDelegate(classLoader: ClassLoader) = {
+      classLoader.loadClass(rlmConfig.remoteLogMetadataManagerClassName())
+        .getDeclaredConstructor()
+        .newInstance()
+        .asInstanceOf[RemoteLogMetadataManager]
+    }
+
+    AccessController.doPrivileged(new PrivilegedAction[RemoteLogMetadataManager] {
+      private val classPath = rlmConfig.remoteLogMetadataManagerClassPath
+
+      override def run(): RemoteLogMetadataManager = {
+        if (classPath != null && classPath.trim.nonEmpty) {
+          val classLoader = new ChildFirstClassLoader(classPath, this.getClass.getClassLoader)
+          val delegate = createDelegate(classLoader)
+          new ClassLoaderAwareRemoteLogMetadataManager(delegate, classLoader)
+        } else {
+          createDelegate(this.getClass.getClassLoader)
+        }
+      }
+    })
+  }
+
+  private def configureRLMM(): Unit = {
+    val rlmmProps = new util.HashMap[String, Any]()
+    rlmConfig.remoteLogMetadataManagerProps().asScala.foreach { case (k, v) => rlmmProps.put(k, v) }
+    rlmmProps.put(KafkaConfig.BrokerIdProp, brokerId)
+    rlmmProps.put(KafkaConfig.LogDirProp, logDir)
+    remoteLogMetadataManager.configure(rlmmProps)
+  }
+
+  def startup(): Unit = {
+    // Initialize and configure RSM and RLMM. This will start RSM, RLMM resources which may need to start resources
+    // in connecting to the brokers or remote storages.
+    configureRSM()
+    configureRLMM()
+  }
+
+  def storageManager(): RemoteStorageManager = {
+    remoteLogStorageManager
+  }
+
+  /**
+   * Callback to receive any leadership changes for the topic partitions assigned to this broker. If there are no
+   * existing tasks for a given topic partition then it will assign new leader or follower task else it will convert the
+   * task to respective target state(leader or follower).
+   *
+   * @param partitionsBecomeLeader   partitions that have become leaders on this broker.
+   * @param partitionsBecomeFollower partitions that have become followers on this broker.
+   * @param topicIds                 topic name to topic id mappings.
+   */
+  def onLeadershipChange(partitionsBecomeLeader: Set[Partition],
+                         partitionsBecomeFollower: Set[Partition],
+                         topicIds: util.Map[String, Uuid]): Unit = {
+    debug(s"Received leadership changes for leaders: $partitionsBecomeLeader and followers: $partitionsBecomeFollower")
+
+    // Partitions logs are available when this callback is invoked.
+    // Compact topics and internal topics are filtered here as they are not supported with tiered storage.
+    def filterPartitions(partitions: Set[Partition]): Set[TopicIdPartition] = {
+      // We are not specifically checking for internal topics etc here as `log.remoteLogEnabled()` already handles that.
+      partitions.filter(partition => partition.log.exists(log => log.remoteLogEnabled()))
+        .map(partition => new TopicIdPartition(topicIds.get(partition.topic), partition.topicPartition))
+    }
+
+    val followerTopicPartitions = filterPartitions(partitionsBecomeFollower)
+    val leaderTopicPartitions = filterPartitions(partitionsBecomeLeader)
+    debug(s"Effective topic partitions after filtering compact and internal topics, leaders: $leaderTopicPartitions " +
+      s"and followers: $followerTopicPartitions")
+
+    if (leaderTopicPartitions.nonEmpty || followerTopicPartitions.nonEmpty) {
+      leaderTopicPartitions.foreach(x => topicPartitionIds.put(x.topicPartition(), x.topicId()))
+      followerTopicPartitions.foreach(x => topicPartitionIds.put(x.topicPartition(), x.topicId()))
+
+      remoteLogMetadataManager.onPartitionLeadershipChanges(leaderTopicPartitions.asJava, followerTopicPartitions.asJava)
+    }
+  }
+
+  /**
+   * Deletes the internal topic partition info if delete flag is set as true.
+   *
+   * @param topicPartition topic partition to be stopped.
+   * @param delete         flag to indicate whether the given topic partitions to be deleted or not.
+   */
+  def stopPartitions(topicPartition: TopicPartition, delete: Boolean): Unit = {
+    if (delete) {
+      // Delete from internal datastructures only if it is to be deleted.
+      val topicIdPartition = topicPartitionIds.remove(topicPartition)
+      debug(s"Removed partition: $topicIdPartition from topicPartitionIds")
+    }
+  }
+
+  def fetchRemoteLogSegmentMetadata(topicPartition: TopicPartition,
+                                    epochForOffset: Int,
+                                    offset: Long): Optional[RemoteLogSegmentMetadata] = {
+    val topicId = topicPartitionIds.get(topicPartition)
+
+    if (topicId == null) {
+      throw new KafkaException("No topic id registered for topic partition: " + topicPartition)
+    }
+
+    remoteLogMetadataManager.remoteLogSegmentMetadata(new TopicIdPartition(topicId, topicPartition), epochForOffset, offset)
+  }
+
+  private def lookupTimestamp(rlsMetadata: RemoteLogSegmentMetadata, timestamp: Long, startingOffset: Long): Option[TimestampAndOffset] = {
+    val startPos = indexCache.lookupTimestamp(rlsMetadata, timestamp, startingOffset)
+
+    var remoteSegInputStream: InputStream = null
+    try {
+      // Search forward for the position of the last offset that is greater than or equal to the startingOffset
+      remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(rlsMetadata, startPos)
+      val remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream)
+      var batch: RecordBatch = null
+
+      def nextBatch(): RecordBatch = {
+        batch = remoteLogInputStream.nextBatch()
+        batch
+      }
+
+      while (nextBatch() != null) {
+        if (batch.maxTimestamp >= timestamp && batch.lastOffset >= startingOffset) {
+          batch.iterator.asScala.foreach(record => {
+            if (record.timestamp >= timestamp && record.offset >= startingOffset)
+              return Some(new TimestampAndOffset(record.timestamp, record.offset, maybeLeaderEpoch(batch.partitionLeaderEpoch)))
+          })
+        }
+      }
+      None
+    } finally {
+      Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream")
+    }
+  }
+
+  private def maybeLeaderEpoch(leaderEpoch: Int): Optional[Integer] = {
+    if (leaderEpoch == RecordBatch.NO_PARTITION_LEADER_EPOCH)
+      Optional.empty()
+    else
+      Optional.of(leaderEpoch)
+  }
+
+  /**
+   * Search the message offset in the remote storage based on timestamp and offset.
+   *
+   * This method returns an option of TimestampOffset. The returned value is determined using the following ordered list of rules:
+   *
+   * - If there are no messages in the remote storage, return None
+   * - If all the messages in the remote storage have smaller offsets, return None
+   * - If all the messages in the remote storage have smaller timestamps, return None
+   * - Otherwise, return an option of TimestampOffset. The offset is the offset of the first message whose timestamp
+   * is greater than or equals to the target timestamp and whose offset is greater than or equals to the startingOffset.
+   *
+   * @param tp               topic partition in which the offset to be found.
+   * @param timestamp        The timestamp to search for.
+   * @param startingOffset   The starting offset to search.
+   * @param leaderEpochCache LeaderEpochFileCache of the topic partition.
+   * @return the timestamp and offset of the first message that meets the requirements. None will be returned if there
+   *         is no such message.
+   */
+  def findOffsetByTimestamp(tp: TopicPartition,
+                            timestamp: Long,
+                            startingOffset: Long,
+                            leaderEpochCache: LeaderEpochFileCache): Option[TimestampAndOffset] = {
+    val topicId = topicPartitionIds.get(tp)
+    if (topicId == null) {
+      throw new KafkaException("Topic id does not exist for topic partition: " + tp)
+    }
+
+    // Get the respective epoch in which the starting-offset exists.
+    var maybeEpoch = leaderEpochCache.epochForOffset(startingOffset)
+    while (maybeEpoch.nonEmpty) {
+      val epoch = maybeEpoch.get
+      remoteLogMetadataManager.listRemoteLogSegments(new TopicIdPartition(topicId, tp), epoch).asScala
+        .foreach(rlsMetadata =>
+          if (rlsMetadata.maxTimestampMs() >= timestamp && rlsMetadata.endOffset() >= startingOffset) {
+            val timestampOffset = lookupTimestamp(rlsMetadata, timestamp, startingOffset)
+            if (timestampOffset.isDefined)
+              return timestampOffset
+          }
+        )
+
+      // Move to the next epoch if not found with the current epoch.
+      maybeEpoch = leaderEpochCache.nextEpoch(epoch)
+    }
+    None
+  }
+
+  /**
+   * Closes and releases all the resources like RemoterStorageManager and RemoteLogMetadataManager.
+   */
+  def close(): Unit = {
+    this synchronized {
+      if (!closed) {
+        Utils.closeQuietly(remoteLogStorageManager, "RemoteLogStorageManager")
+        Utils.closeQuietly(remoteLogMetadataManager, "RemoteLogMetadataManager")
+        Utils.closeQuietly(indexCache, "RemoteIndexCache")
+        closed = true
+      }
+    }
+  }
+
+}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 9701f552647..dd8c6a83e58 100755
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -46,7 +46,7 @@ import scala.jdk.CollectionConverters._
 import scala.math._
 
 /**
- *  Abstract class for fetching data from multiple partitions from the same broker.
+ * Abstract class for fetching data from multiple partitions from the same broker.
  */
 abstract class AbstractFetcherThread(name: String,
                                      clientId: String,
@@ -89,6 +89,22 @@ abstract class AbstractFetcherThread(name: String,
 
   protected val isOffsetForLeaderEpochSupported: Boolean
 
+  /**
+   * Builds the required remote log auxiliary state for the given topic partition on this follower replica and returns
+   * the offset to be fetched from the leader replica.
+   *
+   * @param partition topic partition
+   * @param currentLeaderEpoch current leader epoch maintained by this follower replica.
+   * @param fetchOffset offset to be fetched from the leader.
+   * @param epochForFetchOffset respective leader epoch for the given fetch pffset.
+   * @param leaderLogStartOffset log-start-offset on the leader.
+   */
+  protected def buildRemoteLogAuxState(partition: TopicPartition,
+                                       currentLeaderEpoch: Int,
+                                       fetchOffset: Long,
+                                       epochForFetchOffset: Int,
+                                       leaderLogStartOffset: Long): Long
+
   override def shutdown(): Unit = {
     initiateShutdown()
     inLock(partitionMapLock) {
@@ -187,14 +203,14 @@ abstract class AbstractFetcherThread(name: String,
   }
 
   /**
-    * - Build a leader epoch fetch based on partitions that are in the Truncating phase
-    * - Send OffsetsForLeaderEpochRequest, retrieving the latest offset for each partition's
-    *   leader epoch. This is the offset the follower should truncate to ensure
-    *   accurate log replication.
-    * - Finally truncate the logs for partitions in the truncating phase and mark the
-    *   truncation complete. Do this within a lock to ensure no leadership changes can
-    *   occur during truncation.
-    */
+   * - Build a leader epoch fetch based on partitions that are in the Truncating phase
+   * - Send OffsetsForLeaderEpochRequest, retrieving the latest offset for each partition's
+   * leader epoch. This is the offset the follower should truncate to ensure
+   * accurate log replication.
+   * - Finally truncate the logs for partitions in the truncating phase and mark the
+   * truncation complete. Do this within a lock to ensure no leadership changes can
+   * occur during truncation.
+   */
   private def truncateToEpochEndOffsets(latestEpochsForPartitions: Map[TopicPartition, EpochData]): Unit = {
     val endOffsets = leader.fetchEpochEndOffsets(latestEpochsForPartitions)
     //Ensure we hold a lock during truncation.
@@ -281,6 +297,7 @@ abstract class AbstractFetcherThread(name: String,
 
   /**
    * remove the partition if the partition state is NOT updated. Otherwise, keep the partition active.
+   *
    * @return true if the epoch in this thread is updated. otherwise, false
    */
   private def onPartitionFenced(tp: TopicPartition, requestEpoch: Optional[Integer]): Boolean = inLock(partitionMapLock) {
@@ -381,9 +398,14 @@ abstract class AbstractFetcherThread(name: String,
                       markPartitionFailed(topicPartition)
                   }
                 case Errors.OFFSET_OUT_OF_RANGE =>
-                  if (handleOutOfRangeError(topicPartition, currentFetchState, fetchPartitionData.currentLeaderEpoch))
+                  if (!handleOutOfRangeError(topicPartition, currentFetchState, fetchPartitionData.currentLeaderEpoch))
+                    partitionsWithError += topicPartition
+                case Errors.OFFSET_MOVED_TO_TIERED_STORAGE =>
+                  debug(s"Received error ${Errors.OFFSET_MOVED_TO_TIERED_STORAGE}, " +
+                    s"at fetch offset: ${currentFetchState.fetchOffset}, " + s"topic-partition: $topicPartition")
+                  if (!handleOffsetsMovedToTieredStorage(topicPartition, currentFetchState,
+                    fetchPartitionData.currentLeaderEpoch, partitionData.logStartOffset()))
                     partitionsWithError += topicPartition
-
                 case Errors.UNKNOWN_LEADER_EPOCH =>
                   debug(s"Remote broker has a smaller leader epoch for partition $topicPartition than " +
                     s"this replica's current leader epoch of ${currentFetchState.currentLeaderEpoch}.")
@@ -475,7 +497,7 @@ abstract class AbstractFetcherThread(name: String,
       val lastFetchedEpoch = latestEpoch(tp)
       val state = if (lastFetchedEpoch.nonEmpty) Fetching else Truncating
       PartitionFetchState(initialFetchState.topicId, initialFetchState.initOffset, None, initialFetchState.currentLeaderEpoch,
-          state, lastFetchedEpoch)
+        state, lastFetchedEpoch)
     } else {
       PartitionFetchState(initialFetchState.topicId, initialFetchState.initOffset, None, initialFetchState.currentLeaderEpoch,
         state = Truncating, lastFetchedEpoch = None)
@@ -513,11 +535,11 @@ abstract class AbstractFetcherThread(name: String,
   }
 
   /**
-    * Loop through all partitions, updating their fetch offset and maybe marking them as
-    * truncation completed if their offsetTruncationState indicates truncation completed
-    *
-    * @param fetchOffsets the partitions to update fetch offset and maybe mark truncation complete
-    */
+   * Loop through all partitions, updating their fetch offset and maybe marking them as
+   * truncation completed if their offsetTruncationState indicates truncation completed
+   *
+   * @param fetchOffsets the partitions to update fetch offset and maybe mark truncation complete
+   */
   private def updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets: Map[TopicPartition, OffsetTruncationState]): Unit = {
     val newStates: Map[TopicPartition, PartitionFetchState] = partitionStates.partitionStateMap.asScala
       .map { case (topicPartition, currentFetchState) =>
@@ -557,8 +579,8 @@ abstract class AbstractFetcherThread(name: String,
    *  -- Otherwise, truncate to min(leader's offset, end offset on the follower for epoch that
    *  leader replied with, follower's Log End Offset).
    *
-   * @param tp                    Topic partition
-   * @param leaderEpochOffset     Epoch end offset received from the leader for this topic partition
+   * @param tp                Topic partition
+   * @param leaderEpochOffset Epoch end offset received from the leader for this topic partition
    */
   private def getOffsetTruncationState(tp: TopicPartition,
                                        leaderEpochOffset: EpochEndOffset): OffsetTruncationState = inLock(partitionMapLock) {
@@ -568,13 +590,13 @@ abstract class AbstractFetcherThread(name: String,
       // replica's truncation offset (when the current replica truncates, it forces future
       // replica's partition state to 'truncating' and sets initial offset to its truncation offset)
       warn(s"Based on replica's leader epoch, leader replied with an unknown offset in $tp. " +
-           s"The initial fetch offset ${partitionStates.stateValue(tp).fetchOffset} will be used for truncation.")
+        s"The initial fetch offset ${partitionStates.stateValue(tp).fetchOffset} will be used for truncation.")
       OffsetTruncationState(partitionStates.stateValue(tp).fetchOffset, truncationCompleted = true)
     } else if (leaderEpochOffset.leaderEpoch == UNDEFINED_EPOCH) {
       // either leader or follower or both use inter-broker protocol version < IBP_2_0_IV0
       // (version 0 of OffsetForLeaderEpoch request/response)
       warn(s"Leader or replica is on protocol version where leader epoch is not considered in the OffsetsForLeaderEpoch response. " +
-           s"The leader's offset ${leaderEpochOffset.endOffset} will be used for truncation in $tp.")
+        s"The leader's offset ${leaderEpochOffset.endOffset} will be used for truncation in $tp.")
       OffsetTruncationState(min(leaderEpochOffset.endOffset, logEndOffset(tp)), truncationCompleted = true)
     } else {
       val replicaEndOffset = logEndOffset(tp)
@@ -610,40 +632,25 @@ abstract class AbstractFetcherThread(name: String,
   }
 
   /**
-   * Handle the out of range error. Return false if
-   * 1) the request succeeded or
-   * 2) was fenced and this thread haven't received new epoch,
-   * which means we need not backoff and retry. True if there was a retriable error.
-   */
-  private def handleOutOfRangeError(topicPartition: TopicPartition,
-                                    fetchState: PartitionFetchState,
-                                    requestEpoch: Optional[Integer]): Boolean = {
-    try {
-      val newFetchState = fetchOffsetAndTruncate(topicPartition, fetchState.topicId, fetchState.currentLeaderEpoch)
-      partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
-      info(s"Current offset ${fetchState.fetchOffset} for partition $topicPartition is " +
-        s"out of range, which typically implies a leader change. Reset fetch offset to ${newFetchState.fetchOffset}")
-      false
-    } catch {
-      case _: FencedLeaderEpochException =>
-        onPartitionFenced(topicPartition, requestEpoch)
-
-      case e @ (_ : UnknownTopicOrPartitionException |
-                _ : UnknownLeaderEpochException |
-                _ : NotLeaderOrFollowerException) =>
-        info(s"Could not fetch offset for $topicPartition due to error: ${e.getMessage}")
-        true
-
-      case e: Throwable =>
-        error(s"Error getting offset for partition $topicPartition", e)
-        true
-    }
-  }
-
-  /**
-   * Handle a partition whose offset is out of range and return a new fetch offset.
+   * It returns the next fetch state. It fetches the  log-start-offset or local-log-start-offset based on
+   * `fetchFromLocalLogStartOffset` flag. This is used in truncation by passing it to the given `truncateAndBuild`
+   * function.
+   *
+   * @param topicPartition               topic partition
+   * @param topicId                      topic id
+   * @param currentLeaderEpoch           current leader epoch maintained by this follower replica.
+   * @param truncateAndBuild             Function to truncate for the given epoch and offset. It returns the next fetch offset value.
+   * @param fetchFromLocalLogStartOffset Whether to fetch from local-log-start-offset or log-start-offset. If true, it
+   *                                     requests the local-log-start-offset from the leader, else it requests
+   *                                     log-start-offset from the leader. This is used in sending the value to the
+   *                                     given `truncateAndBuild` function.
+   * @return next PartitionFetchState
    */
-  protected def fetchOffsetAndTruncate(topicPartition: TopicPartition, topicId: Option[Uuid], currentLeaderEpoch: Int): PartitionFetchState = {
+  private def fetchOffsetAndApplyTruncateAndBuild(topicPartition: TopicPartition,
+                                                  topicId: Option[Uuid],
+                                                  currentLeaderEpoch: Int,
+                                                  truncateAndBuild: => (Int, Long) => Long,
+                                                  fetchFromLocalLogStartOffset: Boolean = true): PartitionFetchState = {
     val replicaEndOffset = logEndOffset(topicPartition)
 
     /**
@@ -656,7 +663,7 @@ abstract class AbstractFetcherThread(name: String,
      *
      * There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now.
      */
-    val leaderEndOffset = leader.fetchLatestOffset(topicPartition, currentLeaderEpoch)
+    val (_, leaderEndOffset) = leader.fetchLatestOffset(topicPartition, currentLeaderEpoch)
     if (leaderEndOffset < replicaEndOffset) {
       warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " +
         s"leader's latest offset $leaderEndOffset")
@@ -676,24 +683,33 @@ abstract class AbstractFetcherThread(name: String,
        * produced to the new leader. While the old leader is trying to handle the OffsetOutOfRangeException and query
        * the log end offset of the new leader, the new leader's log end offset becomes higher than the follower's log end offset.
        *
-       * In the first case, the follower's current log end offset is smaller than the leader's log start offset. So the
-       * follower should truncate all its logs, roll out a new segment and start to fetch from the current leader's log
-       * start offset.
+       * In the first case, the follower's current log end offset is smaller than the leader's log start offset
+       * (or leader's local log start offset).
+       * So the follower should truncate all its logs, roll out a new segment and start to fetch from the current
+       * leader's log start offset(or leader's local log start offset).
        * In the second case, the follower should just keep the current log segments and retry the fetch. In the second
        * case, there will be some inconsistency of data between old and new leader. We are not solving it here.
        * If users want to have strong consistency guarantees, appropriate configurations needs to be set for both
        * brokers and producers.
        *
        * Putting the two cases together, the follower should fetch from the higher one of its replica log end offset
-       * and the current leader's log start offset.
+       * and the current leader's (local-log-start-offset or) log start offset.
        */
-      val leaderStartOffset = leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch)
+      val (epoch, leaderStartOffset) = if (fetchFromLocalLogStartOffset)
+        leader.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch) else
+        leader.fetchEarliestOffset(topicPartition, currentLeaderEpoch)
+
       warn(s"Reset fetch offset for partition $topicPartition from $replicaEndOffset to current " +
         s"leader's start offset $leaderStartOffset")
-      val offsetToFetch = Math.max(leaderStartOffset, replicaEndOffset)
-      // Only truncate log when current leader's log start offset is greater than follower's log end offset.
-      if (leaderStartOffset > replicaEndOffset)
-        truncateFullyAndStartAt(topicPartition, leaderStartOffset)
+      val offsetToFetch =
+        if (leaderStartOffset > replicaEndOffset) {
+          // Only truncate log when current leader's log start offset (local log start offset if >= 3.4 version incaseof
+          // OffsetMovedToTieredStorage error) is greater than follower's log end offset.
+          // truncateAndBuild returns offset value from which it needs to start fetching.
+          truncateAndBuild(epoch, leaderStartOffset)
+        } else {
+          replicaEndOffset
+        }
 
       val initialLag = leaderEndOffset - offsetToFetch
       fetcherLagStats.getAndMaybePut(topicPartition).lag = initialLag
@@ -702,6 +718,102 @@ abstract class AbstractFetcherThread(name: String,
     }
   }
 
+  /**
+   * Handle a partition whose offset is out of range and return a new fetch offset.
+   */
+  private def fetchOffsetAndTruncate(topicPartition: TopicPartition, topicId: Option[Uuid], currentLeaderEpoch: Int): PartitionFetchState = {
+    fetchOffsetAndApplyTruncateAndBuild(topicPartition, topicId, currentLeaderEpoch,
+      (_, leaderLogStartOffset) => {
+        truncateFullyAndStartAt(topicPartition, leaderLogStartOffset)
+        leaderLogStartOffset
+      },
+      // In this case, it will fetch from leader's log-start-offset like earlier instead of fetching from
+      // local-log-start-offset. This handles both the scenarios of whether tiered storage is enabled or not.
+      // If tiered storage is enabled, the next fetch result of fetching from log-start-offset may result in
+      // OffsetMovedToTieredStorage error and it will handle building the remote log state.
+      fetchFromLocalLogStartOffset = false)
+  }
+
+  /**
+   * Handles the out of range error for the given topic partition.
+   *
+   * Returns true if
+   *    - the request succeeded or
+   *    - it was fenced and this thread hasn't received new epoch, which means we need not backoff and retry as the
+   *    partition is moved to failed state.
+   *
+   * Returns false if there was a retriable error.
+   *
+   * @param topicPartition topic partition
+   * @param fetchState current fetch state
+   * @param leaderEpochInRequest current leader epoch sent in the fetch request.
+   */
+  private def handleOutOfRangeError(topicPartition: TopicPartition,
+                                    fetchState: PartitionFetchState,
+                                    leaderEpochInRequest: Optional[Integer]): Boolean = {
+    try {
+      val newFetchState = fetchOffsetAndTruncate(topicPartition, fetchState.topicId, fetchState.currentLeaderEpoch)
+      partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
+      info(s"Current offset ${fetchState.fetchOffset} for partition $topicPartition is " +
+        s"out of range, which typically implies a leader change. Reset fetch offset to ${newFetchState.fetchOffset}")
+      true
+    } catch {
+      case _: FencedLeaderEpochException =>
+        onPartitionFenced(topicPartition, leaderEpochInRequest)
+
+      case e@(_: UnknownTopicOrPartitionException |
+              _: UnknownLeaderEpochException |
+              _: NotLeaderOrFollowerException) =>
+        info(s"Could not fetch offset for $topicPartition due to error: ${e.getMessage}")
+        false
+
+      case e: Throwable =>
+        error(s"Error getting offset for partition $topicPartition", e)
+        false
+    }
+  }
+
+  /**
+   * Handles the offset moved to tiered storage error for the given topic partition.
+   *
+   * Returns true if
+   *    - the request succeeded or
+   *    - it was fenced and this thread haven't received new epoch, which means we need not backoff and retry as the
+   *    partition is moved to failed state.
+   *
+   * Returns false if there was a retriable error.
+   *
+   * @param topicPartition topic partition
+   * @param fetchState current partition fetch state.
+   * @param leaderEpochInRequest current leader epoch sent in the fetch request.
+   * @param leaderLogStartOffset log-start-offset in the leader replica.
+   */
+  private def handleOffsetsMovedToTieredStorage(topicPartition: TopicPartition,
+                                                fetchState: PartitionFetchState,
+                                                leaderEpochInRequest: Optional[Integer],
+                                                leaderLogStartOffset: Long): Boolean = {
+    try {
+      val newFetchState = fetchOffsetAndApplyTruncateAndBuild(topicPartition, fetchState.topicId, fetchState.currentLeaderEpoch,
+        (offsetEpoch, leaderLocalLogStartOffset) => buildRemoteLogAuxState(topicPartition, fetchState.currentLeaderEpoch, leaderLocalLogStartOffset, offsetEpoch, leaderLogStartOffset))
+
+      partitionStates.updateAndMoveToEnd(topicPartition, newFetchState)
+      debug(s"Current offset ${fetchState.fetchOffset} for partition $topicPartition is " +
+        s"out of range or moved to remote tier. Reset fetch offset to ${newFetchState.fetchOffset}")
+      true
+    } catch {
+      case _: FencedLeaderEpochException =>
+        onPartitionFenced(topicPartition, leaderEpochInRequest)
+      case e@(_: UnknownTopicOrPartitionException |
+              _: UnknownLeaderEpochException |
+              _: NotLeaderOrFollowerException) =>
+        info(s"Could not build remote log auxiliary state for $topicPartition due to error: ${e.getMessage}")
+        false
+      case e: Throwable =>
+        error(s"Error building remote log auxiliary state for $topicPartition", e)
+        false
+    }
+  }
+
   def delayPartitions(partitions: Iterable[TopicPartition], delay: Long): Unit = {
     partitionMapLock.lockInterruptibly()
     try {
@@ -773,6 +885,7 @@ abstract class AbstractFetcherThread(name: String,
 object AbstractFetcherThread {
 
   case class ReplicaFetch(partitionData: util.Map[TopicPartition, FetchRequest.PartitionData], fetchRequest: FetchRequest.Builder)
+
   case class ResultWithPartitions[R](result: R, partitionsWithError: Set[TopicPartition])
 
 }
@@ -845,7 +958,9 @@ case class ClientIdTopicPartition(clientId: String, topicPartition: TopicPartiti
 }
 
 sealed trait ReplicaState
+
 case object Truncating extends ReplicaState
+
 case object Fetching extends ReplicaState
 
 object PartitionFetchState {
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 623338fd2f1..2b7561c31f3 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -17,15 +17,11 @@
 
 package kafka.server
 
-import java.net.InetAddress
-import java.util
-import java.util.concurrent.atomic.AtomicBoolean
-import java.util.concurrent.locks.ReentrantLock
-import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit, TimeoutException}
 import kafka.cluster.Broker.ServerInfo
 import kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorAdapter}
 import kafka.coordinator.transaction.{ProducerIdManager, TransactionCoordinator}
 import kafka.log.LogManager
+import kafka.log.remote.RemoteLogManager
 import kafka.network.{DataPlaneAcceptor, SocketServer}
 import kafka.raft.KafkaRaftManager
 import kafka.security.CredentialProvider
@@ -40,16 +36,22 @@ import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
 import org.apache.kafka.common.utils.{LogContext, Time, Utils}
-import org.apache.kafka.common.{ClusterResource, Endpoint}
+import org.apache.kafka.common.{ClusterResource, Endpoint, KafkaException}
 import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
 import org.apache.kafka.metadata.{BrokerState, VersionRange}
 import org.apache.kafka.raft
 import org.apache.kafka.raft.{RaftClient, RaftConfig}
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.ApiMessageAndVersion
+import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.apache.kafka.snapshot.SnapshotWriter
 
+import java.net.InetAddress
+import java.util
+import java.util.concurrent.atomic.AtomicBoolean
+import java.util.concurrent.locks.ReentrantLock
+import java.util.concurrent.{CompletableFuture, ExecutionException, TimeUnit, TimeoutException}
 import scala.collection.{Map, Seq}
 import scala.compat.java8.OptionConverters._
 import scala.jdk.CollectionConverters._
@@ -106,6 +108,7 @@ class BrokerServer(
 
   var logDirFailureChannel: LogDirFailureChannel = _
   var logManager: LogManager = _
+  var remoteLogManager: Option[RemoteLogManager] = None
 
   var tokenManager: DelegationTokenManager = _
 
@@ -202,6 +205,8 @@ class BrokerServer(
       logManager = LogManager(config, initialOfflineDirs, metadataCache, kafkaScheduler, time,
         brokerTopicStats, logDirFailureChannel, keepPartitionMetadataFile = true)
 
+      remoteLogManager = createRemoteLogManager(config)
+
       // Enable delegation token cache for all SCRAM mechanisms to simplify dynamic update.
       // This keeps the cache up-to-date if new SCRAM mechanisms are enabled dynamically.
       tokenCache = new DelegationTokenCache(ScramMechanism.mechanismNames)
@@ -256,6 +261,7 @@ class BrokerServer(
         time = time,
         scheduler = kafkaScheduler,
         logManager = logManager,
+        remoteLogManager = remoteLogManager,
         quotaManagers = quotaManagers,
         metadataCache = metadataCache,
         logDirFailureChannel = logDirFailureChannel,
@@ -461,6 +467,9 @@ class BrokerServer(
       // Log static broker configurations.
       new KafkaConfig(config.originals(), true)
 
+      // Start RemoteLogManager before broker start serving the requests.
+      remoteLogManager.foreach(_.startup())
+
       // Enable inbound TCP connections. Each endpoint will be started only once its matching
       // authorizer future is completed.
       socketServer.enableRequestProcessing(authorizerFutures)
@@ -492,6 +501,19 @@ class BrokerServer(
     }
   }
 
+  protected def createRemoteLogManager(config: KafkaConfig): Option[RemoteLogManager] = {
+    val remoteLogManagerConfig = new RemoteLogManagerConfig(config)
+    if (remoteLogManagerConfig.enableRemoteStorageSystem()) {
+      if (config.logDirs.size > 1) {
+        throw new KafkaException("Tiered storage is not supported with multiple log dirs.");
+      }
+
+      Some(new RemoteLogManager(remoteLogManagerConfig, config.brokerId, config.logDirs.head))
+    } else {
+      None
+    }
+  }
+
   override def shutdown(): Unit = {
     if (!maybeChangeStatus(STARTED, SHUTTING_DOWN)) return
     try {
@@ -567,6 +589,10 @@ class BrokerServer(
       if (logManager != null)
         CoreUtils.swallow(logManager.shutdown(), this)
 
+      // Close remote log manager to give a chance to any of its underlying clients
+      // (especially in RemoteStorageManager and RemoteLogMetadataManager) to close gracefully.
+      CoreUtils.swallow(remoteLogManager.foreach(_.close()), this)
+
       if (quotaManagers != null)
         CoreUtils.swallow(quotaManagers.shutdown(), this)
 
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 2f880a118e1..f8e449d8d1e 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -21,6 +21,7 @@ import java.io.{File, IOException}
 import java.net.{InetAddress, SocketTimeoutException}
 import java.util.concurrent._
 import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger}
+
 import kafka.cluster.{Broker, EndPoint}
 import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException, InconsistentClusterIdException}
 import kafka.controller.KafkaController
@@ -28,6 +29,7 @@ import kafka.coordinator.group.{GroupCoordinator, GroupCoordinatorAdapter}
 import kafka.coordinator.transaction.{ProducerIdManager, TransactionCoordinator}
 import kafka.log.LogManager
 import kafka.metrics.KafkaMetricsReporter
+import kafka.log.remote.RemoteLogManager
 import kafka.network.{ControlPlaneAcceptor, DataPlaneAcceptor, RequestChannel, SocketServer}
 import kafka.raft.KafkaRaftManager
 import kafka.security.CredentialProvider
@@ -47,13 +49,14 @@ import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
 import org.apache.kafka.common.security.{JaasContext, JaasUtils}
 import org.apache.kafka.common.utils.{AppInfoParser, LogContext, Time, Utils}
-import org.apache.kafka.common.{Endpoint, Node}
+import org.apache.kafka.common.{Endpoint, KafkaException, Node}
 import org.apache.kafka.metadata.{BrokerState, MetadataRecordSerde, VersionRange}
 import org.apache.kafka.raft.RaftConfig
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.{ApiMessageAndVersion, MetadataVersion}
 import org.apache.kafka.server.common.MetadataVersion._
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
+import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.zookeeper.client.ZKClientConfig
 
 import scala.collection.{Map, Seq}
@@ -121,6 +124,7 @@ class KafkaServer(
 
   var logDirFailureChannel: LogDirFailureChannel = _
   @volatile private var _logManager: LogManager = _
+  var remoteLogManager: Option[RemoteLogManager] = None
 
   @volatile private var _replicaManager: ReplicaManager = _
   var adminManager: ZkAdminManager = _
@@ -273,6 +277,8 @@ class KafkaServer(
         _brokerState = BrokerState.RECOVERY
         logManager.startup(zkClient.getAllTopicsInCluster())
 
+        remoteLogManager = createRemoteLogManager(config)
+
         if (config.migrationEnabled) {
           kraftControllerNodes = RaftConfig.voterConnectionsToNodes(
             RaftConfig.parseVoterConnections(config.quorumVoters)).asScala
@@ -477,6 +483,9 @@ class KafkaServer(
           new FetchSessionCache(config.maxIncrementalFetchSessionCacheSlots,
             KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
 
+        // Start RemoteLogManager before broker start serving the requests.
+        remoteLogManager.foreach(_.startup())
+
         /* start processing requests */
         val zkSupport = ZkSupport(adminManager, kafkaController, zkClient, forwardingManager, metadataCache)
 
@@ -560,6 +569,19 @@ class KafkaServer(
     }
   }
 
+  protected def createRemoteLogManager(config: KafkaConfig): Option[RemoteLogManager] = {
+    val remoteLogManagerConfig = new RemoteLogManagerConfig(config)
+    if (remoteLogManagerConfig.enableRemoteStorageSystem()) {
+      if(config.logDirs.size > 1) {
+        throw new KafkaException("Tiered storage is not supported with multiple log dirs.");
+      }
+
+      Some(new RemoteLogManager(remoteLogManagerConfig, config.brokerId, config.logDirs.head))
+    } else {
+      None
+    }
+  }
+
   protected def createReplicaManager(isShuttingDown: AtomicBoolean): ReplicaManager = {
     new ReplicaManager(
       metrics = metrics,
@@ -567,6 +589,7 @@ class KafkaServer(
       time = time,
       scheduler = kafkaScheduler,
       logManager = logManager,
+      remoteLogManager = remoteLogManager,
       quotaManagers = quotaManagers,
       metadataCache = metadataCache,
       logDirFailureChannel = logDirFailureChannel,
@@ -865,6 +888,11 @@ class KafkaServer(
         if (kafkaController != null)
           CoreUtils.swallow(kafkaController.shutdown(), this)
 
+        // Close remote log manager before stopping processing requests, to give a chance to any
+        // of its underlying clients (especially in RemoteStorageManager and RemoteLogMetadataManager)
+        // to close gracefully.
+        CoreUtils.swallow(remoteLogManager.foreach(_.close()), this)
+
         if (featureChangeListener != null)
           CoreUtils.swallow(featureChangeListener.close(), this)
 
diff --git a/core/src/main/scala/kafka/server/LeaderEndPoint.scala b/core/src/main/scala/kafka/server/LeaderEndPoint.scala
index 70d2149dabc..3deff7d7b79 100644
--- a/core/src/main/scala/kafka/server/LeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/LeaderEndPoint.scala
@@ -66,24 +66,24 @@ trait LeaderEndPoint {
   def fetch(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData]
 
   /**
-   * Fetches the log start offset of the given topic partition from the leader.
+   * Fetches the epoch and log start offset of the given topic partition from the leader.
    *
    * @param topicPartition The topic partition that we want to fetch from
    * @param currentLeaderEpoch An int representing the current leader epoch of the requester
    *
-   * @return A long representing the earliest offset in the leader's topic partition.
+   * @return A tuple representing the (epoch, earliest_offset) in the leader's topic partition.
    */
-  def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long
+  def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long)
 
   /**
-   * Fetches the log end offset of the given topic partition from the leader.
+   * Fetches the epoch and log end offset of the given topic partition from the leader.
    *
    * @param topicPartition The topic partition that we want to fetch from
    * @param currentLeaderEpoch An int representing the current leader epoch of the requester
    *
-   * @return A long representing the latest offset in the leader's topic partition.
+   * @return A tuple representing the (epoch, latest_offset) in the leader's topic partition.
    */
-  def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long
+  def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long)
 
   /**
    * Fetches offset for leader epoch from the leader for each given topic partition
@@ -94,6 +94,16 @@ trait LeaderEndPoint {
    */
   def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset]
 
+  /**
+   * Fetches the epoch and local log start offset from the leader for the given partition and the current leader-epoch
+   *
+   * @param topicPartition  The topic partition that we want to fetch from
+   * @param currentLeaderEpoch An int representing the current leader epoch of the requester
+   *
+   * @return A tuple representing the (epoch, earliest_local_offset) in the leader's topic partition.
+   */
+  def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long)
+
   /**
    * Builds a fetch request, given a partition map.
    *
diff --git a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
index 1080c8e0739..109fdd73847 100644
--- a/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/LocalLeaderEndPoint.scala
@@ -23,12 +23,12 @@ import kafka.server.AbstractFetcherThread.{ReplicaFetch, ResultWithPartitions}
 import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.utils.Logging
 import org.apache.kafka.common.errors.KafkaStorageException
-import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
 import org.apache.kafka.common.message.FetchResponseData
-import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH
 import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.UNDEFINED_EPOCH
 import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, RequestUtils}
+import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
 
 import java.util
 import java.util.Optional
@@ -113,14 +113,25 @@ class LocalLeaderEndPoint(sourceBroker: BrokerEndPoint,
     partitionData.toMap
   }
 
-  override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long = {
+  override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = {
+    val partition = replicaManager.getPartitionOrException(topicPartition)
+    val logStartOffset = partition.localLogOrException.logStartOffset
+    val epoch = partition.localLogOrException.leaderEpochCache.get.epochForOffset(logStartOffset)
+    (epoch.getOrElse(0), logStartOffset)
+  }
+
+  override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = {
     val partition = replicaManager.getPartitionOrException(topicPartition)
-    partition.localLogOrException.logStartOffset
+    val logEndOffset = partition.localLogOrException.logEndOffset
+    val epoch = partition.localLogOrException.leaderEpochCache.get.epochForOffset(logEndOffset)
+    (epoch.getOrElse(0), logEndOffset)
   }
 
-  override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long = {
+  override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = {
     val partition = replicaManager.getPartitionOrException(topicPartition)
-    partition.localLogOrException.logEndOffset
+    val localLogStartOffset = partition.localLogOrException.localLogStartOffset()
+    val epoch = partition.localLogOrException.leaderEpochCache.get.epochForOffset(localLogStartOffset)
+    (epoch.getOrElse(0), localLogStartOffset)
   }
 
   override def fetchEpochEndOffsets(partitions: collection.Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = {
diff --git a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
index 826643a0f5e..9c455324a17 100644
--- a/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
+++ b/core/src/main/scala/kafka/server/RemoteLeaderEndPoint.scala
@@ -94,22 +94,26 @@ class RemoteLeaderEndPoint(logPrefix: String,
     }
   }
 
-  override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long = {
+  override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = {
     fetchOffset(topicPartition, currentLeaderEpoch, ListOffsetsRequest.EARLIEST_TIMESTAMP)
   }
 
-  override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long = {
+  override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = {
     fetchOffset(topicPartition, currentLeaderEpoch, ListOffsetsRequest.LATEST_TIMESTAMP)
   }
 
-  private def fetchOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int, earliestOrLatest: Long): Long = {
+  override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = {
+    fetchOffset(topicPartition, currentLeaderEpoch, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)
+  }
+
+  private def fetchOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int, timestamp: Long): (Int, Long) = {
     val topic = new ListOffsetsTopic()
       .setName(topicPartition.topic)
       .setPartitions(Collections.singletonList(
         new ListOffsetsPartition()
           .setPartitionIndex(topicPartition.partition)
           .setCurrentLeaderEpoch(currentLeaderEpoch)
-          .setTimestamp(earliestOrLatest)))
+          .setTimestamp(timestamp)))
     val metadataVersion = metadataVersionSupplier()
     val requestBuilder = ListOffsetsRequest.Builder.forReplica(metadataVersion.listOffsetRequestVersion, brokerConfig.brokerId)
       .setTargetTimes(Collections.singletonList(topic))
@@ -122,9 +126,9 @@ class RemoteLeaderEndPoint(logPrefix: String,
     Errors.forCode(responsePartition.errorCode) match {
       case Errors.NONE =>
         if (metadataVersion.isAtLeast(IBP_0_10_1_IV2))
-          responsePartition.offset
+          (responsePartition.leaderEpoch, responsePartition.offset)
         else
-          responsePartition.oldStyleOffsets.get(0)
+          (responsePartition.leaderEpoch, responsePartition.oldStyleOffsets.get(0))
       case error => throw error.exception
     }
   }
diff --git a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
index 10eae83b99f..133bcd2136a 100644
--- a/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaAlterLogDirsThread.scala
@@ -121,4 +121,13 @@ class ReplicaAlterLogDirsThread(name: String,
     partition.truncateFullyAndStartAt(offset, isFuture = true)
   }
 
+  override protected def buildRemoteLogAuxState(partition: TopicPartition,
+                                                currentLeaderEpoch: Int,
+                                                fetchOffset: Long,
+                                                epochForFetchOffset: Int,
+                                                leaderLogStartOffset: Long): Long = {
+    // JBOD is not supported with tiered storage.
+    throw new UnsupportedOperationException();
+  }
+
 }
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index ea94aaa1378..fdaef89cf44 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *    http://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -17,13 +17,25 @@
 
 package kafka.server
 
-import kafka.log.{LeaderOffsetIncremented, LogAppendInfo}
+import kafka.log.remote.RemoteLogManager
+import kafka.log.{LeaderOffsetIncremented, LogAppendInfo, UnifiedLog}
+import kafka.server.checkpoints.LeaderEpochCheckpointFile
+import kafka.server.epoch.EpochEntry
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
+import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.record.MemoryRecords
 import org.apache.kafka.common.requests._
-import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.Utils
+import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.server.common.CheckpointFile.CheckpointReadBuffer
 import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentMetadata, RemoteStorageException, RemoteStorageManager}
 
+import java.io.{BufferedReader, File, InputStreamReader}
+import java.nio.charset.StandardCharsets
+import java.nio.file.{Files, StandardCopyOption}
 import scala.collection.mutable
+import scala.jdk.CollectionConverters._
 
 class ReplicaFetcherThread(name: String,
                            leader: LeaderEndPoint,
@@ -192,4 +204,140 @@ class ReplicaFetcherThread(name: String,
     partition.truncateFullyAndStartAt(offset, isFuture = false)
   }
 
+  private def buildProducerSnapshotFile(snapshotFile: File, remoteLogSegmentMetadata: RemoteLogSegmentMetadata, rlm: RemoteLogManager): Unit = {
+    val tmpSnapshotFile = new File(snapshotFile.getAbsolutePath + ".tmp")
+    // Copy it to snapshot file in atomic manner.
+    Files.copy(rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.PRODUCER_SNAPSHOT),
+      tmpSnapshotFile.toPath, StandardCopyOption.REPLACE_EXISTING)
+    Utils.atomicMoveWithFallback(tmpSnapshotFile.toPath, snapshotFile.toPath, false)
+  }
+
+  /**
+   * It tries to build the required state for this partition from leader and remote storage so that it can start
+   * fetching records from the leader.
+   */
+  override protected def buildRemoteLogAuxState(partition: TopicPartition,
+                                                currentLeaderEpoch: Int,
+                                                leaderLocalLogStartOffset: Long,
+                                                epochForLeaderLocalLogStartOffset: Int,
+                                                leaderLogStartOffset: Long): Long = {
+
+    def fetchEarlierEpochEndOffset(epoch: Int): EpochEndOffset = {
+      val previousEpoch = epoch - 1
+      // Find the end-offset for the epoch earlier to the given epoch from the leader
+      val partitionsWithEpochs = Map(partition -> new EpochData().setPartition(partition.partition())
+        .setCurrentLeaderEpoch(currentLeaderEpoch)
+        .setLeaderEpoch(previousEpoch))
+      val maybeEpochEndOffset = leader.fetchEpochEndOffsets(partitionsWithEpochs).get(partition)
+      if (maybeEpochEndOffset.isEmpty) {
+        throw new KafkaException("No response received for partition: " + partition);
+      }
+
+      val epochEndOffset = maybeEpochEndOffset.get
+      if (epochEndOffset.errorCode() != Errors.NONE.code()) {
+        throw Errors.forCode(epochEndOffset.errorCode()).exception()
+      }
+
+      epochEndOffset
+    }
+
+    val log = replicaMgr.localLogOrException(partition)
+    val nextOffset = {
+      if (log.remoteStorageSystemEnable && log.config.remoteLogConfig.remoteStorageEnable) {
+        if (replicaMgr.remoteLogManager.isEmpty) throw new IllegalStateException("RemoteLogManager is not yet instantiated")
+
+        val rlm = replicaMgr.remoteLogManager.get
+
+        // Find the respective leader epoch for (leaderLocalLogStartOffset - 1). We need to build the leader epoch cache
+        // until that offset
+        val previousOffsetToLeaderLocalLogStartOffset = leaderLocalLogStartOffset - 1
+        val targetEpoch: Int = {
+          // If the existing epoch is 0, no need to fetch from earlier epoch as the desired offset(leaderLogStartOffset - 1)
+          // will have the same epoch.
+          if (epochForLeaderLocalLogStartOffset == 0) {
+            epochForLeaderLocalLogStartOffset
+          } else {
+            // Fetch the earlier epoch/end-offset(exclusive) from the leader.
+            val earlierEpochEndOffset = fetchEarlierEpochEndOffset(epochForLeaderLocalLogStartOffset)
+            // Check if the target offset lies with in the range of earlier epoch. Here, epoch's end-offset is exclusive.
+            if (earlierEpochEndOffset.endOffset > previousOffsetToLeaderLocalLogStartOffset) {
+              // Always use the leader epoch from returned earlierEpochEndOffset.
+              // This gives the respective leader epoch, that will handle any gaps in epochs.
+              // For ex, leader epoch cache contains:
+              // leader-epoch   start-offset
+              //  0 		          20
+              //  1 		          85
+              //  <2> - gap no messages were appended in this leader epoch.
+              //  3 		          90
+              //  4 		          98
+              // There is a gap in leader epoch. For leaderLocalLogStartOffset as 90, leader-epoch is 3.
+              // fetchEarlierEpochEndOffset(2) will return leader-epoch as 1, end-offset as 90.
+              // So, for offset 89, we should return leader epoch as 1 like below.
+              earlierEpochEndOffset.leaderEpoch()
+            } else epochForLeaderLocalLogStartOffset
+          }
+        }
+
+        val maybeRlsm = rlm.fetchRemoteLogSegmentMetadata(partition, targetEpoch, previousOffsetToLeaderLocalLogStartOffset)
+
+        if (maybeRlsm.isPresent) {
+          val remoteLogSegmentMetadata = maybeRlsm.get()
+          // Build leader epoch cache, producer snapshots until remoteLogSegmentMetadata.endOffset() and start
+          // segments from (remoteLogSegmentMetadata.endOffset() + 1)
+          val nextOffset = remoteLogSegmentMetadata.endOffset() + 1
+
+          // Truncate the existing local log before restoring the leader epoch cache and producer snapshots.
+          truncateFullyAndStartAt(partition, nextOffset)
+
+          // Build leader epoch cache.
+          log.maybeIncrementLogStartOffset(leaderLogStartOffset, LeaderOffsetIncremented)
+          val epochs = readLeaderEpochCheckpoint(rlm, remoteLogSegmentMetadata)
+          log.leaderEpochCache.foreach { cache =>
+            cache.assign(epochs)
+          }
+
+          debug(s"Updated the epoch cache from remote tier till offset: $leaderLocalLogStartOffset " +
+            s"with size: ${epochs.size} for $partition")
+
+          // Restore producer snapshot
+          val snapshotFile = UnifiedLog.producerSnapshotFile(log.dir, nextOffset)
+          buildProducerSnapshotFile(snapshotFile, remoteLogSegmentMetadata, rlm)
+
+          // Reload producer snapshots.
+          log.producerStateManager.truncateFullyAndReloadSnapshots()
+          log.loadProducerState(nextOffset)
+          debug(s"Built the leader epoch cache and producer snapshots from remote tier for $partition, with " +
+            s"active producers size: ${log.producerStateManager.activeProducers.size}, " +
+            s"leaderLogStartOffset: $leaderLogStartOffset, and logEndOffset: $nextOffset")
+
+          // Return the offset from which next fetch should happen.
+          nextOffset
+        } else {
+          throw new RemoteStorageException(s"Couldn't build the state from remote store for partition: $partition, " +
+            s"currentLeaderEpoch: $currentLeaderEpoch, leaderLocalLogStartOffset: $leaderLocalLogStartOffset, " +
+            s"leaderLogStartOffset: $leaderLogStartOffset, epoch: $targetEpoch as the previous remote log segment " +
+            s"metadata was not found")
+        }
+      } else {
+        // If the tiered storage is not enabled throw an exception back so tht it will retry until the tiered storage
+        // is set as expected.
+        throw new RemoteStorageException(s"Couldn't build the state from remote store for partition $partition, as " +
+          s"remote log storage is not yet enabled")
+      }
+    }
+
+    nextOffset
+  }
+
+  private def readLeaderEpochCheckpoint(rlm: RemoteLogManager, remoteLogSegmentMetadata: RemoteLogSegmentMetadata): collection.Seq[EpochEntry] = {
+    val inputStream = rlm.storageManager().fetchIndex(remoteLogSegmentMetadata, RemoteStorageManager.IndexType.LEADER_EPOCH)
+    val bufferedReader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))
+    try {
+      val readBuffer = new CheckpointReadBuffer[EpochEntry]("", bufferedReader,  0, LeaderEpochCheckpointFile.Formatter)
+      readBuffer.read().asScala.toSeq
+    } finally {
+      bufferedReader.close()
+    }
+  }
+
 }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index b2a37479bae..7b56b6d6732 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -27,6 +27,7 @@ import kafka.cluster.{BrokerEndPoint, Partition}
 import kafka.common.RecordValidationException
 import kafka.controller.{KafkaController, StateChangeLogger}
 import kafka.log._
+import kafka.log.remote.RemoteLogManager
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.HostedPartition.Online
 import kafka.server.QuotaFactory.QuotaManagers
@@ -190,6 +191,7 @@ class ReplicaManager(val config: KafkaConfig,
                      time: Time,
                      scheduler: Scheduler,
                      val logManager: LogManager,
+                     val remoteLogManager: Option[RemoteLogManager] = None,
                      quotaManagers: QuotaManagers,
                      val metadataCache: MetadataCache,
                      logDirFailureChannel: LogDirFailureChannel,
diff --git a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
index e6e45fd1374..2053d8c50dd 100644
--- a/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
+++ b/core/src/main/scala/kafka/server/epoch/LeaderEpochFileCache.scala
@@ -60,6 +60,14 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
     }
   }
 
+  def assign(entries: Seq[EpochEntry]): Unit = {
+    entries.foreach(entry =>
+      if (assign(entry)) {
+        debug(s"Appended new epoch entry $entry. Cache now contains ${epochs.size} entries.")
+      })
+    flush()
+  }
+
   private def assign(entry: EpochEntry): Boolean = {
     if (entry.epoch < 0 || entry.startOffset < 0) {
       throw new IllegalArgumentException(s"Received invalid partition leader epoch entry $entry")
@@ -169,6 +177,24 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
     }
   }
 
+  def previousEpoch(epoch: Int): Option[Int] = {
+    inReadLock(lock) {
+      Option(epochs.lowerKey(epoch))
+    }
+  }
+
+  def nextEpoch(epoch: Int): Option[Int] = {
+    inReadLock(lock) {
+      Option(epochs.higherKey(epoch))
+    }
+  }
+
+  def epochEntry(epoch: Int): Option[EpochEntry] = {
+    inReadLock(lock) {
+      Option.apply(epochs.get(epoch))
+    }
+  }
+
   /**
     * Returns the Leader Epoch and the End Offset for a requested Leader Epoch.
     *
@@ -268,6 +294,22 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
     }
   }
 
+  def epochForOffset(offset: Long): Option[Int] = {
+    inReadLock(lock) {
+      var previousEpoch: Option[Int] = None
+      epochs.values().asScala.foreach {
+        case EpochEntry(epoch, startOffset) =>
+          if (startOffset == offset)
+            return Some(epoch)
+          if (startOffset > offset)
+            return previousEpoch
+
+          previousEpoch = Some(epoch)
+      }
+      previousEpoch
+    }
+  }
+
   /**
     * Delete all entries.
     */
@@ -287,7 +329,7 @@ class LeaderEpochFileCache(topicPartition: TopicPartition,
   // Visible for testing
   def epochEntries: Seq[EpochEntry] = epochs.values.asScala.toSeq
 
-  private def flush(): Unit = {
+  def flush(): Unit = {
     checkpoint.write(epochs.values.asScala)
   }
 
diff --git a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
index 36cbbc82c9a..b368a7aa50c 100644
--- a/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
+++ b/core/src/test/java/kafka/test/ClusterTestExtensionsTest.java
@@ -117,6 +117,6 @@ public class ClusterTestExtensionsTest {
 
     @ClusterTest
     public void testDefaults(ClusterConfig config) {
-        Assertions.assertEquals(MetadataVersion.IBP_3_4_IV0, config.metadataVersion());
+        Assertions.assertEquals(MetadataVersion.IBP_3_4_IV1, config.metadataVersion());
     }
 }
diff --git a/core/src/test/java/kafka/test/annotation/ClusterTest.java b/core/src/test/java/kafka/test/annotation/ClusterTest.java
index d841996ec18..9dea747ab1d 100644
--- a/core/src/test/java/kafka/test/annotation/ClusterTest.java
+++ b/core/src/test/java/kafka/test/annotation/ClusterTest.java
@@ -41,6 +41,6 @@ public @interface ClusterTest {
     String name() default "";
     SecurityProtocol securityProtocol() default SecurityProtocol.PLAINTEXT;
     String listener() default "";
-    MetadataVersion metadataVersion() default MetadataVersion.IBP_3_4_IV0;
+    MetadataVersion metadataVersion() default MetadataVersion.IBP_3_4_IV1;
     ClusterConfigProperty[] serverProperties() default {};
 }
diff --git a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index aa327f153b1..987bfea2748 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -39,6 +39,7 @@ import org.apache.kafka.common.config.ConfigResource.Type
 import org.apache.kafka.common.protocol.Errors._
 import org.apache.kafka.image.ClusterImage
 import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.slf4j.LoggerFactory
 
 import scala.annotation.nowarn
@@ -890,4 +891,30 @@ class KRaftClusterTest {
       cluster.close()
     }
   }
+
+  @Test
+  def testRemoteLogManagerInstantiation(): Unit = {
+    val cluster = new KafkaClusterTestKit.Builder(
+      new TestKitNodes.Builder().
+        setNumBrokerNodes(1).
+        setNumControllerNodes(1).build())
+      .setConfigProp(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString)
+      .setConfigProp(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
+        "org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager")
+      .setConfigProp(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
+        "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager")
+      .build()
+    try {
+      cluster.format()
+      cluster.startup()
+      cluster.brokers().forEach((_, server) => {
+        server.remoteLogManager match {
+          case Some(_) =>
+          case None => fail("RemoteLogManager should be initialized")
+        }
+      })
+    } finally {
+      cluster.close()
+    }
+  }
 }
diff --git a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
index 81810c61dac..10fca95d346 100644
--- a/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/MetadataVersionIntegrationTest.scala
@@ -34,7 +34,10 @@ class MetadataVersionIntegrationTest {
   @ClusterTests(value = Array(
       new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV0),
       new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV1),
-      new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV2)
+      new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV2),
+      new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_3_IV3),
+      new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_4_IV0),
+      new ClusterTest(clusterType = Type.KRAFT, metadataVersion = MetadataVersion.IBP_3_4_IV0)
   ))
   def testBasicMetadataVersionUpgrade(clusterInstance: ClusterInstance): Unit = {
     val admin = clusterInstance.createAdminClient()
@@ -44,7 +47,7 @@ class MetadataVersionIntegrationTest {
     assertEquals(ff.maxVersionLevel(), clusterInstance.config().metadataVersion().featureLevel())
 
     // Update to new version
-    val updateVersion = MetadataVersion.IBP_3_3_IV3.featureLevel.shortValue
+    val updateVersion = MetadataVersion.IBP_3_4_IV1.featureLevel.shortValue
     val updateResult = admin.updateFeatures(
       Map("metadata.version" -> new FeatureUpdate(updateVersion, UpgradeType.UPGRADE)).asJava, new UpdateFeaturesOptions())
     updateResult.all().get()
diff --git a/core/src/test/scala/kafka/log/remote/ClassLoaderAwareRemoteStorageManagerTest.scala b/core/src/test/scala/kafka/log/remote/ClassLoaderAwareRemoteStorageManagerTest.scala
new file mode 100644
index 00000000000..3cb1516a38b
--- /dev/null
+++ b/core/src/test/scala/kafka/log/remote/ClassLoaderAwareRemoteStorageManagerTest.scala
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote
+
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager
+import org.junit.jupiter.api.Test
+import org.mockito.Mockito.mock
+import org.mockito.Mockito.when
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Assertions.assertNotEquals
+import org.mockito.ArgumentMatchers.any
+
+import java.util.Collections
+
+class ClassLoaderAwareRemoteStorageManagerTest {
+
+  @Test
+  def testWithClassLoader(): Unit = {
+    val dummyClassLoader = new DummyClassLoader()
+    val delegate = mock(classOf[RemoteStorageManager])
+    val rsm = new ClassLoaderAwareRemoteStorageManager(delegate, dummyClassLoader)
+    when(delegate.configure(any())).thenAnswer(_ =>
+      assertEquals(dummyClassLoader, Thread.currentThread().getContextClassLoader))
+
+    assertNotEquals(dummyClassLoader, Thread.currentThread().getContextClassLoader)
+    rsm.configure(Collections.emptyMap())
+    assertNotEquals(dummyClassLoader, Thread.currentThread().getContextClassLoader)
+  }
+
+  private class DummyClassLoader extends ClassLoader
+}
diff --git a/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
new file mode 100644
index 00000000000..ba2d2c43c21
--- /dev/null
+++ b/core/src/test/scala/kafka/server/LocalLeaderEndPointTest.scala
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.cluster.BrokerEndPoint
+import kafka.log.AppendOrigin
+import kafka.server.checkpoints.LazyOffsetCheckpoints
+import kafka.utils.{MockScheduler, MockTime, TestUtils}
+import org.apache.kafka.common.{Node, TopicPartition, Uuid}
+import org.apache.kafka.common.message.LeaderAndIsrRequestData.LeaderAndIsrPartitionState
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
+import org.apache.kafka.common.requests.LeaderAndIsrRequest
+import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
+import org.junit.jupiter.api.{BeforeEach, Test}
+import org.junit.jupiter.api.Assertions._
+import org.mockito.Mockito.mock
+
+import java.io.File
+import java.util.Collections
+import scala.collection.{Map, Seq}
+import scala.jdk.CollectionConverters._
+
+class LocalLeaderEndPointTest {
+
+  val time = new MockTime
+  val topicId: Uuid = Uuid.randomUuid()
+  val topic = "test"
+  val topicPartition = new TopicPartition(topic, 5)
+  val sourceBroker: BrokerEndPoint = BrokerEndPoint(0, "localhost", 9092)
+  var replicaManager: ReplicaManager = _
+  var endPoint: LeaderEndPoint = _
+
+  @BeforeEach
+  def setUp(): Unit = {
+    val props = TestUtils.createBrokerConfig(sourceBroker.id, TestUtils.MockZkConnect, port = sourceBroker.port)
+    val config = KafkaConfig.fromProps(props)
+    val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)))
+    val alterPartitionManager = mock(classOf[AlterPartitionManager])
+    val metrics = new Metrics
+    val quotaManager = QuotaFactory.instantiate(config, metrics, time, "")
+    replicaManager = new ReplicaManager(
+      metrics = metrics,
+      config = config,
+      time = time,
+      scheduler = new MockScheduler(time),
+      logManager = mockLogMgr,
+      quotaManagers = quotaManager,
+      metadataCache = MetadataCache.zkMetadataCache(config.brokerId, config.interBrokerProtocolVersion),
+      logDirFailureChannel = new LogDirFailureChannel(config.logDirs.size),
+      alterPartitionManager = alterPartitionManager)
+    val partition = replicaManager.createPartition(topicPartition)
+    partition.createLogIfNotExists(isNew = false, isFutureReplica = false,
+      new LazyOffsetCheckpoints(replicaManager.highWatermarkCheckpoints), None)
+    // Make this replica the leader.
+    val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 0)
+    replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
+    replicaManager.getPartitionOrException(topicPartition)
+      .localLogOrException
+    endPoint = new LocalLeaderEndPoint(sourceBroker, config, replicaManager, QuotaFactory.UnboundedQuota)
+  }
+
+  @Test
+  def testFetchLatestOffset(): Unit = {
+    appendRecords(replicaManager, topicPartition, records)
+      .onFire(response => assertEquals(Errors.NONE, response.error))
+    assertEquals((0, 3L), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch = 0))
+    val leaderAndIsrRequest =  buildLeaderAndIsrRequest(leaderEpoch = 4)
+    replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
+    appendRecords(replicaManager, topicPartition, records)
+      .onFire(response => assertEquals(Errors.NONE, response.error))
+    assertEquals((4, 6L), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch = 7))
+  }
+
+  @Test
+  def testFetchEarliestOffset(): Unit = {
+    appendRecords(replicaManager, topicPartition, records)
+      .onFire(response => assertEquals(Errors.NONE, response.error))
+    assertEquals((0, 0L), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 0))
+
+    val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 4)
+    replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
+    appendRecords(replicaManager, topicPartition, records)
+      .onFire(response => assertEquals(Errors.NONE, response.error))
+    replicaManager.deleteRecords(timeout = 1000L, Map(topicPartition -> 3), _ => ())
+    assertEquals((4, 3L), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 7))
+  }
+
+  @Test
+  def testFetchEarliestLocalOffset(): Unit = {
+    appendRecords(replicaManager, topicPartition, records)
+      .onFire(response => assertEquals(Errors.NONE, response.error))
+    assertEquals((0, 0L), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 0))
+
+    val leaderAndIsrRequest = buildLeaderAndIsrRequest(leaderEpoch = 4)
+    replicaManager.becomeLeaderOrFollower(0, leaderAndIsrRequest, (_, _) => ())
+    appendRecords(replicaManager, topicPartition, records)
+      .onFire(response => assertEquals(Errors.NONE, response.error))
+    replicaManager.logManager.getLog(topicPartition).foreach(log => log._localLogStartOffset = 3)
+    assertEquals((0, 0L), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch = 7))
+    assertEquals((4, 3L), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch = 7))
+  }
+
+  private class CallbackResult[T] {
+    private var value: Option[T] = None
+    private var fun: Option[T => Unit] = None
+
+    def hasFired: Boolean = {
+      value.isDefined
+    }
+
+    def fire(value: T): Unit = {
+      this.value = Some(value)
+      fun.foreach(f => f(value))
+    }
+
+    def onFire(fun: T => Unit): CallbackResult[T] = {
+      this.fun = Some(fun)
+      if (this.hasFired) fire(value.get)
+      this
+    }
+  }
+
+  private def buildLeaderAndIsrRequest(leaderEpoch: Int): LeaderAndIsrRequest = {
+    val brokerList = Seq[Integer](sourceBroker.id).asJava
+    val topicIds = Collections.singletonMap(topic, topicId)
+    new LeaderAndIsrRequest.Builder(ApiKeys.LEADER_AND_ISR.latestVersion, 0, 0, 0,
+      Seq(new LeaderAndIsrPartitionState()
+        .setTopicName(topic)
+        .setPartitionIndex(topicPartition.partition())
+        .setControllerEpoch(0)
+        .setLeader(sourceBroker.id)
+        .setLeaderEpoch(leaderEpoch)
+        .setIsr(brokerList)
+        .setPartitionEpoch(0)
+        .setReplicas(brokerList)
+        .setIsNew(false)).asJava,
+      topicIds,
+      Set(node(sourceBroker)).asJava).build()
+  }
+
+  private def appendRecords(replicaManager: ReplicaManager,
+                            partition: TopicPartition,
+                            records: MemoryRecords,
+                            origin: AppendOrigin = AppendOrigin.Client,
+                            requiredAcks: Short = -1): CallbackResult[PartitionResponse] = {
+    val result = new CallbackResult[PartitionResponse]()
+    def appendCallback(responses: Map[TopicPartition, PartitionResponse]): Unit = {
+      val response = responses.get(partition)
+      assertTrue(response.isDefined)
+      result.fire(response.get)
+    }
+
+    replicaManager.appendRecords(
+      timeout = 1000,
+      requiredAcks = requiredAcks,
+      internalTopicsAllowed = false,
+      origin = origin,
+      entriesPerPartition = Map(partition -> records),
+      responseCallback = appendCallback)
+
+    result
+  }
+
+  private def node(endPoint: BrokerEndPoint): Node = {
+    new Node(endPoint.id, endPoint.host, endPoint.port)
+  }
+
+  private def records: MemoryRecords = {
+    MemoryRecords.withRecords(CompressionType.NONE,
+      new SimpleRecord("first message".getBytes()),
+      new SimpleRecord("second message".getBytes()),
+      new SimpleRecord("third message".getBytes()),
+    )
+  }
+}
\ No newline at end of file
diff --git a/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala b/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala
new file mode 100644
index 00000000000..462a9a980a4
--- /dev/null
+++ b/core/src/test/scala/kafka/server/RemoteLeaderEndPointTest.scala
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.cluster.BrokerEndPoint
+import kafka.server.epoch.util.MockBlockingSender
+import kafka.utils.{MockTime, TestUtils}
+import org.apache.kafka.clients.FetchSessionHandler
+import org.apache.kafka.common.errors.{FencedLeaderEpochException, UnknownLeaderEpochException}
+import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
+import org.apache.kafka.common.utils.LogContext
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.message.ListOffsetsResponseData.ListOffsetsPartitionResponse
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.server.common.MetadataVersion
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{BeforeEach, Test}
+import org.mockito.Mockito.mock
+
+import java.util
+
+class RemoteLeaderEndPointTest {
+
+    val topicPartition = new TopicPartition("test", 0)
+    val currentLeaderEpoch = 10
+    val logStartOffset = 20
+    val localLogStartOffset = 100
+    val logEndOffset = 300
+    var blockingSend: MockBlockingSender = _
+    var endPoint: LeaderEndPoint = _
+
+    @BeforeEach
+    def setUp(): Unit = {
+        val time = new MockTime
+        val logPrefix = "remote-leader-endpoint"
+        val sourceBroker: BrokerEndPoint = BrokerEndPoint(0, "localhost", 9092)
+        val props = TestUtils.createBrokerConfig(sourceBroker.id, TestUtils.MockZkConnect, port = sourceBroker.port)
+        val fetchSessionHandler = new FetchSessionHandler(new LogContext(logPrefix), sourceBroker.id)
+        val config = KafkaConfig.fromProps(props)
+        val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+        blockingSend = new MockBlockingSender(offsets = new util.HashMap[TopicPartition, EpochEndOffset](),
+            sourceBroker = sourceBroker, time = time)
+        endPoint = new RemoteLeaderEndPoint(logPrefix, blockingSend, fetchSessionHandler,
+            config, replicaManager, QuotaFactory.UnboundedQuota, () => MetadataVersion.MINIMUM_KRAFT_VERSION)
+    }
+
+    @Test
+    def testFetchLatestOffset(): Unit = {
+        blockingSend.setListOffsetsDataForNextResponse(Map(topicPartition ->
+          new ListOffsetsPartitionResponse().setLeaderEpoch(7).setOffset(logEndOffset)))
+        assertEquals((7, logEndOffset), endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch))
+    }
+
+    @Test
+    def testFetchEarliestOffset(): Unit = {
+        blockingSend.setListOffsetsDataForNextResponse(Map(topicPartition ->
+          new ListOffsetsPartitionResponse().setLeaderEpoch(5).setOffset(logStartOffset)))
+        assertEquals((5, logStartOffset), endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch))
+    }
+
+    @Test
+    def testFetchEarliestLocalOffset(): Unit = {
+        blockingSend.setListOffsetsDataForNextResponse(Map(topicPartition ->
+          new ListOffsetsPartitionResponse().setLeaderEpoch(6).setOffset(localLogStartOffset)))
+        assertEquals((6, localLogStartOffset), endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch))
+    }
+
+    @Test
+    def testThrowsFencedLeaderEpochException(): Unit = {
+        blockingSend.setListOffsetsDataForNextResponse(Map(topicPartition ->
+          new ListOffsetsPartitionResponse().setErrorCode(Errors.FENCED_LEADER_EPOCH.code())))
+        assertThrows(classOf[FencedLeaderEpochException], () => endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch - 1))
+        assertThrows(classOf[FencedLeaderEpochException], () => endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch - 1))
+        assertThrows(classOf[FencedLeaderEpochException], () => endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch - 1))
+    }
+
+    @Test
+    def testThrowsUnknownLeaderEpochException(): Unit = {
+        blockingSend.setListOffsetsDataForNextResponse(Map(topicPartition ->
+          new ListOffsetsPartitionResponse().setErrorCode(Errors.UNKNOWN_LEADER_EPOCH.code())))
+        assertThrows(classOf[UnknownLeaderEpochException], () => endPoint.fetchEarliestLocalOffset(topicPartition, currentLeaderEpoch + 1))
+        assertThrows(classOf[UnknownLeaderEpochException], () => endPoint.fetchEarliestOffset(topicPartition, currentLeaderEpoch + 1))
+        assertThrows(classOf[UnknownLeaderEpochException], () => endPoint.fetchLatestOffset(topicPartition, currentLeaderEpoch + 1))
+    }
+}
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala b/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
index a8f74af23bf..f335c6de69c 100644
--- a/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/FeatureCommandTest.scala
@@ -26,7 +26,7 @@ import org.apache.kafka.clients.admin.FeatureUpdate.UpgradeType.{SAFE_DOWNGRADE,
 import org.apache.kafka.clients.admin.MockAdminClient
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.server.common.MetadataVersion.{IBP_3_3_IV0, IBP_3_3_IV1, IBP_3_3_IV2, IBP_3_3_IV3}
+import org.apache.kafka.server.common.MetadataVersion.{IBP_3_3_IV0, IBP_3_3_IV1, IBP_3_3_IV2, IBP_3_3_IV3, IBP_3_4_IV1}
 import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
 import org.junit.jupiter.api.Test
 import org.junit.jupiter.params.ParameterizedTest
@@ -84,7 +84,7 @@ class FeatureCommandTest extends IntegrationTestHarness {
         Array("--bootstrap-server", bootstrapServers(), "describe"), env.out))
       assertEquals(String.format(
         "Feature: metadata.version\tSupportedMinVersion: 3.0-IV1\t" +
-          "SupportedMaxVersion: 3.4-IV0\tFinalizedVersionLevel: 3.3-IV1\t"),
+          "SupportedMaxVersion: 3.4-IV1\tFinalizedVersionLevel: 3.3-IV1\t"),
             env.outputWithoutEpoch())
     }
   }
@@ -145,7 +145,7 @@ class FeatureCommandTest extends IntegrationTestHarness {
       assertEquals(1, FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
         "disable", "--feature", "metadata.version"), env.out))
       assertEquals("Could not disable metadata.version. Invalid update version 0 for feature " +
-        "metadata.version. Local controller 1000 only supports versions 1-8", env.outputWithoutEpoch())
+        "metadata.version. Local controller 1000 only supports versions 1-9", env.outputWithoutEpoch())
     }
     TestUtils.resource(FeatureCommandTestEnv()) { env =>
       assertEquals(1, FeatureCommand.mainNoExit(Array("--bootstrap-server", bootstrapServers(),
@@ -173,8 +173,8 @@ class FeatureCommandUnitTest {
 
   @Test
   def testMetadataVersionsToString(): Unit = {
-    assertEquals("3.3-IV0, 3.3-IV1, 3.3-IV2, 3.3-IV3",
-      FeatureCommand.metadataVersionsToString(IBP_3_3_IV0, IBP_3_3_IV3))
+    assertEquals("3.3-IV0, 3.3-IV1, 3.3-IV2, 3.3-IV3, 3.4-IV0, 3.4-IV1",
+      FeatureCommand.metadataVersionsToString(IBP_3_3_IV0, IBP_3_4_IV1))
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index 0c5453b54d3..82ffee42d03 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -834,6 +834,13 @@ class PartitionTest extends AbstractPartitionTest {
       case Left(e: ApiException) => fail(s"Got ApiException $e")
     }
 
+    // If we request the earliest local timestamp, we skip the check
+    fetchOffsetsForTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, Some(IsolationLevel.READ_UNCOMMITTED)) match {
+      case Right(Some(offsetAndTimestamp)) => assertEquals(0, offsetAndTimestamp.offset)
+      case Right(None) => fail("Should have seen some offsets")
+      case Left(e: ApiException) => fail(s"Got ApiException $e")
+    }
+
     // If we request an offset by timestamp earlier than the HW, we are ok
     fetchOffsetsForTimestamp(11, Some(IsolationLevel.READ_UNCOMMITTED)) match {
       case Right(Some(offsetAndTimestamp)) =>
@@ -944,8 +951,8 @@ class PartitionTest extends AbstractPartitionTest {
       baseOffset = 0L)
     partition.appendRecordsToLeader(records, origin = AppendOrigin.Client, requiredAcks = 0, RequestLocal.withThreadConfinedCaching)
 
-    def fetchLatestOffset(isolationLevel: Option[IsolationLevel]): TimestampAndOffset = {
-      val res = partition.fetchOffsetForTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP,
+    def fetchOffset(isolationLevel: Option[IsolationLevel], timestamp: Long): TimestampAndOffset = {
+      val res = partition.fetchOffsetForTimestamp(timestamp,
         isolationLevel = isolationLevel,
         currentLeaderEpoch = Optional.empty(),
         fetchOnlyFromLeader = true)
@@ -953,13 +960,16 @@ class PartitionTest extends AbstractPartitionTest {
       res.get
     }
 
+    def fetchLatestOffset(isolationLevel: Option[IsolationLevel]): TimestampAndOffset = {
+      fetchOffset(isolationLevel, ListOffsetsRequest.LATEST_TIMESTAMP)
+    }
+
     def fetchEarliestOffset(isolationLevel: Option[IsolationLevel]): TimestampAndOffset = {
-      val res = partition.fetchOffsetForTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP,
-        isolationLevel = isolationLevel,
-        currentLeaderEpoch = Optional.empty(),
-        fetchOnlyFromLeader = true)
-      assertTrue(res.isDefined)
-      res.get
+      fetchOffset(isolationLevel, ListOffsetsRequest.EARLIEST_TIMESTAMP)
+    }
+
+    def fetchEarliestLocalOffset(isolationLevel: Option[IsolationLevel]): TimestampAndOffset = {
+      fetchOffset(isolationLevel, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)
     }
 
     assertEquals(3L, fetchLatestOffset(isolationLevel = None).offset)
@@ -975,6 +985,10 @@ class PartitionTest extends AbstractPartitionTest {
     assertEquals(0L, fetchEarliestOffset(isolationLevel = None).offset)
     assertEquals(0L, fetchEarliestOffset(isolationLevel = Some(IsolationLevel.READ_UNCOMMITTED)).offset)
     assertEquals(0L, fetchEarliestOffset(isolationLevel = Some(IsolationLevel.READ_COMMITTED)).offset)
+
+    assertEquals(0L, fetchEarliestLocalOffset(isolationLevel = None).offset)
+    assertEquals(0L, fetchEarliestLocalOffset(isolationLevel = Some(IsolationLevel.READ_UNCOMMITTED)).offset)
+    assertEquals(0L, fetchEarliestLocalOffset(isolationLevel = Some(IsolationLevel.READ_COMMITTED)).offset)
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index a7798418ed8..2d70eba7c43 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -18,6 +18,7 @@
 package kafka.log
 
 import com.yammer.metrics.core.{Gauge, MetricName}
+import kafka.log.remote.RemoteIndexCache
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.server.metadata.{ConfigRepository, MockConfigRepository}
 import kafka.server.{BrokerTopicStats, FetchDataInfo, FetchLogEnd, LogDirFailureChannel}
@@ -366,6 +367,18 @@ class LogManagerTest {
     }
   }
 
+  /**
+   * Tests that the log manager skips the remote-log-index-cache directory when loading the logs from disk
+   */
+  @Test
+  def testLoadLogsSkipRemoteIndexCache(): Unit = {
+    val logDir = TestUtils.tempDir()
+    val remoteIndexCache = new File(logDir, RemoteIndexCache.DirName)
+    remoteIndexCache.mkdir()
+    logManager = createLogManager(Seq(logDir))
+    logManager.loadLogs(logConfig, Map.empty)
+  }
+
   /**
    * Test that it is not possible to open two log managers using the same data directory
    */
diff --git a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
index 67ce51c5ca4..8a73c8bb943 100644
--- a/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTestUtils.scala
@@ -17,6 +17,8 @@
 
 package kafka.log
 
+import kafka.log.remote.RemoteLogManager
+
 import java.io.File
 import java.util.Properties
 
@@ -60,7 +62,8 @@ object LogTestUtils {
                       maxMessageBytes: Int = Defaults.MaxMessageSize,
                       indexIntervalBytes: Int = Defaults.IndexInterval,
                       segmentIndexBytes: Int = Defaults.MaxIndexSize,
-                      fileDeleteDelayMs: Long = Defaults.FileDeleteDelayMs): LogConfig = {
+                      fileDeleteDelayMs: Long = Defaults.FileDeleteDelayMs,
+                      remoteLogStorageEnable: Boolean = Defaults.RemoteLogStorageEnable): LogConfig = {
     val logProps = new Properties()
     logProps.put(LogConfig.SegmentMsProp, segmentMs: java.lang.Long)
     logProps.put(LogConfig.SegmentBytesProp, segmentBytes: Integer)
@@ -72,6 +75,7 @@ object LogTestUtils {
     logProps.put(LogConfig.IndexIntervalBytesProp, indexIntervalBytes: Integer)
     logProps.put(LogConfig.SegmentIndexBytesProp, segmentIndexBytes: Integer)
     logProps.put(LogConfig.FileDeleteDelayMsProp, fileDeleteDelayMs: java.lang.Long)
+    logProps.put(LogConfig.RemoteLogStorageEnableProp, remoteLogStorageEnable: java.lang.Boolean)
     LogConfig(logProps)
   }
 
@@ -88,7 +92,9 @@ object LogTestUtils {
                 lastShutdownClean: Boolean = true,
                 topicId: Option[Uuid] = None,
                 keepPartitionMetadataFile: Boolean = true,
-                numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int]): UnifiedLog = {
+                numRemainingSegments: ConcurrentMap[String, Int] = new ConcurrentHashMap[String, Int],
+                remoteStorageSystemEnable: Boolean = false,
+                remoteLogManager: Option[RemoteLogManager] = None): UnifiedLog = {
     UnifiedLog(
       dir = dir,
       config = config,
@@ -104,7 +110,9 @@ object LogTestUtils {
       lastShutdownClean = lastShutdownClean,
       topicId = topicId,
       keepPartitionMetadataFile = keepPartitionMetadataFile,
-      numRemainingSegments = numRemainingSegments
+      numRemainingSegments = numRemainingSegments,
+      remoteStorageSystemEnable = remoteStorageSystemEnable,
+      remoteLogManager = remoteLogManager
     )
   }
 
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 3c954b70180..b631b642ef4 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -23,7 +23,6 @@ import java.nio.channels.FileChannel
 import java.nio.file.{Files, StandardOpenOption}
 import java.util.Collections
 import java.util.concurrent.atomic.AtomicInteger
-
 import kafka.server.LogOffsetMetadata
 import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
@@ -741,6 +740,35 @@ class ProducerStateManagerTest {
     assertEquals(Set(1), currentSnapshotOffsets)
   }
 
+  @Test
+  def testReloadSnapshots(): Unit = {
+    val epoch = 0.toShort
+    append(stateManager, producerId, epoch, 1, 1L)
+    append(stateManager, producerId, epoch, 2, 2L)
+    stateManager.takeSnapshot()
+    val pathAndDataList = logDir.listFiles().map(file => (file.toPath, Files.readAllBytes(file.toPath)))
+
+    append(stateManager, producerId, epoch, 3, 3L)
+    append(stateManager, producerId, epoch, 4, 4L)
+    stateManager.takeSnapshot()
+    assertEquals(2, logDir.listFiles().length)
+    assertEquals(Set(3, 5), currentSnapshotOffsets)
+
+    // Truncate to the range (3, 5), this will delete the earlier snapshot until offset 3.
+    stateManager.truncateAndReload(3, 5, time.milliseconds())
+    assertEquals(1, logDir.listFiles().length)
+    assertEquals(Set(5), currentSnapshotOffsets)
+
+    // Add the snapshot files until offset 3 to the log dir.
+    pathAndDataList.foreach { case (path, data) => Files.write(path, data) }
+    // Cleanup the in-memory snapshots and reload the snapshots from log dir.
+    // It loads the earlier written snapshot files from log dir.
+    stateManager.truncateFullyAndReloadSnapshots()
+
+    assertEquals(Some(3), stateManager.latestSnapshotOffset)
+    assertEquals(Set(3), currentSnapshotOffsets)
+  }
+
   @Test
   def testFirstUnstableOffsetAfterTruncation(): Unit = {
     val epoch = 0.toShort
diff --git a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
index 790bcd88a9a..1f0f91aa2ad 100644
--- a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
@@ -168,4 +168,12 @@ class TransactionIndexTest {
     assertEquals(5, abortedTxns(1).firstOffset)
   }
 
+  @Test
+  def testUpdateParentDir(): Unit = {
+    val tmpParentDir = new File(TestUtils.tempDir(), "parent")
+    tmpParentDir.mkdir()
+    assertNotEquals(tmpParentDir, index.file.getParentFile)
+    index.updateParentDir(tmpParentDir)
+    assertEquals(tmpParentDir, index.file.getParentFile)
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 8f42456123d..42fdafae206 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -20,25 +20,30 @@ package kafka.log
 import java.io._
 import java.nio.ByteBuffer
 import java.nio.file.Files
-import java.util.concurrent.{Callable, Executors}
+import java.util.concurrent.{Callable, ConcurrentHashMap, Executors}
 import java.util.{Optional, Properties}
-
 import kafka.common.{OffsetsOutOfOrderException, RecordValidationException, UnexpectedAppendOffsetException}
+import kafka.log.remote.RemoteLogManager
 import kafka.server.checkpoints.LeaderEpochCheckpointFile
 import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
 import kafka.server.{BrokerTopicStats, FetchHighWatermark, FetchIsolation, FetchLogEnd, FetchTxnCommitted, KafkaConfig, LogOffsetMetadata, PartitionMetadataFile}
 import kafka.utils._
 import org.apache.kafka.common.{InvalidRecordException, TopicPartition, Uuid}
 import org.apache.kafka.common.errors._
+import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.FetchResponseData
 import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
 import org.apache.kafka.common.record.MemoryRecords.RecordFilter
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.requests.{ListOffsetsRequest, ListOffsetsResponse}
 import org.apache.kafka.common.utils.{BufferSupplier, Time, Utils}
+import org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.mockito.ArgumentMatchers
+import org.mockito.ArgumentMatchers.anyLong
+import org.mockito.Mockito.{mock, when}
 
 import scala.annotation.nowarn
 import scala.collection.Map
@@ -1980,6 +1985,8 @@ class UnifiedLogTest {
 
     assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))),
       log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP))
+    assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))),
+      log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP))
     assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch))),
       log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP))
 
@@ -2020,6 +2027,60 @@ class UnifiedLogTest {
       log.fetchOffsetByTimestamp(ListOffsetsRequest.MAX_TIMESTAMP))
   }
 
+  @Test
+  def testFetchOffsetByTimestampFromRemoteStorage(): Unit = {
+    val remoteLogManager = mock(classOf[RemoteLogManager])
+    val logConfig = LogTestUtils.createLogConfig(segmentBytes = 200, indexIntervalBytes = 1,
+      remoteLogStorageEnable = true)
+    val log = createLog(logDir, logConfig, remoteStorageSystemEnable = true, remoteLogManager = Some(remoteLogManager))
+    when(remoteLogManager.findOffsetByTimestamp(log.topicPartition, 0, 0, log.leaderEpochCache.get))
+      .thenReturn(None)
+    assertEquals(None, log.fetchOffsetByTimestamp(0L))
+
+    val firstTimestamp = mockTime.milliseconds
+    val firstLeaderEpoch = 0
+    log.appendAsLeader(TestUtils.singletonRecords(
+      value = TestUtils.randomBytes(10),
+      timestamp = firstTimestamp),
+      leaderEpoch = firstLeaderEpoch)
+
+    val secondTimestamp = firstTimestamp + 1
+    val secondLeaderEpoch = 1
+    log.appendAsLeader(TestUtils.singletonRecords(
+      value = TestUtils.randomBytes(10),
+      timestamp = secondTimestamp),
+      leaderEpoch = secondLeaderEpoch)
+
+    when(remoteLogManager.findOffsetByTimestamp(ArgumentMatchers.eq(log.topicPartition),
+      anyLong(), anyLong(), ArgumentMatchers.eq(log.leaderEpochCache.get)))
+      .thenAnswer(ans => {
+        val timestamp = ans.getArgument(1).asInstanceOf[Long]
+        Option(timestamp)
+          .filter(_ == firstTimestamp)
+          .map(new TimestampAndOffset(_, 0L, Optional.of(firstLeaderEpoch)))
+      })
+    log._localLogStartOffset = 1
+
+    assertEquals(Some(new TimestampAndOffset(firstTimestamp, 0L, Optional.of(firstLeaderEpoch))),
+      log.fetchOffsetByTimestamp(firstTimestamp))
+    assertEquals(Some(new TimestampAndOffset(secondTimestamp, 1L, Optional.of(secondLeaderEpoch))),
+      log.fetchOffsetByTimestamp(secondTimestamp))
+
+    assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 0L, Optional.of(firstLeaderEpoch))),
+      log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_TIMESTAMP))
+    assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 1L, Optional.of(secondLeaderEpoch))),
+      log.fetchOffsetByTimestamp(ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP))
+    assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(secondLeaderEpoch))),
+      log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP))
+
+    // The cache can be updated directly after a leader change.
+    // The new latest offset should reflect the updated epoch.
+    log.maybeAssignEpochStartOffset(2, 2L)
+
+    assertEquals(Some(new TimestampAndOffset(ListOffsetsResponse.UNKNOWN_TIMESTAMP, 2L, Optional.of(2))),
+      log.fetchOffsetByTimestamp(ListOffsetsRequest.LATEST_TIMESTAMP))
+  }
+
   /**
    * Test the Log truncate operations
    */
@@ -3475,6 +3536,58 @@ class UnifiedLogTest {
     assertEquals(None, log.maybeUpdateHighWatermark(101L))
   }
 
+  def testEnableRemoteLogStorageOnCompactedTopics(): Unit = {
+      var logConfig = LogTestUtils.createLogConfig()
+      var log = createLog(logDir, logConfig)
+      assertFalse(log.remoteLogEnabled())
+
+      log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
+      assertFalse(log.remoteLogEnabled())
+
+      logConfig = LogTestUtils.createLogConfig(remoteLogStorageEnable = true)
+      log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
+      assertTrue(log.remoteLogEnabled())
+
+      logConfig = LogTestUtils.createLogConfig(cleanupPolicy = LogConfig.Compact, remoteLogStorageEnable = true)
+      log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
+      assertFalse(log.remoteLogEnabled())
+
+      logConfig = LogTestUtils.createLogConfig(cleanupPolicy = LogConfig.Compact + "," + LogConfig.Delete,
+        remoteLogStorageEnable = true)
+      log = createLog(logDir, logConfig, remoteStorageSystemEnable = true)
+      assertFalse(log.remoteLogEnabled())
+    }
+
+    @Test
+    def testRemoteLogStorageIsDisabledOnInternalAndRemoteLogMetadataTopic(): Unit = {
+      val partitions = Seq(TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME,
+        Topic.TRANSACTION_STATE_TOPIC_NAME, Topic.TRANSACTION_STATE_TOPIC_NAME)
+        .map(topic => new TopicPartition(topic, 0))
+      for (partition <- partitions) {
+        val logConfig = LogTestUtils.createLogConfig(remoteLogStorageEnable = true)
+        val internalLogDir = new File(TestUtils.tempDir(), partition.toString)
+        internalLogDir.mkdir()
+        val log = createLog(internalLogDir, logConfig, remoteStorageSystemEnable = true)
+        assertFalse(log.remoteLogEnabled())
+      }
+    }
+
+    @Test
+    def testNoOpWhenRemoteLogStorageIsDisabled(): Unit = {
+      val logConfig = LogTestUtils.createLogConfig()
+      val log = createLog(logDir, logConfig)
+
+      for (i <- 0 until 100) {
+        val records = TestUtils.singletonRecords(value = s"test$i".getBytes)
+        log.appendAsLeader(records, leaderEpoch = 0)
+      }
+      
+      log.updateHighWatermark(90L)
+      log.maybeIncrementLogStartOffset(20L, SegmentDeletion)
+      assertEquals(20, log.logStartOffset)
+      assertEquals(log.logStartOffset, log.localLogStartOffset())
+    }
+
   private def appendTransactionalToBuffer(buffer: ByteBuffer,
                                           producerId: Long,
                                           producerEpoch: Short,
@@ -3529,10 +3642,13 @@ class UnifiedLogTest {
                         producerIdExpirationCheckIntervalMs: Int = kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs,
                         lastShutdownClean: Boolean = true,
                         topicId: Option[Uuid] = None,
-                        keepPartitionMetadataFile: Boolean = true): UnifiedLog = {
+                        keepPartitionMetadataFile: Boolean = true,
+                        remoteStorageSystemEnable: Boolean = false,
+                        remoteLogManager: Option[RemoteLogManager] = None): UnifiedLog = {
     LogTestUtils.createLog(dir, config, brokerTopicStats, scheduler, time, logStartOffset, recoveryPoint,
       maxTransactionTimeoutMs, producerStateManagerConfig, producerIdExpirationCheckIntervalMs,
-      lastShutdownClean, topicId, keepPartitionMetadataFile)
+      lastShutdownClean, topicId, keepPartitionMetadataFile, new ConcurrentHashMap[String, Int],
+      remoteStorageSystemEnable, remoteLogManager)
   }
 
   private def createLogWithOffsetOverflow(logConfig: LogConfig): (UnifiedLog, LogSegment) = {
diff --git a/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
new file mode 100644
index 00000000000..e785f09fb7d
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/remote/RemoteIndexCacheTest.scala
@@ -0,0 +1,237 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote
+
+import kafka.log.{OffsetIndex, OffsetPosition, TimeIndex, UnifiedLog}
+import kafka.utils.MockTime
+import org.apache.kafka.common.{TopicIdPartition, TopicPartition, Uuid}
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
+import org.apache.kafka.server.log.remote.storage.{RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteStorageManager}
+import org.apache.kafka.test.TestUtils
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.mockito.ArgumentMatchers
+import org.mockito.ArgumentMatchers.any
+import org.mockito.Mockito._
+
+import java.io.{File, FileInputStream}
+import java.nio.file.Files
+import java.util.Collections
+import scala.collection.mutable
+
+class RemoteIndexCacheTest {
+
+  val time = new MockTime()
+  val partition = new TopicPartition("foo", 0)
+  val idPartition = new TopicIdPartition(Uuid.randomUuid(), partition)
+  val logDir: File = TestUtils.tempDirectory("kafka-logs")
+  val tpDir: File = new File(logDir, partition.toString)
+  val brokerId = 1
+  val baseOffset = 45L
+  val lastOffset = 75L
+  val segmentSize = 1024
+
+  val rsm: RemoteStorageManager = mock(classOf[RemoteStorageManager])
+  val cache: RemoteIndexCache =  new RemoteIndexCache(remoteStorageManager = rsm, logDir = logDir.toString)
+  val remoteLogSegmentId = new RemoteLogSegmentId(idPartition, Uuid.randomUuid())
+  val rlsMetadata: RemoteLogSegmentMetadata = new RemoteLogSegmentMetadata(remoteLogSegmentId, baseOffset, lastOffset,
+    time.milliseconds(), brokerId, time.milliseconds(), segmentSize, Collections.singletonMap(0, 0L))
+
+  @BeforeEach
+  def setup(): Unit = {
+    Files.createDirectory(tpDir.toPath)
+    val txnIdxFile = new File(tpDir, "txn-index" + UnifiedLog.TxnIndexFileSuffix)
+    txnIdxFile.createNewFile()
+    when(rsm.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType])))
+      .thenAnswer(ans => {
+        val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
+        val indexType = ans.getArgument[IndexType](1)
+        val maxEntries = (metadata.endOffset() - metadata.startOffset()).asInstanceOf[Int]
+        val offsetIdx = new OffsetIndex(new File(tpDir, String.valueOf(metadata.startOffset()) + UnifiedLog.IndexFileSuffix),
+          metadata.startOffset(), maxIndexSize = maxEntries * 8)
+        val timeIdx = new TimeIndex(new File(tpDir, String.valueOf(metadata.startOffset()) + UnifiedLog.TimeIndexFileSuffix),
+          metadata.startOffset(), maxIndexSize = maxEntries * 12)
+        maybeAppendIndexEntries(offsetIdx, timeIdx)
+        indexType match {
+          case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
+          case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
+          case IndexType.TRANSACTION => new FileInputStream(txnIdxFile)
+          case IndexType.LEADER_EPOCH => // leader-epoch-cache is not accessed.
+          case IndexType.PRODUCER_SNAPSHOT => // producer-snapshot is not accessed.
+        }
+      })
+  }
+
+  @AfterEach
+  def cleanup(): Unit = {
+    reset(rsm)
+    cache.entries.forEach((_, v) => v.cleanup())
+    cache.close()
+  }
+
+  @Test
+  def testFetchIndexFromRemoteStorage(): Unit = {
+    val offsetIndex = cache.getIndexEntry(rlsMetadata).offsetIndex.get
+    val offsetPosition1 = offsetIndex.entry(1)
+    // this call should have invoked fetchOffsetIndex, fetchTimestampIndex once
+    val resultPosition = cache.lookupOffset(rlsMetadata, offsetPosition1.offset)
+    assertEquals(offsetPosition1.position, resultPosition)
+    verifyFetchIndexInvocation(count = 1, Seq(IndexType.OFFSET, IndexType.TIMESTAMP))
+
+    // this should not cause fetching index from RemoteStorageManager as it is already fetched earlier
+    reset(rsm)
+    val offsetPosition2 = offsetIndex.entry(2)
+    val resultPosition2 = cache.lookupOffset(rlsMetadata, offsetPosition2.offset)
+    assertEquals(offsetPosition2.position, resultPosition2)
+    assertNotNull(cache.getIndexEntry(rlsMetadata))
+    verifyNoInteractions(rsm)
+  }
+
+  @Test
+  def testPositionForNonExistingIndexFromRemoteStorage(): Unit = {
+    val offsetIndex = cache.getIndexEntry(rlsMetadata).offsetIndex.get
+    val lastOffsetPosition = cache.lookupOffset(rlsMetadata, offsetIndex.lastOffset)
+    val greaterOffsetThanLastOffset = offsetIndex.lastOffset + 1
+    assertEquals(lastOffsetPosition, cache.lookupOffset(rlsMetadata, greaterOffsetThanLastOffset))
+
+    // offsetIndex.lookup() returns OffsetPosition(baseOffset, 0) for offsets smaller than least entry in the offset index.
+    val nonExistentOffsetPosition = OffsetPosition(baseOffset, 0)
+    val lowerOffsetThanBaseOffset = offsetIndex.baseOffset - 1
+    assertEquals(nonExistentOffsetPosition.position, cache.lookupOffset(rlsMetadata, lowerOffsetThanBaseOffset))
+  }
+
+  @Test
+  def testCacheEntryExpiry(): Unit = {
+    val cache = new RemoteIndexCache(maxSize = 2, rsm, logDir = logDir.toString)
+    val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
+    val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
+
+    // getIndex for first time will call rsm#fetchIndex
+    cache.getIndexEntry(metadataList.head)
+    // Calling getIndex on the same entry should not call rsm#fetchIndex again, but it should retrieve from cache
+    cache.getIndexEntry(metadataList.head)
+    assertEquals(1, cache.entries.size())
+    verifyFetchIndexInvocation(count = 1)
+
+    // Here a new key metadataList(1) is invoked, that should call rsm#fetchIndex, making the count to 2
+    cache.getIndexEntry(metadataList.head)
+    cache.getIndexEntry(metadataList(1))
+    assertEquals(2, cache.entries.size())
+    verifyFetchIndexInvocation(count = 2)
+
+    // getting index for metadataList.last should call rsm#fetchIndex, but metadataList(1) is already in cache.
+    cache.getIndexEntry(metadataList.last)
+    cache.getIndexEntry(metadataList(1))
+    assertEquals(2, cache.entries.size())
+    assertTrue(cache.entries.containsKey(metadataList.last.remoteLogSegmentId().id()))
+    assertTrue(cache.entries.containsKey(metadataList(1).remoteLogSegmentId().id()))
+    verifyFetchIndexInvocation(count = 3)
+
+    // getting index for metadataList.head should call rsm#fetchIndex as that entry was expired earlier,
+    // but metadataList(1) is already in cache.
+    cache.getIndexEntry(metadataList(1))
+    cache.getIndexEntry(metadataList.head)
+    assertEquals(2, cache.entries.size())
+    assertFalse(cache.entries.containsKey(metadataList.last.remoteLogSegmentId().id()))
+    verifyFetchIndexInvocation(count = 4)
+  }
+
+  @Test
+  def testGetIndexAfterCacheClose(): Unit = {
+    val cache = new RemoteIndexCache(maxSize = 2, rsm, logDir = logDir.toString)
+    val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
+    val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
+
+    cache.getIndexEntry(metadataList.head)
+    assertEquals(1, cache.entries.size())
+    verifyFetchIndexInvocation(count = 1)
+
+    cache.close()
+
+    // Check IllegalStateException is thrown when index is accessed after it is closed.
+    assertThrows(classOf[IllegalStateException], () => cache.getIndexEntry(metadataList.head))
+  }
+
+  @Test
+  def testReloadCacheAfterClose(): Unit = {
+    val cache = new RemoteIndexCache(maxSize = 2, rsm, logDir = logDir.toString)
+    val tpId = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("foo", 0))
+    val metadataList = generateRemoteLogSegmentMetadata(size = 3, tpId)
+
+    // getIndex for first time will call rsm#fetchIndex
+    cache.getIndexEntry(metadataList.head)
+    // Calling getIndex on the same entry should not call rsm#fetchIndex again, but it should retrieve from cache
+    cache.getIndexEntry(metadataList.head)
+    assertEquals(1, cache.entries.size())
+    verifyFetchIndexInvocation(count = 1)
+
+    // Here a new key metadataList(1) is invoked, that should call rsm#fetchIndex, making the count to 2
+    cache.getIndexEntry(metadataList(1))
+    // Calling getIndex on the same entry should not call rsm#fetchIndex again, but it should retrieve from cache
+    cache.getIndexEntry(metadataList(1))
+    assertEquals(2, cache.entries.size())
+    verifyFetchIndexInvocation(count = 2)
+
+    // Here a new key metadataList(2) is invoked, that should call rsm#fetchIndex, making the count to 2
+    cache.getIndexEntry(metadataList(2))
+    // Calling getIndex on the same entry should not call rsm#fetchIndex again, but it should retrieve from cache
+    cache.getIndexEntry(metadataList(2))
+    assertEquals(2, cache.entries.size())
+    verifyFetchIndexInvocation(count = 3)
+
+    // Close the cache
+    cache.close()
+
+    // Reload the cache from the disk and check the cache size is same as earlier
+    val reloadedCache = new RemoteIndexCache(maxSize = 2, rsm, logDir = logDir.toString)
+    assertEquals(2, reloadedCache.entries.size())
+    reloadedCache.close()
+  }
+
+  private def verifyFetchIndexInvocation(count: Int,
+                                         indexTypes: Seq[IndexType] =
+                                         Seq(IndexType.OFFSET, IndexType.TIMESTAMP, IndexType.TRANSACTION)): Unit = {
+    for (indexType <- indexTypes) {
+      verify(rsm, times(count)).fetchIndex(any(classOf[RemoteLogSegmentMetadata]), ArgumentMatchers.eq(indexType))
+    }
+  }
+
+  private def generateRemoteLogSegmentMetadata(size: Int,
+                                               tpId: TopicIdPartition): List[RemoteLogSegmentMetadata] = {
+    val metadataList = mutable.Buffer.empty[RemoteLogSegmentMetadata]
+    for (i <- 0 until size) {
+      metadataList.append(new RemoteLogSegmentMetadata(new RemoteLogSegmentId(tpId, Uuid.randomUuid()), baseOffset * i,
+        baseOffset * i + 10, time.milliseconds(), brokerId, time.milliseconds(), segmentSize,
+        Collections.singletonMap(0, 0L)))
+    }
+    metadataList.toList
+  }
+
+  private def maybeAppendIndexEntries(offsetIndex: OffsetIndex,
+                                      timeIndex: TimeIndex): Unit = {
+    if (!offsetIndex.isFull) {
+      val curTime = time.milliseconds()
+      for (i <- 0 until offsetIndex.maxEntries) {
+        val offset = offsetIndex.baseOffset + i
+        offsetIndex.append(offset, i)
+        timeIndex.maybeAppend(curTime + i, offset, skipFullCheck = true)
+      }
+      offsetIndex.flush()
+      timeIndex.flush()
+    }
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/log/remote/RemoteLogManagerTest.scala b/core/src/test/scala/unit/kafka/log/remote/RemoteLogManagerTest.scala
new file mode 100644
index 00000000000..92128c1e7d9
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/log/remote/RemoteLogManagerTest.scala
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote
+
+import kafka.cluster.Partition
+import kafka.log.{OffsetIndex, TimeIndex, UnifiedLog}
+import kafka.server.KafkaConfig
+import kafka.server.checkpoints.LeaderEpochCheckpoint
+import kafka.server.epoch.{EpochEntry, LeaderEpochFileCache}
+import kafka.utils.MockTime
+import org.apache.kafka.common.{KafkaException, TopicIdPartition, TopicPartition, Uuid}
+import org.apache.kafka.common.config.AbstractConfig
+import org.apache.kafka.common.record.FileRecords.TimestampAndOffset
+import org.apache.kafka.common.record.{CompressionType, MemoryRecords, SimpleRecord}
+import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType
+import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig, RemoteLogMetadataManager, RemoteLogSegmentId, RemoteLogSegmentMetadata, RemoteStorageManager}
+import org.apache.kafka.test.TestUtils
+import org.junit.jupiter.api.{BeforeEach, Test}
+import org.mockito.Mockito._
+import org.junit.jupiter.api.Assertions._
+import org.mockito.{ArgumentCaptor, ArgumentMatchers}
+import org.mockito.ArgumentMatchers.{any, anyInt, anyLong}
+
+import java.io.{ByteArrayInputStream, File, FileInputStream}
+import java.nio.file.Files
+import java.util.{Optional, Properties}
+import java.util
+import scala.collection.Seq
+import scala.jdk.CollectionConverters._
+
+class RemoteLogManagerTest {
+
+  val time = new MockTime()
+  val brokerId = 0
+  val logDir: String = TestUtils.tempDirectory("kafka-").toString
+
+  val remoteStorageManager: RemoteStorageManager = mock(classOf[RemoteStorageManager])
+  val remoteLogMetadataManager: RemoteLogMetadataManager = mock(classOf[RemoteLogMetadataManager])
+  var remoteLogManagerConfig: RemoteLogManagerConfig = _
+  var remoteLogManager: RemoteLogManager = _
+
+  val leaderTopicIdPartition =  new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("Leader", 0))
+  val followerTopicIdPartition = new TopicIdPartition(Uuid.randomUuid(), new TopicPartition("Follower", 0))
+  val topicIds: util.Map[String, Uuid] = Map(
+    leaderTopicIdPartition.topicPartition().topic() -> leaderTopicIdPartition.topicId(),
+    followerTopicIdPartition.topicPartition().topic() -> followerTopicIdPartition.topicId()
+  ).asJava
+
+  val checkpoint: LeaderEpochCheckpoint = new LeaderEpochCheckpoint {
+    var epochs: Seq[EpochEntry] = Seq()
+    override def write(epochs: Iterable[EpochEntry]): Unit = this.epochs = epochs.toSeq
+    override def read(): Seq[EpochEntry] = this.epochs
+  }
+
+  @BeforeEach
+  def setUp(): Unit = {
+    val props = new Properties()
+    remoteLogManagerConfig = createRLMConfig(props)
+    remoteLogManager = new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir) {
+      override private[remote] def createRemoteStorageManager() = remoteStorageManager
+      override private[remote] def createRemoteLogMetadataManager() = remoteLogMetadataManager
+    }
+  }
+
+  @Test
+  def testRemoteLogMetadataManagerWithUserDefinedConfigs(): Unit = {
+    val key = "key"
+    val configPrefix = "config.prefix"
+    val props: Properties = new Properties()
+    props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CONFIG_PREFIX_PROP, configPrefix)
+    props.put(configPrefix + key, "world")
+    props.put("remote.log.metadata.y", "z")
+
+    val metadataMangerConfig = createRLMConfig(props).remoteLogMetadataManagerProps()
+    assertEquals(props.get(configPrefix + key), metadataMangerConfig.get(key))
+    assertFalse(metadataMangerConfig.containsKey("remote.log.metadata.y"))
+  }
+
+  @Test
+  def testStartup(): Unit = {
+    remoteLogManager.startup()
+    val capture: ArgumentCaptor[util.Map[String, _]] = ArgumentCaptor.forClass(classOf[util.Map[String, _]])
+    verify(remoteStorageManager, times(1)).configure(capture.capture())
+    assertEquals(brokerId, capture.getValue.get(KafkaConfig.BrokerIdProp))
+
+    verify(remoteLogMetadataManager, times(1)).configure(capture.capture())
+    assertEquals(brokerId, capture.getValue.get(KafkaConfig.BrokerIdProp))
+    assertEquals(logDir, capture.getValue.get(KafkaConfig.LogDirProp))
+  }
+
+  @Test
+  def testGetClassLoaderAwareRemoteStorageManager(): Unit = {
+    val rsmManager: ClassLoaderAwareRemoteStorageManager = mock(classOf[ClassLoaderAwareRemoteStorageManager])
+    val remoteLogManager =
+      new RemoteLogManager(remoteLogManagerConfig, brokerId, logDir) {
+        override private[remote] def createRemoteStorageManager(): ClassLoaderAwareRemoteStorageManager = rsmManager
+      }
+    assertEquals(rsmManager, remoteLogManager.storageManager())
+  }
+
+  @Test
+  def testTopicIdCacheUpdates(): Unit = {
+    def verifyInCache(topicIdPartitions: TopicIdPartition*): Unit = {
+      topicIdPartitions.foreach { topicIdPartition =>
+        assertDoesNotThrow(() =>
+          remoteLogManager.fetchRemoteLogSegmentMetadata(topicIdPartition.topicPartition(), epochForOffset = 0, offset = 0L))
+      }
+    }
+
+    def verifyNotInCache(topicIdPartitions: TopicIdPartition*): Unit = {
+      topicIdPartitions.foreach { topicIdPartition =>
+        assertThrows(classOf[KafkaException], () =>
+          remoteLogManager.fetchRemoteLogSegmentMetadata(topicIdPartition.topicPartition(), epochForOffset = 0, offset = 0L))
+      }
+    }
+
+    val mockLeaderPartition = mockPartition(leaderTopicIdPartition)
+    val mockFollowerPartition = mockPartition(followerTopicIdPartition)
+
+    when(remoteLogMetadataManager.remoteLogSegmentMetadata(any(classOf[TopicIdPartition]), anyInt(), anyLong()))
+      .thenReturn(Optional.empty[RemoteLogSegmentMetadata]())
+    verifyNotInCache(followerTopicIdPartition, leaderTopicIdPartition)
+    // Load topicId cache
+    remoteLogManager.onLeadershipChange(Set(mockLeaderPartition), Set(mockFollowerPartition), topicIds)
+    verify(remoteLogMetadataManager, times(1))
+      .onPartitionLeadershipChanges(Set(leaderTopicIdPartition).asJava, Set(followerTopicIdPartition).asJava)
+    verifyInCache(followerTopicIdPartition, leaderTopicIdPartition)
+
+    // Evicts from topicId cache
+    remoteLogManager.stopPartitions(leaderTopicIdPartition.topicPartition(), delete = true)
+    verifyNotInCache(leaderTopicIdPartition)
+    verifyInCache(followerTopicIdPartition)
+
+    // Evicts from topicId cache
+    remoteLogManager.stopPartitions(followerTopicIdPartition.topicPartition(), delete = true)
+    verifyNotInCache(leaderTopicIdPartition, followerTopicIdPartition)
+  }
+
+  @Test
+  def testFetchRemoteLogSegmentMetadata(): Unit = {
+    remoteLogManager.onLeadershipChange(
+      Set(mockPartition(leaderTopicIdPartition)), Set(mockPartition(followerTopicIdPartition)), topicIds)
+    remoteLogManager.fetchRemoteLogSegmentMetadata(leaderTopicIdPartition.topicPartition(), 10, 100L)
+    remoteLogManager.fetchRemoteLogSegmentMetadata(followerTopicIdPartition.topicPartition(), 20, 200L)
+
+    verify(remoteLogMetadataManager)
+      .remoteLogSegmentMetadata(ArgumentMatchers.eq(leaderTopicIdPartition), anyInt(), anyLong())
+    verify(remoteLogMetadataManager)
+      .remoteLogSegmentMetadata(ArgumentMatchers.eq(followerTopicIdPartition), anyInt(), anyLong())
+  }
+
+  @Test
+  def testFindOffsetByTimestamp(): Unit = {
+    val tp = leaderTopicIdPartition.topicPartition()
+    val remoteLogSegmentId = new RemoteLogSegmentId(leaderTopicIdPartition, Uuid.randomUuid())
+    val ts = time.milliseconds()
+    val startOffset = 120
+    val targetLeaderEpoch = 10
+
+    val segmentMetadata = mock(classOf[RemoteLogSegmentMetadata])
+    when(segmentMetadata.remoteLogSegmentId()).thenReturn(remoteLogSegmentId)
+    when(segmentMetadata.maxTimestampMs()).thenReturn(ts + 2)
+    when(segmentMetadata.startOffset()).thenReturn(startOffset)
+    when(segmentMetadata.endOffset()).thenReturn(startOffset + 2)
+
+    val tpDir: File = new File(logDir, tp.toString)
+    Files.createDirectory(tpDir.toPath)
+    val txnIdxFile = new File(tpDir, "txn-index" + UnifiedLog.TxnIndexFileSuffix)
+    txnIdxFile.createNewFile()
+    when(remoteStorageManager.fetchIndex(any(classOf[RemoteLogSegmentMetadata]), any(classOf[IndexType])))
+      .thenAnswer { ans =>
+        val metadata = ans.getArgument[RemoteLogSegmentMetadata](0)
+        val indexType = ans.getArgument[IndexType](1)
+        val maxEntries = (metadata.endOffset() - metadata.startOffset()).asInstanceOf[Int]
+        val offsetIdx = new OffsetIndex(new File(tpDir, String.valueOf(metadata.startOffset()) + UnifiedLog.IndexFileSuffix),
+          metadata.startOffset(), maxIndexSize = maxEntries * 8)
+        val timeIdx = new TimeIndex(new File(tpDir, String.valueOf(metadata.startOffset()) + UnifiedLog.TimeIndexFileSuffix),
+          metadata.startOffset(), maxIndexSize = maxEntries * 12)
+        indexType match {
+          case IndexType.OFFSET => new FileInputStream(offsetIdx.file)
+          case IndexType.TIMESTAMP => new FileInputStream(timeIdx.file)
+          case IndexType.TRANSACTION => new FileInputStream(txnIdxFile)
+          case IndexType.LEADER_EPOCH =>
+          case IndexType.PRODUCER_SNAPSHOT =>
+        }
+      }
+
+    when(remoteLogMetadataManager.listRemoteLogSegments(ArgumentMatchers.eq(leaderTopicIdPartition), anyInt()))
+      .thenAnswer(ans => {
+        val leaderEpoch = ans.getArgument[Int](1)
+        if (leaderEpoch == targetLeaderEpoch)
+          List(segmentMetadata).asJava.iterator()
+        else
+          List().asJava.iterator()
+      })
+
+    def records(timestamp: Long,
+                initialOffset: Long,
+                partitionLeaderEpoch: Int): MemoryRecords = {
+      MemoryRecords.withRecords(initialOffset, CompressionType.NONE, partitionLeaderEpoch,
+        new SimpleRecord(timestamp - 1, "first message".getBytes()),
+        new SimpleRecord(timestamp + 1, "second message".getBytes()),
+        new SimpleRecord(timestamp + 2, "third message".getBytes()),
+      )
+    }
+
+    // 3 messages are added with offset, and timestamp as below
+    // startOffset   , ts-1
+    // startOffset+1 , ts+1
+    // startOffset+2 , ts+2
+    when(remoteStorageManager.fetchLogSegment(segmentMetadata, 0))
+      .thenAnswer(_ => new ByteArrayInputStream(records(ts, startOffset, targetLeaderEpoch).buffer().array()))
+
+    val leaderEpochFileCache = new LeaderEpochFileCache(tp, checkpoint)
+    leaderEpochFileCache.assign(epoch = 5, startOffset = 99L)
+    leaderEpochFileCache.assign(epoch = targetLeaderEpoch, startOffset = startOffset)
+    leaderEpochFileCache.assign(epoch = 12, startOffset = 500L)
+
+    remoteLogManager.onLeadershipChange(Set(mockPartition(leaderTopicIdPartition)), Set(), topicIds)
+    // Fetching message for timestamp `ts` will return the message with startOffset+1, and `ts+1` as there are no
+    // messages starting with the startOffset and with `ts`.
+    val maybeTimestampAndOffset1 = remoteLogManager.findOffsetByTimestamp(tp, ts, startOffset, leaderEpochFileCache)
+    assertEquals(Some(new TimestampAndOffset(ts + 1, startOffset + 1, Optional.of(targetLeaderEpoch))), maybeTimestampAndOffset1)
+
+    // Fetching message for `ts+2` will return the message with startOffset+2 and its timestamp value is `ts+2`.
+    val maybeTimestampAndOffset2 = remoteLogManager.findOffsetByTimestamp(tp, ts + 2, startOffset, leaderEpochFileCache)
+    assertEquals(Some(new TimestampAndOffset(ts + 2, startOffset + 2, Optional.of(targetLeaderEpoch))), maybeTimestampAndOffset2)
+
+    // Fetching message for `ts+3` will return None as there are no records with timestamp >= ts+3.
+    val maybeTimestampAndOffset3 = remoteLogManager.findOffsetByTimestamp(tp, ts + 3, startOffset, leaderEpochFileCache)
+    assertEquals(None, maybeTimestampAndOffset3)
+  }
+
+  @Test
+  def testIdempotentClose(): Unit = {
+    remoteLogManager.close()
+    remoteLogManager.close()
+    val inorder = inOrder(remoteStorageManager, remoteLogMetadataManager)
+    inorder.verify(remoteStorageManager, times(1)).close()
+    inorder.verify(remoteLogMetadataManager, times(1)).close()
+  }
+
+  private def mockPartition(topicIdPartition: TopicIdPartition) = {
+    val tp = topicIdPartition.topicPartition()
+    val partition: Partition = mock(classOf[Partition])
+    val log = mock(classOf[UnifiedLog])
+    when(partition.topicPartition).thenReturn(tp)
+    when(partition.topic).thenReturn(tp.topic())
+    when(log.remoteLogEnabled()).thenReturn(true)
+    when(partition.log).thenReturn(Some(log))
+    partition
+  }
+
+  private def createRLMConfig(props: Properties): RemoteLogManagerConfig = {
+    props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString)
+    props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName)
+    props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName)
+    val config = new AbstractConfig(RemoteLogManagerConfig.CONFIG_DEF, props)
+    new RemoteLogManagerConfig(config)
+  }
+
+}
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
index cb60384a6b0..260efed809a 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
@@ -297,15 +297,17 @@ class AbstractFetcherManagerTest {
 
     override def fetch(fetchRequest: FetchRequest.Builder): Map[TopicPartition, FetchData] = Map.empty
 
-    override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long = 1
+    override def fetchEarliestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = (0, 1)
 
-    override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): Long = 1
+    override def fetchLatestOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = (0, 1)
 
     override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = Map.empty
 
     override def buildFetch(partitionMap: Map[TopicPartition, PartitionFetchState]): ResultWithPartitions[Option[ReplicaFetch]] = ResultWithPartitions(None, Set.empty)
 
     override val isTruncationOnFetchSupported: Boolean = false
+
+    override def fetchEarliestLocalOffset(topicPartition: TopicPartition, currentLeaderEpoch: Int): (Int, Long) = (0, 1)
   }
 
   private class TestResizeFetcherThread(sourceBroker: BrokerEndPoint, failedPartitions: FailedPartitions)
@@ -334,6 +336,8 @@ class AbstractFetcherManagerTest {
     override protected def endOffsetForEpoch(topicPartition: TopicPartition, epoch: Int): Option[OffsetAndEpoch] = Some(OffsetAndEpoch(1, 0))
 
     override protected val isOffsetForLeaderEpochSupported: Boolean = false
+
+    override protected def buildRemoteLogAuxState(partition: TopicPartition, currentLeaderEpoch: Int, fetchOffset: Long, epochForFetchOffset: Int, leaderLogStartOffset: Long): Long = 1
   }
 
 }
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
index cdd17b1af2c..711b50724b8 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherThreadTest.scala
@@ -17,10 +17,6 @@
 
 package kafka.server
 
-import java.nio.ByteBuffer
-import java.util.Optional
-import java.util.concurrent.atomic.AtomicInteger
-
 import kafka.cluster.BrokerEndPoint
 import kafka.log.LogAppendInfo
 import kafka.message.NoCompressionCodec
@@ -28,7 +24,6 @@ import kafka.server.AbstractFetcherThread.ReplicaFetch
 import kafka.server.AbstractFetcherThread.ResultWithPartitions
 import kafka.utils.Implicits.MapExtensionMethods
 import kafka.utils.TestUtils
-import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
 import org.apache.kafka.common.errors.{FencedLeaderEpochException, UnknownLeaderEpochException, UnknownTopicIdException}
 import org.apache.kafka.common.message.FetchResponseData
 import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.EpochEndOffset
@@ -38,15 +33,19 @@ import org.apache.kafka.common.requests.OffsetsForLeaderEpochResponse.{UNDEFINED
 import org.apache.kafka.common.requests.{FetchRequest, FetchResponse}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.server.metrics.KafkaYammerMetrics
+import org.apache.kafka.common.{KafkaException, TopicPartition, Uuid}
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Assumptions.assumeTrue
 import org.junit.jupiter.api.{BeforeEach, Test}
 
-import scala.jdk.CollectionConverters._
-import scala.collection.{Map, Set, mutable}
-import scala.util.Random
+import java.nio.ByteBuffer
+import java.util.Optional
+import java.util.concurrent.atomic.AtomicInteger
 import scala.collection.mutable.ArrayBuffer
+import scala.collection.{Map, Set, mutable}
 import scala.compat.java8.OptionConverters._
+import scala.jdk.CollectionConverters._
+import scala.util.Random
 
 class AbstractFetcherThreadTest {
 
@@ -614,12 +613,103 @@ class AbstractFetcherThreadTest {
     assertEquals(0L, replicaState.highWatermark)
   }
 
+  @Test
+  def testFollowerFetchMovedToTieredStore(): Unit = {
+    val partition = new TopicPartition("topic", 0)
+    val fetcher = new MockFetcherThread(new MockLeaderEndPoint)
+
+    val replicaLog = Seq(
+      mkBatch(baseOffset = 0, leaderEpoch = 0, new SimpleRecord("a".getBytes)),
+      mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
+
+    val replicaState = PartitionState(replicaLog, leaderEpoch = 5, highWatermark = 0L, rlmEnabled = true)
+    fetcher.setReplicaState(partition, replicaState)
+    fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), 3L, leaderEpoch = 5)))
+
+    val leaderLog = Seq(
+      mkBatch(baseOffset = 5, leaderEpoch = 5, new SimpleRecord("f".getBytes)),
+      mkBatch(baseOffset = 6, leaderEpoch = 5, new SimpleRecord("g".getBytes)),
+      mkBatch(baseOffset = 7, leaderEpoch = 5, new SimpleRecord("h".getBytes)),
+      mkBatch(baseOffset = 8, leaderEpoch = 5, new SimpleRecord("i".getBytes)))
+
+
+    val leaderState = PartitionState(leaderLog, leaderEpoch = 5, highWatermark = 8L, rlmEnabled = true)
+    // Overriding the log start offset to zero for mocking the scenario of segment 0-4 moved to remote store.
+    leaderState.logStartOffset = 0
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    assertEquals(3L, replicaState.logEndOffset)
+    val expectedState = if (truncateOnFetch) Option(Fetching) else Option(Truncating)
+    assertEquals(expectedState, fetcher.fetchState(partition).map(_.state))
+
+    fetcher.doWork()
+    // verify that the offset moved to tiered store error triggered and respective states are truncated to expected.
+    assertEquals(0L, replicaState.logStartOffset)
+    assertEquals(5L, replicaState.localLogStartOffset)
+    assertEquals(5L, replicaState.highWatermark)
+    assertEquals(5L, replicaState.logEndOffset)
+
+    // Only 1 record batch is returned after a poll so calling 'n' number of times to get the desired result.
+    for (_ <- 1 to 5) fetcher.doWork()
+    assertEquals(4, replicaState.log.size)
+    assertEquals(0L, replicaState.logStartOffset)
+    assertEquals(5L, replicaState.localLogStartOffset)
+    assertEquals(8L, replicaState.highWatermark)
+    assertEquals(9L, replicaState.logEndOffset)
+  }
+
+  @Test
+  def testFencedOffsetResetAfterMovedToRemoteTier(): Unit = {
+    val partition = new TopicPartition("topic", 0)
+    var isErrorHandled = false
+    val fetcher = new MockFetcherThread(new MockLeaderEndPoint) {
+      override protected def buildRemoteLogAuxState(partition: TopicPartition,
+                                                    currentLeaderEpoch: Int,
+                                                    fetchOffset: Long,
+                                                    epochForFetchOffset: Int,
+                                                    leaderLogStartOffset: Long): Long = {
+        isErrorHandled = true
+        throw new FencedLeaderEpochException(s"Epoch $currentLeaderEpoch is fenced")
+      }
+    }
+
+    val replicaLog = Seq(
+      mkBatch(baseOffset = 1, leaderEpoch = 2, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 2, leaderEpoch = 4, new SimpleRecord("c".getBytes)))
+    val replicaState = PartitionState(replicaLog, leaderEpoch = 5, highWatermark = 2L, rlmEnabled = true)
+    fetcher.setReplicaState(partition, replicaState)
+    fetcher.addPartitions(Map(partition -> initialFetchState(topicIds.get(partition.topic), fetchOffset = 0L, leaderEpoch = 5)))
+
+    val leaderLog = Seq(
+      mkBatch(baseOffset = 5, leaderEpoch = 5, new SimpleRecord("b".getBytes)),
+      mkBatch(baseOffset = 6, leaderEpoch = 5, new SimpleRecord("c".getBytes)))
+    val leaderState = PartitionState(leaderLog, leaderEpoch = 5, highWatermark = 6L, rlmEnabled = true)
+    // Overriding the log start offset to zero for mocking the scenario of segment 0-4 moved to remote store.
+    leaderState.logStartOffset = 0
+    fetcher.mockLeader.setLeaderState(partition, leaderState)
+    fetcher.mockLeader.setReplicaPartitionStateCallback(fetcher.replicaPartitionState)
+
+    // After the offset moved to tiered storage error, we get a fenced error and remove the partition and mark as failed
+    fetcher.doWork()
+    assertEquals(3, replicaState.logEndOffset)
+    assertTrue(isErrorHandled)
+    assertTrue(fetcher.fetchState(partition).isEmpty)
+    assertTrue(failedPartitions.contains(partition))
+  }
+
   @Test
   def testFencedOffsetResetAfterOutOfRange(): Unit = {
     val partition = new TopicPartition("topic", 0)
     var fetchedEarliestOffset = false
+
     val fetcher = new MockFetcherThread(new MockLeaderEndPoint {
-      override def fetchEarliestOffset(topicPartition: TopicPartition, leaderEpoch: Int): Long = {
+      override def fetchEarliestOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = {
+        fetchedEarliestOffset = true
+        throw new FencedLeaderEpochException(s"Epoch $leaderEpoch is fenced")
+      }
+      override def fetchEarliestLocalOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = {
         fetchedEarliestOffset = true
         throw new FencedLeaderEpochException(s"Epoch $leaderEpoch is fenced")
       }
@@ -691,7 +781,7 @@ class AbstractFetcherThreadTest {
     val partition = new TopicPartition("topic", 0)
     val fetcher: MockFetcherThread = new MockFetcherThread(new MockLeaderEndPoint {
       val tries = new AtomicInteger(0)
-      override def fetchLatestOffset(topicPartition: TopicPartition, leaderEpoch: Int): Long = {
+      override def fetchLatestOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = {
         if (tries.getAndIncrement() == 0)
           throw new UnknownLeaderEpochException("Unexpected leader epoch")
         super.fetchLatestOffset(topicPartition, leaderEpoch)
@@ -1075,6 +1165,8 @@ class AbstractFetcherThreadTest {
           (Errors.OFFSET_OUT_OF_RANGE, MemoryRecords.EMPTY)
         } else if (divergingEpoch.nonEmpty) {
           (Errors.NONE, MemoryRecords.EMPTY)
+        } else if (leaderState.rlmEnabled && fetchData.fetchOffset < leaderState.localLogStartOffset) {
+          (Errors.OFFSET_MOVED_TO_TIERED_STORAGE, MemoryRecords.EMPTY)
         } else {
           // for simplicity, we fetch only one batch at a time
           val records = leaderState.log.find(_.baseOffset >= fetchData.fetchOffset) match {
@@ -1103,16 +1195,22 @@ class AbstractFetcherThreadTest {
       }.toMap
     }
 
-    override def fetchEarliestOffset(topicPartition: TopicPartition, leaderEpoch: Int): Long = {
+    override def fetchEarliestOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = {
       val leaderState = leaderPartitionState(topicPartition)
       checkLeaderEpochAndThrow(leaderEpoch, leaderState)
-      leaderState.logStartOffset
+      (leaderState.leaderEpoch, leaderState.logStartOffset)
     }
 
-    override def fetchLatestOffset(topicPartition: TopicPartition, leaderEpoch: Int): Long = {
+    override def fetchLatestOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = {
       val leaderState = leaderPartitionState(topicPartition)
       checkLeaderEpochAndThrow(leaderEpoch, leaderState)
-      leaderState.logEndOffset
+      (leaderState.leaderEpoch, leaderState.logEndOffset)
+    }
+
+    override def fetchEarliestLocalOffset(topicPartition: TopicPartition, leaderEpoch: Int): (Int, Long) = {
+      val leaderState = leaderPartitionState(topicPartition)
+      checkLeaderEpochAndThrow(leaderEpoch, leaderState)
+      (leaderState.leaderEpoch, leaderState.localLogStartOffset)
     }
 
     override def fetchEpochEndOffsets(partitions: Map[TopicPartition, EpochData]): Map[TopicPartition, EpochEndOffset] = {
@@ -1240,13 +1338,15 @@ class AbstractFetcherThreadTest {
                        var leaderEpoch: Int,
                        var logStartOffset: Long,
                        var logEndOffset: Long,
-                       var highWatermark: Long)
+                       var highWatermark: Long,
+                       var rlmEnabled: Boolean = false,
+                       var localLogStartOffset: Long)
 
   object PartitionState {
-    def apply(log: Seq[RecordBatch], leaderEpoch: Int, highWatermark: Long): PartitionState = {
+    def apply(log: Seq[RecordBatch], leaderEpoch: Int, highWatermark: Long, rlmEnabled: Boolean = false): PartitionState = {
       val logStartOffset = log.headOption.map(_.baseOffset).getOrElse(0L)
       val logEndOffset = log.lastOption.map(_.nextOffset).getOrElse(0L)
-      new PartitionState(log.toBuffer, leaderEpoch, logStartOffset, logEndOffset, highWatermark)
+      new PartitionState(log.toBuffer, leaderEpoch, logStartOffset, logEndOffset, highWatermark, rlmEnabled, logStartOffset)
     }
 
     def apply(leaderEpoch: Int): PartitionState = {
@@ -1351,7 +1451,11 @@ class AbstractFetcherThreadTest {
     override def truncateFullyAndStartAt(topicPartition: TopicPartition, offset: Long): Unit = {
       val state = replicaPartitionState(topicPartition)
       state.log.clear()
-      state.logStartOffset = offset
+      if (state.rlmEnabled) {
+        state.localLogStartOffset = offset
+      } else {
+        state.logStartOffset = offset
+      }
       state.logEndOffset = offset
       state.highWatermark = offset
     }
@@ -1384,6 +1488,18 @@ class AbstractFetcherThreadTest {
     }
 
     override protected val isOffsetForLeaderEpochSupported: Boolean = true
+
+    override protected def buildRemoteLogAuxState(topicPartition: TopicPartition,
+                                                  currentLeaderEpoch: Int,
+                                                  fetchOffset: Long,
+                                                  epochForFetchOffset: Int,
+                                                  leaderLogStartOffset: Long): Long = {
+      truncateFullyAndStartAt(topicPartition, fetchOffset)
+      replicaPartitionState(topicPartition).logStartOffset = leaderLogStartOffset
+      // skipped building leader epoch cache and producer snapshots as they are not verified.
+      leaderLogStartOffset
+    }
+
   }
 
 }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
index 5a84820bf4b..d5d4990d5a5 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaServerTest.scala
@@ -24,6 +24,7 @@ import org.junit.jupiter.api.Test
 import java.util.Properties
 
 import org.apache.kafka.server.common.MetadataVersion
+import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 
 class KafkaServerTest extends QuorumTestHarness {
 
@@ -136,6 +137,23 @@ class KafkaServerTest extends QuorumTestHarness {
     server.shutdown()
   }
 
+  @Test
+  def testRemoteLogManagerInstantiation(): Unit = {
+    val props = TestUtils.createBrokerConfigs(1, zkConnect).head
+    props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, true.toString)
+    props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP,
+      "org.apache.kafka.server.log.remote.storage.NoOpRemoteLogMetadataManager")
+    props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP,
+      "org.apache.kafka.server.log.remote.storage.NoOpRemoteStorageManager")
+
+    val server = TestUtils.createServer(KafkaConfig.fromProps(props))
+    server.remoteLogManager match {
+      case Some(_) =>
+      case None => fail("RemoteLogManager should be initialized")
+    }
+    server.shutdown()
+  }
+
   def createServer(nodeId: Int, hostName: String, port: Int): KafkaServer = {
     val props = TestUtils.createBrokerConfig(nodeId, zkConnect)
     props.put(KafkaConfig.AdvertisedListenersProp, s"PLAINTEXT://$hostName:$port")
diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
index 1988ad6afca..654ffb5ae8d 100644
--- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
@@ -63,7 +63,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
     assertResponseError(Errors.UNKNOWN_TOPIC_OR_PARTITION, randomBrokerId, replicaRequest)
     assertResponseError(Errors.UNKNOWN_TOPIC_OR_PARTITION, randomBrokerId, debugReplicaRequest)
 
-    val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 2, servers)
+    val partitionToLeader = createTopic(numPartitions = 1, replicationFactor = 2)
     val replicas = zkClient.getReplicasForPartition(partition).toSet
     val leader = partitionToLeader(partition.partition)
     val follower = replicas.find(_ != leader).get
@@ -112,7 +112,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
   def testCurrentEpochValidation(): Unit = {
     val topic = "topic"
     val topicPartition = new TopicPartition(topic, 0)
-    val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers)
+    val partitionToLeader = createTopic(numPartitions = 1, replicationFactor = 3)
     val firstLeaderId = partitionToLeader(topicPartition.partition)
 
     // We need a leader change in order to check epoch fencing since the first epoch is 0 and
@@ -171,7 +171,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
 
   @Test
   def testResponseIncludesLeaderEpoch(): Unit = {
-    val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers)
+    val partitionToLeader = createTopic(numPartitions = 1, replicationFactor = 3)
     val firstLeaderId = partitionToLeader(partition.partition)
 
     TestUtils.generateAndProduceMessages(servers, topic, 9)
@@ -179,6 +179,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
 
     assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, -1))
     assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, -1))
+    assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version = -1))
     assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, -1))
     assertEquals((9L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, -1))
 
@@ -191,6 +192,9 @@ class ListOffsetsRequestTest extends BaseRequestTest {
     val secondLeaderEpoch = TestUtils.findLeaderEpoch(secondLeaderId, partition, servers)
 
     // No changes to written data
+    assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, 0L, -1))
+    assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, -1))
+
     assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, 0L, -1))
     assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, -1))
 
@@ -204,7 +208,7 @@ class ListOffsetsRequestTest extends BaseRequestTest {
 
   @Test
   def testResponseDefaultOffsetAndLeaderEpochForAllVersions(): Unit = {
-    val partitionToLeader = TestUtils.createTopic(zkClient, topic, numPartitions = 1, replicationFactor = 3, servers)
+    val partitionToLeader = createTopic(numPartitions = 1, replicationFactor = 3)
     val firstLeaderId = partitionToLeader(partition.partition)
 
     TestUtils.generateAndProduceMessages(servers, topic, 9)
@@ -214,18 +218,22 @@ class ListOffsetsRequestTest extends BaseRequestTest {
       if (version == 0) {
         assertEquals((-1L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort))
         assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort))
+        assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort))
         assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort))
       } else if (version >= 1 && version <= 3) {
         assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort))
         assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort))
+        assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort))
         assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort))
       } else if (version >= 4  && version <= 6) {
         assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort))
         assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort))
+        assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort))
         assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort))
       } else if (version >= 7) {
         assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, version.toShort))
         assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, version.toShort))
+        assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP, version.toShort))
         assertEquals((10L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, version.toShort))
         assertEquals((9L, 0), fetchOffsetAndEpoch(firstLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, version.toShort))
       }
@@ -245,4 +253,8 @@ class ListOffsetsRequestTest extends BaseRequestTest {
   private def sendRequest(leaderId: Int, request: ListOffsetsRequest): ListOffsetsResponse = {
     connectAndReceive[ListOffsetsResponse](request, destination = brokerSocketServer(leaderId))
   }
+
+  def createTopic(numPartitions: Int, replicationFactor: Int): Map[Int, Int] = {
+    TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor, servers)
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestWithRemoteStoreTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestWithRemoteStoreTest.scala
new file mode 100644
index 00000000000..fa7656932ad
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestWithRemoteStoreTest.scala
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.utils.TestUtils
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.server.log.remote.storage.{NoOpRemoteLogMetadataManager, NoOpRemoteStorageManager, RemoteLogManagerConfig}
+
+import java.util.Properties
+
+class ListOffsetsRequestWithRemoteStoreTest extends ListOffsetsRequestTest {
+
+  override def brokerPropertyOverrides(props: Properties): Unit = {
+    props.put(RemoteLogManagerConfig.REMOTE_LOG_STORAGE_SYSTEM_ENABLE_PROP, "true")
+    props.put(RemoteLogManagerConfig.REMOTE_STORAGE_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteStorageManager].getName)
+    props.put(RemoteLogManagerConfig.REMOTE_LOG_METADATA_MANAGER_CLASS_NAME_PROP, classOf[NoOpRemoteLogMetadataManager].getName)
+  }
+
+  override def createTopic(numPartitions: Int, replicationFactor: Int): Map[Int, Int] = {
+    val props = new Properties()
+    props.put(TopicConfig.REMOTE_LOG_STORAGE_ENABLE_CONFIG, "true")
+    TestUtils.createTopic(zkClient, topic, numPartitions, replicationFactor, servers, props)
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 78f85d8f546..1364e7e5e5e 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -251,16 +251,17 @@ class LogOffsetTest extends BaseRequestTest {
       log.appendAsLeader(TestUtils.singletonRecords(value = Integer.toString(42).getBytes()), leaderEpoch = 0)
     log.flush(false)
 
-    val offsets = log.legacyFetchOffsetsBefore(ListOffsetsRequest.EARLIEST_TIMESTAMP, 10)
+    for (timestamp <- Seq(ListOffsetsRequest.EARLIEST_TIMESTAMP, ListOffsetsRequest.EARLIEST_LOCAL_TIMESTAMP)) {
+      val offsets = log.legacyFetchOffsetsBefore(timestamp, 10)
+      assertEquals(Seq(0L), offsets)
 
-    assertEquals(Seq(0L), offsets)
-
-    TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, broker),
-      "Leader should be elected")
-    val request = ListOffsetsRequest.Builder.forReplica(0, 0)
-      .setTargetTimes(buildTargetTimes(topicPartition, ListOffsetsRequest.EARLIEST_TIMESTAMP, 10).asJava).build()
-    val consumerOffsets = findPartition(sendListOffsetsRequest(request).topics.asScala, topicPartition).oldStyleOffsets.asScala
-    assertEquals(Seq(0L), consumerOffsets)
+      TestUtils.waitUntilTrue(() => TestUtils.isLeaderLocalOnBroker(topic, topicPartition.partition, broker),
+        "Leader should be elected")
+      val request = ListOffsetsRequest.Builder.forReplica(0, 0)
+        .setTargetTimes(buildTargetTimes(topicPartition, timestamp, 10).asJava).build()
+      val consumerOffsets = findPartition(sendListOffsetsRequest(request).topics.asScala, topicPartition).oldStyleOffsets.asScala
+      assertEquals(Seq(0L), consumerOffsets)
+    }
   }
 
   /* We test that `fetchOffsetsBefore` works correctly if `LogSegment.size` changes after each invocation (simulating
diff --git a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
index 1a4a82f6fef..63568fdb3e7 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/LeaderEpochFileCacheTest.scala
@@ -578,4 +578,59 @@ class LeaderEpochFileCacheTest {
     //Then
     cache.truncateFromEnd(7)
   }
+
+  @Test
+  def testFindPreviousEpoch(): Unit = {
+    assertEquals(None, cache.previousEpoch(epoch = 2))
+
+    cache.assign(epoch = 2, startOffset = 10)
+    assertEquals(None, cache.previousEpoch(epoch = 2))
+
+    cache.assign(epoch = 4, startOffset = 15)
+    assertEquals(Some(2), cache.previousEpoch(epoch = 4))
+
+    cache.assign(epoch = 10, startOffset = 20)
+    assertEquals(Some(4), cache.previousEpoch(epoch = 10))
+
+    cache.truncateFromEnd(18)
+    assertEquals(Some(2), cache.previousEpoch(cache.latestEpoch.get))
+  }
+
+  @Test
+  def testFindNextEpoch(): Unit = {
+    cache.assign(epoch = 0, startOffset = 0)
+    cache.assign(epoch = 1, startOffset = 100)
+    cache.assign(epoch = 2, startOffset = 200)
+
+    assertEquals(Some(0), cache.nextEpoch(epoch = -1))
+    assertEquals(Some(1), cache.nextEpoch(epoch = 0))
+    assertEquals(Some(2), cache.nextEpoch(epoch = 1))
+    assertEquals(None, cache.nextEpoch(epoch = 2))
+    assertEquals(None, cache.nextEpoch(epoch = 100))
+  }
+
+  @Test
+  def testGetEpochEntry(): Unit = {
+    cache.assign(epoch = 2, startOffset = 100)
+    cache.assign(epoch = 3, startOffset = 500)
+    cache.assign(epoch = 5, startOffset = 1000)
+
+    assertEquals(EpochEntry(2, 100), cache.epochEntry(2).get)
+    assertEquals(EpochEntry(3, 500), cache.epochEntry(3).get)
+    assertEquals(EpochEntry(5, 1000), cache.epochEntry(5).get)
+  }
+
+  @Test
+  def shouldFetchEpochForGivenOffset(): Unit = {
+    cache.assign(epoch = 0, startOffset = 10)
+    cache.assign(epoch = 1, startOffset = 20)
+    cache.assign(epoch = 5, startOffset = 30)
+
+    assertEquals(Some(1), cache.epochForOffset(offset = 25))
+    assertEquals(Some(1), cache.epochForOffset(offset = 20))
+    assertEquals(Some(5), cache.epochForOffset(offset = 30))
+    assertEquals(Some(5), cache.epochForOffset(offset = 50))
+    assertEquals(None, cache.epochForOffset(offset = 5))
+  }
+
 }
diff --git a/core/src/test/scala/unit/kafka/server/epoch/util/MockBlockingSender.scala b/core/src/test/scala/unit/kafka/server/epoch/util/MockBlockingSender.scala
index ac1d8b57547..4dc598fdce8 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/util/MockBlockingSender.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/util/MockBlockingSender.scala
@@ -20,16 +20,18 @@ import kafka.cluster.BrokerEndPoint
 import kafka.server.BlockingSend
 import org.apache.kafka.clients.{ClientRequest, ClientResponse, MockClient, NetworkClientUtils}
 import org.apache.kafka.common.message.OffsetForLeaderEpochResponseData.{EpochEndOffset, OffsetForLeaderTopicResult}
-import org.apache.kafka.common.message.{FetchResponseData, OffsetForLeaderEpochResponseData}
+import org.apache.kafka.common.message.{FetchResponseData, ListOffsetsResponseData, OffsetForLeaderEpochResponseData}
+import org.apache.kafka.common.message.ListOffsetsResponseData.{ListOffsetsPartitionResponse, ListOffsetsTopicResponse}
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
 import org.apache.kafka.common.requests.AbstractRequest.Builder
-import org.apache.kafka.common.requests.{AbstractRequest, FetchResponse, OffsetsForLeaderEpochResponse, FetchMetadata => JFetchMetadata}
+import org.apache.kafka.common.requests.{AbstractRequest, FetchResponse, ListOffsetsResponse, OffsetsForLeaderEpochResponse, FetchMetadata => JFetchMetadata}
 import org.apache.kafka.common.utils.{SystemTime, Time}
 import org.apache.kafka.common.{Node, TopicIdPartition, TopicPartition, Uuid}
 
 import java.net.SocketTimeoutException
 import java.util
 import scala.collection.Map
+import scala.jdk.CollectionConverters._
 
 /**
   * Stub network client used for testing the ReplicaFetcher, wraps the MockClient used for consumer testing
@@ -47,10 +49,12 @@ class MockBlockingSender(offsets: java.util.Map[TopicPartition, EpochEndOffset],
   private val client = new MockClient(new SystemTime)
   var fetchCount = 0
   var epochFetchCount = 0
+  var listOffsetsCount = 0
   var lastUsedOffsetForLeaderEpochVersion = -1
   var callback: Option[() => Unit] = None
   var currentOffsets: util.Map[TopicPartition, EpochEndOffset] = offsets
   var fetchPartitionData: Map[TopicPartition, FetchResponseData.PartitionData] = Map.empty
+  var listOffsets: Map[TopicPartition, ListOffsetsPartitionResponse] = Map.empty
   var topicIds: Map[String, Uuid] = Map.empty
   private val sourceNode = new Node(sourceBroker.id, sourceBroker.host, sourceBroker.port)
 
@@ -70,6 +74,10 @@ class MockBlockingSender(offsets: java.util.Map[TopicPartition, EpochEndOffset],
     this.topicIds = topicIds
   }
 
+  def setListOffsetsDataForNextResponse(listOffsets: Map[TopicPartition, ListOffsetsPartitionResponse]): Unit = {
+    this.listOffsets = listOffsets
+  }
+
   override def brokerEndPoint(): BrokerEndPoint = sourceBroker
 
   override def sendRequest(requestBuilder: Builder[_ <: AbstractRequest]): ClientResponse = {
@@ -110,6 +118,21 @@ class MockBlockingSender(offsets: java.util.Map[TopicPartition, EpochEndOffset],
           if (partitionData.isEmpty) JFetchMetadata.INVALID_SESSION_ID else 1,
           partitionData)
 
+      case ApiKeys.LIST_OFFSETS =>
+        listOffsetsCount += 1
+        val data = new ListOffsetsResponseData()
+        listOffsets.foreach {
+          case (tp, partitionResponse) =>
+            val topicResponse = data.topics().asScala.find(x => x.name().equals(tp.topic()))
+              .getOrElse {
+                val topicResponse = new ListOffsetsTopicResponse().setName(tp.topic())
+                data.topics().add(topicResponse)
+                topicResponse
+              }
+            topicResponse.partitions.add(partitionResponse)
+        }
+        new ListOffsetsResponse(data)
+
       case _ =>
         throw new UnsupportedOperationException
     }
diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
index 6cbf2438c8c..30b93b4b1c0 100644
--- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
+++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/fetcher/ReplicaFetcherThreadBenchmark.java
@@ -100,6 +100,7 @@ import java.util.Properties;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import scala.Option;
+import scala.Tuple2;
 import scala.collection.Iterator;
 import scala.collection.Map;
 
@@ -330,8 +331,8 @@ public class ReplicaFetcherThreadBenchmark {
                             config::interBrokerProtocolVersion
                     ) {
                         @Override
-                        public long fetchEarliestOffset(TopicPartition topicPartition, int currentLeaderEpoch) {
-                            return 0;
+                        public Tuple2<Object, Object> fetchEarliestOffset(TopicPartition topicPartition, int currentLeaderEpoch) {
+                            return Tuple2.apply(0, 0);
                         }
 
                         @Override
diff --git a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
index 9d0508d876a..27c362e5438 100644
--- a/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
+++ b/metadata/src/test/java/org/apache/kafka/controller/QuorumControllerTest.java
@@ -164,7 +164,7 @@ public class QuorumControllerTest {
         ) {
             controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
                 new BrokerRegistrationRequestData().
-                setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_4_IV0)).
+                setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_4_IV1)).
                 setBrokerId(0).
                 setClusterId(logEnv.clusterId())).get();
             testConfigurationOperations(controlEnv.activeController());
@@ -205,7 +205,7 @@ public class QuorumControllerTest {
         ) {
             controlEnv.activeController().registerBroker(ANONYMOUS_CONTEXT,
                 new BrokerRegistrationRequestData().
-                    setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_4_IV0)).
+                    setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_4_IV1)).
                     setBrokerId(0).
                     setClusterId(logEnv.clusterId())).get();
             testDelayedConfigurationOperations(logEnv, controlEnv.activeController());
@@ -536,7 +536,7 @@ public class QuorumControllerTest {
                     setBrokerId(0).
                     setClusterId(active.clusterId()).
                     setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwBA")).
-                    setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_4_IV0)).
+                    setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_4_IV1)).
                     setListeners(listeners));
             assertEquals(2L, reply.get().epoch());
             CreateTopicsRequestData createTopicsRequestData =
@@ -957,7 +957,7 @@ public class QuorumControllerTest {
                     .setBrokerId(brokerId)
                     .setRack(null)
                     .setClusterId(controller.clusterId())
-                    .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_4_IV0))
+                    .setFeatures(brokerFeatures(MetadataVersion.IBP_3_0_IV1, MetadataVersion.IBP_3_4_IV1))
                     .setIncarnationId(Uuid.fromString("kxAT73dKQsitIedpiPtwB" + brokerId))
                     .setListeners(
                         new ListenerCollection(
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java b/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java
index a1f708e3d67..aaa28a77f6e 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/CheckpointFile.java
@@ -108,14 +108,14 @@ public class CheckpointFile<T> {
         }
     }
 
-    private static class CheckpointReadBuffer<T> {
+    public static class CheckpointReadBuffer<T> {
 
         private final String location;
         private final BufferedReader reader;
         private final int version;
         private final EntryFormatter<T> formatter;
 
-        CheckpointReadBuffer(String location,
+        public CheckpointReadBuffer(String location,
                              BufferedReader reader,
                              int version,
                              EntryFormatter<T> formatter) {
@@ -125,7 +125,7 @@ public class CheckpointFile<T> {
             this.formatter = formatter;
         }
 
-        List<T> read() throws IOException {
+        public List<T> read() throws IOException {
             String line = reader.readLine();
             if (line == null)
                 return Collections.emptyList();
diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
index 15f8fee9bde..27f691cc3fd 100644
--- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
+++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java
@@ -165,7 +165,10 @@ public enum MetadataVersion {
 
     // Adds ZK to KRaft migration support (KIP-866). This includes ZkMigrationRecord, a new version of RegisterBrokerRecord,
     // and updates to a handful of RPCs.
-    IBP_3_4_IV0(8, "3.4", "IV0", true);
+    IBP_3_4_IV0(8, "3.4", "IV0", true),
+
+    // Support for tiered storage (KIP-405)
+    IBP_3_4_IV1(9, "3.4", "IV1", true);
 
     // NOTE: update the default version in @ClusterTest annotation to point to the latest version
     public static final String FEATURE_NAME = "metadata.version";
@@ -285,7 +288,9 @@ public enum MetadataVersion {
     }
 
     public short fetchRequestVersion() {
-        if (this.isAtLeast(IBP_3_1_IV0)) {
+        if (this.isAtLeast(IBP_3_4_IV1)) {
+            return 14;
+        } else if (this.isAtLeast(IBP_3_1_IV0)) {
             return 13;
         } else if (this.isAtLeast(IBP_2_7_IV1)) {
             return 12;
@@ -327,7 +332,9 @@ public enum MetadataVersion {
     }
 
     public short listOffsetRequestVersion() {
-        if (this.isAtLeast(IBP_3_0_IV1)) {
+        if (this.isAtLeast(IBP_3_4_IV1)) {
+            return 8;
+        } else if (this.isAtLeast(IBP_3_0_IV1)) {
             return 7;
         } else if (this.isAtLeast(IBP_2_8_IV0)) {
             return 6;
diff --git a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
index 6c76a82bfa6..a4a08f0cad3 100644
--- a/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
+++ b/server-common/src/test/java/org/apache/kafka/server/common/MetadataVersionTest.java
@@ -23,47 +23,7 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 
-import static org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV0;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_0_10_0_IV1;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV0;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV1;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_0_10_1_IV2;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_0_10_2_IV0;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV0;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV1;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_0_11_0_IV2;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_0_8_0;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_0_8_1;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_0_8_2;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_0_9_0;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_1_0_IV0;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_1_1_IV0;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_2_0_IV0;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_2_0_IV1;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_2_1_IV0;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_2_1_IV1;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_2_1_IV2;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_2_2_IV0;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_2_2_IV1;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_2_3_IV0;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_2_3_IV1;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_2_4_IV0;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_2_4_IV1;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_2_5_IV0;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_2_6_IV0;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV0;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV1;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_2_7_IV2;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_2_8_IV0;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_2_8_IV1;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV0;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_3_0_IV1;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_3_1_IV0;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_3_2_IV0;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV0;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV1;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV2;
-import static org.apache.kafka.server.common.MetadataVersion.IBP_3_3_IV3;
+import static org.apache.kafka.server.common.MetadataVersion.*;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -190,11 +150,13 @@ class MetadataVersionTest {
         assertEquals(IBP_3_2_IV0, MetadataVersion.fromVersionString("3.2"));
         assertEquals(IBP_3_2_IV0, MetadataVersion.fromVersionString("3.2-IV0"));
 
-        assertEquals(IBP_3_3_IV3, MetadataVersion.fromVersionString("3.3"));
         assertEquals(IBP_3_3_IV0, MetadataVersion.fromVersionString("3.3-IV0"));
         assertEquals(IBP_3_3_IV1, MetadataVersion.fromVersionString("3.3-IV1"));
         assertEquals(IBP_3_3_IV2, MetadataVersion.fromVersionString("3.3-IV2"));
         assertEquals(IBP_3_3_IV3, MetadataVersion.fromVersionString("3.3-IV3"));
+
+        assertEquals(IBP_3_4_IV0, MetadataVersion.fromVersionString("3.4-IV0"));
+        assertEquals(IBP_3_4_IV1, MetadataVersion.fromVersionString("3.4-IV1"));
     }
 
     @Test
@@ -242,6 +204,8 @@ class MetadataVersionTest {
         assertEquals("3.3", IBP_3_3_IV1.shortVersion());
         assertEquals("3.3", IBP_3_3_IV2.shortVersion());
         assertEquals("3.3", IBP_3_3_IV3.shortVersion());
+        assertEquals("3.4", IBP_3_4_IV0.shortVersion());
+        assertEquals("3.4", IBP_3_4_IV1.shortVersion());
     }
 
     @Test
@@ -278,6 +242,8 @@ class MetadataVersionTest {
         assertEquals("3.3-IV1", IBP_3_3_IV1.version());
         assertEquals("3.3-IV2", IBP_3_3_IV2.version());
         assertEquals("3.3-IV3", IBP_3_3_IV3.version());
+        assertEquals("3.4-IV0", IBP_3_4_IV0.version());
+        assertEquals("3.4-IV1", IBP_3_4_IV1.version());
     }
 
     @Test
diff --git a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java
new file mode 100644
index 00000000000..900d5bd5c69
--- /dev/null
+++ b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteLogMetadataManager.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+public class NoOpRemoteLogMetadataManager implements RemoteLogMetadataManager {
+    @Override
+    public CompletableFuture<Void> addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
+        return null;
+    }
+
+    @Override
+    public CompletableFuture<Void> updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate) {
+        return null;
+    }
+
+    @Override
+    public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
+                                                                       int epochForOffset,
+                                                                       long offset) {
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<Long> highestOffsetForEpoch(TopicIdPartition topicIdPartition, int leaderEpoch) {
+        return Optional.empty();
+    }
+
+    @Override
+    public CompletableFuture<Void> putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) {
+        return null;
+    }
+
+    @Override
+    public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition) {
+        return Collections.emptyIterator();
+    }
+
+    @Override
+    public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition,
+                                                                    int leaderEpoch) {
+        return Collections.emptyIterator();
+    }
+
+    @Override
+    public void onPartitionLeadershipChanges(Set<TopicIdPartition> leaderPartitions,
+                                             Set<TopicIdPartition> followerPartitions) {
+    }
+
+    @Override
+    public void onStopPartitions(Set<TopicIdPartition> partitions) {
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+    }
+}
diff --git a/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteStorageManager.java b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteStorageManager.java
new file mode 100644
index 00000000000..8a83033aa04
--- /dev/null
+++ b/storage/api/src/test/java/org/apache/kafka/server/log/remote/storage/NoOpRemoteStorageManager.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.storage;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.util.Map;
+
+public class NoOpRemoteStorageManager implements RemoteStorageManager {
+    @Override
+    public void copyLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+                                   LogSegmentData logSegmentData) {
+    }
+
+    @Override
+    public InputStream fetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+                                       int startPosition) {
+        return new ByteArrayInputStream(new byte[0]);
+    }
+
+    @Override
+    public InputStream fetchLogSegment(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+                                       int startPosition,
+                                       int endPosition) {
+        return new ByteArrayInputStream(new byte[0]);
+    }
+
+    @Override
+    public InputStream fetchIndex(RemoteLogSegmentMetadata remoteLogSegmentMetadata,
+                                  IndexType indexType) {
+        return new ByteArrayInputStream(new byte[0]);
+    }
+
+    @Override
+    public void deleteLogSegmentData(RemoteLogSegmentMetadata remoteLogSegmentMetadata) {
+    }
+
+    @Override
+    public void close() {
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+    }
+}
diff --git a/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java
new file mode 100644
index 00000000000..a40e34031d3
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManager.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A wrapper class of {@link RemoteLogMetadataManager} that sets the context class loader when calling the respective
+ * methods.
+ */
+public class ClassLoaderAwareRemoteLogMetadataManager implements RemoteLogMetadataManager {
+    private final RemoteLogMetadataManager delegate;
+    private final ClassLoader loader;
+
+    public ClassLoaderAwareRemoteLogMetadataManager(RemoteLogMetadataManager delegate,
+                                                    ClassLoader loader) {
+        this.delegate = delegate;
+        this.loader = loader;
+    }
+
+    @Override
+    public CompletableFuture<Void> addRemoteLogSegmentMetadata(RemoteLogSegmentMetadata remoteLogSegmentMetadata) throws RemoteStorageException {
+        return withClassLoader(() -> delegate.addRemoteLogSegmentMetadata(remoteLogSegmentMetadata));
+    }
+
+    @Override
+    public CompletableFuture<Void> updateRemoteLogSegmentMetadata(RemoteLogSegmentMetadataUpdate remoteLogSegmentMetadataUpdate) throws RemoteStorageException {
+        return withClassLoader(() -> delegate.updateRemoteLogSegmentMetadata(remoteLogSegmentMetadataUpdate));
+    }
+
+    @Override
+    public Optional<RemoteLogSegmentMetadata> remoteLogSegmentMetadata(TopicIdPartition topicIdPartition,
+                                                                       int epochForOffset,
+                                                                       long offset) throws RemoteStorageException {
+        return withClassLoader(() -> delegate.remoteLogSegmentMetadata(topicIdPartition, epochForOffset, offset));
+    }
+
+    @Override
+    public Optional<Long> highestOffsetForEpoch(TopicIdPartition topicIdPartition,
+                                                int leaderEpoch) throws RemoteStorageException {
+        return withClassLoader(() -> delegate.highestOffsetForEpoch(topicIdPartition, leaderEpoch));
+    }
+
+    @Override
+    public CompletableFuture<Void> putRemotePartitionDeleteMetadata(RemotePartitionDeleteMetadata remotePartitionDeleteMetadata) throws RemoteStorageException {
+        return withClassLoader(() -> delegate.putRemotePartitionDeleteMetadata(remotePartitionDeleteMetadata)
+        );
+    }
+
+    @Override
+    public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition) throws RemoteStorageException {
+        return withClassLoader(() -> delegate.listRemoteLogSegments(topicIdPartition));
+    }
+
+    @Override
+    public Iterator<RemoteLogSegmentMetadata> listRemoteLogSegments(TopicIdPartition topicIdPartition,
+                                                                    int leaderEpoch) throws RemoteStorageException {
+        return withClassLoader(() -> delegate.listRemoteLogSegments(topicIdPartition, leaderEpoch));
+    }
+
+    @Override
+    public void onPartitionLeadershipChanges(Set<TopicIdPartition> leaderPartitions,
+                                             Set<TopicIdPartition> followerPartitions) {
+        withTryCatchClassLoader(() -> {
+            delegate.onPartitionLeadershipChanges(leaderPartitions, followerPartitions);
+            return null;
+        });
+    }
+
+    @Override
+    public void onStopPartitions(Set<TopicIdPartition> partitions) {
+        withTryCatchClassLoader(() -> {
+            delegate.onStopPartitions(partitions);
+            return null;
+        });
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        withTryCatchClassLoader(() -> {
+            delegate.configure(configs);
+            return null;
+        });
+    }
+
+    @Override
+    public void close() throws IOException {
+        ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
+        Thread.currentThread().setContextClassLoader(loader);
+        try {
+            delegate.close();
+        } finally {
+            Thread.currentThread().setContextClassLoader(originalClassLoader);
+        }
+    }
+
+    @SuppressWarnings("UnusedReturnValue")
+    private <T> T withTryCatchClassLoader(Worker<T> worker) {
+        try {
+            return withClassLoader(worker);
+        } catch (final RemoteStorageException ex) {
+            // ignore, this exception is not thrown by the method.
+        }
+        return null;
+    }
+
+    private <T> T withClassLoader(Worker<T> worker) throws RemoteStorageException {
+        ClassLoader originalClassLoader = Thread.currentThread().getContextClassLoader();
+        Thread.currentThread().setContextClassLoader(loader);
+        try {
+            return worker.doWork();
+        } finally {
+            Thread.currentThread().setContextClassLoader(originalClassLoader);
+        }
+    }
+
+    @FunctionalInterface
+    public interface Worker<T> {
+        T doWork() throws RemoteStorageException;
+    }
+}
diff --git a/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManagerTest.java b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManagerTest.java
new file mode 100644
index 00000000000..82b174da3b7
--- /dev/null
+++ b/storage/src/test/java/org/apache/kafka/server/log/remote/metadata/storage/ClassLoaderAwareRemoteLogMetadataManagerTest.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.server.log.remote.storage.RemoteLogMetadataManager;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.junit.jupiter.api.Test;
+
+import java.util.concurrent.CompletableFuture;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+class ClassLoaderAwareRemoteLogMetadataManagerTest {
+
+    @Test
+    public void testWithClassLoader() throws RemoteStorageException {
+        DummyClassLoader dummyClassLoader = new DummyClassLoader();
+        RemoteLogMetadataManager delegate = mock(RemoteLogMetadataManager.class);
+        ClassLoaderAwareRemoteLogMetadataManager rlmm = new ClassLoaderAwareRemoteLogMetadataManager(delegate, dummyClassLoader);
+        when(delegate.addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class))).thenAnswer(metadata -> {
+            assertEquals(dummyClassLoader, Thread.currentThread().getContextClassLoader());
+            return CompletableFuture.completedFuture(null);
+        });
+        assertNotEquals(dummyClassLoader, Thread.currentThread().getContextClassLoader());
+        rlmm.addRemoteLogSegmentMetadata(mock(RemoteLogSegmentMetadata.class));
+        assertNotEquals(dummyClassLoader, Thread.currentThread().getContextClassLoader());
+    }
+
+    private static class DummyClassLoader extends ClassLoader {
+    }
+}
\ No newline at end of file