You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sa...@apache.org on 2023/05/18 01:07:46 UTC

[kafka] branch trunk updated: KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory. (#13535)

This is an automated email from the ASF dual-hosted git repository.

satishd pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6f197301646 KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory. (#13535)
6f197301646 is described below

commit 6f197301646135e0bb39a461ca0a07c09c3185fb
Author: Satish Duggana <sa...@apache.org>
AuthorDate: Thu May 18 06:37:37 2023 +0530

    KAFKA-9579 Fetch implementation for records in the remote storage through a specific purgatory. (#13535)
    
    This change includes
    - Recognize the fetch requests with out of range local log offsets
    - Add fetch implementation for the data lying in the range of [logStartOffset, localLogStartOffset]
    - Add a new purgatory for async remote read requests which are served through a specific thread pool
    
    We have an extended version of remote fetch that can fetch from multiple remote partitions in parallel, which we will raise as a followup PR.
    
    A few tests for the newly introduced changes are added in this PR. There are some tests available for these scenarios in 2.8.x, refactoring with the trunk changes, will add them in followup PRs.
    
    Other contributors:
    Kamal Chandraprakash <kc...@uber.com> - Further improvements and adding a few tests
    Luke Chen <sh...@gmail.com> - Added a few test cases for these changes.
    
    PS: This functionality is pulled out from internal branches with other functionalities related to the feature in 2.8.x. The reason for not pulling all the changes as it makes the PR huge, and burdensome to review and it also needs other metrics, minor enhancements(including perf), and minor changes done for tests. So, we will try to have followup PRs to cover all those.
    
    Reviewers: Jun Rao <ju...@gmail.com>, Alexandre Dupriez <al...@gmail.com>, Divij Vaidya <di...@amazon.com>,  Jorge Esteban Quilcate Otoya <qu...@gmail.com>
---
 checkstyle/suppressions.xml                        |   1 +
 .../java/kafka/log/remote/RemoteLogManager.java    | 298 ++++++++++++++++++---
 .../java/kafka/log/remote/RemoteLogReader.java     |  70 +++++
 .../server/builders/ReplicaManagerBuilder.java     |   8 +
 core/src/main/scala/kafka/cluster/Partition.scala  |   2 +-
 .../src/main/scala/kafka/server/BrokerServer.scala |   1 +
 .../src/main/scala/kafka/server/DelayedFetch.scala |   2 +-
 .../scala/kafka/server/DelayedRemoteFetch.scala    | 116 ++++++++
 core/src/main/scala/kafka/server/KafkaServer.scala |   1 +
 .../main/scala/kafka/server/ReplicaManager.scala   | 259 ++++++++++++++----
 .../kafka/log/remote/RemoteLogManagerTest.java     |   8 +-
 .../java/kafka/log/remote/RemoteLogReaderTest.java |  82 ++++++
 .../kafka/server/DelayedFetchTest.scala            |   2 +-
 .../kafka/server/DelayedRemoteFetchTest.scala      | 175 ++++++++++++
 .../unit/kafka/server/ListOffsetsRequestTest.scala |  15 +-
 .../kafka/server/ReplicaManagerQuotasTest.scala    |  10 +-
 .../kafka/storage/internals/log/FetchDataInfo.java |  10 +
 .../storage/internals/log/RemoteLogReadResult.java |  30 +++
 .../internals/log/RemoteStorageFetchInfo.java      |  53 ++++
 .../internals/log/RemoteStorageThreadPool.java     |  73 +++++
 20 files changed, 1107 insertions(+), 109 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 9cbb54b5cf3..2084490d187 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -39,6 +39,7 @@
     <suppress checks="(NPathComplexity|ClassFanOutComplexity|CyclomaticComplexity|ClassDataAbstractionCoupling|FinalLocalVariable|LocalVariableName|MemberName|ParameterName|MethodLength|JavaNCSS|AvoidStarImport)"
               files="core[\\/]src[\\/](generated|generated-test)[\\/].+.java$"/>
     <suppress checks="NPathComplexity" files="(ClusterTestExtensions|KafkaApisBuilder).java"/>
+    <suppress checks="NPathComplexity|ClassFanOutComplexity" files="RemoteLogManager.java"/>
     <suppress checks="MethodLength"
               files="(KafkaClusterTestKit).java"/>
 
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogManager.java b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
index 1048cd5020f..4c07451236f 100644
--- a/core/src/main/java/kafka/log/remote/RemoteLogManager.java
+++ b/core/src/main/java/kafka/log/remote/RemoteLogManager.java
@@ -24,10 +24,14 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.TopicIdPartition;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.message.FetchResponseData;
 import org.apache.kafka.common.record.FileRecords;
+import org.apache.kafka.common.record.MemoryRecords;
 import org.apache.kafka.common.record.Record;
 import org.apache.kafka.common.record.RecordBatch;
 import org.apache.kafka.common.record.RemoteLogInputStream;
+import org.apache.kafka.common.requests.FetchRequest;
 import org.apache.kafka.common.utils.ChildFirstClassLoader;
 import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.LogContext;
@@ -46,7 +50,18 @@ import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
 import org.apache.kafka.server.log.remote.storage.RemoteStorageManager;
 import org.apache.kafka.storage.internals.checkpoint.InMemoryLeaderEpochCheckpoint;
 import org.apache.kafka.storage.internals.epoch.LeaderEpochFileCache;
+import org.apache.kafka.storage.internals.log.AbortedTxn;
 import org.apache.kafka.storage.internals.log.EpochEntry;
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
+import org.apache.kafka.storage.internals.log.FetchIsolation;
+import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
+import org.apache.kafka.storage.internals.log.OffsetIndex;
+import org.apache.kafka.storage.internals.log.OffsetPosition;
+import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
+import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
+import org.apache.kafka.storage.internals.log.RemoteStorageThreadPool;
+import org.apache.kafka.storage.internals.log.TransactionIndex;
+import org.apache.kafka.storage.internals.log.TxnIndexSearchResult;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
@@ -65,6 +80,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.ListIterator;
@@ -76,6 +92,7 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Future;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -97,6 +114,7 @@ import java.util.stream.Stream;
 public class RemoteLogManager implements Closeable {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(RemoteLogManager.class);
+    private static final String REMOTE_LOG_READER_THREAD_NAME_PREFIX = "remote-log-reader";
 
     private final RemoteLogManagerConfig rlmConfig;
     private final int brokerId;
@@ -109,7 +127,7 @@ public class RemoteLogManager implements Closeable {
     private final RemoteLogMetadataManager remoteLogMetadataManager;
 
     private final RemoteIndexCache indexCache;
-
+    private final RemoteStorageThreadPool remoteStorageReaderThreadPool;
     private final RLMScheduledThreadPool rlmScheduledThreadPool;
 
     private final long delayInMs;
@@ -147,6 +165,11 @@ public class RemoteLogManager implements Closeable {
         indexCache = new RemoteIndexCache(1024, remoteLogStorageManager, logDir);
         delayInMs = rlmConfig.remoteLogManagerTaskIntervalMs();
         rlmScheduledThreadPool = new RLMScheduledThreadPool(rlmConfig.remoteLogManagerThreadPoolSize());
+        remoteStorageReaderThreadPool = new RemoteStorageThreadPool(
+                REMOTE_LOG_READER_THREAD_NAME_PREFIX,
+                rlmConfig.remoteLogReaderThreads(),
+                rlmConfig.remoteLogReaderMaxPendingTasks()
+        );
     }
 
     private <T> T createDelegate(ClassLoader classLoader, String className) {
@@ -447,7 +470,7 @@ public class RemoteLogManager implements Closeable {
             leaderEpoch = -1;
         }
 
-        private void maybeUpdateReadOffset() throws RemoteStorageException {
+        private void maybeUpdateReadOffset(UnifiedLog log) throws RemoteStorageException {
             if (!copiedOffsetOption.isPresent()) {
                 logger.info("Find the highest remote offset for partition: {} after becoming leader, leaderEpoch: {}", topicIdPartition, leaderEpoch);
 
@@ -455,23 +478,17 @@ public class RemoteLogManager implements Closeable {
                 // of a segment with that epoch copied into remote storage. If it can not find an entry then it checks for the
                 // previous leader epoch till it finds an entry, If there are no entries till the earliest leader epoch in leader
                 // epoch cache then it starts copying the segments from the earliest epoch entry's offset.
-                copiedOffsetOption = OptionalLong.of(findHighestRemoteOffset(topicIdPartition));
+                copiedOffsetOption = OptionalLong.of(findHighestRemoteOffset(topicIdPartition, log));
             }
         }
 
-        public void copyLogSegmentsToRemote() throws InterruptedException {
+        public void copyLogSegmentsToRemote(UnifiedLog log) throws InterruptedException {
             if (isCancelled())
                 return;
 
             try {
-                maybeUpdateReadOffset();
+                maybeUpdateReadOffset(log);
                 long copiedOffset = copiedOffsetOption.getAsLong();
-                Optional<UnifiedLog> maybeLog = fetchLog.apply(topicIdPartition.topicPartition());
-                if (!maybeLog.isPresent()) {
-                    return;
-                }
-
-                UnifiedLog log = maybeLog.get();
 
                 // LSO indicates the offset below are ready to be consumed (high-watermark or committed)
                 long lso = log.lastStableOffset();
@@ -578,9 +595,15 @@ public class RemoteLogManager implements Closeable {
                 return;
 
             try {
+                Optional<UnifiedLog> unifiedLogOptional = fetchLog.apply(topicIdPartition.topicPartition());
+
+                if (!unifiedLogOptional.isPresent()) {
+                    return;
+                }
+
                 if (isLeader()) {
                     // Copy log segments to remote storage
-                    copyLogSegmentsToRemote();
+                    copyLogSegmentsToRemote(unifiedLogOptional.get());
                 }
             } catch (InterruptedException ex) {
                 if (!isCancelled()) {
@@ -600,25 +623,212 @@ public class RemoteLogManager implements Closeable {
         }
     }
 
-    long findHighestRemoteOffset(TopicIdPartition topicIdPartition) throws RemoteStorageException {
-        Optional<Long> offset = Optional.empty();
-        Optional<UnifiedLog> maybeLog = fetchLog.apply(topicIdPartition.topicPartition());
-        if (maybeLog.isPresent()) {
-            UnifiedLog log = maybeLog.get();
-            Option<LeaderEpochFileCache> maybeLeaderEpochFileCache = log.leaderEpochCache();
-            if (maybeLeaderEpochFileCache.isDefined()) {
-                LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get();
-                OptionalInt epoch = cache.latestEpoch();
-                while (!offset.isPresent() && epoch.isPresent()) {
-                    offset = remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt());
-                    epoch = cache.previousEpoch(epoch.getAsInt());
+    public FetchDataInfo read(RemoteStorageFetchInfo remoteStorageFetchInfo) throws RemoteStorageException, IOException {
+        int fetchMaxBytes = remoteStorageFetchInfo.fetchMaxBytes;
+        TopicPartition tp = remoteStorageFetchInfo.topicPartition;
+        FetchRequest.PartitionData fetchInfo = remoteStorageFetchInfo.fetchInfo;
+
+        boolean includeAbortedTxns = remoteStorageFetchInfo.fetchIsolation == FetchIsolation.TXN_COMMITTED;
+
+        long offset = fetchInfo.fetchOffset;
+        int maxBytes = Math.min(fetchMaxBytes, fetchInfo.maxBytes);
+
+        Optional<UnifiedLog> logOptional = fetchLog.apply(tp);
+        OptionalInt epoch = OptionalInt.empty();
+
+        if (logOptional.isPresent()) {
+            Option<LeaderEpochFileCache> leaderEpochCache = logOptional.get().leaderEpochCache();
+            if (leaderEpochCache.isDefined()) {
+                epoch = leaderEpochCache.get().epochForOffset(offset);
+            }
+        }
+
+        Optional<RemoteLogSegmentMetadata> rlsMetadataOptional = epoch.isPresent()
+                ? fetchRemoteLogSegmentMetadata(tp, epoch.getAsInt(), offset)
+                : Optional.empty();
+
+        if (!rlsMetadataOptional.isPresent()) {
+            String epochStr = (epoch.isPresent()) ? Integer.toString(epoch.getAsInt()) : "NOT AVAILABLE";
+            throw new OffsetOutOfRangeException("Received request for offset " + offset + " for leader epoch "
+                    + epochStr + " and partition " + tp + " which does not exist in remote tier.");
+        }
+
+        RemoteLogSegmentMetadata remoteLogSegmentMetadata = rlsMetadataOptional.get();
+        int startPos = lookupPositionForOffset(remoteLogSegmentMetadata, offset);
+        InputStream remoteSegInputStream = null;
+        try {
+            // Search forward for the position of the last offset that is greater than or equal to the target offset
+            remoteSegInputStream = remoteLogStorageManager.fetchLogSegment(remoteLogSegmentMetadata, startPos);
+            RemoteLogInputStream remoteLogInputStream = new RemoteLogInputStream(remoteSegInputStream);
+
+            RecordBatch firstBatch = findFirstBatch(remoteLogInputStream, offset);
+
+            if (firstBatch == null)
+                return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false,
+                        includeAbortedTxns ? Optional.of(Collections.emptyList()) : Optional.empty());
+
+            int firstBatchSize = firstBatch.sizeInBytes();
+            // An empty record is sent instead of an incomplete batch when
+            //  - there is no minimum-one-message constraint and
+            //  - the first batch size is more than maximum bytes that can be sent and
+            //  - for FetchRequest version 3 or above.
+            if (!remoteStorageFetchInfo.minOneMessage &&
+                    !remoteStorageFetchInfo.hardMaxBytesLimit &&
+                    firstBatchSize > maxBytes) {
+                return new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY);
+            }
+
+            int updatedFetchSize =
+                    remoteStorageFetchInfo.minOneMessage && firstBatchSize > maxBytes ? firstBatchSize : maxBytes;
+
+            ByteBuffer buffer = ByteBuffer.allocate(updatedFetchSize);
+            int remainingBytes = updatedFetchSize;
+
+            firstBatch.writeTo(buffer);
+            remainingBytes -= firstBatchSize;
+
+            if (remainingBytes > 0) {
+                // read the input stream until min of (EOF stream or buffer's remaining capacity).
+                Utils.readFully(remoteSegInputStream, buffer);
+            }
+            buffer.flip();
+
+            FetchDataInfo fetchDataInfo = new FetchDataInfo(
+                    new LogOffsetMetadata(offset, remoteLogSegmentMetadata.startOffset(), startPos),
+                    MemoryRecords.readableRecords(buffer));
+            if (includeAbortedTxns) {
+                fetchDataInfo = addAbortedTransactions(firstBatch.baseOffset(), remoteLogSegmentMetadata, fetchDataInfo, logOptional.get());
+            }
+
+            return fetchDataInfo;
+        } finally {
+            Utils.closeQuietly(remoteSegInputStream, "RemoteLogSegmentInputStream");
+        }
+    }
+
+    private int lookupPositionForOffset(RemoteLogSegmentMetadata remoteLogSegmentMetadata, long offset) {
+        return indexCache.lookupOffset(remoteLogSegmentMetadata, offset);
+    }
+
+    private FetchDataInfo addAbortedTransactions(long startOffset,
+                                                 RemoteLogSegmentMetadata segmentMetadata,
+                                                 FetchDataInfo fetchInfo,
+                                                 UnifiedLog log) throws RemoteStorageException {
+        int fetchSize = fetchInfo.records.sizeInBytes();
+        OffsetPosition startOffsetPosition = new OffsetPosition(fetchInfo.fetchOffsetMetadata.messageOffset,
+                fetchInfo.fetchOffsetMetadata.relativePositionInSegment);
+
+        OffsetIndex offsetIndex = indexCache.getIndexEntry(segmentMetadata).offsetIndex();
+        long upperBoundOffset = offsetIndex.fetchUpperBoundOffset(startOffsetPosition, fetchSize)
+                .map(x -> x.offset).orElse(segmentMetadata.endOffset() + 1);
+
+        final Set<FetchResponseData.AbortedTransaction> abortedTransactions = new HashSet<>();
+
+        Consumer<List<AbortedTxn>> accumulator =
+                abortedTxns -> abortedTransactions.addAll(abortedTxns.stream()
+                        .map(AbortedTxn::asAbortedTransaction).collect(Collectors.toList()));
+
+        collectAbortedTransactions(startOffset, upperBoundOffset, segmentMetadata, accumulator, log);
+
+        return new FetchDataInfo(fetchInfo.fetchOffsetMetadata,
+                fetchInfo.records,
+                fetchInfo.firstEntryIncomplete,
+                Optional.of(abortedTransactions.isEmpty() ? Collections.emptyList() : new ArrayList<>(abortedTransactions)));
+    }
+
+    private void collectAbortedTransactions(long startOffset,
+                                            long upperBoundOffset,
+                                            RemoteLogSegmentMetadata segmentMetadata,
+                                            Consumer<List<AbortedTxn>> accumulator,
+                                            UnifiedLog log) throws RemoteStorageException {
+        // Search in remote segments first.
+        Optional<RemoteLogSegmentMetadata> nextSegmentMetadataOpt = Optional.of(segmentMetadata);
+        while (nextSegmentMetadataOpt.isPresent()) {
+            Optional<TransactionIndex> txnIndexOpt = nextSegmentMetadataOpt.map(metadata -> indexCache.getIndexEntry(metadata).txnIndex());
+            if (txnIndexOpt.isPresent()) {
+                TxnIndexSearchResult searchResult = txnIndexOpt.get().collectAbortedTxns(startOffset, upperBoundOffset);
+                accumulator.accept(searchResult.abortedTransactions);
+                if (searchResult.isComplete) {
+                    // Return immediately when the search result is complete, it does not need to go through local log segments.
+                    return;
                 }
             }
+
+            nextSegmentMetadataOpt = findNextSegmentMetadata(nextSegmentMetadataOpt.get(), log.leaderEpochCache());
+        }
+
+        // Search in local segments
+        collectAbortedTransactionInLocalSegments(startOffset, upperBoundOffset, accumulator, JavaConverters.asJavaIterator(log.logSegments().iterator()));
+    }
+
+    private void collectAbortedTransactionInLocalSegments(long startOffset,
+                                                          long upperBoundOffset,
+                                                          Consumer<List<AbortedTxn>> accumulator,
+                                                          Iterator<LogSegment> localLogSegments) {
+        while (localLogSegments.hasNext()) {
+            TransactionIndex txnIndex = localLogSegments.next().txnIndex();
+            if (txnIndex != null) {
+                TxnIndexSearchResult searchResult = txnIndex.collectAbortedTxns(startOffset, upperBoundOffset);
+                accumulator.accept(searchResult.abortedTransactions);
+                if (searchResult.isComplete) {
+                    return;
+                }
+            }
+        }
+    }
+
+    private Optional<RemoteLogSegmentMetadata> findNextSegmentMetadata(RemoteLogSegmentMetadata segmentMetadata,
+                                                                       Option<LeaderEpochFileCache> leaderEpochFileCacheOption) throws RemoteStorageException {
+        if (leaderEpochFileCacheOption.isEmpty()) {
+            return Optional.empty();
+        }
+
+        long nextSegmentBaseOffset = segmentMetadata.endOffset() + 1;
+        OptionalInt epoch = leaderEpochFileCacheOption.get().epochForOffset(nextSegmentBaseOffset);
+        return epoch.isPresent()
+                ? fetchRemoteLogSegmentMetadata(segmentMetadata.topicIdPartition().topicPartition(), epoch.getAsInt(), nextSegmentBaseOffset)
+                : Optional.empty();
+    }
+
+    private RecordBatch findFirstBatch(RemoteLogInputStream remoteLogInputStream, long offset) throws IOException {
+        RecordBatch nextBatch = null;
+        // Look for the batch which has the desired offset
+        // We will always have a batch in that segment as it is a non-compacted topic.
+        do {
+            nextBatch = remoteLogInputStream.nextBatch();
+        } while (nextBatch != null && nextBatch.lastOffset() < offset);
+
+        return nextBatch;
+    }
+
+    long findHighestRemoteOffset(TopicIdPartition topicIdPartition, UnifiedLog log) throws RemoteStorageException {
+        Optional<Long> offset = Optional.empty();
+
+        Option<LeaderEpochFileCache> maybeLeaderEpochFileCache = log.leaderEpochCache();
+        if (maybeLeaderEpochFileCache.isDefined()) {
+            LeaderEpochFileCache cache = maybeLeaderEpochFileCache.get();
+            OptionalInt epoch = cache.latestEpoch();
+            while (!offset.isPresent() && epoch.isPresent()) {
+                offset = remoteLogMetadataManager.highestOffsetForEpoch(topicIdPartition, epoch.getAsInt());
+                epoch = cache.previousEpoch(epoch.getAsInt());
+            }
         }
 
         return offset.orElse(-1L);
     }
 
+    /**
+     * Submit a remote log read task.
+     *
+     * This method returns immediately. The read operation is executed in a thread pool.
+     * The callback will be called when the task is done.
+     *
+     * @throws java.util.concurrent.RejectedExecutionException if the task cannot be accepted for execution (task queue is full)
+     */
+    public Future<Void> asyncRead(RemoteStorageFetchInfo fetchInfo, Consumer<RemoteLogReadResult> callback) {
+        return remoteStorageReaderThreadPool.submit(new RemoteLogReader(fetchInfo, this, callback));
+    }
+
     void doHandleLeaderOrFollowerPartitions(TopicIdPartition topicPartition,
                                             Consumer<RLMTask> convertToLeaderOrFollower) {
         RLMTaskWithFuture rlmTaskWithFuture = leaderOrFollowerTasks.computeIfAbsent(topicPartition,
@@ -665,17 +875,40 @@ public class RemoteLogManager implements Closeable {
                 Utils.closeQuietly(remoteLogStorageManager, "RemoteLogStorageManager");
                 Utils.closeQuietly(remoteLogMetadataManager, "RemoteLogMetadataManager");
                 Utils.closeQuietly(indexCache, "RemoteIndexCache");
-                try {
-                    rlmScheduledThreadPool.shutdown();
-                } catch (InterruptedException e) {
-                    // ignore
-                }
+
+                rlmScheduledThreadPool.close();
+                shutdownAndAwaitTermination(remoteStorageReaderThreadPool, "RemoteStorageReaderThreadPool", 10, TimeUnit.SECONDS);
+
                 leaderOrFollowerTasks.clear();
                 closed = true;
             }
         }
     }
 
+    private static void shutdownAndAwaitTermination(ExecutorService pool, String poolName, long timeout, TimeUnit timeUnit) {
+        // This pattern of shutting down thread pool is adopted from here: https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/util/concurrent/ExecutorService.html
+        LOGGER.info("Shutting down of thread pool {} is started", poolName);
+        pool.shutdown(); // Disable new tasks from being submitted
+        try {
+            // Wait a while for existing tasks to terminate
+            if (!pool.awaitTermination(timeout, timeUnit)) {
+                LOGGER.info("Shutting down of thread pool {} could not be completed. It will retry cancelling the tasks using shutdownNow.", poolName);
+                pool.shutdownNow(); // Cancel currently executing tasks
+                // Wait a while for tasks to respond to being cancelled
+                if (!pool.awaitTermination(timeout, timeUnit))
+                    LOGGER.warn("Shutting down of thread pool {} could not be completed even after retrying cancellation of the tasks using shutdownNow.", poolName);
+            }
+        } catch (InterruptedException ex) {
+            // (Re-)Cancel if current thread also interrupted
+            LOGGER.warn("Encountered InterruptedException while shutting down thread pool {}. It will retry cancelling the tasks using shutdownNow.", poolName);
+            pool.shutdownNow();
+            // Preserve interrupt status
+            Thread.currentThread().interrupt();
+        }
+
+        LOGGER.info("Shutting down of thread pool {} is completed", poolName);
+    }
+
     static class RLMScheduledThreadPool {
 
         private static final Logger LOGGER = LoggerFactory.getLogger(RLMScheduledThreadPool.class);
@@ -708,11 +941,8 @@ public class RemoteLogManager implements Closeable {
             return scheduledThreadPool.scheduleWithFixedDelay(runnable, initialDelay, delay, timeUnit);
         }
 
-        public boolean shutdown() throws InterruptedException {
-            LOGGER.info("Shutting down scheduled thread pool");
-            scheduledThreadPool.shutdownNow();
-            //waits for 2 mins to terminate the current tasks
-            return scheduledThreadPool.awaitTermination(2, TimeUnit.MINUTES);
+        public void close() {
+            shutdownAndAwaitTermination(scheduledThreadPool, "RLMScheduledThreadPool", 10, TimeUnit.SECONDS);
         }
     }
 
diff --git a/core/src/main/java/kafka/log/remote/RemoteLogReader.java b/core/src/main/java/kafka/log/remote/RemoteLogReader.java
new file mode 100644
index 00000000000..0ed7f722d5b
--- /dev/null
+++ b/core/src/main/java/kafka/log/remote/RemoteLogReader.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
+import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
+import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
+import org.slf4j.Logger;
+
+import java.util.Optional;
+import java.util.concurrent.Callable;
+import java.util.function.Consumer;
+
+public class RemoteLogReader implements Callable<Void> {
+    private final Logger logger;
+    private final RemoteStorageFetchInfo fetchInfo;
+    private final RemoteLogManager rlm;
+    private final Consumer<RemoteLogReadResult> callback;
+
+    public RemoteLogReader(RemoteStorageFetchInfo fetchInfo,
+                           RemoteLogManager rlm,
+                           Consumer<RemoteLogReadResult> callback) {
+        this.fetchInfo = fetchInfo;
+        this.rlm = rlm;
+        this.callback = callback;
+        logger = new LogContext() {
+            @Override
+            public String logPrefix() {
+                return "[" + Thread.currentThread().getName() + "]";
+            }
+        }.logger(RemoteLogReader.class);
+    }
+
+    @Override
+    public Void call() {
+        RemoteLogReadResult result;
+        try {
+            logger.debug("Reading records from remote storage for topic partition {}", fetchInfo.topicPartition);
+
+            FetchDataInfo fetchDataInfo = rlm.read(fetchInfo);
+            result = new RemoteLogReadResult(Optional.of(fetchDataInfo), Optional.empty());
+        } catch (OffsetOutOfRangeException e) {
+            result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
+        } catch (Exception e) {
+            logger.error("Error occurred while reading the remote data for {}", fetchInfo.topicPartition, e);
+            result = new RemoteLogReadResult(Optional.empty(), Optional.of(e));
+        }
+
+        logger.debug("Finished reading records from remote storage for topic partition {}", fetchInfo.topicPartition);
+        callback.accept(result);
+
+        return null;
+    }
+}
diff --git a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
index 93d6f4ff3f3..5860aa17693 100644
--- a/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/ReplicaManagerBuilder.java
@@ -26,6 +26,7 @@ import kafka.server.DelayedElectLeader;
 import kafka.server.DelayedFetch;
 import kafka.server.DelayedOperationPurgatory;
 import kafka.server.DelayedProduce;
+import kafka.server.DelayedRemoteFetch;
 import kafka.server.KafkaConfig;
 import kafka.server.MetadataCache;
 import kafka.server.QuotaFactory.QuotaManagers;
@@ -61,6 +62,7 @@ public class ReplicaManagerBuilder {
     private Optional<DelayedOperationPurgatory<DelayedFetch>> delayedFetchPurgatory = Optional.empty();
     private Optional<DelayedOperationPurgatory<DelayedDeleteRecords>> delayedDeleteRecordsPurgatory = Optional.empty();
     private Optional<DelayedOperationPurgatory<DelayedElectLeader>> delayedElectLeaderPurgatory = Optional.empty();
+    private Optional<DelayedOperationPurgatory<DelayedRemoteFetch>> delayedRemoteFetchPurgatory = Optional.empty();
     private Optional<String> threadNamePrefix = Optional.empty();
     private Long brokerEpoch = -1L;
     private Optional<AddPartitionsToTxnManager> addPartitionsToTxnManager = Optional.empty();
@@ -140,6 +142,11 @@ public class ReplicaManagerBuilder {
         return this;
     }
 
+    public ReplicaManagerBuilder setDelayedRemoteFetchPurgatory(DelayedOperationPurgatory<DelayedRemoteFetch> delayedRemoteFetchPurgatory) {
+        this.delayedRemoteFetchPurgatory = Optional.of(delayedRemoteFetchPurgatory);
+        return this;
+    }
+
     public ReplicaManagerBuilder setDelayedDeleteRecordsPurgatory(DelayedOperationPurgatory<DelayedDeleteRecords> delayedDeleteRecordsPurgatory) {
         this.delayedDeleteRecordsPurgatory = Optional.of(delayedDeleteRecordsPurgatory);
         return this;
@@ -189,6 +196,7 @@ public class ReplicaManagerBuilder {
                              OptionConverters.toScala(delayedFetchPurgatory),
                              OptionConverters.toScala(delayedDeleteRecordsPurgatory),
                              OptionConverters.toScala(delayedElectLeaderPurgatory),
+                             OptionConverters.toScala(delayedRemoteFetchPurgatory),
                              OptionConverters.toScala(threadNamePrefix),
                              () -> brokerEpoch,
                              OptionConverters.toScala(addPartitionsToTxnManager));
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 0f37d292385..413373d65b0 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -526,7 +526,7 @@ class Partition(val topicPartition: TopicPartition,
     leaderReplicaIdOpt.filter(_ == localBrokerId)
   }
 
-  private def localLogWithEpochOrThrow(
+  def localLogWithEpochOrThrow(
     currentLeaderEpoch: Optional[Integer],
     requireLeader: Boolean
   ): UnifiedLog = {
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index 9452f0afd93..d48a7da30aa 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -270,6 +270,7 @@ class BrokerServer(
         isShuttingDown = isShuttingDown,
         zkClient = None,
         threadNamePrefix = None, // The ReplicaManager only runs on the broker, and already includes the ID in thread names.
+        delayedRemoteFetchPurgatoryParam = None,
         brokerEpochSupplier = () => lifecycleManager.brokerEpoch,
         addPartitionsToTxnManager = Some(addPartitionsToTxnManager)
       )
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index 423b7bcd223..9ce6082e76c 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -163,7 +163,7 @@ class DelayedFetch(
       tp -> status.fetchInfo
     }
 
-    val logReadResults = replicaManager.readFromLocalLog(
+    val logReadResults = replicaManager.readFromLog(
       params,
       fetchInfos,
       quota,
diff --git a/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
new file mode 100644
index 00000000000..5fcf851fe83
--- /dev/null
+++ b/core/src/main/scala/kafka/server/DelayedRemoteFetch.scala
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import org.apache.kafka.common.TopicIdPartition
+import org.apache.kafka.common.errors._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.storage.internals.log.{FetchParams, FetchPartitionData, LogOffsetMetadata, RemoteLogReadResult, RemoteStorageFetchInfo}
+
+import java.util.concurrent.{CompletableFuture, Future}
+import java.util.{Optional, OptionalInt, OptionalLong}
+import scala.collection._
+
+/**
+ * A remote fetch operation that can be created by the replica manager and watched
+ * in the remote fetch operation purgatory
+ */
+class DelayedRemoteFetch(remoteFetchTask: Future[Void],
+                         remoteFetchResult: CompletableFuture[RemoteLogReadResult],
+                         remoteFetchInfo: RemoteStorageFetchInfo,
+                         fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)],
+                         fetchParams: FetchParams,
+                         localReadResults: Seq[(TopicIdPartition, LogReadResult)],
+                         replicaManager: ReplicaManager,
+                         responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit)
+  extends DelayedOperation(fetchParams.maxWaitMs) {
+
+  /**
+   * The operation can be completed if:
+   *
+   * Case a: This broker is no longer the leader of the partition it tries to fetch
+   * Case b: This broker does not know the partition it tries to fetch
+   * Case c: The remote storage read request completed (succeeded or failed)
+   * Case d: The partition is in an offline log directory on this broker
+   *
+   * Upon completion, should return whatever data is available for each valid partition
+   */
+  override def tryComplete(): Boolean = {
+    fetchPartitionStatus.foreach {
+      case (topicPartition, fetchStatus) =>
+        val fetchOffset = fetchStatus.startOffsetMetadata
+        try {
+          if (fetchOffset != LogOffsetMetadata.UNKNOWN_OFFSET_METADATA) {
+            replicaManager.getPartitionOrException(topicPartition.topicPartition())
+          }
+        } catch {
+          case _: KafkaStorageException => // Case d
+            debug(s"Partition $topicPartition is in an offline log directory, satisfy $fetchParams immediately")
+            return forceComplete()
+          case _: UnknownTopicOrPartitionException => // Case b
+            debug(s"Broker no longer knows of partition $topicPartition, satisfy $fetchParams immediately")
+            return forceComplete()
+          case _: NotLeaderOrFollowerException =>  // Case a
+            debug("Broker is no longer the leader or follower of %s, satisfy %s immediately".format(topicPartition, fetchParams))
+            return forceComplete()
+        }
+    }
+    if (remoteFetchResult.isDone) // Case c
+      forceComplete()
+    else
+      false
+  }
+
+  override def onExpiration(): Unit = {
+    // cancel the remote storage read task, if it has not been executed yet
+    val cancelled = remoteFetchTask.cancel(true)
+    if (!cancelled) debug(s"Remote fetch task for for RemoteStorageFetchInfo: $remoteFetchInfo could not be cancelled and its isDone value is ${remoteFetchTask.isDone}")
+  }
+
+  /**
+   * Upon completion, read whatever data is available and pass to the complete callback
+   */
+  override def onComplete(): Unit = {
+    val fetchPartitionData = localReadResults.map { case (tp, result) =>
+      if (tp.topicPartition().equals(remoteFetchInfo.topicPartition)
+        && remoteFetchResult.isDone
+        && result.error == Errors.NONE
+        && result.info.delayedRemoteStorageFetch.isPresent) {
+        if (remoteFetchResult.get.error.isPresent) {
+          tp -> ReplicaManager.createLogReadResult(remoteFetchResult.get.error.get).toFetchPartitionData(false)
+        } else {
+          val info = remoteFetchResult.get.fetchDataInfo.get
+          tp -> new FetchPartitionData(
+            result.error,
+            result.highWatermark,
+            result.leaderLogStartOffset,
+            info.records,
+            Optional.empty(),
+            if (result.lastStableOffset.isDefined) OptionalLong.of(result.lastStableOffset.get) else OptionalLong.empty(),
+            info.abortedTransactions,
+            if (result.preferredReadReplica.isDefined) OptionalInt.of(result.preferredReadReplica.get) else OptionalInt.empty(),
+            false)
+        }
+      } else {
+        tp -> result.toFetchPartitionData(false)
+      }
+    }
+
+    responseCallback(fetchPartitionData)
+  }
+}
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 7a3d2f225b6..261659747ef 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -629,6 +629,7 @@ class KafkaServer(
       brokerTopicStats = brokerTopicStats,
       isShuttingDown = isShuttingDown,
       zkClient = Some(zkClient),
+      delayedRemoteFetchPurgatoryParam = None,
       threadNamePrefix = threadNamePrefix,
       brokerEpochSupplier = brokerEpochSupplier,
       addPartitionsToTxnManager = Some(addPartitionsToTxnManager))
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 92ddb59ed57..0027fe77afe 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -24,6 +24,7 @@ import kafka.log.remote.RemoteLogManager
 import kafka.log.{LogManager, UnifiedLog}
 import kafka.server.HostedPartition.Online
 import kafka.server.QuotaFactory.QuotaManagers
+import kafka.server.ReplicaManager.createLogReadResult
 import kafka.server.checkpoints.{LazyOffsetCheckpoints, OffsetCheckpointFile, OffsetCheckpoints}
 import kafka.server.metadata.ZkMetadataCache
 import kafka.utils.Implicits._
@@ -55,16 +56,16 @@ import org.apache.kafka.common.{ElectionType, IsolationLevel, Node, TopicIdParti
 import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
 import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
 import org.apache.kafka.server.common.MetadataVersion._
-import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchParams, FetchPartitionData, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, RecordValidationException}
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
+import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, FetchParams, FetchPartitionData, LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, LogOffsetMetadata, LogReadInfo, RecordValidationException, RemoteLogReadResult, RemoteStorageFetchInfo}
 
 import java.io.File
 import java.nio.file.{Files, Paths}
 import java.util
-import java.util.concurrent.TimeUnit
 import java.util.concurrent.atomic.AtomicBoolean
 import java.util.concurrent.locks.Lock
+import java.util.concurrent.{CompletableFuture, Future, RejectedExecutionException, TimeUnit}
 import java.util.{Optional, OptionalInt, OptionalLong}
 import scala.collection.{Map, Seq, Set, mutable}
 import scala.compat.java8.OptionConverters._
@@ -175,6 +176,33 @@ object HostedPartition {
 
 object ReplicaManager {
   val HighWatermarkFilename = "replication-offset-checkpoint"
+
+  def createLogReadResult(highWatermark: Long,
+                          leaderLogStartOffset: Long,
+                          leaderLogEndOffset: Long,
+                          e: Throwable) = {
+    LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
+      divergingEpoch = None,
+      highWatermark,
+      leaderLogStartOffset,
+      leaderLogEndOffset,
+      followerLogStartOffset = -1L,
+      fetchTimeMs = -1L,
+      lastStableOffset = None,
+      exception = Some(e))
+  }
+
+  def createLogReadResult(e: Throwable): LogReadResult = {
+    LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
+      divergingEpoch = None,
+      highWatermark = UnifiedLog.UnknownOffset,
+      leaderLogStartOffset = UnifiedLog.UnknownOffset,
+      leaderLogEndOffset = UnifiedLog.UnknownOffset,
+      followerLogStartOffset = UnifiedLog.UnknownOffset,
+      fetchTimeMs = -1L,
+      lastStableOffset = None,
+      exception = Some(e))
+  }
 }
 
 class ReplicaManager(val config: KafkaConfig,
@@ -194,6 +222,7 @@ class ReplicaManager(val config: KafkaConfig,
                      delayedFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedFetch]] = None,
                      delayedDeleteRecordsPurgatoryParam: Option[DelayedOperationPurgatory[DelayedDeleteRecords]] = None,
                      delayedElectLeaderPurgatoryParam: Option[DelayedOperationPurgatory[DelayedElectLeader]] = None,
+                     delayedRemoteFetchPurgatoryParam: Option[DelayedOperationPurgatory[DelayedRemoteFetch]] = None,
                      threadNamePrefix: Option[String] = None,
                      val brokerEpochSupplier: () => Long = () => -1,
                      addPartitionsToTxnManager: Option[AddPartitionsToTxnManager] = None
@@ -215,6 +244,9 @@ class ReplicaManager(val config: KafkaConfig,
   val delayedElectLeaderPurgatory = delayedElectLeaderPurgatoryParam.getOrElse(
     DelayedOperationPurgatory[DelayedElectLeader](
       purgatoryName = "ElectLeader", brokerId = config.brokerId))
+  val delayedRemoteFetchPurgatory = delayedRemoteFetchPurgatoryParam.getOrElse(
+    DelayedOperationPurgatory[DelayedRemoteFetch](
+      purgatoryName = "RemoteFetch", brokerId = config.brokerId))
 
   /* epoch of the controller that last changed the leader */
   @volatile private[server] var controllerEpoch: Int = KafkaController.InitialControllerEpoch
@@ -330,6 +362,7 @@ class ReplicaManager(val config: KafkaConfig,
     val topicPartitionOperationKey = TopicPartitionOperationKey(topicPartition)
     delayedProducePurgatory.checkAndComplete(topicPartitionOperationKey)
     delayedFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
+    delayedRemoteFetchPurgatory.checkAndComplete(topicPartitionOperationKey)
   }
 
   /**
@@ -610,7 +643,7 @@ class ReplicaManager(val config: KafkaConfig,
    * Noted that all pending delayed check operations are stored in a queue. All callers to ReplicaManager.appendRecords()
    * are expected to call ActionQueue.tryCompleteActions for all affected partitions, without holding any conflicting
    * locks.
-   * 
+   *
    * @param timeout                       maximum time we will wait to append before returning
    * @param requiredAcks                  number of replicas who must acknowledge the append before sending the response
    * @param internalTopicsAllowed         boolean indicating whether internal topics can be appended to
@@ -638,7 +671,7 @@ class ReplicaManager(val config: KafkaConfig,
       val sTime = time.milliseconds
       
       val transactionalProducerIds = mutable.HashSet[Long]()
-      val (verifiedEntriesPerPartition, notYetVerifiedEntriesPerPartition) = 
+      val (verifiedEntriesPerPartition, notYetVerifiedEntriesPerPartition) =
         if (transactionStatePartition.isEmpty || !config.transactionPartitionVerificationEnable)
           (entriesPerPartition, Map.empty)
         else {
@@ -648,7 +681,7 @@ class ReplicaManager(val config: KafkaConfig,
             transactionalBatches.foreach(batch => transactionalProducerIds.add(batch.producerId))
             if (transactionalBatches.nonEmpty) {
               getPartitionOrException(topicPartition).hasOngoingTransaction(transactionalBatches.head.producerId)
-            } else { 
+            } else {
               // If there is no producer ID in the batches, no need to verify.
               true
             }
@@ -1121,21 +1154,69 @@ class ReplicaManager(val config: KafkaConfig,
     partition.legacyFetchOffsetsForTimestamp(timestamp, maxNumOffsets, isFromConsumer, fetchOnlyFromLeader)
   }
 
+  /**
+   * Returns [[LogReadResult]] with error if a task for RemoteStorageFetchInfo could not be scheduled successfully
+   * else returns [[None]].
+   */
+  private def processRemoteFetch(remoteFetchInfo: RemoteStorageFetchInfo,
+                                 params: FetchParams,
+                                 responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit,
+                                 logReadResults: Seq[(TopicIdPartition, LogReadResult)],
+                                 fetchPartitionStatus: Seq[(TopicIdPartition, FetchPartitionStatus)]): Option[LogReadResult] = {
+    val key = new TopicPartitionOperationKey(remoteFetchInfo.topicPartition.topic(), remoteFetchInfo.topicPartition.partition())
+    val remoteFetchResult = new CompletableFuture[RemoteLogReadResult]
+    var remoteFetchTask: Future[Void] = null
+    try {
+      remoteFetchTask = remoteLogManager.get.asyncRead(remoteFetchInfo, (result: RemoteLogReadResult) => {
+        remoteFetchResult.complete(result)
+        delayedRemoteFetchPurgatory.checkAndComplete(key)
+      })
+    } catch {
+      case e: RejectedExecutionException =>
+        // Return the error if any in scheduling the remote fetch task
+        return Some(createLogReadResult(e))
+    }
+
+    val remoteFetch = new DelayedRemoteFetch(remoteFetchTask, remoteFetchResult, remoteFetchInfo,
+      fetchPartitionStatus, params, logReadResults, this, responseCallback)
+
+    delayedRemoteFetchPurgatory.tryCompleteElseWatch(remoteFetch, Seq(key))
+    None
+  }
+
+  private def buildPartitionToFetchPartitionData(logReadResults: Seq[(TopicIdPartition, LogReadResult)],
+                                                 remoteFetchTopicPartition: TopicPartition,
+                                                 error: LogReadResult): Seq[(TopicIdPartition, FetchPartitionData)] = {
+    logReadResults.map { case (tp, result) =>
+      val fetchPartitionData = {
+        if (tp.topicPartition().equals(remoteFetchTopicPartition))
+          error
+        else
+          result
+      }.toFetchPartitionData(false)
+
+      tp -> fetchPartitionData
+    }
+  }
+
   /**
    * Fetch messages from a replica, and wait until enough data can be fetched and return;
    * the callback function will be triggered either when timeout or required fetch info is satisfied.
    * Consumers may fetch from any replica, but followers can only fetch from the leader.
    */
-  def fetchMessages(
-    params: FetchParams,
-    fetchInfos: Seq[(TopicIdPartition, PartitionData)],
-    quota: ReplicaQuota,
-    responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit
-  ): Unit = {
+  def fetchMessages(params: FetchParams,
+                    fetchInfos: Seq[(TopicIdPartition, PartitionData)],
+                    quota: ReplicaQuota,
+                    responseCallback: Seq[(TopicIdPartition, FetchPartitionData)] => Unit): Unit = {
+
     // check if this fetch request can be satisfied right away
-    val logReadResults = readFromLocalLog(params, fetchInfos, quota, readFromPurgatory = false)
+    val logReadResults = readFromLog(params, fetchInfos, quota, readFromPurgatory = false)
     var bytesReadable: Long = 0
     var errorReadingData = false
+
+    // The 1st topic-partition that has to be read from remote storage
+    var remoteFetchInfo: Optional[RemoteStorageFetchInfo] = Optional.empty()
+
     var hasDivergingEpoch = false
     var hasPreferredReadReplica = false
     val logReadResultMap = new mutable.HashMap[TopicIdPartition, LogReadResult]
@@ -1145,6 +1226,9 @@ class ReplicaManager(val config: KafkaConfig,
       brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()
       if (logReadResult.error != Errors.NONE)
         errorReadingData = true
+      if (!remoteFetchInfo.isPresent && logReadResult.info.delayedRemoteStorageFetch.isPresent) {
+        remoteFetchInfo = logReadResult.info.delayedRemoteStorageFetch
+      }
       if (logReadResult.divergingEpoch.nonEmpty)
         hasDivergingEpoch = true
       if (logReadResult.preferredReadReplica.nonEmpty)
@@ -1153,14 +1237,15 @@ class ReplicaManager(val config: KafkaConfig,
       logReadResultMap.put(topicIdPartition, logReadResult)
     }
 
-    // respond immediately if 1) fetch request does not want to wait
+    // Respond immediately if no remote fetches are required and any of the below conditions is true
+    //                        1) fetch request does not want to wait
     //                        2) fetch request does not require any data
     //                        3) has enough data to respond
     //                        4) some error happens while reading data
     //                        5) we found a diverging epoch
     //                        6) has a preferred read replica
-    if (params.maxWaitMs <= 0 || fetchInfos.isEmpty || bytesReadable >= params.minBytes || errorReadingData ||
-      hasDivergingEpoch || hasPreferredReadReplica) {
+    if (!remoteFetchInfo.isPresent && (params.maxWaitMs <= 0 || fetchInfos.isEmpty || bytesReadable >= params.minBytes || errorReadingData ||
+      hasDivergingEpoch || hasPreferredReadReplica)) {
       val fetchPartitionData = logReadResults.map { case (tp, result) =>
         val isReassignmentFetch = params.isFromFollower && isAddingReplica(tp.topicPartition, params.replicaId)
         tp -> result.toFetchPartitionData(isReassignmentFetch)
@@ -1175,49 +1260,77 @@ class ReplicaManager(val config: KafkaConfig,
           fetchPartitionStatus += (topicIdPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))
         })
       }
-      val delayedFetch = new DelayedFetch(
-        params = params,
-        fetchPartitionStatus = fetchPartitionStatus,
-        replicaManager = this,
-        quota = quota,
-        responseCallback = responseCallback
-      )
-
-      // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
-      val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) }
-
-      // try to complete the request immediately, otherwise put it into the purgatory;
-      // this is because while the delayed fetch operation is being created, new requests
-      // may arrive and hence make this operation completable.
-      delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
+
+      if (remoteFetchInfo.isPresent) {
+        val maybeLogReadResultWithError = processRemoteFetch(remoteFetchInfo.get(), params, responseCallback, logReadResults, fetchPartitionStatus)
+        if (maybeLogReadResultWithError.isDefined) {
+          // If there is an error in scheduling the remote fetch task, return what we currently have
+          // (the data read from local log segment for the other topic-partitions) and an error for the topic-partition
+          // that we couldn't read from remote storage
+          val partitionToFetchPartitionData = buildPartitionToFetchPartitionData(logReadResults, remoteFetchInfo.get().topicPartition, maybeLogReadResultWithError.get)
+          responseCallback(partitionToFetchPartitionData)
+        }
+      } else {
+        // If there is not enough data to respond and there is no remote data, we will let the fetch request
+        // wait for new data.
+        val delayedFetch = new DelayedFetch(
+          params = params,
+          fetchPartitionStatus = fetchPartitionStatus,
+          replicaManager = this,
+          quota = quota,
+          responseCallback = responseCallback
+        )
+
+        // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
+        val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) }
+
+        // try to complete the request immediately, otherwise put it into the purgatory;
+        // this is because while the delayed fetch operation is being created, new requests
+        // may arrive and hence make this operation completable.
+        delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
+      }
     }
   }
 
   /**
    * Read from multiple topic partitions at the given offset up to maxSize bytes
    */
-  def readFromLocalLog(
+  def readFromLog(
     params: FetchParams,
     readPartitionInfo: Seq[(TopicIdPartition, PartitionData)],
     quota: ReplicaQuota,
-    readFromPurgatory: Boolean
-  ): Seq[(TopicIdPartition, LogReadResult)] = {
+    readFromPurgatory: Boolean): Seq[(TopicIdPartition, LogReadResult)] = {
     val traceEnabled = isTraceEnabled
 
+    def checkFetchDataInfo(partition: Partition, givenFetchedDataInfo: FetchDataInfo) = {
+      if (params.isFromFollower && shouldLeaderThrottle(quota, partition, params.replicaId)) {
+        // If the partition is being throttled, simply return an empty set.
+        new FetchDataInfo(givenFetchedDataInfo.fetchOffsetMetadata, MemoryRecords.EMPTY)
+      } else if (!params.hardMaxBytesLimit && givenFetchedDataInfo.firstEntryIncomplete) {
+        // For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
+        // progress in such cases and don't need to report a `RecordTooLargeException`
+        new FetchDataInfo(givenFetchedDataInfo.fetchOffsetMetadata, MemoryRecords.EMPTY)
+      } else {
+        givenFetchedDataInfo
+      }
+    }
+
     def read(tp: TopicIdPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
       val offset = fetchInfo.fetchOffset
       val partitionFetchSize = fetchInfo.maxBytes
       val followerLogStartOffset = fetchInfo.logStartOffset
 
       val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes)
+      var log: UnifiedLog = null
+      var partition : Partition = null
+      val fetchTimeMs = time.milliseconds
       try {
         if (traceEnabled)
           trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " +
             s"remaining response limit $limitBytes" +
             (if (minOneMessage) s", ignoring response/partition size limits" else ""))
 
-        val partition = getPartitionOrException(tp.topicPartition)
-        val fetchTimeMs = time.milliseconds
+        partition = getPartitionOrException(tp.topicPartition)
 
         // Check if topic ID from the fetch request/session matches the ID in the log
         val topicId = if (tp.topicId == Uuid.ZERO_UUID) None else Some(tp.topicId)
@@ -1246,6 +1359,8 @@ class ReplicaManager(val config: KafkaConfig,
             preferredReadReplica = preferredReadReplica,
             exception = None)
         } else {
+          log = partition.localLogWithEpochOrThrow(fetchInfo.currentLeaderEpoch, params.fetchOnlyLeader())
+
           // Try the read first, this tells us whether we need all of adjustedFetchSize for this partition
           val readInfo: LogReadInfo = partition.fetchRecords(
             fetchParams = params,
@@ -1253,19 +1368,9 @@ class ReplicaManager(val config: KafkaConfig,
             fetchTimeMs = fetchTimeMs,
             maxBytes = adjustedMaxBytes,
             minOneMessage = minOneMessage,
-            updateFetchState = !readFromPurgatory
-          )
+            updateFetchState = !readFromPurgatory)
 
-          val fetchDataInfo = if (params.isFromFollower && shouldLeaderThrottle(quota, partition, params.replicaId)) {
-            // If the partition is being throttled, simply return an empty set.
-            new FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
-          } else if (!params.hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) {
-            // For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
-            // progress in such cases and don't need to report a `RecordTooLargeException`
-            new FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
-          } else {
-            readInfo.fetchedData
-          }
+          val fetchDataInfo = checkFetchDataInfo(partition, readInfo.fetchedData)
 
           LogReadResult(info = fetchDataInfo,
             divergingEpoch = readInfo.divergingEpoch.asScala,
@@ -1288,17 +1393,10 @@ class ReplicaManager(val config: KafkaConfig,
                  _: FencedLeaderEpochException |
                  _: ReplicaNotAvailableException |
                  _: KafkaStorageException |
-                 _: OffsetOutOfRangeException |
                  _: InconsistentTopicIdException) =>
-          LogReadResult(info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
-            divergingEpoch = None,
-            highWatermark = UnifiedLog.UnknownOffset,
-            leaderLogStartOffset = UnifiedLog.UnknownOffset,
-            leaderLogEndOffset = UnifiedLog.UnknownOffset,
-            followerLogStartOffset = UnifiedLog.UnknownOffset,
-            fetchTimeMs = -1L,
-            lastStableOffset = None,
-            exception = Some(e))
+          createLogReadResult(e)
+        case e: OffsetOutOfRangeException =>
+          handleOffsetOutOfRangeError(tp, params, fetchInfo, adjustedMaxBytes, minOneMessage, log, fetchTimeMs, e)
         case e: Throwable =>
           brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark()
           brokerTopicStats.allTopicsStats.failedFetchRequestRate.mark()
@@ -1335,6 +1433,50 @@ class ReplicaManager(val config: KafkaConfig,
     result
   }
 
+  private def handleOffsetOutOfRangeError(tp: TopicIdPartition, params: FetchParams, fetchInfo: PartitionData,
+                                          adjustedMaxBytes: Int, minOneMessage:
+                                          Boolean, log: UnifiedLog, fetchTimeMs: Long,
+                                          exception: OffsetOutOfRangeException): LogReadResult = {
+    val offset = fetchInfo.fetchOffset
+    // In case of offset out of range errors, handle it for tiered storage only if all the below conditions are true.
+    //   1) remote log manager is enabled and it is available
+    //   2) `log` instance should not be null here as that would have been caught earlier with NotLeaderForPartitionException or ReplicaNotAvailableException.
+    //   3) fetch offset is within the offset range of the remote storage layer
+    if (remoteLogManager.isDefined && log != null && log.remoteLogEnabled() &&
+      log.logStartOffset <= offset && offset < log.localLogStartOffset())
+    {
+      val highWatermark = log.highWatermark
+      val leaderLogStartOffset = log.logStartOffset
+      val leaderLogEndOffset = log.logEndOffset
+
+      if (params.isFromFollower) {
+        // If it is from a follower then send the offset metadata only as the data is already available in remote
+        // storage and throw an error saying that this offset is moved to tiered storage.
+        createLogReadResult(highWatermark, leaderLogStartOffset, leaderLogEndOffset,
+          new OffsetMovedToTieredStorageException("Given offset" + offset + " is moved to tiered storage"))
+      } else {
+        // For consume fetch requests, create a dummy FetchDataInfo with the remote storage fetch information.
+        // For the first topic-partition that needs remote data, we will use this information to read the data in another thread.
+        val fetchDataInfo =
+        new FetchDataInfo(new LogOffsetMetadata(offset), MemoryRecords.EMPTY, false, Optional.empty(),
+          Optional.of(new RemoteStorageFetchInfo(adjustedMaxBytes, minOneMessage, tp.topicPartition(),
+            fetchInfo, params.isolation, params.hardMaxBytesLimit())))
+
+        LogReadResult(fetchDataInfo,
+          divergingEpoch = None,
+          highWatermark,
+          leaderLogStartOffset,
+          leaderLogEndOffset,
+          fetchInfo.logStartOffset,
+          fetchTimeMs,
+          Some(log.lastStableOffset),
+          exception = None)
+      }
+    } else {
+      createLogReadResult(exception)
+    }
+  }
+
   /**
     * Using the configured [[ReplicaSelector]], determine the preferred read replica for a partition given the
     * client metadata, the requested offset, and the current set of replicas. If the preferred read replica is the
@@ -2045,6 +2187,7 @@ class ReplicaManager(val config: KafkaConfig,
     replicaFetcherManager.shutdown()
     replicaAlterLogDirsManager.shutdown()
     delayedFetchPurgatory.shutdown()
+    delayedRemoteFetchPurgatory.shutdown()
     delayedProducePurgatory.shutdown()
     delayedDeleteRecordsPurgatory.shutdown()
     delayedElectLeaderPurgatory.shutdown()
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
index c4cfc00b57b..9b9ad697eb8 100644
--- a/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
+++ b/core/src/test/java/kafka/log/remote/RemoteLogManagerTest.java
@@ -167,11 +167,11 @@ public class RemoteLogManagerTest {
         LeaderEpochFileCache cache = new LeaderEpochFileCache(tp, checkpoint);
         when(mockLog.leaderEpochCache()).thenReturn(Option.apply(cache));
         TopicIdPartition tpId = new TopicIdPartition(Uuid.randomUuid(), tp);
-        long offset = remoteLogManager.findHighestRemoteOffset(tpId);
+        long offset = remoteLogManager.findHighestRemoteOffset(tpId, mockLog);
         assertEquals(-1, offset);
 
         when(remoteLogMetadataManager.highestOffsetForEpoch(tpId, 2)).thenReturn(Optional.of(200L));
-        long offset2 = remoteLogManager.findHighestRemoteOffset(tpId);
+        long offset2 = remoteLogManager.findHighestRemoteOffset(tpId, mockLog);
         assertEquals(200, offset2);
     }
 
@@ -261,7 +261,7 @@ public class RemoteLogManagerTest {
 
         RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
         task.convertToLeader(2);
-        task.copyLogSegmentsToRemote();
+        task.copyLogSegmentsToRemote(mockLog);
 
         // verify remoteLogMetadataManager did add the expected RemoteLogSegmentMetadata
         ArgumentCaptor<RemoteLogSegmentMetadata> remoteLogSegmentMetadataArg = ArgumentCaptor.forClass(RemoteLogSegmentMetadata.class);
@@ -318,7 +318,7 @@ public class RemoteLogManagerTest {
 
         RemoteLogManager.RLMTask task = remoteLogManager.new RLMTask(leaderTopicIdPartition);
         task.convertToFollower();
-        task.copyLogSegmentsToRemote();
+        task.copyLogSegmentsToRemote(mockLog);
 
         // verify the remoteLogMetadataManager never add any metadata and remoteStorageManager never copy log segments
         verify(remoteLogMetadataManager, never()).addRemoteLogSegmentMetadata(any(RemoteLogSegmentMetadata.class));
diff --git a/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java b/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
new file mode 100644
index 00000000000..aa8dd042a13
--- /dev/null
+++ b/core/src/test/java/kafka/log/remote/RemoteLogReaderTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.log.remote;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.OffsetOutOfRangeException;
+import org.apache.kafka.common.record.Records;
+import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
+import org.apache.kafka.storage.internals.log.FetchDataInfo;
+import org.apache.kafka.storage.internals.log.LogOffsetMetadata;
+import org.apache.kafka.storage.internals.log.RemoteLogReadResult;
+import org.apache.kafka.storage.internals.log.RemoteStorageFetchInfo;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+
+import java.io.IOException;
+import java.util.function.Consumer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class RemoteLogReaderTest {
+    RemoteLogManager mockRLM = mock(RemoteLogManager.class);
+    LogOffsetMetadata logOffsetMetadata = new LogOffsetMetadata(100);
+    Records records = mock(Records.class);
+
+    @Test
+    public void testRemoteLogReaderWithoutError() throws RemoteStorageException, IOException {
+        FetchDataInfo fetchDataInfo = new FetchDataInfo(logOffsetMetadata, records);
+        when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenReturn(fetchDataInfo);
+
+        Consumer<RemoteLogReadResult> callback = mock(Consumer.class);
+        RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, new TopicPartition("test", 0), null, null, false);
+        RemoteLogReader remoteLogReader = new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback);
+        remoteLogReader.call();
+
+        // verify the callback did get invoked with the expected remoteLogReadResult
+        ArgumentCaptor<RemoteLogReadResult> remoteLogReadResultArg = ArgumentCaptor.forClass(RemoteLogReadResult.class);
+        verify(callback, times(1)).accept(remoteLogReadResultArg.capture());
+        RemoteLogReadResult actualRemoteLogReadResult = remoteLogReadResultArg.getValue();
+        assertFalse(actualRemoteLogReadResult.error.isPresent());
+        assertTrue(actualRemoteLogReadResult.fetchDataInfo.isPresent());
+        assertEquals(fetchDataInfo, actualRemoteLogReadResult.fetchDataInfo.get());
+    }
+
+    @Test
+    public void testRemoteLogReaderWithError() throws RemoteStorageException, IOException {
+        when(mockRLM.read(any(RemoteStorageFetchInfo.class))).thenThrow(new OffsetOutOfRangeException("error"));
+
+        Consumer<RemoteLogReadResult> callback = mock(Consumer.class);
+        RemoteStorageFetchInfo remoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, new TopicPartition("test", 0), null, null, false);
+        RemoteLogReader remoteLogReader = new RemoteLogReader(remoteStorageFetchInfo, mockRLM, callback);
+        remoteLogReader.call();
+
+        // verify the callback did get invoked with the expected remoteLogReadResult
+        ArgumentCaptor<RemoteLogReadResult> remoteLogReadResultArg = ArgumentCaptor.forClass(RemoteLogReadResult.class);
+        verify(callback, times(1)).accept(remoteLogReadResultArg.capture());
+        RemoteLogReadResult actualRemoteLogReadResult = remoteLogReadResultArg.getValue();
+        assertTrue(actualRemoteLogReadResult.error.isPresent());
+        assertFalse(actualRemoteLogReadResult.fetchDataInfo.isPresent());
+    }
+}
diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
index 1710d713d0f..f26f7079d4b 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
@@ -188,7 +188,7 @@ class DelayedFetchTest {
     fetchPartitionData: FetchRequest.PartitionData,
     error: Errors
   ): Unit = {
-    when(replicaManager.readFromLocalLog(
+    when(replicaManager.readFromLog(
       fetchParams,
       readPartitionInfo = Seq((topicIdPartition, fetchPartitionData)),
       quota = replicaQuota,
diff --git a/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
new file mode 100644
index 00000000000..007154f46de
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/server/DelayedRemoteFetchTest.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import kafka.cluster.Partition
+import org.apache.kafka.common.errors.NotLeaderOrFollowerException
+import org.apache.kafka.common.protocol.{ApiKeys, Errors}
+import org.apache.kafka.common.record.MemoryRecords
+import org.apache.kafka.common.requests.FetchRequest
+import org.apache.kafka.common.{TopicIdPartition, Uuid}
+import org.apache.kafka.storage.internals.log._
+import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.Test
+import org.mockito.Mockito.{mock, when}
+
+import java.util.Optional
+import java.util.concurrent.CompletableFuture
+
+import scala.collection._
+
+class DelayedRemoteFetchTest {
+  private val maxBytes = 1024
+  private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+  private val topicIdPartition = new TopicIdPartition(Uuid.randomUuid(), 0, "topic")
+  private val fetchOffset = 500L
+  private val logStartOffset = 0L
+  private val currentLeaderEpoch = Optional.of[Integer](10)
+  private val replicaId = 1
+
+  private val fetchStatus = FetchPartitionStatus(
+    startOffsetMetadata = new LogOffsetMetadata(fetchOffset),
+    fetchInfo = new FetchRequest.PartitionData(Uuid.ZERO_UUID, fetchOffset, logStartOffset, maxBytes, currentLeaderEpoch))
+  private val fetchParams = buildFollowerFetchParams(replicaId, maxWaitMs = 500)
+
+  @Test
+  def testFetch(): Unit = {
+    var actualTopicPartition: Option[TopicIdPartition] = None
+    var fetchResultOpt: Option[FetchPartitionData] = None
+
+    def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = {
+      assertEquals(1, responses.size)
+      actualTopicPartition = Some(responses.head._1)
+      fetchResultOpt = Some(responses.head._2)
+    }
+
+    val future: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]()
+    future.complete(null)
+    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition.topicPartition(), null, null, false)
+    val highWatermark = 100
+    val leaderLogStartOffset = 10
+    val logReadInfo = buildReadResult(Errors.NONE, highWatermark, leaderLogStartOffset)
+
+    val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
+      Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
+
+    when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
+      .thenReturn(mock(classOf[Partition]))
+
+    assertTrue(delayedRemoteFetch.tryComplete())
+    assertTrue(delayedRemoteFetch.isCompleted)
+    assertTrue(actualTopicPartition.isDefined)
+    assertEquals(topicIdPartition, actualTopicPartition.get)
+    assertTrue(fetchResultOpt.isDefined)
+
+    val fetchResult = fetchResultOpt.get
+    assertEquals(Errors.NONE, fetchResult.error)
+    assertEquals(highWatermark, fetchResult.highWatermark)
+    assertEquals(leaderLogStartOffset, fetchResult.logStartOffset)
+  }
+
+  @Test
+  def testNotLeaderOrFollower(): Unit = {
+    var actualTopicPartition: Option[TopicIdPartition] = None
+    var fetchResultOpt: Option[FetchPartitionData] = None
+
+    def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = {
+      assertEquals(1, responses.size)
+      actualTopicPartition = Some(responses.head._1)
+      fetchResultOpt = Some(responses.head._2)
+    }
+
+    // throw exception while getPartition
+    when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
+      .thenThrow(new NotLeaderOrFollowerException(s"Replica for $topicIdPartition not available"))
+
+    val future: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]()
+    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition.topicPartition(), null, null, false)
+
+    val logReadInfo = buildReadResult(Errors.NONE)
+
+    val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
+      Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
+
+    // delayed remote fetch should still be able to complete
+    assertTrue(delayedRemoteFetch.tryComplete())
+    assertTrue(delayedRemoteFetch.isCompleted)
+    assertEquals(topicIdPartition, actualTopicPartition.get)
+    assertTrue(fetchResultOpt.isDefined)
+  }
+
+  @Test
+  def testErrorLogReadInfo(): Unit = {
+    var actualTopicPartition: Option[TopicIdPartition] = None
+    var fetchResultOpt: Option[FetchPartitionData] = None
+
+    def callback(responses: Seq[(TopicIdPartition, FetchPartitionData)]): Unit = {
+      assertEquals(1, responses.size)
+      actualTopicPartition = Some(responses.head._1)
+      fetchResultOpt = Some(responses.head._2)
+    }
+
+    when(replicaManager.getPartitionOrException(topicIdPartition.topicPartition))
+      .thenReturn(mock(classOf[Partition]))
+
+    val future: CompletableFuture[RemoteLogReadResult] = new CompletableFuture[RemoteLogReadResult]()
+    future.complete(null)
+    val fetchInfo: RemoteStorageFetchInfo = new RemoteStorageFetchInfo(0, false, topicIdPartition.topicPartition(), null, null, false)
+
+    // build a read result with error
+    val logReadInfo = buildReadResult(Errors.FENCED_LEADER_EPOCH)
+
+    val delayedRemoteFetch = new DelayedRemoteFetch(null, future, fetchInfo, Seq(topicIdPartition -> fetchStatus), fetchParams,
+      Seq(topicIdPartition -> logReadInfo), replicaManager, callback)
+
+    assertTrue(delayedRemoteFetch.tryComplete())
+    assertTrue(delayedRemoteFetch.isCompleted)
+    assertEquals(topicIdPartition, actualTopicPartition.get)
+    assertTrue(fetchResultOpt.isDefined)
+    assertEquals(Errors.FENCED_LEADER_EPOCH, fetchResultOpt.get.error)
+  }
+
+  private def buildFollowerFetchParams(replicaId: Int,
+                                       maxWaitMs: Int): FetchParams = {
+    new FetchParams(
+      ApiKeys.FETCH.latestVersion,
+      replicaId,
+      1,
+      maxWaitMs,
+      1,
+      maxBytes,
+      FetchIsolation.LOG_END,
+      Optional.empty()
+    )
+  }
+
+  private def buildReadResult(error: Errors,
+                              highWatermark: Int = 0,
+                              leaderLogStartOffset: Int = 0): LogReadResult = {
+    LogReadResult(
+      exception = if (error != Errors.NONE) Some(error.exception) else None,
+      info = new FetchDataInfo(LogOffsetMetadata.UNKNOWN_OFFSET_METADATA, MemoryRecords.EMPTY),
+      divergingEpoch = None,
+      highWatermark = highWatermark,
+      leaderLogStartOffset = leaderLogStartOffset,
+      leaderLogEndOffset = -1L,
+      followerLogStartOffset = -1L,
+      fetchTimeMs = -1L,
+      lastStableOffset = None)
+  }
+
+}
diff --git a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
index 654ffb5ae8d..5419557b1e7 100644
--- a/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
@@ -158,15 +158,20 @@ class ListOffsetsRequestTest extends BaseRequestTest {
   private[this] def fetchOffsetAndEpoch(serverId: Int,
                                         timestamp: Long,
                                         version: Short): (Long, Int) = {
+    val (offset, leaderEpoch, _) = fetchOffsetAndEpochWithError(serverId, timestamp, version)
+    (offset, leaderEpoch)
+  }
+
+  private[this] def fetchOffsetAndEpochWithError(serverId: Int, timestamp: Long, version: Short): (Long, Int, Short) = {
     val partitionData = sendRequest(serverId, timestamp, version)
 
     if (version == 0) {
       if (partitionData.oldStyleOffsets().isEmpty)
-        (-1, partitionData.leaderEpoch)
+        (-1, partitionData.leaderEpoch, partitionData.errorCode())
       else
-        (partitionData.oldStyleOffsets().asScala.head, partitionData.leaderEpoch)
+        (partitionData.oldStyleOffsets().asScala.head, partitionData.leaderEpoch, partitionData.errorCode())
     } else
-      (partitionData.offset, partitionData.leaderEpoch)
+      (partitionData.offset, partitionData.leaderEpoch, partitionData.errorCode())
   }
 
   @Test
@@ -202,8 +207,8 @@ class ListOffsetsRequestTest extends BaseRequestTest {
     assertEquals((0L, 0), fetchOffsetAndEpoch(secondLeaderId, ListOffsetsRequest.EARLIEST_TIMESTAMP, -1))
 
     // The latest offset reflects the updated epoch
-    assertEquals((10L, secondLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, -1))
-    assertEquals((9L, secondLeaderEpoch), fetchOffsetAndEpoch(secondLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, -1))
+    assertEquals((10L, secondLeaderEpoch, Errors.NONE.code()), fetchOffsetAndEpochWithError(secondLeaderId, ListOffsetsRequest.LATEST_TIMESTAMP, -1))
+    assertEquals((9L, secondLeaderEpoch, Errors.NONE.code()), fetchOffsetAndEpochWithError(secondLeaderId, ListOffsetsRequest.MAX_TIMESTAMP, -1))
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index ce727f83c74..4e35084d3bd 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -67,7 +67,7 @@ class ReplicaManagerQuotasTest {
       .thenReturn(true)
 
     val fetchParams = PartitionTest.followerFetchParams(followerReplicaId)
-    val fetch = replicaManager.readFromLocalLog(fetchParams, fetchInfo, quota, readFromPurgatory = false)
+    val fetch = replicaManager.readFromLog(fetchParams, fetchInfo, quota, readFromPurgatory = false)
     assertEquals(1, fetch.find(_._1 == topicIdPartition1).get._2.info.records.batches.asScala.size,
       "Given two partitions, with only one throttled, we should get the first")
     assertEquals(0, fetch.find(_._1 == topicIdPartition2).get._2.info.records.batches.asScala.size,
@@ -85,7 +85,7 @@ class ReplicaManagerQuotasTest {
       .thenReturn(true)
 
     val fetchParams = PartitionTest.followerFetchParams(followerReplicaId)
-    val fetch = replicaManager.readFromLocalLog(fetchParams, fetchInfo, quota, readFromPurgatory = false)
+    val fetch = replicaManager.readFromLog(fetchParams, fetchInfo, quota, readFromPurgatory = false)
     assertEquals(0, fetch.find(_._1 == topicIdPartition1).get._2.info.records.batches.asScala.size,
       "Given two partitions, with both throttled, we should get no messages")
     assertEquals(0, fetch.find(_._1 == topicIdPartition2).get._2.info.records.batches.asScala.size,
@@ -103,7 +103,7 @@ class ReplicaManagerQuotasTest {
       .thenReturn(false)
 
     val fetchParams = PartitionTest.followerFetchParams(followerReplicaId)
-    val fetch = replicaManager.readFromLocalLog(fetchParams, fetchInfo, quota, readFromPurgatory = false)
+    val fetch = replicaManager.readFromLog(fetchParams, fetchInfo, quota, readFromPurgatory = false)
     assertEquals(1, fetch.find(_._1 == topicIdPartition1).get._2.info.records.batches.asScala.size,
       "Given two partitions, with both non-throttled, we should get both messages")
     assertEquals(1, fetch.find(_._1 == topicIdPartition2).get._2.info.records.batches.asScala.size,
@@ -121,7 +121,7 @@ class ReplicaManagerQuotasTest {
       .thenReturn(true)
 
     val fetchParams = PartitionTest.followerFetchParams(followerReplicaId)
-    val fetch = replicaManager.readFromLocalLog(fetchParams, fetchInfo, quota, readFromPurgatory = false)
+    val fetch = replicaManager.readFromLog(fetchParams, fetchInfo, quota, readFromPurgatory = false)
     assertEquals(1, fetch.find(_._1 == topicIdPartition1).get._2.info.records.batches.asScala.size,
       "Given two partitions, with only one throttled, we should get the first")
 
@@ -137,7 +137,7 @@ class ReplicaManagerQuotasTest {
     when(quota.isQuotaExceeded).thenReturn(true)
 
     val fetchParams = PartitionTest.consumerFetchParams()
-    val fetch = replicaManager.readFromLocalLog(fetchParams, fetchInfo, quota, readFromPurgatory = false).toMap
+    val fetch = replicaManager.readFromLog(fetchParams, fetchInfo, quota, readFromPurgatory = false).toMap
     assertEquals(1, fetch(topicIdPartition1).info.records.batches.asScala.size,
       "Replication throttled partitions should return data for consumer fetch")
     assertEquals(1, fetch(topicIdPartition2).info.records.batches.asScala.size,
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchDataInfo.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchDataInfo.java
index 48c16b9d57e..b5c1118dea1 100644
--- a/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchDataInfo.java
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/FetchDataInfo.java
@@ -28,6 +28,7 @@ public class FetchDataInfo {
     public final Records records;
     public final boolean firstEntryIncomplete;
     public final Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions;
+    public final Optional<RemoteStorageFetchInfo> delayedRemoteStorageFetch;
 
     public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
                          Records records) {
@@ -38,10 +39,19 @@ public class FetchDataInfo {
                          Records records,
                          boolean firstEntryIncomplete,
                          Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions) {
+        this(fetchOffsetMetadata, records, firstEntryIncomplete, abortedTransactions, Optional.empty());
+    }
+
+    public FetchDataInfo(LogOffsetMetadata fetchOffsetMetadata,
+                         Records records,
+                         boolean firstEntryIncomplete,
+                         Optional<List<FetchResponseData.AbortedTransaction>> abortedTransactions,
+                         Optional<RemoteStorageFetchInfo> delayedRemoteStorageFetch) {
         this.fetchOffsetMetadata = fetchOffsetMetadata;
         this.records = records;
         this.firstEntryIncomplete = firstEntryIncomplete;
         this.abortedTransactions = abortedTransactions;
+        this.delayedRemoteStorageFetch = delayedRemoteStorageFetch;
     }
 
     public static FetchDataInfo empty(long fetchOffset) {
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteLogReadResult.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteLogReadResult.java
new file mode 100644
index 00000000000..06c72ecb80c
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteLogReadResult.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import java.util.Optional;
+
+public class RemoteLogReadResult {
+    public final Optional<FetchDataInfo> fetchDataInfo;
+    public final Optional<Throwable> error;
+
+    public RemoteLogReadResult(Optional<FetchDataInfo> fetchDataInfo, Optional<Throwable> error) {
+        this.fetchDataInfo = fetchDataInfo;
+        this.error = error;
+    }
+}
+
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageFetchInfo.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageFetchInfo.java
new file mode 100644
index 00000000000..7e8752d703a
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageFetchInfo.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.requests.FetchRequest;
+
+public class RemoteStorageFetchInfo {
+
+    public final int fetchMaxBytes;
+    public final boolean minOneMessage;
+    public final TopicPartition topicPartition;
+    public final FetchRequest.PartitionData fetchInfo;
+    public final FetchIsolation fetchIsolation;
+    public final boolean hardMaxBytesLimit;
+
+    public RemoteStorageFetchInfo(int fetchMaxBytes, boolean minOneMessage, TopicPartition topicPartition,
+                                  FetchRequest.PartitionData fetchInfo, FetchIsolation fetchIsolation,
+                                  boolean hardMaxBytesLimit) {
+        this.fetchMaxBytes = fetchMaxBytes;
+        this.minOneMessage = minOneMessage;
+        this.topicPartition = topicPartition;
+        this.fetchInfo = fetchInfo;
+        this.fetchIsolation = fetchIsolation;
+        this.hardMaxBytesLimit = hardMaxBytesLimit;
+    }
+
+    @Override
+    public String toString() {
+        return "RemoteStorageFetchInfo{" +
+                "fetchMaxBytes=" + fetchMaxBytes +
+                ", minOneMessage=" + minOneMessage +
+                ", topicPartition=" + topicPartition +
+                ", fetchInfo=" + fetchInfo +
+                ", fetchIsolation=" + fetchIsolation +
+                ", hardMaxBytesLimit=" + hardMaxBytesLimit +
+                '}';
+    }
+}
diff --git a/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
new file mode 100644
index 00000000000..2b7ae15b154
--- /dev/null
+++ b/storage/src/main/java/org/apache/kafka/storage/internals/log/RemoteStorageThreadPool.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.internals.FatalExitError;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class RemoteStorageThreadPool extends ThreadPoolExecutor {
+    private final Logger logger;
+
+    public RemoteStorageThreadPool(String threadNamePrefix,
+                                   int numThreads,
+                                   int maxPendingTasks) {
+        super(numThreads, numThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(maxPendingTasks),
+                new RemoteStorageThreadFactory(threadNamePrefix));
+        logger = new LogContext() {
+            @Override
+            public String logPrefix() {
+                return "[" + Thread.currentThread().getName() + "]";
+            }
+        }.logger(RemoteStorageThreadPool.class);
+    }
+
+    @Override
+    protected void afterExecute(Runnable runnable, Throwable th) {
+        if (th != null) {
+            if (th instanceof FatalExitError) {
+                logger.error("Stopping the server as it encountered a fatal error.");
+                Exit.exit(((FatalExitError) th).statusCode());
+            } else {
+                if (!isShutdown())
+                    logger.error("Error occurred while executing task: {}", runnable, th);
+            }
+        }
+    }
+
+    private static class RemoteStorageThreadFactory implements ThreadFactory {
+        private final String namePrefix;
+        private final AtomicInteger threadNumber = new AtomicInteger(0);
+
+        RemoteStorageThreadFactory(String namePrefix) {
+            this.namePrefix = namePrefix;
+        }
+
+        @Override
+        public Thread newThread(Runnable r) {
+            return new Thread(r, namePrefix + threadNumber.getAndIncrement());
+        }
+
+    }
+}