You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ar...@apache.org on 2022/09/28 14:28:41 UTC

[impala] 02/03: IMPALA-10791 Add batch reading for remote temporary files

This is an automated email from the ASF dual-hosted git repository.

arawat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 6dfab93fe9cb54aceed0b5203275827980752074
Author: Yida Wu <wy...@gmail.com>
AuthorDate: Tue Oct 26 14:10:53 2021 -0700

    IMPALA-10791 Add batch reading for remote temporary files
    
    The patch adds a feature to batch read from a remote temporary
    file in order to improve the reading performance for the spilled
    remote data.
    
    Originally, the design is to use the local disk file as the buffer
    for batch read from the remote file. But in practice, it
    doesn't help to improve the performance. Therefore, the design
    is changed to use the memory as the read buffer.
    
    Currently, each TmpFileRemote has two DiskFile, one is for the
    remote, and one is for the local buffer. The patch adds MemBlocks
    to the local buffer file. Each local buffer file is divided into
    several MemBlocks evenly. Moreover, in order to guarantee a
    single page not being cut into two parts in different blocks,
    the block size could be a little different to each other in
    practice. The default block size is the minimum value between
    the default file size and
    MAX_REMOTE_READ_MEM_BLOCK_THRESHOLD_BYTES, which is 16MB.
    
    When pinning a page, the system will detect if there is enough
    memory for the block that holds the page. If yes, the block will
    be stored in the memory until all of the pages in the block are
    read or the query ends. If not, we will go reading the page
    directly and disable this block, because it may be good to avoid
    duplicated reads from the remote fs for the same content.
    
    One challenge of the read buffer is where to get the extra memory
    for it, because when impala starts to spill data, it means the
    process lacks of memory to use. By default, impala process will
    reserve 20% of the total system memory as unused memory, and here
    we will use this unused memory for the read buffer because it is
    reasonable to use it for the emergency case like spilling and
    the memory of the read buffer will be returned immediately after
    the use.
    
    For system reliability consideration, we set a restriction that,
    the maximum bytes of the read buffer memory are no more than 10%
    of the total system memory and 50% of the unused memory. Also,
    if the unused memory is less than 5% of the total system memory,
    the read buffer will be disabled.
    
    Two start options have been added for the new feature.
    
    1. remote_batch_read. Default is false. If set true, the batch read
    is enabled.
    2. remote_read_memory_buffer_size. Default is 1G. The maximum memory
    that can be used by the read buffer. The number is also restricted
    by the process memory limit, which can not exceed 10% of the process
    memory limit.
    
    Added metrics ScratchReadsUseMem/ScratchBytesReadUseMem/
    ScratchBytesReadUseLocalDisk to the query profile.
    
    The patch also increases the MAX_REMOTE_TMPFILE_SIZE_THRESHOLD_MB
    from 256 to 512.
    
    Tests:
    Ran core and exhaustive tests.
    Added and ran TmpFileMgrTest::TestBatchReadingFromRemote.
    Added e2e test test_scratch_dirs_batch_reading.
    
    Change-Id: I1dcc5d0881ffaeff09c5c514306cd668373ad31b
    Reviewed-on: http://gerrit.cloudera.org:8080/17979
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/io/CMakeLists.txt          |   2 +
 be/src/runtime/io/disk-file-test.cc       | 149 ++++++++++++
 be/src/runtime/io/disk-file.cc            |  50 ++++
 be/src/runtime/io/disk-file.h             | 363 ++++++++++++++++++++++++++++-
 be/src/runtime/io/disk-io-mgr-test.cc     | 122 ++++++++++
 be/src/runtime/io/disk-io-mgr.cc          | 117 ++++++++--
 be/src/runtime/io/request-context.cc      |  28 +--
 be/src/runtime/io/request-context.h       |  31 +++
 be/src/runtime/io/request-ranges.h        |  43 ++--
 be/src/runtime/io/scan-range.cc           | 123 ++++++----
 be/src/runtime/tmp-file-mgr-internal.h    | 118 ++++++++--
 be/src/runtime/tmp-file-mgr-test.cc       | 175 +++++++++++++-
 be/src/runtime/tmp-file-mgr.cc            | 369 ++++++++++++++++++++++++++----
 be/src/runtime/tmp-file-mgr.h             |  62 ++++-
 be/src/util/mem-info.cc                   |   3 +-
 be/src/util/mem-info.h                    |   3 +-
 be/src/util/metrics.h                     |   4 +-
 common/thrift/metrics.json                |  20 ++
 tests/custom_cluster/test_scratch_disk.py |  42 ++++
 19 files changed, 1667 insertions(+), 157 deletions(-)

diff --git a/be/src/runtime/io/CMakeLists.txt b/be/src/runtime/io/CMakeLists.txt
index 1600c9310..cbc1f1357 100644
--- a/be/src/runtime/io/CMakeLists.txt
+++ b/be/src/runtime/io/CMakeLists.txt
@@ -43,6 +43,7 @@ add_dependencies(Io gen-deps)
 add_library(IoTests STATIC
   data-cache-trace-test.cc
   disk-io-mgr-test.cc
+  disk-file-test.cc
 )
 add_dependencies(IoTests gen-deps)
 
@@ -55,6 +56,7 @@ add_executable(data-cache-trace-replayer data-cache-trace-replayer.cc)
 target_link_libraries(data-cache-trace-replayer ${IMPALA_TEST_LINK_LIBS})
 
 ADD_UNIFIED_BE_LSAN_TEST(disk-io-mgr-test DiskIoMgrTest.*)
+ADD_UNIFIED_BE_LSAN_TEST(disk-file-test DiskFileTest.*)
 ADD_UNIFIED_BE_LSAN_TEST(data-cache-trace-test DataCacheTraceTest.*)
 # Exception to unified be: Custom main function (platform tests)
 ADD_BE_LSAN_TEST(data-cache-test)
diff --git a/be/src/runtime/io/disk-file-test.cc b/be/src/runtime/io/disk-file-test.cc
new file mode 100644
index 000000000..5dc917dae
--- /dev/null
+++ b/be/src/runtime/io/disk-file-test.cc
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "common/names.h"
+#include "runtime/io/disk-io-mgr-internal.h"
+#include "testutil/death-test-util.h"
+
+using namespace std;
+
+namespace impala {
+namespace io {
+class DiskFileTest : public testing::Test {
+ public:
+  void ValidateMemBlockStatus(MemBlockStatus last_status);
+  void ValidateMemBlockStatusTransition(MemBlock& block, MemBlockStatus old_status,
+      MemBlockStatus new_status, bool expect_success);
+};
+
+// last_status is the MemBlock's last status it is going to reach other than
+// MemBlockStatus::DISABLED.
+void DiskFileTest::ValidateMemBlockStatus(MemBlockStatus last_status) {
+  const int block_id = 0;
+  const int64_t block_size = 1024;
+  bool expect_reserved = last_status >= MemBlockStatus::RESERVED;
+  bool expect_alloc = last_status >= MemBlockStatus::ALLOC;
+  bool reserved = false;
+  bool alloc = false;
+  MemBlock block(block_id);
+  ASSERT_TRUE(block.data() == nullptr);
+  ASSERT_TRUE(block.IsStatus(MemBlockStatus::UNINIT));
+  if (last_status == MemBlockStatus::UNINIT) goto end;
+  block.SetStatus(MemBlockStatus::RESERVED);
+  ASSERT_TRUE(block.IsStatus(MemBlockStatus::RESERVED));
+  if (last_status == MemBlockStatus::RESERVED) goto end;
+  {
+    unique_lock<SpinLock> read_buffer_lock(*(block.GetLock()));
+    EXPECT_OK(block.AllocLocked(read_buffer_lock, block_size));
+  }
+  ASSERT_TRUE(block.IsStatus(MemBlockStatus::ALLOC));
+  ASSERT_TRUE(block.data() != nullptr);
+  if (last_status == MemBlockStatus::ALLOC) goto end;
+  ASSERT_EQ(last_status, MemBlockStatus::WRITTEN);
+  memset(block.data(), 1, block_size);
+  block.SetStatus(MemBlockStatus::WRITTEN);
+  ASSERT_TRUE(block.IsStatus(MemBlockStatus::WRITTEN));
+  for (int i = 0; i < block_size; i++) {
+    EXPECT_EQ(block.data()[i], 1);
+  }
+end:
+  block.Delete(&reserved, &alloc);
+  ASSERT_EQ(reserved, expect_reserved);
+  ASSERT_EQ(alloc, expect_alloc);
+  ASSERT_TRUE(block.IsStatus(MemBlockStatus::DISABLED));
+  ASSERT_TRUE(block.data() == nullptr);
+}
+
+void DiskFileTest::ValidateMemBlockStatusTransition(MemBlock& block,
+    MemBlockStatus old_status, MemBlockStatus new_status, bool expect_success) {
+  block.status_ = old_status;
+  if (expect_success) {
+    block.SetStatus(new_status);
+    ASSERT_TRUE(block.IsStatus(new_status));
+  } else {
+    IMPALA_ASSERT_DEBUG_DEATH(block.SetStatus(new_status), "");
+  }
+}
+
+// Test the basic flow of a MemBlock.
+TEST_F(DiskFileTest, MemBlockTest) {
+  ValidateMemBlockStatus(MemBlockStatus::UNINIT);
+  ValidateMemBlockStatus(MemBlockStatus::RESERVED);
+  ValidateMemBlockStatus(MemBlockStatus::ALLOC);
+  ValidateMemBlockStatus(MemBlockStatus::WRITTEN);
+}
+
+// Test the MemBlock status transition.
+TEST_F(DiskFileTest, MemBlockStatusTransition) {
+  MemBlock block(0);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::UNINIT, MemBlockStatus::UNINIT, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::UNINIT, MemBlockStatus::RESERVED, true);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::UNINIT, MemBlockStatus::ALLOC, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::UNINIT, MemBlockStatus::WRITTEN, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::UNINIT, MemBlockStatus::DISABLED, true);
+
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::RESERVED, MemBlockStatus::UNINIT, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::RESERVED, MemBlockStatus::RESERVED, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::RESERVED, MemBlockStatus::ALLOC, true);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::RESERVED, MemBlockStatus::WRITTEN, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::RESERVED, MemBlockStatus::DISABLED, true);
+
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::ALLOC, MemBlockStatus::UNINIT, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::ALLOC, MemBlockStatus::RESERVED, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::ALLOC, MemBlockStatus::ALLOC, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::ALLOC, MemBlockStatus::WRITTEN, true);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::ALLOC, MemBlockStatus::DISABLED, true);
+
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::WRITTEN, MemBlockStatus::UNINIT, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::WRITTEN, MemBlockStatus::RESERVED, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::WRITTEN, MemBlockStatus::ALLOC, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::WRITTEN, MemBlockStatus::WRITTEN, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::WRITTEN, MemBlockStatus::DISABLED, true);
+
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::DISABLED, MemBlockStatus::UNINIT, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::DISABLED, MemBlockStatus::RESERVED, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::DISABLED, MemBlockStatus::ALLOC, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::DISABLED, MemBlockStatus::WRITTEN, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::DISABLED, MemBlockStatus::DISABLED, true);
+}
+} // namespace io
+} // namespace impala
diff --git a/be/src/runtime/io/disk-file.cc b/be/src/runtime/io/disk-file.cc
index fc4f5fe0a..767047567 100644
--- a/be/src/runtime/io/disk-file.cc
+++ b/be/src/runtime/io/disk-file.cc
@@ -67,3 +67,53 @@ DiskFile::DiskFile(const string& path, DiskIoMgr* io_mgr, int64_t file_size,
     space_reserved_.Store(true);
   }
 }
+
+DiskFile::DiskFile(const string& path, DiskIoMgr* io_mgr, int64_t file_size,
+    DiskFileType disk_type, int64_t read_buffer_block_size, int num_read_buffer_blocks)
+  : path_(path),
+    file_size_(file_size),
+    disk_type_(disk_type),
+    file_status_(DiskFileStatus::INWRITING) {
+  DCHECK(disk_type == DiskFileType::LOCAL_BUFFER);
+  hdfs_conn_ = nullptr;
+  space_reserved_.Store(false);
+  file_writer_.reset(new LocalFileWriter(io_mgr, path_.c_str(), file_size));
+  read_buffer_ =
+      std::make_unique<ReadBuffer>(read_buffer_block_size, num_read_buffer_blocks);
+}
+
+DiskFile::ReadBuffer::ReadBuffer(
+    int64_t read_buffer_block_size, int64_t num_read_buffer_blocks)
+  : read_buffer_block_size_(read_buffer_block_size),
+    num_of_read_buffer_blocks_(num_read_buffer_blocks) {
+  page_cnts_per_block_ = std::make_unique<int64_t[]>(num_read_buffer_blocks);
+  read_buffer_block_offsets_ = std::make_unique<int64_t[]>(num_read_buffer_blocks);
+  memset(page_cnts_per_block_.get(), 0, num_read_buffer_blocks * sizeof(int64_t));
+  memset(read_buffer_block_offsets_.get(), DISK_FILE_INVALID_FILE_OFFSET,
+      num_read_buffer_blocks * sizeof(int64_t));
+  for (int i = 0; i < num_read_buffer_blocks; i++) {
+    read_buffer_blocks_.emplace_back(std::make_unique<MemBlock>(i));
+  }
+}
+
+void MemBlock::Delete(bool* reserved, bool* allocated) {
+  DCHECK(reserved != nullptr);
+  DCHECK(allocated != nullptr);
+  *reserved = false;
+  *allocated = false;
+  unique_lock<SpinLock> lock(mem_block_lock_);
+  switch (status_) {
+    case MemBlockStatus::WRITTEN:
+    case MemBlockStatus::ALLOC:
+      // Release the memory.
+      DCHECK(data_ != nullptr);
+      free(data_);
+      data_ = nullptr;
+      *allocated = true;
+    case MemBlockStatus::RESERVED:
+      *reserved = true;
+    default:
+      SetStatusLocked(lock, MemBlockStatus::DISABLED);
+      DCHECK(data_ == nullptr);
+  }
+}
diff --git a/be/src/runtime/io/disk-file.h b/be/src/runtime/io/disk-file.h
index 97112420b..f0ef8ce27 100644
--- a/be/src/runtime/io/disk-file.h
+++ b/be/src/runtime/io/disk-file.h
@@ -25,6 +25,7 @@
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/io/file-writer.h"
 
+#include "util/condition-variable.h"
 #include "util/spinlock.h"
 
 namespace impala {
@@ -36,6 +37,130 @@ class RemoteOperRange;
 class ScanRange;
 class WriteRange;
 
+static const int64_t DISK_FILE_INVALID_FILE_OFFSET = -1;
+
+/// MemBlockStatus indicates the status of a MemBlock.
+/// Normal status change should be: UNINIT -> RESERVED -> ALLOC -> WRITTEN -> DISABLED.
+/// But all status can jump to DISABLED directly.
+/// UNINIT is the default status, indicates the block is not initialized.
+/// RESERVED indicates the memory required by the block is reserved.
+/// ALLOC indicates the memory required by the block is allocated.
+/// WRITTEN indicates the memory is allocated and content has been written to the memory.
+/// DISABLED indicates the MemBlock is disabled, doesn't allow any writes to or reads from
+/// the block. It is a final state, and no memory should be allocated or reserved.
+enum class MemBlockStatus { UNINIT, RESERVED, ALLOC, WRITTEN, DISABLED };
+
+/// Each MemBlock can contain multiple pages, and be used as the buffer to read multiple
+/// pages at a time from the DiskFile.
+/// The caller may need to maintain the status of the MemBlock and make sure the block is
+/// used under the correct status.
+class MemBlock {
+ public:
+  MemBlock(int block_id) : block_id_(block_id), status_(MemBlockStatus::UNINIT) {}
+  virtual ~MemBlock() {
+    // Must be MemBlockStatus::DISABLED before destruction.
+    DCHECK_EQ(static_cast<int>(status_), static_cast<int>(MemBlockStatus::DISABLED));
+    DCHECK(data_ == nullptr);
+  }
+
+  // Release the memory if it is allocated.
+  // The MemBlock status will be set to MemBlockStatus::DISABLED after deletion.
+  // Return to the caller whether the memory is reserved or allocated before deletion.
+  // Must be called before the MemBlock destruction.
+  void Delete(bool* reserved, bool* alloc);
+
+  // Allocate the memory for the MemBlock.
+  // Status must be MemBlockStatus::RESERVED before allocation.
+  // If successfully allocated, the status will be set to MemBlockStatus::ALLOC.
+  Status AllocLocked(const std::unique_lock<SpinLock>& lock, int64_t size) {
+    DCHECK(lock.mutex() == &mem_block_lock_ && lock.owns_lock());
+    DCHECK_EQ(static_cast<int>(status_), static_cast<int>(MemBlockStatus::RESERVED));
+    // Use malloc, could be better to alloc from a buffer pool.
+    data_ = static_cast<uint8_t*>(malloc(size));
+    if (UNLIKELY(data_ == nullptr)) {
+      return Status(strings::Substitute("Couldn't allocate memory for a memory block, "
+                                        "block size: '$0' bytes",
+          size));
+    }
+    SetStatusLocked(lock, MemBlockStatus::ALLOC);
+    return Status::OK();
+  }
+
+  uint8_t* data() { return data_; }
+
+  MemBlockStatus GetStatus() {
+    std::unique_lock<SpinLock> l(mem_block_lock_);
+    return status_;
+  }
+
+  bool IsStatus(MemBlockStatus status) {
+    std::unique_lock<SpinLock> l(mem_block_lock_);
+    return IsStatusLocked(l, status);
+  }
+
+  bool IsStatusLocked(const std::unique_lock<SpinLock>& lock, MemBlockStatus status) {
+    DCHECK(lock.mutex() == &mem_block_lock_ && lock.owns_lock());
+    return status_ == status;
+  }
+
+  void SetStatus(MemBlockStatus status) {
+    std::unique_lock<SpinLock> l(mem_block_lock_);
+    SetStatusLocked(l, status);
+  }
+
+  void SetStatusLocked(const std::unique_lock<SpinLock>& lock, MemBlockStatus status) {
+    DCHECK(lock.mutex() == &mem_block_lock_ && lock.owns_lock());
+    SetInternalStatus(status);
+  }
+
+  /// Return the lock of the memory block.
+  SpinLock* GetLock() { return &mem_block_lock_; }
+
+  /// Return the block id.
+  int block_id() { return block_id_; }
+
+ private:
+  friend class TmpFileRemote;
+  friend class RemoteOperRange;
+  friend class DiskFileTest;
+
+  /// Caller should hold the lock.
+  void SetInternalStatus(MemBlockStatus new_status) {
+    switch (new_status) {
+      case MemBlockStatus::RESERVED: {
+        DCHECK(status_ == MemBlockStatus::UNINIT);
+        break;
+      }
+      case MemBlockStatus::ALLOC: {
+        DCHECK(status_ == MemBlockStatus::RESERVED);
+        break;
+      }
+      case MemBlockStatus::WRITTEN: {
+        DCHECK(status_ == MemBlockStatus::ALLOC);
+        break;
+      }
+      case MemBlockStatus::DISABLED: {
+        break;
+      }
+      default:
+        DCHECK(false) << "Invalid memory block status: " << static_cast<int>(new_status);
+    }
+    status_ = new_status;
+  }
+
+  /// The id of the memory block.
+  const int block_id_;
+
+  /// Protect the members below.
+  SpinLock mem_block_lock_;
+
+  /// The status of the memory block.
+  MemBlockStatus status_;
+
+  /// The data of the memory block, may contain multiple pages.
+  uint8_t* data_ = nullptr;
+};
+
 /// DiskFileType indicates the type of the file handled by the DiskFile.
 /// LOCAL indicates the file is in the local filesystem.
 /// LOCAL_BUFFER indicates the file is used as a buffer in the local filesystem.
@@ -64,8 +189,46 @@ class DiskFile {
   DiskFile(const std::string& path, DiskIoMgr* io_mgr, int64_t file_size,
       DiskFileType disk_type, const hdfsFS* hdfs_conn = nullptr);
 
+  /// Constructor for a file with read buffers.
+  DiskFile(const std::string& path, DiskIoMgr* io_mgr, int64_t file_size,
+      DiskFileType disk_type, int64_t read_buffer_size, int num_read_buffer_blocks);
+
   virtual ~DiskFile() {}
 
+  /// The ReadBuffer is designed for batch reading. Each ReadBuffer belongs to one
+  /// DiskFile, and contains multiple read buffer blocks which are divided from the
+  /// DiskFile by the block size. Each block contains multiple pages.
+  /// When reading a page from the read buffer, we firstly use the offset of the page
+  /// to calculate which block contains the page, then see whether the block is
+  /// available or not. If it is available, the caller can read the page from the block.
+  /// The block is only available after a fetch, which is triggered in TmpFileMgr.
+  /// The default size of a read buffer block is fixed and the number of the block per
+  /// disk file is the default file size divided by the default block size.
+  struct ReadBuffer {
+    ReadBuffer(int64_t read_buffer_block_size, int64_t num_read_buffer_blocks);
+
+    /// The default read buffer block size.
+    const int64_t read_buffer_block_size_;
+
+    /// The number of read buffer blocks per disk file.
+    const int64_t num_of_read_buffer_blocks_;
+
+    /// Each read buffer is a memory block, therefore, the size of read_buffer_blocks_ is
+    /// num_of_read_buffer_blocks_.
+    std::vector<std::unique_ptr<MemBlock>> read_buffer_blocks_;
+
+    /// Protect below members.
+    SpinLock read_buffer_ctrl_lock_;
+
+    /// The statistics for the page number for each read buffer block.
+    /// The size of page_cnts_per_block_ is num_of_read_buffer_blocks_.
+    std::unique_ptr<int64_t[]> page_cnts_per_block_;
+
+    /// The start offsets of each read buffer block to the whole file.
+    /// The size of read_buffer_block_offsets_ is num_of_read_buffer_blocks_.
+    std::unique_ptr<int64_t[]> read_buffer_block_offsets_;
+  };
+
   // Delete the physical file. Caller should hold the exclusive file lock.
   Status Delete(const std::unique_lock<boost::shared_mutex>& lock);
 
@@ -81,7 +244,7 @@ class DiskFile {
   int64_t file_size() const { return file_size_; }
 
   /// Return the actual size of the file.
-  int64_t actual_file_size() const { return actual_file_size_.Load(); }
+  int64_t actual_file_size() { return actual_file_size_.Load(); }
 
   /// If return True, the file is persisted.
   /// The caller should hold the status lock.
@@ -101,6 +264,9 @@ class DiskFile {
     return GetFileStatusLocked(l) == DiskFileStatus::DELETED;
   }
 
+  /// If True, the file is to be deleted.
+  bool is_to_delete() { return to_delete_.Load(); }
+
   /// Set the status of the DiskFile. Caller should not hold the status lock.
   void SetStatus(DiskFileStatus status) {
     std::unique_lock<SpinLock> l(status_lock_);
@@ -114,6 +280,9 @@ class DiskFile {
     SetInternalStatus(status);
   }
 
+  /// Set the flag of to_delete.
+  void SetToDeleteFlag(bool to_delete = true) { to_delete_.Store(to_delete); }
+
   /// Returns the status of the file.
   /// The caller should not hold the status lock.
   DiskFileStatus GetFileStatus() {
@@ -133,15 +302,182 @@ class DiskFile {
   void SetSpaceReserved() { space_reserved_.Store(true); }
   bool IsSpaceReserved() { return space_reserved_.Load(); }
 
-  /// Set actual file size. Should only be called by the TmpFileRemote::AllocateSpace()
-  /// right after the allocation is at capacity, and the function should only be called
-  /// once during the lifetime of the DiskFile.
+  /// Set actual file size.
+  /// The function should only be called once during the lifetime of the DiskFile.
   void SetActualFileSize(int64_t size) {
     DCHECK_EQ(0, actual_file_size_.Load());
     DCHECK_LE(file_size_, size);
     actual_file_size_.Store(size);
   }
 
+  // Update the metadata of read buffer if the file is batch read enabled.
+  // The metadata of read buffer is set when the file is written, because each page may
+  // have different sizes, so for each read buffer block, the number of pages and the
+  // start offset of a block could be different. By updating the metadata, these
+  // information would be recorded.
+  void UpdateReadBufferMetaDataIfNeeded(int64_t offset) {
+    if (!IsBatchReadEnabled()) return;
+    int64_t par_idx = GetReadBufferIndex(offset);
+    DCheckReadBufferIdx(par_idx);
+    std::lock_guard<SpinLock> lock(read_buffer_->read_buffer_ctrl_lock_);
+    read_buffer_->page_cnts_per_block_[par_idx]++;
+    int64_t cur_offset = read_buffer_->read_buffer_block_offsets_[par_idx];
+    if (cur_offset == DISK_FILE_INVALID_FILE_OFFSET || offset < cur_offset) {
+      read_buffer_->read_buffer_block_offsets_[par_idx] = offset;
+    }
+  }
+
+  // Return the index of the buffer block by the file offset.
+  int GetReadBufferIndex(int64_t offset) {
+    int read_buffer_idx = offset / read_buffer_block_size();
+    if (read_buffer_idx >= num_of_read_buffers()) {
+      // Because the offset could be a little over the default file size, the index
+      // could equal to the max number of read buffers, but can't be more than it.
+      DCHECK(read_buffer_idx == num_of_read_buffers());
+      read_buffer_idx = num_of_read_buffers() - 1;
+    }
+    DCheckReadBufferIdx(read_buffer_idx);
+    return read_buffer_idx;
+  }
+
+  // Return the start offset by the index of the buffer block.
+  int64_t GetReadBuffStartOffset(int buffer_idx) {
+    DCheckReadBufferIdx(buffer_idx);
+    std::lock_guard<SpinLock> lock(read_buffer_->read_buffer_ctrl_lock_);
+    int64_t offset = read_buffer_->read_buffer_block_offsets_[buffer_idx];
+    DCHECK(offset != DISK_FILE_INVALID_FILE_OFFSET);
+    return offset;
+  }
+
+  // Return the actual size of the specific read buffer block.
+  int64_t GetReadBuffActualSize(int buffer_idx) {
+    DCheckReadBufferIdx(buffer_idx);
+    std::lock_guard<SpinLock> lock(read_buffer_->read_buffer_ctrl_lock_);
+    int64_t cur_offset = read_buffer_->read_buffer_block_offsets_[buffer_idx];
+    DCHECK(cur_offset != DISK_FILE_INVALID_FILE_OFFSET);
+    while (buffer_idx != num_of_read_buffers() - 1) {
+      DCHECK_LT(buffer_idx, num_of_read_buffers() - 1);
+      int64_t nxt_offset = read_buffer_->read_buffer_block_offsets_[buffer_idx + 1];
+      if (nxt_offset != DISK_FILE_INVALID_FILE_OFFSET) return nxt_offset - cur_offset;
+      buffer_idx++;
+    }
+    int64_t actual_file_size = actual_file_size_.Load();
+    DCHECK_GT(actual_file_size, 0);
+    return actual_file_size - cur_offset;
+  }
+
+  // Return the number of the page count in the read buffer block.
+  int64_t GetReadBuffPageCount(int buffer_idx) {
+    DCheckReadBufferIdx(buffer_idx);
+    std::lock_guard<SpinLock> lock(read_buffer_->read_buffer_ctrl_lock_);
+    return read_buffer_->page_cnts_per_block_[buffer_idx];
+  }
+
+  // Return the read buffer block.
+  MemBlock* GetBufferBlock(int index) {
+    DCheckReadBufferIdx(index);
+    return read_buffer_->read_buffer_blocks_[index].get();
+  }
+
+  // Return the lock of the read buffer block.
+  SpinLock* GetBufferBlockLock(int index) {
+    DCheckReadBufferIdx(index);
+    return read_buffer_->read_buffer_blocks_[index]->GetLock();
+  }
+
+  // Check if there is an available local memory buffer for the specific offset.
+  // Caller should hold the physical lock of the disk file in case the object is
+  // destroyed. But the caller should not hold the lock of the memory block because the
+  // IsStatus() would require the lock.
+  bool CanReadFromReadBuffer(
+      const boost::shared_lock<boost::shared_mutex>& lock, int64_t offset) {
+    if (!IsBatchReadEnabled()) return false;
+    DCHECK(lock.mutex() == &physical_file_lock_ && lock.owns_lock());
+    MemBlock* read_buffer_block = GetBufferBlock(GetReadBufferIndex(offset));
+    return read_buffer_block != nullptr
+        && read_buffer_block->IsStatus(MemBlockStatus::WRITTEN);
+  }
+
+  // Return if batch reading is enabled.
+  bool IsBatchReadEnabled() { return read_buffer_ != nullptr; }
+
+  void DCheckMemBlock(const boost::shared_lock<boost::shared_mutex>& file_lock,
+      MemBlock* read_buffer_block) {
+    DCHECK(file_lock.mutex() == &physical_file_lock_ && file_lock.owns_lock());
+    DCHECK(read_buffer_block != nullptr);
+  }
+
+  // Read the spilled data from the memory buffer.
+  // Caller should hold the physical file lock of the disk file in case the object is
+  // destroyed. Also, caller should guarantee the buffer won't be released during reading,
+  // it is good for the caller to have the lock of the read buffer block.
+  Status ReadFromMemBuffer(int64_t offset_to_file, int64_t len, uint8_t* dst,
+      const boost::shared_lock<boost::shared_mutex>& file_lock) {
+    DCHECK(file_lock.mutex() == &physical_file_lock_ && file_lock.owns_lock());
+    int64_t idx = GetReadBufferIndex(offset_to_file);
+    DCheckReadBufferIdx(idx);
+    uint8_t* read_buffer_block = read_buffer_->read_buffer_blocks_[idx]->data();
+    DCHECK(read_buffer_block != nullptr);
+    int64_t offset_to_block = offset_to_file - GetReadBuffStartOffset(idx);
+    DCHECK_GE(offset_to_block, 0);
+    DCHECK_GE(GetReadBuffActualSize(idx), offset_to_block + len);
+    memcpy(dst, read_buffer_block + offset_to_block, len);
+    return Status::OK();
+  }
+
+  // Helper function to allocate the memory for a reading buffer block.
+  // Caller should hold both physical_file_lock_ and the lock of the read buffer block.
+  Status AllocReadBufferBlockLocked(MemBlock* read_buffer_block, int64_t size,
+      const boost::shared_lock<boost::shared_mutex>& file_lock,
+      const std::unique_lock<SpinLock>& block_lock) {
+    DCheckMemBlock(file_lock, read_buffer_block);
+    return read_buffer_block->AllocLocked(block_lock, size);
+  }
+
+  // Helper function to set the status of a memory block.
+  // Caller should hold the physical_file_lock_.
+  // If the caller holds the lock of the read buffer block, SetStatusLocked() will be
+  // called.
+  void SetReadBufferBlockStatus(MemBlock* read_buffer_block, MemBlockStatus status,
+      const boost::shared_lock<boost::shared_mutex>& file_lock,
+      const std::unique_lock<SpinLock>* block_lock = nullptr) {
+    DCheckMemBlock(file_lock, read_buffer_block);
+    if (block_lock == nullptr) {
+      read_buffer_block->SetStatus(status);
+    } else {
+      read_buffer_block->SetStatusLocked(*block_lock, status);
+    }
+  }
+
+  // Helper function to check the status of a read buffer block.
+  // Caller should hold the physical_file_lock_.
+  // If the caller holds the lock of the read buffer block, IsStatusLocked() will be
+  // called.
+  bool IsReadBufferBlockStatus(MemBlock* read_buffer_block, MemBlockStatus status,
+      const boost::shared_lock<boost::shared_mutex>& file_lock,
+      const std::unique_lock<SpinLock>* block_lock = nullptr) {
+    DCheckMemBlock(file_lock, read_buffer_block);
+    if (block_lock == nullptr) return read_buffer_block->IsStatus(status);
+    return read_buffer_block->IsStatusLocked(*block_lock, status);
+  }
+
+  // Helper function to delete the read buffer block.
+  // Caller should hold the physical_file_lock_, but should not hold the lock of the
+  // read buffer block, because Delete() will hold the read buffer block's lock.
+  template <typename T>
+  void DeleteReadBuffer(
+      MemBlock* read_buffer_block, bool* reserved, bool* alloc, const T& file_lock) {
+    DCHECK(file_lock.mutex() == &physical_file_lock_ && file_lock.owns_lock());
+    DCHECK(read_buffer_block != nullptr);
+    return read_buffer_block->Delete(reserved, alloc);
+  }
+
+  // Return the number of read buffer blocks of this DiskFile.
+  int64_t num_of_read_buffers() { return read_buffer_->num_of_read_buffer_blocks_; }
+
+  // Return the default size of a read buffer.
+  int64_t read_buffer_block_size() { return read_buffer_->read_buffer_block_size_; }
+
  private:
   friend class RemoteOperRange;
   friend class ScanRange;
@@ -183,11 +519,18 @@ class DiskFile {
   /// avoid a deadlock.
   /// The lock order of file lock and status lock (above) should be file lock acquired
   /// first.
+  /// If the disk file has the memory blocks, the lock also protects them from
+  /// destruction.
   boost::shared_mutex physical_file_lock_;
 
   /// The hdfs connection used to connect to the remote scratch path.
   hdfsFS hdfs_conn_;
 
+  /// to_delete_ is set to true if the file is to be deleted.
+  /// It is a flag for the deleting thread to fetch the unique file lock and
+  /// ask the current lock holder to yield.
+  AtomicBool to_delete_{false};
+
   /// Specify if the file's space is reserved to be allowed to write to the filesystem
   /// because the filesystem may reach the size limit and needs some time before it can
   /// release space for new writes to the filesystem, so the space reserved indicator is
@@ -229,6 +572,10 @@ class DiskFile {
   /// like S3, it is set after a successful upload.
   AtomicInt64 actual_file_size_{0};
 
+  /// The read buffer for the disk file, would be a nullptr if batch reading is not
+  /// enabled.
+  std::unique_ptr<ReadBuffer> read_buffer_;
+
   /// Internal setter to set the status.
   /// The status is from INWRITING -> PERSISTED -> DELETED, which should not be a
   /// reverse transition.
@@ -258,6 +605,14 @@ class DiskFile {
 
   /// Return the status lock of the file.
   SpinLock* GetStatusLock() { return &status_lock_; }
+
+  /// Helper function to DCHECK if the read buffer control is not NULL and if the buffer
+  /// index is valid.
+  void DCheckReadBufferIdx(int buffer_idx) {
+    DCHECK(read_buffer_ != nullptr);
+    DCHECK_LT(buffer_idx, read_buffer_->num_of_read_buffer_blocks_);
+    DCHECK_GE(buffer_idx, 0);
+  }
 };
 } // namespace io
 } // namespace impala
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
index 4dab7af3f..52741f0f8 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -2479,5 +2479,127 @@ TEST_F(DiskIoMgrTest, WriteToRemoteDiffPagesSuccess) {
   tmp_file_grp->Close();
   io_mgr.UnregisterContext(io_ctx.get());
 }
+
+// Delete the physical remote file after upload.
+TEST_F(DiskIoMgrTest, WriteToRemoteFileDeleted) {
+  num_oper_ = 0;
+  num_ranges_written_ = 0;
+  string remote_file_path = REMOTE_URL + "/test";
+  string local_buffer_path = LOCAL_BUFFER_PATH + "/test";
+  FLAGS_remote_tmp_file_size = "1K";
+  int64_t file_size = 1024;
+  int64_t block_size = 1024;
+
+  // Delete the hdfs file if it exists.
+  hdfsDelete(hdfsConnect("default", 0), remote_file_path.c_str(), 1);
+
+  // Delete the file in local file system if it exists.
+  vector<string> local_buffer_path_vec;
+  local_buffer_path_vec.push_back(local_buffer_path);
+  Status rm_status = FileSystemUtil::RemovePaths(local_buffer_path_vec);
+
+  TmpFileMgr tmp_file_mgr;
+  DiskIoMgr io_mgr(1, 1, 1, 1, 10);
+  ASSERT_OK(io_mgr.Init());
+  TmpFileGroup* tmp_file_grp = NewRemoteFileGroup(&tmp_file_mgr, &io_mgr);
+  ASSERT_TRUE(tmp_file_grp != nullptr);
+
+  ObjectPool tmp_pool;
+  unique_ptr<RequestContext> io_ctx = io_mgr.RegisterContext();
+
+  TmpFileRemote tmp_file(
+      tmp_file_grp, 0, remote_file_path, local_buffer_path, false, REMOTE_URL.c_str());
+  DiskFile* remote_file = tmp_file.DiskFile();
+  DiskFile* local_buffer_file = tmp_file.DiskBufferFile();
+  tmp_file.GetWriteFile()->SetActualFileSize(file_size);
+
+  // Write some data for testing.
+  size_t write_size_len = sizeof(int32_t);
+  vector<WriteRange*> ranges;
+  vector<int32_t> datas;
+  for (int i = 0; i < file_size / write_size_len; i++) {
+    int32_t* data = tmp_pool.Add(new int32_t);
+    *data = rand();
+    datas.push_back(*data);
+    WriteRange** new_range = tmp_pool.Add(new WriteRange*);
+    WriteRange::WriteDoneCallback callback = [=](const Status& status) {
+      ASSERT_EQ(0, status.code());
+      lock_guard<mutex> l(written_mutex_);
+      num_ranges_written_ = 1;
+      writes_done_.NotifyOne();
+    };
+
+    *new_range = tmp_pool.Add(new WriteRange(remote_file_path, 0, 0, callback));
+    (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
+    (*new_range)->SetDiskFile(tmp_file.GetWriteFile());
+    ranges.push_back(*new_range);
+    EXPECT_OK(io_ctx->AddWriteRange(*new_range));
+    {
+      unique_lock<mutex> lock(written_mutex_);
+      while (num_ranges_written_ < 1) writes_done_.Wait(lock);
+    }
+    num_ranges_written_ = 0;
+    if (i == file_size / write_size_len - 1) {
+      tmp_file.SetAtCapacity();
+    }
+  }
+
+  auto disk_id = io_mgr.RemoteDfsDiskFileOperId();
+  bool upload_ok = false;
+  RemoteOperRange::RemoteOperDoneCallback callback = [&](const Status& status) {
+    upload_ok = status.ok();
+    lock_guard<mutex> l(oper_mutex_);
+    num_oper_ = 1;
+    oper_done_.NotifyOne();
+  };
+
+  // Request to upload the file to the remote.
+  auto oper_range = tmp_pool.Add(new RemoteOperRange(local_buffer_file, remote_file,
+      block_size, disk_id, RequestType::FILE_UPLOAD, &io_mgr, callback));
+  Status add_status = io_ctx->AddRemoteOperRange(oper_range);
+  ASSERT_OK(add_status);
+
+  // Wait until the file is created before calling the deletion.
+  while (!HdfsFileExist(remote_file_path)) {
+    usleep(rand() % 1000);
+  }
+  // Delete the file to create the failure.
+  hdfsDelete(hdfsConnect("default", 0), remote_file_path.c_str(), 1);
+
+  {
+    unique_lock<mutex> lock(oper_mutex_);
+    while (num_oper_ < 1) oper_done_.Wait(lock);
+  }
+
+  // If any chance the file is deleted and cause an upload failure, it won't lead to a
+  // crash. Otherwise the upload succeeds, and we should meet a failure during reading
+  // due to the deletion of the remote file.
+  EXPECT_FALSE(HdfsFileExist(remote_file_path));
+  if (upload_ok) {
+    // TryEvictFile and the local buffer file should be evicted.
+    Status try_evict_status = tmp_file_mgr.TryEvictFile(&tmp_file);
+    ASSERT_TRUE(try_evict_status.ok());
+
+    // None of the files should exist.
+    EXPECT_FALSE(FileExist(local_buffer_path));
+
+    // Should fail reading the first range.
+    ScanRange* scan_range = tmp_pool.Add(new ScanRange);
+    auto range = ranges.at(0);
+    size_t buffer_len = sizeof(int32_t);
+    vector<uint8_t> client_buffer(buffer_len);
+    scan_range->Reset(hdfsConnect("default", 0), range->file(), range->len(),
+        range->offset(), 0, false, 1000000,
+        BufferOpts::ReadInto(client_buffer.data(), buffer_len, BufferOpts::NO_CACHING),
+        nullptr, tmp_file.DiskFile(), tmp_file.DiskBufferFile());
+    bool needs_buffers;
+    ASSERT_OK(io_ctx->StartScanRange(scan_range, &needs_buffers));
+    unique_ptr<BufferDescriptor> io_buffer;
+    EXPECT_FALSE(scan_range->GetNext(&io_buffer).ok());
+  }
+  num_oper_ = 0;
+  tmp_file_grp->Close();
+  io_mgr.UnregisterContext(io_ctx.get());
+}
 }
 }
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 7195ad3a3..41a2e64d4 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -263,6 +263,7 @@ Status WriteRange::DoWrite() {
     ret_status = file_writer->Open();
     if (!ret_status.ok()) return DoWriteEnd(queue, ret_status);
     ret_status = file_writer->Write(this, &written_bytes);
+    disk_file_->UpdateReadBufferMetaDataIfNeeded(written_bytes - len_);
     int64_t actual_file_size = disk_file_->actual_file_size();
     // actual_file_size is only set once, otherwise it is 0 by default. If it is still
     // not set, it is impossible to be full.
@@ -296,28 +297,23 @@ Status WriteRange::DoWriteEnd(DiskQueue* queue, const Status& ret_status) {
 
 RemoteOperRange::RemoteOperRange(DiskFile* src_file, DiskFile* dst_file,
     int64_t block_size, int disk_id, RequestType::type type, DiskIoMgr* io_mgr,
-    RemoteOperDoneCallback callback)
-  : RequestRange(type, disk_id),
+    RemoteOperDoneCallback callback, int64_t file_offset)
+  : RequestRange(type, disk_id, file_offset),
     callback_(callback),
     io_mgr_(io_mgr),
     disk_file_src_(src_file),
     disk_file_dst_(dst_file),
     block_size_(block_size) {}
 
-Status RemoteOperRange::DoOper(uint8_t* buffer, int64_t buffer_size) {
-  DCHECK(request_type() == RequestType::FILE_UPLOAD);
-  return DoUpload(buffer, buffer_size);
-}
-
 Status RemoteOperRange::DoUpload(uint8_t* buffer, int64_t buffer_size) {
   DCHECK(disk_file_src_ != nullptr);
   DCHECK(disk_file_dst_ != nullptr);
   hdfsFS hdfs_conn = disk_file_dst_->hdfs_conn_;
-  int64_t file_size = disk_file_src_->actual_file_size_.Load();
+  int64_t file_size = disk_file_src_->actual_file_size();
   DCHECK(hdfs_conn != nullptr);
   DCHECK(file_size != 0);
-  const char* remote_file_path = disk_file_dst_->path().c_str();
-  const char* local_file_path = disk_file_src_->path().c_str();
+  const string& remote_file_path = disk_file_dst_->path();
+  const string& local_file_path = disk_file_src_->path();
   DiskQueue* queue = io_mgr_->disk_queues_[disk_id_];
   Status status = Status::OK();
   int64_t ret, offset = 0;
@@ -337,9 +333,9 @@ Status RemoteOperRange::DoUpload(uint8_t* buffer, int64_t buffer_size) {
   }
 
   RETURN_IF_ERROR(io_mgr_->local_file_system_->OpenForRead(
-      local_file_path, O_RDONLY, S_IRUSR | S_IWUSR, &local_file));
+      local_file_path.c_str(), O_RDONLY, S_IRUSR | S_IWUSR, &local_file));
   hdfsFile remote_hdfs_file =
-      hdfsOpenFile(hdfs_conn, remote_file_path, O_WRONLY, 0, 0, buffer_size);
+      hdfsOpenFile(hdfs_conn, remote_file_path.c_str(), O_WRONLY, 0, 0, buffer_size);
 
   if (remote_hdfs_file == nullptr) {
     status = Status(TErrorCode::DISK_IO_ERROR, GetBackendString(),
@@ -350,9 +346,12 @@ Status RemoteOperRange::DoUpload(uint8_t* buffer, int64_t buffer_size) {
   /// Read the blocks from the local buffer file and write the blocks
   /// to the remote file.
   while (file_size != offset) {
+    // If to_delete flag is set, we will quit the upload process, close the local file
+    // but leave the deletion work to the thread which sets the to_delete flag.
+    if (disk_file_src_->is_to_delete()) goto end;
     int bytes = min(file_size - offset, buffer_size);
-    status =
-        io_mgr_->local_file_system_->Fread(local_file, buffer, bytes, local_file_path);
+    status = io_mgr_->local_file_system_->Fread(
+        local_file, buffer, bytes, local_file_path.c_str());
     if (!status.ok()) goto end;
     {
       ScopedHistogramTimer write_timer(queue->write_latency());
@@ -377,13 +376,15 @@ end:
     ScopedHistogramTimer write_timer(queue->write_latency());
     if (hdfsCloseFile(hdfs_conn, remote_hdfs_file) != 0) {
       // Try to close the local file if error happens.
-      RETURN_IF_ERROR(io_mgr_->local_file_system_->Fclose(local_file, local_file_path));
+      RETURN_IF_ERROR(
+          io_mgr_->local_file_system_->Fclose(local_file, local_file_path.c_str()));
       return Status(TErrorCode::DISK_IO_ERROR, GetBackendString(),
           Substitute(
               "Failed to close HDFS file: $0", remote_file_path, GetHdfsErrorMsg("")));
     }
   }
-  RETURN_IF_ERROR(io_mgr_->local_file_system_->Fclose(local_file, local_file_path));
+  RETURN_IF_ERROR(
+      io_mgr_->local_file_system_->Fclose(local_file, local_file_path.c_str()));
   if (status.ok()) {
     disk_file_dst_->SetStatus(io::DiskFileStatus::PERSISTED);
     disk_file_dst_->SetActualFileSize(file_size);
@@ -393,6 +394,80 @@ end:
   return status;
 }
 
+Status RemoteOperRange::DoFetch() {
+  hdfsFS hdfs_conn = disk_file_src_->hdfs_conn_;
+  DCHECK(hdfs_conn != nullptr);
+  // Fetch the data from the source file (remote) to the destination file (local).
+  DCHECK(disk_file_dst_ != nullptr);
+  DCHECK(disk_file_src_ != nullptr);
+  int64_t buffer_idx = disk_file_dst_->GetReadBufferIndex(offset_);
+  int64_t local_file_size = disk_file_dst_->GetReadBuffActualSize(buffer_idx);
+  const string& remote_file_path = disk_file_src_->path();
+  DiskQueue* queue = io_mgr_->disk_queues_[disk_id_];
+  Status status = Status::OK();
+
+  // Get the shared lock to prevent the physical files from deletion during the fetching.
+  // The sequence is to get the local file lock, then remote file lock, or it might meet
+  // deadlocks.
+  shared_lock<shared_mutex> dstl(disk_file_dst_->physical_file_lock_);
+  shared_lock<shared_mutex> srcl(disk_file_src_->physical_file_lock_);
+
+  // Check if the remote file is deleted.
+  auto src_status = disk_file_src_->GetFileStatus();
+  if (src_status != io::DiskFileStatus::PERSISTED) {
+    DCHECK(src_status == io::DiskFileStatus::DELETED);
+    return Status(Substitute("File has been deleted, path: '$0'", remote_file_path));
+  }
+
+  unique_lock<SpinLock> read_buffer_lock(
+      *(disk_file_dst_->GetBufferBlockLock(buffer_idx)));
+  MemBlock* read_buffer_bloc = disk_file_dst_->GetBufferBlock(buffer_idx);
+  if (disk_file_dst_->IsReadBufferBlockStatus(
+          read_buffer_bloc, MemBlockStatus::DISABLED, dstl, &read_buffer_lock)) {
+    // If the read block is disabled, the status doesn't allow any writes to
+    // the block, probably the query ends or is cancelled.
+    return Status(Substitute(
+        "Mem block '$0' has been deleted, path: '$1'", buffer_idx, remote_file_path));
+  }
+  RETURN_IF_ERROR(disk_file_dst_->AllocReadBufferBlockLocked(
+      read_buffer_bloc, local_file_size, dstl, read_buffer_lock));
+  DCHECK(read_buffer_bloc->data() != nullptr);
+  hdfsFile remote_hdfs_file =
+      hdfsOpenFile(hdfs_conn, remote_file_path.c_str(), O_RDONLY, 0, 0, block_size_);
+  if (remote_hdfs_file == nullptr) {
+    status = Status(TErrorCode::DISK_IO_ERROR, GetBackendString(),
+        Substitute("Could not open file: $0: $1", remote_file_path, GetStrErrMsg()));
+  } else {
+    int ret = [&]() {
+      ScopedHistogramTimer read_timer(queue->read_latency());
+      return hdfsPreadFully(hdfs_conn, remote_hdfs_file, offset_,
+          read_buffer_bloc->data(), local_file_size);
+    }();
+    if (ret != -1) {
+      queue->read_size()->Update(local_file_size);
+      disk_file_dst_->SetReadBufferBlockStatus(
+          read_buffer_bloc, MemBlockStatus::WRITTEN, dstl, &read_buffer_lock);
+    } else {
+      // The caller may need to handle the error, and deal with the read buffer block.
+      status = Status(TErrorCode::DISK_IO_ERROR, GetBackendString(),
+          GetHdfsErrorMsg("Error reading from HDFS file: ", remote_file_path));
+    }
+  }
+
+  // Try to close the remote file.
+  if (remote_hdfs_file != nullptr && hdfsCloseFile(hdfs_conn, remote_hdfs_file) != 0) {
+    // If there was an error during reading, keep the old status.
+    string close_err_msg = Substitute(
+        "Failed to close HDFS file: $0", remote_file_path, GetHdfsErrorMsg(""));
+    if (status.ok()) {
+      return Status(TErrorCode::DISK_IO_ERROR, GetBackendString(), close_err_msg);
+    } else {
+      LOG(WARNING) << close_err_msg;
+    }
+  }
+  return status;
+}
+
 static void CheckSseSupport() {
   if (!CpuInfo::IsSupported(CpuInfo::SSE4_2)) {
     LOG(WARNING) << "This machine does not support sse4_2.  The default IO system "
@@ -690,7 +765,7 @@ int64_t DiskIoMgr::ComputeIdealBufferReservation(int64_t scan_range_len) {
 // Work is available if there is a RequestContext with
 //  - A ScanRange with a buffer available, or
 //  - A WriteRange in unstarted_write_ranges_ or
-//  - A RemoteOperRange in unstarted_remote_upload_ranges_
+//  - A RemoteOperRange in unstarted_remote_file_op_ranges_.
 RequestRange* DiskQueue::GetNextRequestRange(RequestContext** request_context) {
   // This loops returns either with work to do or when the disk IoMgr shuts down.
   while (true) {
@@ -766,12 +841,18 @@ void DiskQueue::DiskThreadLoop(DiskIoMgr* io_mgr) {
                                 "block size: '$0'",
                   size)));
         } else {
-          Status oper_status = oper_range->DoOper(buffer, size);
+          Status oper_status = oper_range->DoUpload(buffer, size);
           worker_context->OperDone(oper_range, oper_status);
           free(buffer);
         }
         break;
       }
+      case RequestType::FILE_FETCH: {
+        RemoteOperRange* oper_range = static_cast<RemoteOperRange*>(range);
+        Status oper_status = oper_range->DoFetch();
+        worker_context->OperDone(oper_range, oper_status);
+        break;
+      }
       default:
         DCHECK(false) << "Invalid request type: " << range->request_type();
     }
diff --git a/be/src/runtime/io/request-context.cc b/be/src/runtime/io/request-context.cc
index 40d741d2c..e1a439be9 100644
--- a/be/src/runtime/io/request-context.cc
+++ b/be/src/runtime/io/request-context.cc
@@ -58,8 +58,8 @@ class RequestContext::PerDiskState {
   const InternalQueue<WriteRange>* unstarted_write_ranges() const {
     return &unstarted_write_ranges_;
   }
-  const InternalQueue<RemoteOperRange>* unstarted_remote_upload_ranges() const {
-    return &unstarted_remote_upload_ranges_;
+  const InternalQueue<RemoteOperRange>* unstarted_remote_file_oper_ranges() const {
+    return &unstarted_remote_file_op_ranges_;
   }
 
   const InternalQueue<RequestRange>* in_flight_ranges() const {
@@ -70,8 +70,8 @@ class RequestContext::PerDiskState {
   InternalQueue<WriteRange>* unstarted_write_ranges() {
     return &unstarted_write_ranges_;
   }
-  InternalQueue<RemoteOperRange>* unstarted_remote_upload_ranges() {
-    return &unstarted_remote_upload_ranges_;
+  InternalQueue<RemoteOperRange>* unstarted_remote_file_oper_ranges() {
+    return &unstarted_remote_file_op_ranges_;
   }
 
   InternalQueue<RequestRange>* in_flight_ranges() { return &in_flight_ranges_; }
@@ -197,8 +197,8 @@ class RequestContext::PerDiskState {
   /// processed)
   InternalQueue<WriteRange> unstarted_write_ranges_;
 
-  /// A Queue for file operation ranges to process uploading operations to remote disks.
-  InternalQueue<RemoteOperRange> unstarted_remote_upload_ranges_;
+  /// A Queue for file operation ranges to process file uploading or fetching operations.
+  InternalQueue<RemoteOperRange> unstarted_remote_file_op_ranges_;
 };
 
 void RequestContext::ReadDone(int disk_id, ReadOutcome outcome, ScanRange* range) {
@@ -242,7 +242,8 @@ void RequestContext::OperDone(RequestRange* range, const Status& status) {
   if (range->request_type() == RequestType::WRITE) {
     (static_cast<WriteRange*>(range))->callback()(status);
   } else {
-    DCHECK(range->request_type() == RequestType::FILE_UPLOAD);
+    DCHECK(range->request_type() == RequestType::FILE_UPLOAD
+        || range->request_type() == RequestType::FILE_FETCH);
     (static_cast<RemoteOperRange*>(range))->callback()(status);
   }
   {
@@ -313,7 +314,7 @@ void RequestContext::Cancel() {
       }
 
       RemoteOperRange* oper_range;
-      while ((oper_range = disk_state.unstarted_remote_upload_ranges()->Dequeue())
+      while ((oper_range = disk_state.unstarted_remote_file_oper_ranges()->Dequeue())
           != nullptr) {
         remote_oper_callbacks.push_back(oper_range->callback());
       }
@@ -405,10 +406,11 @@ void RequestContext::AddRangeToDisk(const unique_lock<mutex>& lock,
     // ScheduleContext() has no effect if already scheduled, so this is safe to do always.
     disk_state->ScheduleContext(lock, this, range->disk_id());
   } else {
-    DCHECK(range->request_type() == RequestType::FILE_UPLOAD);
+    DCHECK(range->request_type() == RequestType::FILE_UPLOAD
+        || range->request_type() == RequestType::FILE_FETCH);
     DCHECK(schedule_mode == ScheduleMode::IMMEDIATELY) << static_cast<int>(schedule_mode);
     RemoteOperRange* oper_range = static_cast<RemoteOperRange*>(range);
-    disk_state->unstarted_remote_upload_ranges()->Enqueue(oper_range);
+    disk_state->unstarted_remote_file_oper_ranges()->Enqueue(oper_range);
     disk_state->ScheduleContext(lock, this, range->disk_id());
   }
 
@@ -648,10 +650,10 @@ RequestRange* RequestContext::GetNextRequestRange(int disk_id) {
   }
 
   // Do remote temporary files related work.
-  if (!request_disk_state->unstarted_remote_upload_ranges()->empty()) {
+  if (!request_disk_state->unstarted_remote_file_oper_ranges()->empty()) {
     RemoteOperRange* oper_range;
-    if (!request_disk_state->unstarted_remote_upload_ranges()->empty()) {
-      oper_range = request_disk_state->unstarted_remote_upload_ranges()->Dequeue();
+    if (!request_disk_state->unstarted_remote_file_oper_ranges()->empty()) {
+      oper_range = request_disk_state->unstarted_remote_file_oper_ranges()->Dequeue();
       request_disk_state->in_flight_ranges()->Enqueue(oper_range);
     }
   }
diff --git a/be/src/runtime/io/request-context.h b/be/src/runtime/io/request-context.h
index 5424901da..96d609d93 100644
--- a/be/src/runtime/io/request-context.h
+++ b/be/src/runtime/io/request-context.h
@@ -173,6 +173,25 @@ class RequestContext {
     bytes_read_counter_ = bytes_read_counter;
   }
 
+  void set_read_use_mem_counter(RuntimeProfile::Counter* read_use_mem_counter) {
+    read_use_mem_counter_ = read_use_mem_counter;
+  }
+
+  void set_bytes_read_use_mem_counter(
+      RuntimeProfile::Counter* bytes_read_use_mem_counter) {
+    bytes_read_use_mem_counter_ = bytes_read_use_mem_counter;
+  }
+
+  void set_read_use_local_disk_counter(
+      RuntimeProfile::Counter* read_use_local_disk_counter) {
+    read_use_local_disk_counter_ = read_use_local_disk_counter;
+  }
+
+  void set_bytes_read_use_local_disk_counter(
+      RuntimeProfile::Counter* bytes_read_use_local_disk_counter) {
+    bytes_read_use_local_disk_counter_ = bytes_read_use_local_disk_counter;
+  }
+
   void set_read_timer(RuntimeProfile::Counter* read_timer) { read_timer_ = read_timer; }
 
   void set_open_file_timer(RuntimeProfile::Counter* open_file_timer) {
@@ -338,6 +357,18 @@ class RequestContext {
   /// Total bytes read for this reader
   RuntimeProfile::Counter* bytes_read_counter_ = nullptr;
 
+  /// Total read from mem buffer for this reader
+  RuntimeProfile::Counter* read_use_mem_counter_ = nullptr;
+
+  /// Total bytes read from mem buffer for this reader
+  RuntimeProfile::Counter* bytes_read_use_mem_counter_ = nullptr;
+
+  /// Total read from local disk buffer for this reader
+  RuntimeProfile::Counter* read_use_local_disk_counter_ = nullptr;
+
+  /// Total bytes read from local disk buffer for this reader
+  RuntimeProfile::Counter* bytes_read_use_local_disk_counter_ = nullptr;
+
   /// Total time spent in hdfs reading
   RuntimeProfile::Counter* read_timer_ = nullptr;
 
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index b8bfbacd9..1b355b999 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -109,12 +109,14 @@ class BufferDescriptor {
 };
 
 /// The request type, read or write associated with a request range.
-/// Ohter than those, request type file_upload is the type for remote file operation
-/// ranges, for doing file uploading to the remote.
+/// Ohter than those, request type file_upload and file_fetch are the types for remote
+/// file operation ranges, for uploading the file to the remote filesystem or fetching the
+/// file from the remote filesystem.
 struct RequestType {
   enum type {
     READ,
     WRITE,
+    FILE_FETCH,
     FILE_UPLOAD,
   };
 };
@@ -147,9 +149,9 @@ class RequestRange : public InternalQueue<RequestRange>::Node {
   RequestType::type request_type() const { return request_type_; }
 
  protected:
-  RequestRange(RequestType::type request_type, int disk_id = -1)
+  RequestRange(RequestType::type request_type, int disk_id = -1, int64_t offset = -1)
     : fs_(nullptr),
-      offset_(-1),
+      offset_(offset),
       len_(-1),
       disk_id_(disk_id),
       request_type_(request_type) {}
@@ -364,6 +366,7 @@ class ScanRange : public RequestRange {
   friend class RequestContext;
   friend class HdfsFileReader;
   friend class LocalFileReader;
+  friend class RemoteOperRange;
 
   /// Initialize internal fields
   void InitInternal(DiskIoMgr* io_mgr, RequestContext* reader);
@@ -389,9 +392,14 @@ class ScanRange : public RequestRange {
   ReadOutcome DoRead(DiskQueue* queue, int disk_id);
 
   /// The function runs the actual read logic to read content with the specific reader.
-  /// If use_local_buffer is true, it will read from the local buffer with the local
+  /// If use_local_buffer is true, it will read from the local buffer file with the local
   /// buffer reader.
-  ReadOutcome DoReadInternal(DiskQueue* queue, int disk_id, bool use_local_buffer);
+  /// If use_mem_buffer is true, it will read from a memory block in the local buffer.
+  /// The local_file_lock is used to guarantee the local file is not deleted while
+  /// reading, should not be null if use_mem_buffer is true.
+  ReadOutcome DoReadInternal(DiskQueue* queue, int disk_id, bool use_local_buffer,
+      bool use_mem_buffer,
+      boost::shared_lock<boost::shared_mutex>* local_file_lock = nullptr);
 
   /// Whether to use file handle caching for the current file.
   bool FileHandleCacheEnabled();
@@ -696,19 +704,23 @@ class RemoteOperRange : public RequestRange {
   /// RemoteOperRange was successfully added (i.e. AddRemoteOperRange() succeeded).
   /// No locks are held while the callback is invoked.
   typedef std::function<void(const Status&)> RemoteOperDoneCallback;
-  RemoteOperRange(DiskFile* src_file, DiskFile* dst_file, int64_t file_offset,
-      int disk_id, RequestType::type type, DiskIoMgr* io_mgr,
-      RemoteOperDoneCallback callback);
+  RemoteOperRange(DiskFile* src_file, DiskFile* dst_file, int64_t block_size, int disk_id,
+      RequestType::type type, DiskIoMgr* io_mgr, RemoteOperDoneCallback callback,
+      int64_t file_offset = 0);
 
-  /// Called from a disk I/O thread to do the file operation of this range. The
+  int64_t block_size() { return block_size_; }
+
+  RemoteOperDoneCallback callback() const { return callback_; }
+
+  /// Called from a disk I/O thread to upload the file to a remote filesystem. The
   /// returned Status describes what the result of the read was. 'buff' is the
   /// block buffer which is used for file operations. 'buff_size' is the size of the
   /// block buffer. Caller must not hold 'lock_'.
-  Status DoOper(uint8_t* buff, int64_t buff_size);
-
-  int64_t block_size() { return block_size_; }
+  Status DoUpload(uint8_t* buff, int64_t buff_size);
 
-  RemoteOperDoneCallback callback() const { return callback_; }
+  /// Execute the fetch file operation from a remote filesystem.
+  /// Caller must not hold 'lock_'.
+  Status DoFetch();
 
  private:
   DISALLOW_COPY_AND_ASSIGN(RemoteOperRange);
@@ -732,9 +744,6 @@ class RemoteOperRange : public RequestRange {
 
   /// block size to do the file operation.
   int64_t block_size_;
-
-  /// Execute the upload file operation.
-  Status DoUpload(uint8_t* buff, int64_t buff_size);
 };
 
 inline bool BufferDescriptor::is_cached() const {
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index 09776b0c3..1e33a75a6 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -148,10 +148,12 @@ bool ScanRange::FileHandleCacheEnabled() {
   return false;
 }
 
-ReadOutcome ScanRange::DoReadInternal(
-    DiskQueue* queue, int disk_id, bool use_local_buff) {
+ReadOutcome ScanRange::DoReadInternal(DiskQueue* queue, int disk_id, bool use_local_buff,
+    bool use_mem_buffer, shared_lock<shared_mutex>* local_file_lock) {
   int64_t bytes_remaining = bytes_to_read_ - bytes_read_;
   DCHECK_GT(bytes_remaining, 0);
+  // Can't be set to true together.
+  DCHECK(!(use_local_buff && use_mem_buffer));
 
   unique_ptr<BufferDescriptor> buffer_desc;
   FileReader* file_reader = nullptr;
@@ -177,43 +179,75 @@ ReadOutcome ScanRange::DoReadInternal(
       buffer_manager_->add_iomgr_buffer_cumulative_bytes_used(buffer_desc->buffer_len());
     }
     read_in_flight_ = true;
-    if (use_local_buff) {
-      file_reader = local_buffer_reader_.get();
-      file_ = disk_buffer_file_->path();
-    } else {
-      file_reader = file_reader_.get();
+    // Set the correct reader to read the range if the memory buffer is not available.
+    if (!use_mem_buffer) {
+      if (use_local_buff) {
+        file_reader = local_buffer_reader_.get();
+        file_ = disk_buffer_file_->path();
+      } else {
+        file_reader = file_reader_.get();
+      }
+      use_local_buffer_ = use_local_buff;
     }
-    use_local_buffer_ = use_local_buff;
   }
-  DCHECK(file_reader != nullptr);
-
-  // No locks in this section.  Only working on local vars.  We don't want to hold a
-  // lock across the read call.
-  // To use the file handle cache:
-  // 1. It must be enabled at the daemon level.
-  // 2. It must be enabled for the particular filesystem.
-  bool use_file_handle_cache = FileHandleCacheEnabled();
-  VLOG_FILE << (use_file_handle_cache ? "Using" : "Skipping")
-            << " file handle cache for " << (expected_local_ ? "local" : "remote")
-            << " file " << file();
-  Status read_status = file_reader->Open(use_file_handle_cache);
+
   bool eof = false;
-  if (read_status.ok()) {
-    COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, 1L);
-    COUNTER_BITOR_IF_NOT_NULL(reader_->disks_accessed_bitmap_, 1LL << disk_id);
-
-    if (sub_ranges_.empty()) {
-      DCHECK(cache_.data == nullptr);
-      read_status =
-          file_reader->ReadFromPos(queue, offset_ + bytes_read_, buffer_desc->buffer_,
-              min(bytes_to_read() - bytes_read_, buffer_desc->buffer_len_),
-              &buffer_desc->len_, &eof);
-    } else {
-      read_status = ReadSubRanges(queue, buffer_desc.get(), &eof, file_reader);
+  Status read_status = Status::OK();
+
+  if (use_mem_buffer) {
+    // The only scenario to use the memory buffer is for the temporary files, the range
+    // is supposed to be read in one round.
+    // For the efficiency consideration, don't have the lock of the memory block, the
+    // safety is implicitly guaranteed by the physical lock of the disk file, which is
+    // required while removing the disk file and the memory blocks. The other case of
+    // removing the memory block is when all of the pages have been read, and that could
+    // only happen after this read.
+    DCHECK(local_file_lock != nullptr);
+    read_status = disk_buffer_file_->ReadFromMemBuffer(
+        offset_, bytes_to_read_, buffer_desc->buffer_, *local_file_lock);
+    if (read_status.ok()) {
+      buffer_desc->len_ = bytes_to_read_;
+      eof = true;
+      COUNTER_ADD_IF_NOT_NULL(reader_->read_use_mem_counter_, 1L);
+      COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_use_mem_counter_, buffer_desc->len_);
+      COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, buffer_desc->len_);
     }
+  } else {
+    DCHECK(file_reader != nullptr);
 
-    COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, buffer_desc->len_);
-    COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, -1L);
+    // No locks in this section.  Only working on local vars.  We don't want to hold a
+    // lock across the read call.
+    // To use the file handle cache:
+    // 1. It must be enabled at the daemon level.
+    // 2. It must be enabled for the particular filesystem.
+    bool use_file_handle_cache = FileHandleCacheEnabled();
+    VLOG_FILE << (use_file_handle_cache ? "Using" : "Skipping")
+              << " file handle cache for " << (expected_local_ ? "local" : "remote")
+              << " file " << file();
+
+    read_status = file_reader->Open(use_file_handle_cache);
+    if (read_status.ok()) {
+      COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, 1L);
+      COUNTER_BITOR_IF_NOT_NULL(reader_->disks_accessed_bitmap_, 1LL << disk_id);
+
+      if (sub_ranges_.empty()) {
+        DCHECK(cache_.data == nullptr);
+        read_status =
+            file_reader->ReadFromPos(queue, offset_ + bytes_read_, buffer_desc->buffer_,
+                min(bytes_to_read() - bytes_read_, buffer_desc->buffer_len_),
+                &buffer_desc->len_, &eof);
+      } else {
+        read_status = ReadSubRanges(queue, buffer_desc.get(), &eof, file_reader);
+      }
+
+      COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, buffer_desc->len_);
+      COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, -1L);
+      if (use_local_buffer_) {
+        COUNTER_ADD_IF_NOT_NULL(reader_->read_use_local_disk_counter_, 1L);
+        COUNTER_ADD_IF_NOT_NULL(
+            reader_->bytes_read_use_local_disk_counter_, buffer_desc->len_);
+      }
+    }
   }
 
   DCHECK(buffer_desc->buffer_ != nullptr);
@@ -246,7 +280,7 @@ ReadOutcome ScanRange::DoReadInternal(
   // Store the state we need before calling EnqueueReadyBuffer().
   bool eosr = buffer_desc->eosr();
   // No more reads for this scan range - we can close it.
-  if (eosr) file_reader->Close();
+  if (eosr && file_reader != nullptr) file_reader->Close();
   // Read successful - enqueue the buffer and return the appropriate outcome.
   if (!EnqueueReadyBuffer(move(buffer_desc))) return ReadOutcome::CANCELLED;
   // At this point, if eosr=true, then we cannot touch the state of this scan range
@@ -256,7 +290,9 @@ ReadOutcome ScanRange::DoReadInternal(
 
 ReadOutcome ScanRange::DoRead(DiskQueue* queue, int disk_id) {
   bool use_local_buffer = false;
-  if (disk_file_ != nullptr && disk_file_->disk_type() != DiskFileType::LOCAL) {
+  bool use_mem_buffer = false;
+  if (disk_file_ != nullptr && disk_file_->disk_type() != DiskFileType::LOCAL
+      && disk_buffer_file_ != nullptr) {
     // The sequence for acquiring the locks should always be from the local to
     // the remote to avoid deadlocks.
     shared_lock<shared_mutex> local_file_lock(*(disk_buffer_file_->GetFileLock()));
@@ -270,18 +306,25 @@ ReadOutcome ScanRange::DoRead(DiskQueue* queue, int disk_id) {
         // the only case could be the query is cancelled, so that both files are deleted.
         return ReadOutcome::CANCELLED;
       }
-      // If the local buffer exists, we can read from the local buffer, otherwise,
-      // we will read from the remote file system.
+
+      // The range can be read from local for two cases.
+      // 1. If the local buffer file is not deleted(evicted) yet.
+      // 2. A block of the file, which contains the range, has been read and stored in
+      // the memory.
+      // If we don't meet any of the cases, the range needs to be read from the remote.
       if (!disk_buffer_file_->is_deleted(buffer_file_lock)) {
         use_local_buffer = true;
+      } else if (disk_buffer_file_->CanReadFromReadBuffer(local_file_lock, offset_)) {
+        use_mem_buffer = true;
       } else {
         // Read from the remote file. The remote file must be in persisted status.
         DCHECK(disk_file_->is_persisted(file_lock));
       }
     }
-    return DoReadInternal(queue, disk_id, use_local_buffer);
+    return DoReadInternal(
+        queue, disk_id, use_local_buffer, use_mem_buffer, &local_file_lock);
   }
-  return DoReadInternal(queue, disk_id, use_local_buffer);
+  return DoReadInternal(queue, disk_id, use_local_buffer, use_mem_buffer);
 }
 
 Status ScanRange::ReadSubRanges(
diff --git a/be/src/runtime/tmp-file-mgr-internal.h b/be/src/runtime/tmp-file-mgr-internal.h
index cbdebec7d..dbe7566ef 100644
--- a/be/src/runtime/tmp-file-mgr-internal.h
+++ b/be/src/runtime/tmp-file-mgr-internal.h
@@ -30,6 +30,9 @@
 #include "util/hdfs-util.h"
 
 namespace impala {
+namespace io {
+class DiskIoMgr;
+}
 
 /// TmpFile is a handle to a physical file in a temporary directory. File space
 /// can be allocated and files removed using AllocateSpace() and Remove(). Used
@@ -84,7 +87,11 @@ class TmpFile {
   int64_t len() const { return allocation_offset_; }
 
   /// Returns the disk id of the temporary file.
-  int disk_id() const { return disk_id_; }
+  virtual int disk_id(bool is_file_op = false) const {
+    // The disk id for file operations should only be supported in TmpFileRemote.
+    DCHECK(!is_file_op);
+    return disk_id_;
+  }
 
   /// Returns if the temporary file is in local file system.
   bool is_local() { return expected_local_; }
@@ -166,9 +173,9 @@ class TmpFileLocal : public TmpFile {
   TmpFileLocal(TmpFileGroup* file_group, TmpFileMgr::DeviceId device_id,
       const std::string& path, bool expected_local = true);
 
-  bool AllocateSpace(int64_t num_bytes, int64_t* offset);
-  io::DiskFile* GetWriteFile();
-  Status Remove();
+  bool AllocateSpace(int64_t num_bytes, int64_t* offset) override;
+  io::DiskFile* GetWriteFile() override;
+  Status Remove() override;
 };
 
 /// TmpFileRemote is a derived class of TmpFile to provide methods to handle a
@@ -179,7 +186,7 @@ class TmpFileLocal : public TmpFile {
 /// read or upload on the file.A remote temporary file can have two DiskFiles, a local
 /// buffer and a remote file.
 /// Each DiskFile owns two type of locks, a file lock and a status lock.
-/// DiskFile::lock_  -- file lock
+/// DiskFile::physical_file_lock_  -- file lock
 /// DiskFile::status_lock_ -- status lock
 /// For doing file deleting operation, a unique file lock is needed. For other types of
 /// operations on the file, like reading or writing, a shared file lock is needed to
@@ -212,10 +219,48 @@ class TmpFileRemote : public TmpFile {
       bool expected_local = false, const char* url = nullptr);
   ~TmpFileRemote();
 
-  bool AllocateSpace(int64_t num_bytes, int64_t* offset);
-  io::DiskFile* GetWriteFile();
+  bool AllocateSpace(int64_t num_bytes, int64_t* offset) override;
+  io::DiskFile* GetWriteFile() override;
   TmpDir* GetLocalBufferDir() const;
-  Status Remove();
+  Status Remove() override;
+
+  /// Returns the buffer file handle for reading.
+  /// If the local file is not evicted, return immediately.
+  /// If the local file is evicted and batch reading is enabled, may also send a request
+  /// to fetch a block from the remote asynchronously to the memory.
+  io::DiskFile* GetReadBufferFile(int64_t offset);
+
+  /// Send a request to the disk queue to fetch a block asynchronously from the remote
+  /// filesystem.
+  /// If the content is in the buffer block, "fetched" will be set to true. Otherwise,
+  /// the caller should fetch the page from the remote filesystem.
+  void AsyncFetchReadBufferBlock(io::DiskFile* read_buffer_file,
+      io::MemBlock* read_buffer_block, int buffer_idx, bool* fetched);
+
+  /// Get the read buffer block index from the offset to the file.
+  int GetReadBufferIndex(int64_t offset);
+
+  /// Increase the counter of the page that have been read off the buffer block.
+  /// Return true if all the pages have been read of the block.
+  bool IncrementReadPageCount(int buffer_idx);
+
+  /// Try to delete the buffer block and release the reservation.
+  template <typename T>
+  void TryDeleteReadBuffer(const T& lock, int buffer_idx);
+
+  /// Try to delete the buffer block and release the reservation with exclusive lock.
+  void TryDeleteReadBufferExcl(int buffer_idx) {
+    std::unique_lock<boost::shared_mutex> lock(*(disk_buffer_file_->GetFileLock()));
+    TryDeleteReadBuffer(lock, buffer_idx);
+  }
+
+  /// Try to delete the buffer block and release the reservation with shared lock.
+  /// Use the exclusive one unless sure that no one else would access the specific
+  /// read buffer block during deletion and the scenario requires high performance.
+  void TryDeleteMemReadBufferShared(int buffer_idx) {
+    boost::shared_lock<boost::shared_mutex> lock(*(disk_buffer_file_->GetFileLock()));
+    TryDeleteReadBuffer(lock, buffer_idx);
+  }
 
   /// Returns the size of the file.
   int64_t file_size() const { return file_size_; }
@@ -259,6 +304,19 @@ class TmpFileRemote : public TmpFile {
     return buffer_returned_;
   }
 
+  /// Set the flag to files to indicate the file is going to be deleted.
+  void SetToDeleteFlag(bool to_delete = true) {
+    disk_buffer_file_->SetToDeleteFlag(to_delete);
+    disk_file_->SetToDeleteFlag(to_delete);
+  }
+
+  /// Returns the disk id of the temporary file.
+  /// If is_file_op is true, return the disk id specially for file operations.
+  int disk_id(bool is_file_op = false) const override {
+    if (!is_file_op) return disk_id_;
+    return disk_id_file_op_;
+  }
+
  private:
   friend class TmpWriteHandle;
   friend class TmpFileMgr;
@@ -271,14 +329,17 @@ class TmpFileRemote : public TmpFile {
   /// remaining space.
   int64_t file_size_ = 0;
 
+  /// The default size of a read buffer block.
+  int64_t read_buffer_block_size_ = 0;
+
+  /// The id of the disk for file operations.
+  int disk_id_file_op_ = 0;
+
   /// Bogus value of mtime for HDFS files.
   const int64_t mtime_{100000};
 
-  /// The range for doing file uploading.
-  std::unique_ptr<io::RemoteOperRange> upload_range_;
-
   // The pointer of the disk buffer file, which is the local buffer
-  // of the disk file when disk file is a remote disk file.
+  // of the remote disk file. The buffer is for writing.
   std::unique_ptr<io::DiskFile> disk_buffer_file_;
 
   /// The hdfs connection used to connect to the remote scratch path.
@@ -288,6 +349,12 @@ class TmpFileRemote : public TmpFile {
   /// assigned space is equal to or just over the default file size.
   bool at_capacity_ = false;
 
+  /// The range for doing file uploading.
+  std::unique_ptr<io::RemoteOperRange> upload_range_;
+
+  /// The ranges for doing fetch operations from a remote filesystem.
+  std::vector<std::unique_ptr<io::RemoteOperRange>> fetch_ranges_;
+
   /// Protect below members.
   SpinLock lock_;
 
@@ -297,6 +364,27 @@ class TmpFileRemote : public TmpFile {
   /// True if the buffer of the file is returned to the pool. We assume that the buffer
   /// only returns once and only needs to be returned when the buffer space is reserved.
   bool buffer_returned_ = false;
+
+  // The number of pages have been read per read buffer.
+  std::unique_ptr<int64_t[]> disk_read_page_cnts_;
+
+  // Return the start offset of the read buffer block.
+  int64_t GetReadBuffStartOffset(int buffer_idx) {
+    DCHECK(disk_buffer_file_ != nullptr);
+    return disk_buffer_file_->GetReadBuffStartOffset(buffer_idx);
+  }
+
+  // Return the page count of the read buffer block.
+  int64_t GetReadBuffPageCount(int buffer_idx) {
+    DCHECK(disk_buffer_file_ != nullptr);
+    return disk_buffer_file_->GetReadBuffPageCount(buffer_idx);
+  }
+
+  /// Internal DCHECK for the buffer index.
+  void DCheckReadBufferIdx(int buffer_idx) {
+    DCHECK_LT(buffer_idx, file_group_->tmp_file_mgr()->GetNumReadBuffersPerFile());
+    DCHECK_GE(buffer_idx, 0);
+  }
 };
 
 /// TmpFileDummy is a derived class of TmpFile for dummy allocation, used in
@@ -304,9 +392,9 @@ class TmpFileRemote : public TmpFile {
 class TmpFileDummy : public TmpFile {
  public:
   TmpFileDummy() : TmpFile(nullptr, -1, "") { disk_type_ = io::DiskFileType::DUMMY; }
-  bool AllocateSpace(int64_t num_bytes, int64_t* offset) { return true; }
-  io::DiskFile* GetWriteFile() { return nullptr; }
-  Status Remove() { return Status::OK(); }
+  bool AllocateSpace(int64_t num_bytes, int64_t* offset) override { return true; }
+  io::DiskFile* GetWriteFile() override { return nullptr; }
+  Status Remove() override { return Status::OK(); }
 };
 
 /// A configured temporary directory that TmpFileMgr allocates files in.
diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc
index d75b2f87f..038b7e8ee 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -58,6 +58,8 @@ DECLARE_int32(stress_scratch_write_delay_ms);
 #endif
 DECLARE_string(remote_tmp_file_size);
 DECLARE_int32(wait_for_spill_buffer_timeout_s);
+DECLARE_bool(remote_batch_read);
+DECLARE_string(remote_read_memory_buffer_size);
 
 namespace impala {
 
@@ -73,6 +75,10 @@ static const string HDFS_LOCAL_URL = "hdfs://localhost:20500/tmp";
 static const string REMOTE_URL = HDFS_LOCAL_URL;
 static const string LOCAL_BUFFER_PATH = "/tmp/tmp-file-mgr-test-buffer";
 
+/// Read buffer sizes for TestBatchReadingSetMaxBytes().
+static const int64_t READ_BUFFER_SMALL_SIZE_BYTES = 1 << 20; // 1MB
+static const int64_t READ_BUFFER_EXTREMELY_BIG_SIZE_BYTES = 100ll << 30; // 100GB
+
 class TmpFileMgrTest : public ::testing::Test {
  public:
   static const int DEFAULT_PRIORITY = numeric_limits<int>::max();
@@ -85,6 +91,8 @@ class TmpFileMgrTest : public ::testing::Test {
     FLAGS_stress_scratch_write_delay_ms = 0;
 #endif
     FLAGS_remote_tmp_file_size = "8MB";
+    FLAGS_remote_read_memory_buffer_size = "1GB";
+    FLAGS_remote_batch_read = false;
 
     metrics_.reset(new MetricGroup("tmp-file-mgr-test"));
     profile_ = RuntimeProfile::Create(&obj_pool_, "tmp-file-mgr-test");
@@ -147,6 +155,14 @@ class TmpFileMgrTest : public ::testing::Test {
     ASSERT_EQ(hwm_value->GetValue(), exp_hwm_value);
   }
 
+  /// Check the current scratch space read buffer HWM higher than zero.
+  void checkHWMReadBuffMetrics() {
+    AtomicHighWaterMarkGauge* hwm_value =
+        metrics_->FindMetricForTesting<AtomicHighWaterMarkGauge>(
+            "tmp-file-mgr.scratch-read-memory-buffer-used-high-water-mark");
+    ASSERT_TRUE(hwm_value->GetValue() > 0);
+  }
+
   void RemoveAndCreateDirs(const vector<string>& dirs) {
     for (const string& dir: dirs) {
       ASSERT_OK(FileSystemUtil::RemoveAndCreateDirectory(dir));
@@ -157,6 +173,40 @@ class TmpFileMgrTest : public ::testing::Test {
     ASSERT_OK(FileSystemUtil::RemovePaths(dirs));
   }
 
+  int64_t GetReadBufferMaxAllowedBytes(TmpFileMgr* mgr) {
+    return mgr->tmp_dirs_remote_ctrl_.CalcMaxReadBufferBytes();
+  }
+
+  int64_t GetReadBufferCurrentMaxBytes(TmpFileMgr* mgr) {
+    return mgr->tmp_dirs_remote_ctrl_.max_read_buffer_size_;
+  }
+
+  void CreateAndGetGeneralRemoteTmpDir(vector<string>* tmp_dirs) {
+    FLAGS_remote_tmp_file_size = "1K";
+    vector<string> tmp_create_dirs{{LOCAL_BUFFER_PATH}};
+    RemoveAndCreateDirs(tmp_create_dirs);
+    tmp_dirs->push_back(Substitute(LOCAL_BUFFER_PATH + ":$0", 4096));
+    tmp_dirs->push_back(REMOTE_URL);
+  }
+
+  // Helper for TestBatchReadingSetMaxBytes() to set the read buffer size and check
+  // whether we should expect the system to use the max allowed bytes instead of the
+  // specified buffer size in the testcase.
+  void SetReadBufferSizeHelper(int64_t buffer_size, bool* expect_max_allowed_bytes) {
+    ASSERT_TRUE(expect_max_allowed_bytes != nullptr);
+    if (buffer_size == READ_BUFFER_SMALL_SIZE_BYTES) {
+      FLAGS_remote_read_memory_buffer_size = "1MB";
+      *expect_max_allowed_bytes = false;
+    } else if (buffer_size == READ_BUFFER_EXTREMELY_BIG_SIZE_BYTES) {
+      FLAGS_remote_read_memory_buffer_size = "100GB";
+      // If buffer size is too large for the system, we expect the actual capacity of
+      // the read buffer to be changed to the max allowed bytes in the testcase.
+      *expect_max_allowed_bytes = true;
+    } else {
+      ASSERT_TRUE(false) << "Unexpected buffer size " << buffer_size;
+    }
+  }
+
   /// Helper to call the private CreateFiles() method and return
   /// the created files.
   static Status CreateFiles(
@@ -262,6 +312,30 @@ class TmpFileMgrTest : public ::testing::Test {
     EXPECT_EQ(end, search->second.end);
   }
 
+  /// Helper to wait for the disk file changing to specific status. Will timeout after 2
+  /// seconds.
+  static void WaitForDiskFileStatus(DiskFile* file, DiskFileStatus status) {
+    int wait_times = 10;
+    while (true) {
+      if (file->GetFileStatus() == status) {
+        break;
+      }
+      // Suppose the upload should be finished in two seconds.
+      ASSERT_TRUE(wait_times-- > 0);
+      usleep(200 * 1000);
+    }
+  }
+
+  /// Helper to get the remote temporary file from the temporary file group.
+  static TmpFileRemote* GetRemoteTmpFileByFileGroup(TmpFileGroup& file_group, int idx) {
+    return static_cast<TmpFileRemote*>(file_group.tmp_files_remote_[idx].get());
+  }
+
+  /// Helper to get the number of remote temporary files from the temporary file group.
+  static int GetRemoteTmpFileNum(TmpFileGroup& file_group) {
+    return file_group.tmp_files_remote_.size();
+  }
+
   /// Helpers to call WriteHandle methods.
   void Cancel(TmpWriteHandle* handle) { handle->Cancel(); }
   void WaitForWrite(TmpWriteHandle* handle) {
@@ -1796,7 +1870,7 @@ TEST_F(TmpFileMgrTest, TestMixTmpFileLimits) {
   RemoveAndCreateDirs(tmp_create_dirs);
   tmp_dirs.push_back(REMOTE_URL);
   int64_t alloc_size = 1024;
-  int64_t file_size = 256 * 1024 * 1024;
+  int64_t file_size = 512 * 1024 * 1024;
   int64_t offset;
   TmpFile* alloc_file;
   FLAGS_remote_tmp_file_size = "512MB";
@@ -2061,4 +2135,103 @@ TEST_F(TmpFileMgrTest, TestRemoteUploadFailed) {
   test_env_->TearDownQueries();
 }
 
+/// Test using batch reading while reading the remote spilled data.
+TEST_F(TmpFileMgrTest, TestBatchReadingFromRemote) {
+  TmpFileMgr tmp_file_mgr;
+  vector<string> tmp_dirs;
+  CreateAndGetGeneralRemoteTmpDir(&tmp_dirs);
+  FLAGS_remote_read_memory_buffer_size = "1MB";
+  FLAGS_remote_batch_read = true;
+  int64_t page_size_big = 512;
+  int64_t page_size_small = 256;
+  // There should be two files,
+  // file 1. page_big + page_big = 1024B
+  // file 2. page_big + page_small + page_big = 1280B
+  // So that we can test two cases, a file with a default size and one with a little over
+  // size.
+  int64_t file_size_1 = 2 * page_size_big;
+  int64_t file_size_2 = 2 * page_size_big + page_size_small;
+  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, true, "", false, metrics_.get()));
+  TUniqueId id;
+  TmpFileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id);
+
+  vector<unique_ptr<MemRange>> mem_ranges;
+  vector<unique_ptr<TmpWriteHandle>> handles;
+  WriteRange::WriteDoneCallback callback =
+      bind(mem_fn(&TmpFileMgrTest::SignalCallback), this, _1);
+  const int PAGE_NUM = 5;
+  vector<vector<uint8_t>> data(PAGE_NUM);
+  for (int i = 0; i < PAGE_NUM; i++) {
+    int64_t page_size = page_size_big;
+    if (i == 3) {
+      page_size = page_size_small;
+    }
+    data[i].resize(page_size);
+    std::iota(data[i].begin(), data[i].end(), i);
+    mem_ranges.emplace_back(make_unique<MemRange>(data[i].data(), page_size));
+    unique_ptr<TmpWriteHandle> handle;
+    ASSERT_OK(file_group.Write(*(mem_ranges[i].get()), callback, &handle));
+    WaitForWrite(handle.get());
+    handles.emplace_back(move(handle));
+  }
+  WaitForCallbacks(PAGE_NUM);
+
+  // There should be two files in the TmpFileMgr.
+  ASSERT_EQ(GetRemoteTmpFileNum(file_group), 2);
+  auto file1 = GetRemoteTmpFileByFileGroup(file_group, 0);
+  auto file2 = GetRemoteTmpFileByFileGroup(file_group, 1);
+  WaitForDiskFileStatus(file1->DiskFile(), DiskFileStatus::PERSISTED);
+  WaitForDiskFileStatus(file1->DiskBufferFile(), DiskFileStatus::PERSISTED);
+  WaitForDiskFileStatus(file2->DiskFile(), DiskFileStatus::PERSISTED);
+  WaitForDiskFileStatus(file2->DiskBufferFile(), DiskFileStatus::PERSISTED);
+
+  // Check Actual Size is as expected.
+  ASSERT_EQ(file1->DiskBufferFile()->actual_file_size(), file_size_1);
+  ASSERT_EQ(file2->DiskBufferFile()->actual_file_size(), file_size_2);
+
+  // Remove the local buffers in order to read from the remote fs.
+  ASSERT_OK(tmp_file_mgr.TryEvictFile(file1));
+  ASSERT_OK(tmp_file_mgr.TryEvictFile(file2));
+
+  // Read the data.
+  for (int i = 0; i < PAGE_NUM; i++) {
+    vector<uint8_t> tmp;
+    tmp.resize(mem_ranges[i]->len());
+    ASSERT_OK(file_group.Read(handles[i].get(), MemRange(tmp.data(), tmp.size())));
+    EXPECT_EQ(0, memcmp(tmp.data(), mem_ranges[i]->data(), tmp.size()));
+    file_group.DestroyWriteHandle(move(handles[i]));
+  }
+
+  // Check the metrics that we did use the read buffer for reading.
+  checkHWMReadBuffMetrics();
+
+  file_group.Close();
+  test_env_->TearDownQueries();
+}
+
+// Test to set different capacities of the read buffer for remote spilling.
+TEST_F(TmpFileMgrTest, TestBatchReadingSetMaxBytes) {
+  vector<int64_t> buffer_sizes(
+      {READ_BUFFER_SMALL_SIZE_BYTES, READ_BUFFER_EXTREMELY_BIG_SIZE_BYTES});
+  FLAGS_remote_batch_read = true;
+  for (int64_t buffer_size : buffer_sizes) {
+    TmpFileMgr tmp_file_mgr;
+    vector<string> tmp_dirs;
+    bool expect_max_allowed_bytes = false;
+    CreateAndGetGeneralRemoteTmpDir(&tmp_dirs);
+    SetReadBufferSizeHelper(buffer_size, &expect_max_allowed_bytes);
+    ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, true, "", false, metrics_.get()));
+    int64_t max_allowed_bytes = GetReadBufferMaxAllowedBytes(&tmp_file_mgr);
+    int64_t cur_max_bytes = GetReadBufferCurrentMaxBytes(&tmp_file_mgr);
+    // If the buffer size is too large, then the current max bytes of the read buffer
+    // should be set to the system max allowed bytes instead of the buffer size.
+    if (expect_max_allowed_bytes) {
+      EXPECT_EQ(cur_max_bytes, max_allowed_bytes);
+    } else {
+      EXPECT_EQ(cur_max_bytes, buffer_size);
+    }
+    metrics_.reset(new MetricGroup("tmp-file-mgr-test"));
+  }
+}
+
 } // namespace impala
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 51828fbaf..be84ca420 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -50,6 +50,7 @@
 #include "util/hdfs-util.h"
 #include "util/histogram-metric.h"
 #include "util/kudu-status-util.h"
+#include "util/mem-info.h"
 #include "util/os-util.h"
 #include "util/parse-util.h"
 #include "util/pretty-printer.h"
@@ -103,6 +104,9 @@ DEFINE_string(remote_tmp_file_size, "16M",
 DEFINE_string(remote_tmp_file_block_size, "1M",
     "Specify the size of the block for doing file uploading and fetching. The block "
     "size should be power of 2 and less than the size of remote temporary file.");
+DEFINE_string(remote_read_memory_buffer_size, "1G",
+    "Specify the maximum size of read memory buffers for the remote temporary "
+    "files. Only valid when --remote_batch_read is true.");
 DEFINE_bool(remote_tmp_files_avail_pool_lifo, false,
     "If true, lifo is the algo to evict the local buffer files during spilling "
     "to the remote. Otherwise, fifo would be used.");
@@ -110,6 +114,10 @@ DEFINE_int32(wait_for_spill_buffer_timeout_s, 60,
     "Specify the timeout duration waiting for the buffer to write (second). If a spilling"
     "opertion fails to get a buffer from the pool within the duration, the operation"
     "fails.");
+DEFINE_bool(remote_batch_read, false,
+    "Set if the system uses batch reading for the remote temporary files. Batch reading"
+    "allows reading a block asynchronously when the buffer pool is trying to pin one"
+    "page of that block.");
 
 using boost::algorithm::is_any_of;
 using boost::algorithm::join;
@@ -131,7 +139,21 @@ constexpr int64_t TmpFileMgr::HOLE_PUNCH_BLOCK_SIZE_BYTES;
 
 const string TMP_SUB_DIR_NAME = "impala-scratch";
 const uint64_t AVAILABLE_SPACE_THRESHOLD_MB = 1024;
-const uint64_t MAX_REMOTE_TMPFILE_SIZE_THRESHOLD_MB = 256;
+const uint64_t MAX_REMOTE_TMPFILE_SIZE_THRESHOLD_MB = 512;
+
+// For spilling to remote fs, the max size of a read memory block.
+const uint64_t MAX_REMOTE_READ_MEM_BLOCK_THRESHOLD_BYTES = 16 * 1024 * 1024;
+
+// The memory limits for the memory buffer to read the spilled data in the remote fs.
+// The maximum bytes of the read buffer should be limited by the
+// REMOTE_READ_BUFFER_MAX_MEM_PERCENT, which stands for the percentage of the total
+// memory and REMOTE_READ_BUFFER_MEM_HARD_LIMIT_PERCENT, which stands for the percentage
+// of the remaining memory which is not used by the process.
+// Also, if the remaining memory is less than REMOTE_READ_BUFFER_DISABLE_THRESHOLD_PERCENT
+// of the total memory, then the read buffer for remote spilled data should be disabled.
+const double REMOTE_READ_BUFFER_MAX_MEM_PERCENT = 0.1;
+const double REMOTE_READ_BUFFER_MEM_HARD_LIMIT_PERCENT = 0.5;
+const double REMOTE_READ_BUFFER_DISABLE_THRESHOLD_PERCENT = 0.05;
 
 // Metric keys
 const string TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS = "tmp-file-mgr.active-scratch-dirs";
@@ -141,6 +163,10 @@ const string TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED_HIGH_WATER_MARK =
     "tmp-file-mgr.scratch-space-bytes-used-high-water-mark";
 const string TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED =
     "tmp-file-mgr.scratch-space-bytes-used";
+const string TMP_FILE_MGR_SCRATCH_READ_MEMORY_BUFFER_USED_HIGH_WATER_MARK =
+    "tmp-file-mgr.scratch-read-memory-buffer-used-high-water-mark";
+const string TMP_FILE_MGR_SCRATCH_READ_MEMORY_BUFFER_USED =
+    "tmp-file-mgr.scratch-read-memory-buffer-used";
 const string SCRATCH_DIR_BYTES_USED_FORMAT =
     "tmp-file-mgr.scratch-space-bytes-used.dir-$0";
 const string LOCAL_BUFF_BYTES_USED_FORMAT = "tmp-file-mgr.local-buff-bytes-used.dir-$0";
@@ -235,6 +261,18 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dir_specifiers,
     return Status(Substitute("Invalid value of wait_for_spill_buffer_timeout_us '$0'",
         FLAGS_wait_for_spill_buffer_timeout_s));
   }
+
+  tmp_dirs_remote_ctrl_.remote_batch_read_enabled_ = FLAGS_remote_batch_read;
+  if (tmp_dirs_remote_ctrl_.remote_batch_read_enabled_) {
+    Status setup_read_buffer_status = tmp_dirs_remote_ctrl_.SetUpReadBufferParams();
+    if (!setup_read_buffer_status.ok()) {
+      LOG(WARNING) << "Disabled the read buffer for the remote temporary files "
+                      "due to errors in read buffer parameters: "
+                   << setup_read_buffer_status.msg().msg();
+      tmp_dirs_remote_ctrl_.remote_batch_read_enabled_ = false;
+    }
+  }
+
   // Below options are using for test by setting different modes to implement the
   // spilling to the remote.
   tmp_dirs_remote_ctrl_.remote_tmp_files_avail_pool_lifo_ =
@@ -357,6 +395,10 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dir_specifiers,
       metrics->AddHWMGauge(TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED_HIGH_WATER_MARK,
           TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED, 0);
 
+  scratch_read_memory_buffer_used_metric_ =
+      metrics->AddHWMGauge(TMP_FILE_MGR_SCRATCH_READ_MEMORY_BUFFER_USED_HIGH_WATER_MARK,
+          TMP_FILE_MGR_SCRATCH_READ_MEMORY_BUFFER_USED, 0);
+
   initialized_ = true;
 
   if ((tmp_dirs_.empty() && local_buff_dir_ == nullptr) && !tmp_dirs.empty()) {
@@ -444,17 +486,28 @@ Status TmpFileMgr::DequeueTmpFilesPool(shared_ptr<TmpFile>* tmp_file, bool quick
       tmp_file, quick_return);
 }
 
+void TmpFileMgr::ReleaseTmpFileReadBuffer(
+    const unique_lock<shared_mutex>& file_lock, TmpFile* tmp_file) {
+  DCHECK(tmp_file != nullptr);
+  DCHECK(IsRemoteBatchReadingEnabled());
+  TmpFileRemote* tmp_file_remote = static_cast<TmpFileRemote*>(tmp_file);
+  for (int i = 0; i < GetNumReadBuffersPerFile(); i++) {
+    tmp_file_remote->TryDeleteReadBuffer(file_lock, i);
+  }
+}
+
 Status TmpFileMgr::TryEvictFile(TmpFile* tmp_file) {
   DCHECK(tmp_file != nullptr);
   if (tmp_file->disk_type() == io::DiskFileType::DUMMY) return Status::OK();
 
+  TmpFileRemote* tmp_file_remote = static_cast<TmpFileRemote*>(tmp_file);
+  DiskFile* buffer_file = tmp_file_remote->DiskBufferFile();
+
   // Remove the buffer of the TmpFile.
   // After deletion of the buffer, if the TmpFile doesn't exist in the remote file system
   // either, that means the TmpFile shared pointer can be removed from the TmpFileMgr,
   // because in this case, the physical file is considered no longer in the system.
-  // Fetch the unique locks of the files when doing the deletion.
-  TmpFileRemote* tmp_file_remote = static_cast<TmpFileRemote*>(tmp_file);
-  DiskFile* buffer_file = tmp_file_remote->DiskBufferFile();
+  // Hold the unique locks of the files during the deletion.
   Status status = Status::OK();
   {
     unique_lock<shared_mutex> buffer_lock(buffer_file->physical_file_lock_);
@@ -540,6 +593,58 @@ string TmpFileMgr::GetTmpDirPath(DeviceId device_id) const {
   }
 }
 
+int64_t TmpFileMgr::TmpDirRemoteCtrl::CalcMaxReadBufferBytes() {
+  int64_t max_allow_bytes = 0;
+  int64_t process_bytes_limit;
+  int64_t total_avail_mem;
+  if (!ChooseProcessMemLimit(&process_bytes_limit, &total_avail_mem).ok()) {
+    // Return 0 to disable read buffer if unable to get the process and system limit.
+    return max_allow_bytes;
+  }
+  DCHECK_GE(total_avail_mem, process_bytes_limit);
+  // Only allows the read buffer if the memory not being used is larger than
+  // REMOTE_READ_BUFFER_DISABLE_THRESHOLD_PERCENT of the total memory.
+  if ((total_avail_mem - process_bytes_limit)
+      > total_avail_mem * REMOTE_READ_BUFFER_DISABLE_THRESHOLD_PERCENT) {
+    // Max allowed bytes are the minimum of REMOTE_READ_BUFFER_MAX_MEM_PERCENT of the
+    // total memory and REMOTE_READ_BUFFER_MEM_HARD_LIMIT_PERCENT of the unused memory.
+    max_allow_bytes = min((total_avail_mem - process_bytes_limit)
+            * REMOTE_READ_BUFFER_MEM_HARD_LIMIT_PERCENT,
+        total_avail_mem * REMOTE_READ_BUFFER_MAX_MEM_PERCENT);
+  }
+  return max_allow_bytes;
+}
+
+Status TmpFileMgr::TmpDirRemoteCtrl::SetUpReadBufferParams() {
+  bool is_percent;
+  // If the temporary file size is smaller than the max block size, set the block size
+  // as the file size
+  read_buffer_block_size_ =
+      remote_tmp_file_size_ < MAX_REMOTE_READ_MEM_BLOCK_THRESHOLD_BYTES ?
+      remote_tmp_file_size_ :
+      MAX_REMOTE_READ_MEM_BLOCK_THRESHOLD_BYTES;
+  num_read_buffer_blocks_per_file_ =
+      static_cast<int>(remote_tmp_file_size_ / read_buffer_block_size_);
+  max_read_buffer_size_ =
+      ParseUtil::ParseMemSpec(FLAGS_remote_read_memory_buffer_size, &is_percent, 0);
+  if (max_read_buffer_size_ <= 0) {
+    return Status(Substitute("Invalid value of remote_read_memory_buffer_size '$0'",
+        FLAGS_remote_read_memory_buffer_size));
+  }
+  // Calculate the max allowed bytes for the read buffer.
+  int64_t max_allow_bytes = CalcMaxReadBufferBytes();
+  DCHECK_GE(max_allow_bytes, 0);
+  if (max_read_buffer_size_ > max_allow_bytes) {
+    max_read_buffer_size_ = max_allow_bytes;
+    LOG(WARNING) << "The remote read memory buffer size exceeds the maximum "
+                    "allowed and is reduced to "
+                 << max_allow_bytes << " bytes.";
+  }
+  LOG(INFO) << "Using " << max_read_buffer_size_
+            << " bytes for the batch reading buffer of TmpFileMgr.";
+  return Status::OK();
+}
+
 TmpDir::TmpDir(const string& raw_path, const string& prefix, bool is_local)
   : raw_path_(raw_path), prefix_(prefix), is_local_dir_(is_local), is_parsed_(false) {}
 
@@ -872,9 +977,11 @@ TmpFileRemote::TmpFileRemote(TmpFileGroup* file_group, TmpFileMgr::DeviceId devi
   if (IsHdfsPath(hdfs_url, false)) {
     disk_type_ = io::DiskFileType::DFS;
     disk_id_ = file_group->io_mgr_->RemoteDfsDiskId();
+    disk_id_file_op_ = file_group->io_mgr_->RemoteDfsDiskFileOperId();
   } else if (IsS3APath(hdfs_url, false)) {
     disk_type_ = io::DiskFileType::S3;
     disk_id_ = file_group->io_mgr_->RemoteS3DiskId();
+    disk_id_file_op_ = file_group->io_mgr_->RemoteS3DiskFileOperId();
     options = file_group_->tmp_file_mgr_->s3a_options();
   }
   Status status = HdfsFsCache::instance()->GetConnection(
@@ -883,8 +990,23 @@ TmpFileRemote::TmpFileRemote(TmpFileGroup* file_group, TmpFileMgr::DeviceId devi
   local_buffer_path_ = local_buffer_path;
   disk_file_ = make_unique<io::DiskFile>(path_, file_group->io_mgr_,
       file_group_->tmp_file_mgr_->GetRemoteTmpFileSize(), disk_type_, &hdfs_conn_);
-  disk_buffer_file_ = make_unique<io::DiskFile>(local_buffer_path_, file_group_->io_mgr_,
-      file_group_->tmp_file_mgr_->GetRemoteTmpFileSize(), io::DiskFileType::LOCAL_BUFFER);
+  if (file_group_->tmp_file_mgr_->IsRemoteBatchReadingEnabled()) {
+    read_buffer_block_size_ = file_group_->tmp_file_mgr_->GetReadBufferBlockSize();
+    int num_of_read_buffers = file_group_->tmp_file_mgr_->GetNumReadBuffersPerFile();
+    disk_buffer_file_ = make_unique<io::DiskFile>(local_buffer_path_,
+        file_group_->io_mgr_, file_group_->tmp_file_mgr_->GetRemoteTmpFileSize(),
+        io::DiskFileType::LOCAL_BUFFER, read_buffer_block_size_, num_of_read_buffers);
+    disk_read_page_cnts_ = std::make_unique<int64_t[]>(num_of_read_buffers);
+    DCHECK(disk_read_page_cnts_.get() != nullptr);
+    memset(disk_read_page_cnts_.get(), 0, num_of_read_buffers * sizeof(int64_t));
+    for (int i = 0; i < num_of_read_buffers; i++) {
+      fetch_ranges_.emplace_back(nullptr);
+    }
+  } else {
+    disk_buffer_file_ = make_unique<io::DiskFile>(local_buffer_path_,
+        file_group_->io_mgr_, file_group_->tmp_file_mgr_->GetRemoteTmpFileSize(),
+        io::DiskFileType::LOCAL_BUFFER);
+  }
 }
 
 TmpFileRemote::~TmpFileRemote() {
@@ -910,6 +1032,133 @@ io::DiskFile* TmpFileRemote::GetWriteFile() {
   return disk_buffer_file_.get();
 }
 
+int TmpFileRemote::GetReadBufferIndex(int64_t offset) {
+  DCHECK(disk_buffer_file_ != nullptr);
+  return disk_buffer_file_->GetReadBufferIndex(offset);
+}
+
+void TmpFileRemote::AsyncFetchReadBufferBlock(io::DiskFile* read_buffer_file,
+    io::MemBlock* read_buffer_block, int read_buffer_idx, bool* fetched) {
+  DCHECK(fetched != nullptr);
+  *fetched = false;
+  {
+    shared_lock<shared_mutex> read_file_lock(*(read_buffer_file->GetFileLock()));
+    unique_lock<SpinLock> mem_bloc_lock(*(read_buffer_block->GetLock()));
+    // Check the block status.
+    // If the block is disabled, the caller won't be able to use this buffer block.
+    // If the block is written, the block is already fetched, set the fetched flag and
+    // return immediately.
+    // If the block is uninitialized, we will fetch the block immediately but without
+    // waiting for the fetch, so that it won't block the current page reading.
+    // If the block is in reserved or alloc status, means one other thread is handling
+    // the block, here we don't wait because the blocking could be expensive.
+    if (read_buffer_file->IsReadBufferBlockStatus(read_buffer_block,
+            io::MemBlockStatus::DISABLED, read_file_lock, &mem_bloc_lock)) {
+      return;
+    } else if (read_buffer_file->IsReadBufferBlockStatus(read_buffer_block,
+                   io::MemBlockStatus::WRITTEN, read_file_lock, &mem_bloc_lock)) {
+      *fetched = true;
+      return;
+    } else if (read_buffer_file->IsReadBufferBlockStatus(read_buffer_block,
+                   io::MemBlockStatus::UNINIT, read_file_lock, &mem_bloc_lock)) {
+      bool dofetch = true;
+      int64_t mem_size_limit =
+          file_group_->tmp_file_mgr()->GetRemoteMaxTotalReadBufferSize();
+      auto read_mem_counter =
+          file_group_->tmp_file_mgr()->scratch_read_memory_buffer_used_metric_;
+      if (read_mem_counter->Increment(read_buffer_file->read_buffer_block_size())
+          > mem_size_limit) {
+        read_mem_counter->Increment(-1 * read_buffer_file->read_buffer_block_size());
+        dofetch = false;
+      }
+      if (dofetch) {
+        read_buffer_file->SetReadBufferBlockStatus(read_buffer_block,
+            io::MemBlockStatus::RESERVED, read_file_lock, &mem_bloc_lock);
+        RemoteOperRange::RemoteOperDoneCallback fetch_callback =
+            [read_buffer_block, tmp_file = this](const Status& fetch_status) {
+              if (!fetch_status.ok()) {
+                // Disable the read buffer if fails to fetch.
+                tmp_file->TryDeleteReadBufferExcl(read_buffer_block->block_id());
+              }
+            };
+        fetch_ranges_[read_buffer_idx].reset(new RemoteOperRange(disk_file_.get(),
+            read_buffer_file, file_group_->tmp_file_mgr()->GetRemoteTmpBlockSize(),
+            disk_id(true), RequestType::FILE_FETCH, file_group_->io_mgr_, fetch_callback,
+            GetReadBuffStartOffset(read_buffer_idx)));
+        Status add_status = file_group_->io_ctx_->AddRemoteOperRange(
+            fetch_ranges_[read_buffer_idx].get());
+        if (!add_status.ok()) {
+          read_buffer_file->SetReadBufferBlockStatus(read_buffer_block,
+              io::MemBlockStatus::DISABLED, read_file_lock, &mem_bloc_lock);
+        }
+      } else {
+        read_buffer_file->SetReadBufferBlockStatus(read_buffer_block,
+            io::MemBlockStatus::DISABLED, read_file_lock, &mem_bloc_lock);
+      }
+    }
+  }
+  *fetched = true;
+  return;
+}
+
+io::DiskFile* TmpFileRemote::GetReadBufferFile(int64_t offset) {
+  // If the local buffer file exists, return the file directly.
+  // If it is deleted (probably due to eviction), and batch reading is enabled, would
+  // try to fetch the current block asynchronously if it is not present in the memory
+  // buffer.
+  // If the local buffer file is deleted and the read memory buffer doesn't have the
+  // block right now, then return a nullptr to indicate there is no buffer available.
+  io::DiskFile* read_buffer_file = disk_buffer_file_.get();
+  if (disk_buffer_file_->GetFileStatus() != io::DiskFileStatus::DELETED) {
+    return read_buffer_file;
+  }
+  if (!file_group_->tmp_file_mgr()->IsRemoteBatchReadingEnabled()) return nullptr;
+  int read_buffer_idx = GetReadBufferIndex(offset);
+  io::MemBlock* read_buffer_block = disk_buffer_file_->GetBufferBlock(read_buffer_idx);
+  bool fetched = false;
+  io::MemBlockStatus block_status = read_buffer_block->GetStatus();
+  if (block_status == io::MemBlockStatus::DISABLED) {
+    // do nothing
+  } else if (block_status == io::MemBlockStatus::WRITTEN) {
+    fetched = true;
+  } else {
+    AsyncFetchReadBufferBlock(
+        read_buffer_file, read_buffer_block, read_buffer_idx, &fetched);
+  }
+  return fetched ? read_buffer_file : nullptr;
+}
+
+bool TmpFileRemote::IncrementReadPageCount(int buffer_idx) {
+  int64_t read_count = 0;
+  int64_t total_num = 0;
+  DCheckReadBufferIdx(buffer_idx);
+  total_num = GetReadBuffPageCount(buffer_idx);
+  {
+    lock_guard<SpinLock> lock(lock_);
+    read_count = ++disk_read_page_cnts_[buffer_idx];
+  }
+  // Return true if all the pages have been read of the block.
+  return read_count == total_num;
+}
+
+template <typename T>
+void TmpFileRemote::TryDeleteReadBuffer(const T& lock, int buffer_idx) {
+  DCheckReadBufferIdx(buffer_idx);
+  bool reserved = false;
+  bool allocated = false;
+  DCHECK(disk_buffer_file_->IsBatchReadEnabled());
+  DCHECK(lock.mutex() == disk_buffer_file_->GetFileLock() && lock.owns_lock());
+  disk_buffer_file_->DeleteReadBuffer(
+      disk_buffer_file_->GetBufferBlock(buffer_idx), &reserved, &allocated, lock);
+  if (reserved || allocated) {
+    // Because the reservation will increase the current allocated read buffer usage
+    // ahead of the real allocation, we need to decrease it if the block is reserved
+    // or allocated.
+    file_group_->tmp_file_mgr_->scratch_read_memory_buffer_used_metric_->Increment(
+        -1 * read_buffer_block_size_);
+  }
+}
+
 TmpDir* TmpFileRemote::GetLocalBufferDir() const {
   return file_group_->tmp_file_mgr_->GetLocalBufferDir();
 }
@@ -919,33 +1168,44 @@ Status TmpFileRemote::Remove() {
   // If True, we need to enqueue the file back to the pool after deletion.
   bool to_return_the_buffer = false;
 
-  // The order of acquiring the lock must be from local to remote to avoid deadlocks.
-  unique_lock<shared_mutex> buffer_file_lock(*(disk_buffer_file_->GetFileLock()));
-  unique_lock<shared_mutex> file_lock(*(disk_file_->GetFileLock()));
+  // Set a flag to notify other threads which are holding the file lock to release,
+  // since the remove process needs a unique lock, it accelerates acquiring the mutex.
+  SetToDeleteFlag();
 
-  // Delete the local buffer file if exists.
-  if (disk_buffer_file_->GetFileStatus() != io::DiskFileStatus::DELETED) {
-    status = disk_buffer_file_->Delete(buffer_file_lock);
-    if (!status.ok()) {
-      // If the physical file is failed to delete, log a warning, and set a deleted flag
-      // anyway.
-      LOG(WARNING) << "Delete file: " << disk_buffer_file_->path() << " failed.";
-      disk_buffer_file_->SetStatus(io::DiskFileStatus::DELETED);
-    } else if (disk_file_->GetFileStatus() != io::DiskFileStatus::PERSISTED
-        && disk_buffer_file_->IsSpaceReserved()) {
-      // If the file is not uploaded and the buffer space is reserved, we need to return
-      // the buffer to the pool after deletion of the TmpFile. The buffer of a uploaded
-      // file should have been returned to the pool after upload operation completes.
-      to_return_the_buffer = true;
-    } else {
-      // Do nothing.
+  {
+    // The order of acquiring the lock must be from local to remote to avoid deadlocks.
+    unique_lock<shared_mutex> buffer_file_lock(*(disk_buffer_file_->GetFileLock()));
+    unique_lock<shared_mutex> file_lock(*(disk_file_->GetFileLock()));
+
+    // Delete the local buffer file if exists.
+    if (disk_buffer_file_->GetFileStatus() != io::DiskFileStatus::DELETED) {
+      status = disk_buffer_file_->Delete(buffer_file_lock);
+      if (!status.ok()) {
+        // If the physical file is failed to delete, log a warning, and set a deleted flag
+        // anyway.
+        LOG(WARNING) << "Delete file: " << disk_buffer_file_->path() << " failed.";
+        disk_buffer_file_->SetStatus(io::DiskFileStatus::DELETED);
+      } else if (disk_file_->GetFileStatus() != io::DiskFileStatus::PERSISTED
+          && disk_buffer_file_->IsSpaceReserved()) {
+        // If the file is not uploaded and the buffer space is reserved, we need to return
+        // the buffer to the pool after deletion of the TmpFile. The buffer of a uploaded
+        // file should have been returned to the pool after upload operation completes.
+        to_return_the_buffer = true;
+      } else {
+        // Do nothing.
+      }
     }
-  }
 
-  // Set the remote file status to deleted. The physical remote files would be deleted
-  // during deconstruction of TmpFileGroup by deleting the entire remote
-  // directory for efficiency consideration.
-  disk_file_->SetStatus(io::DiskFileStatus::DELETED);
+    // Set the remote file status to deleted. The physical remote files would be deleted
+    // during deconstruction of TmpFileGroup by deleting the entire remote
+    // directory for efficiency consideration.
+    disk_file_->SetStatus(io::DiskFileStatus::DELETED);
+
+    // Try to delete all the read buffers.
+    if (file_group_->tmp_file_mgr_->IsRemoteBatchReadingEnabled()) {
+      file_group_->tmp_file_mgr_->ReleaseTmpFileReadBuffer(buffer_file_lock, this);
+    }
+  }
 
   // Update the metrics.
   GetDir()->bytes_used_metric()->Increment(-file_size_);
@@ -972,6 +1232,13 @@ TmpFileGroup::TmpFileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr,
         ADD_COUNTER(profile, "UncompressedScratchBytesWritten", TUnit::BYTES)),
     read_counter_(ADD_COUNTER(profile, "ScratchReads", TUnit::UNIT)),
     bytes_read_counter_(ADD_COUNTER(profile, "ScratchBytesRead", TUnit::BYTES)),
+    read_use_mem_counter_(ADD_COUNTER(profile, "ScratchReadsUseMem", TUnit::UNIT)),
+    bytes_read_use_mem_counter_(
+        ADD_COUNTER(profile, "ScratchBytesReadUseMem", TUnit::BYTES)),
+    read_use_local_disk_counter_(
+        ADD_COUNTER(profile, "ScratchReadsUseLocalDisk", TUnit::UNIT)),
+    bytes_read_use_local_disk_counter_(
+        ADD_COUNTER(profile, "ScratchBytesReadUseLocalDisk", TUnit::BYTES)),
     scratch_space_bytes_used_counter_(
         ADD_COUNTER(profile, "ScratchFileUsedBytes", TUnit::BYTES)),
     disk_read_timer_(ADD_TIMER(profile, "TotalReadBlockTime")),
@@ -987,6 +1254,10 @@ TmpFileGroup::TmpFileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr,
     free_ranges_(64) {
   DCHECK(tmp_file_mgr != nullptr);
   io_ctx_ = io_mgr_->RegisterContext();
+  io_ctx_->set_read_use_mem_counter(read_use_mem_counter_);
+  io_ctx_->set_bytes_read_use_mem_counter(bytes_read_use_mem_counter_);
+  io_ctx_->set_read_use_local_disk_counter(read_use_local_disk_counter_);
+  io_ctx_->set_bytes_read_use_local_disk_counter(bytes_read_use_local_disk_counter_);
   // Populate the priority based index ranges.
   const std::vector<std::unique_ptr<TmpDir>>& tmp_dirs = tmp_file_mgr_->tmp_dirs_;
   if (tmp_dirs.size() > 0) {
@@ -1352,29 +1623,29 @@ Status TmpFileGroup::ReadAsync(TmpWriteHandle* handle, MemRange buffer) {
   // Don't grab handle->write_state_lock_, it is safe to touch all of handle's state
   // since the write is not in flight.
   handle->read_range_ = scan_range_pool_.Add(new ScanRange);
-
+  int64_t offset = handle->write_range_->offset();
   if (handle->file_ != nullptr && !handle->file_->is_local()) {
     TmpFileRemote* tmp_file = static_cast<TmpFileRemote*>(handle->file_);
-    DiskFile* disk_buffer_file = tmp_file->DiskBufferFile();
-    DiskFile* disk_file = tmp_file->DiskFile();
+    DiskFile* local_read_buffer_file = tmp_file->GetReadBufferFile(offset);
+    DiskFile* remote_file = tmp_file->DiskFile();
     // Reset the read_range, use the remote filesystem's disk id.
-    handle->read_range_->Reset(tmp_file->hdfs_conn_, disk_file->path().c_str(),
-        handle->write_range_->len(), handle->write_range_->offset(), tmp_file->disk_id(),
-        false, tmp_file->mtime_,
+    handle->read_range_->Reset(tmp_file->hdfs_conn_, remote_file->path().c_str(),
+        handle->write_range_->len(), offset, tmp_file->disk_id(), false, tmp_file->mtime_,
         BufferOpts::ReadInto(
             read_buffer.data(), read_buffer.len(), BufferOpts::NO_CACHING),
-        nullptr, disk_file, disk_buffer_file);
+        nullptr, remote_file, local_read_buffer_file);
   } else {
     // Read from local.
     handle->read_range_->Reset(nullptr, handle->write_range_->file(),
-        handle->write_range_->len(), handle->write_range_->offset(),
-        handle->write_range_->disk_id(), false, ScanRange::INVALID_MTIME,
+        handle->write_range_->len(), offset, handle->write_range_->disk_id(), false,
+        ScanRange::INVALID_MTIME,
         BufferOpts::ReadInto(
             read_buffer.data(), read_buffer.len(), BufferOpts::NO_CACHING));
   }
 
   read_counter_->Add(1);
   bytes_read_counter_->Add(read_buffer.len());
+
   bool needs_buffers;
   RETURN_IF_ERROR(io_ctx_->StartScanRange(handle->read_range_, &needs_buffers));
   DCHECK(!needs_buffers) << "Already provided a buffer";
@@ -1431,6 +1702,16 @@ Status TmpFileGroup::WaitForAsyncRead(
     if (!status.ok()) goto exit;
   }
 exit:
+  if (handle->file_ != nullptr && !handle->file_->is_local()) {
+    auto tmp_file = static_cast<TmpFileRemote*>(handle->file_);
+    // If all the pages of specific read buffer have been read, try delete the read
+    // buffer.
+    if (tmp_file_mgr()->IsRemoteBatchReadingEnabled()) {
+      int buffer_idx = tmp_file->GetReadBufferIndex(handle->write_range_->offset());
+      bool all_read = tmp_file->IncrementReadPageCount(buffer_idx);
+      if (all_read) tmp_file->TryDeleteMemReadBufferShared(buffer_idx);
+    }
+  }
   // Always return the buffer before exiting to avoid leaking it.
   if (io_mgr_buffer != nullptr) handle->read_range_->ReturnBuffer(move(io_mgr_buffer));
   handle->read_range_ = nullptr;
@@ -1762,15 +2043,13 @@ void TmpWriteHandle::WriteComplete(const Status& write_status) {
       // Do file upload if the local buffer file is finished.
       if (write_range_->is_full()) {
         TmpFileRemote* tmp_file = static_cast<TmpFileRemote*>(file_);
-        int disk_id = tmp_file->DiskFile()->disk_type() == io::DiskFileType::S3 ?
-            parent_->io_mgr_->RemoteS3DiskFileOperId() :
-            parent_->io_mgr_->RemoteDfsDiskFileOperId();
         RemoteOperRange::RemoteOperDoneCallback u_callback =
             [this, tmp_file](
                 const Status& upload_status) { UploadComplete(tmp_file, upload_status); };
-        tmp_file->upload_range_.reset(new RemoteOperRange(tmp_file->DiskBufferFile(),
-            tmp_file->DiskFile(), parent_->tmp_file_mgr()->GetRemoteTmpBlockSize(),
-            disk_id, RequestType::FILE_UPLOAD, parent_->io_mgr_, u_callback));
+        tmp_file->upload_range_.reset(
+            new RemoteOperRange(tmp_file->DiskBufferFile(), tmp_file->DiskFile(),
+                parent_->tmp_file_mgr()->GetRemoteTmpBlockSize(), tmp_file->disk_id(true),
+                RequestType::FILE_UPLOAD, parent_->io_mgr_, u_callback));
         status = parent_->io_ctx_->AddRemoteOperRange(tmp_file->upload_range_.get());
       }
     }
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index 9f3691c9b..a6a3cb5a7 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -49,6 +49,7 @@ namespace io {
   class DiskIoMgr;
   class RequestContext;
   class ScanRange;
+  class DiskFile;
   class WriteRange;
   class RemoteOperRange;
 }
@@ -136,21 +137,40 @@ class TmpFileMgr {
   /// A configuration for the control parameters of remote temporary directories.
   /// The struct is used by TmpFileMgr and has the same lifecycle as TmpFileMgr.
   struct TmpDirRemoteCtrl {
+    /// Calculate the maximum allowed read buffer bytes for the remote spilling.
+    int64_t CalcMaxReadBufferBytes();
+
+    /// A helper function to set up the paremeters of read buffers for remote files.
+    Status SetUpReadBufferParams() WARN_UNUSED_RESULT;
+
     /// The high water mark metric for local buffer directory.
     AtomicInt64 local_buff_dir_bytes_high_water_mark_{0};
 
     /// The default size of a remote temporary file.
     int64_t remote_tmp_file_size_;
 
+    /// The default size of a read buffer block for a remote temporary file.
+    int64_t read_buffer_block_size_;
+
+    /// The number of read buffer blocks for a remote file, it is from
+    /// remote_tmp_file_size_/read_buffer_block_size_.
+    int num_read_buffer_blocks_per_file_;
+
     /// The default block size of a remote temporary file. The block is used as a buffer
     /// while doing upload and fetch a remote temporary file.
     int64_t remote_tmp_block_size_;
 
+    /// The maximum total size of read buffer for remote spilling.
+    int64_t max_read_buffer_size_;
+
     /// Specify the mode to enqueue the tmp file to the pool.
     /// If true, the file would be placed in the first to be poped out from the pool.
     /// If false, the file would be placed in the last of the pool.
     bool remote_tmp_files_avail_pool_lifo_;
 
+    /// Indicates if batch reading is enabled for the remote temporary files.
+    bool remote_batch_read_enabled_;
+
     /// Temporary file buffer pool managed by TmpFileMgr, is only activated when there is
     /// a remote scratch space is registered. So, if TmpFileMgr::HasRemoteDir() is true,
     /// the tmp_file_pool_ is non-null. Otherwise, it is null.
@@ -197,6 +217,21 @@ class TmpFileMgr {
     return tmp_dirs_remote_ctrl_.remote_tmp_file_size_;
   }
 
+  /// Return the read buffer block size for a remote temporary file.
+  int64_t GetReadBufferBlockSize() const {
+    return tmp_dirs_remote_ctrl_.read_buffer_block_size_;
+  }
+
+  /// Return the number of read buffer blocks for a remote temporary file.
+  int GetNumReadBuffersPerFile() const {
+    return tmp_dirs_remote_ctrl_.num_read_buffer_blocks_per_file_;
+  }
+
+  /// Return the maximum total size of all the read buffer blocks for remote spilling.
+  int64_t GetRemoteMaxTotalReadBufferSize() const {
+    return tmp_dirs_remote_ctrl_.max_read_buffer_size_;
+  }
+
   /// Return the remote temporary block size.
   int64_t GetRemoteTmpBlockSize() const {
     return tmp_dirs_remote_ctrl_.remote_tmp_block_size_;
@@ -213,6 +248,11 @@ class TmpFileMgr {
     return tmp_dirs_remote_ctrl_.remote_tmp_files_avail_pool_lifo_;
   }
 
+  // Return if batch reading for remote temporary files is enabled.
+  bool IsRemoteBatchReadingEnabled() {
+    return tmp_dirs_remote_ctrl_.remote_batch_read_enabled_;
+  }
+
   /// Return the local buffer dir for remote spilling.
   TmpDir* GetLocalBufferDir() const;
 
@@ -257,6 +297,11 @@ class TmpFileMgr {
   /// one would wait util a file is dequeued.
   Status DequeueTmpFilesPool(std::shared_ptr<TmpFile>* tmp_file, bool quick_return);
 
+  /// The function releases all the memory of read buffer in a temporary file.
+  /// Caller needs to hold the unique lock of the buffer file.
+  void ReleaseTmpFileReadBuffer(
+      const std::unique_lock<boost::shared_mutex>& lock, TmpFile* tmp_file);
+
   /// Try to delete the buffer of a TmpFile to make some space for other buffers.
   /// May return an error status if error happens during deletion of the buffer.
   Status TryEvictFile(TmpFile* tmp_file);
@@ -350,6 +395,9 @@ class TmpFileMgr {
 
   /// Metrics to track the scratch space HWM.
   AtomicHighWaterMarkGauge* scratch_bytes_used_metric_ = nullptr;
+
+  /// Metrics to track the read memory buffer HWM.
+  AtomicHighWaterMarkGauge* scratch_read_memory_buffer_used_metric_ = nullptr;
 };
 
 /// Represents a group of temporary files - one per disk with a scratch directory. The
@@ -553,6 +601,18 @@ class TmpFileGroup {
   /// Number of bytes read from disk (includes reads started but not yet complete).
   RuntimeProfile::Counter* const bytes_read_counter_;
 
+  /// Number of read operations that use mem buffer.
+  RuntimeProfile::Counter* const read_use_mem_counter_;
+
+  /// Number of bytes read from mem buffer.
+  RuntimeProfile::Counter* const bytes_read_use_mem_counter_;
+
+  /// Number of read operations that use local disk buffer.
+  RuntimeProfile::Counter* const read_use_local_disk_counter_;
+
+  /// Number of bytes read from local disk buffer.
+  RuntimeProfile::Counter* const bytes_read_use_local_disk_counter_;
+
   /// Amount of scratch space allocated in bytes.
   RuntimeProfile::Counter* const scratch_space_bytes_used_counter_;
 
@@ -566,7 +626,7 @@ class TmpFileGroup {
   /// is not enabled.
   RuntimeProfile::Counter* compression_timer_;
 
-  /// Protects tmp_files_remote_ptrs_lock_.
+  /// Protects tmp_files_remote_ptrs_.
   SpinLock tmp_files_remote_ptrs_lock_;
 
   /// A map of raw pointer and its shared_ptr of remote TmpFiles.
diff --git a/be/src/util/mem-info.cc b/be/src/util/mem-info.cc
index df932cb70..43cfd24ff 100644
--- a/be/src/util/mem-info.cc
+++ b/be/src/util/mem-info.cc
@@ -198,7 +198,7 @@ string ThpConfig::DebugString() const {
   return stream.str();
 }
 
-Status ChooseProcessMemLimit(int64_t* bytes_limit) {
+Status ChooseProcessMemLimit(int64_t* bytes_limit, int64_t* process_avail_mem) {
   // Depending on the system configuration, we detect the total amount of memory
   // available to the system - either the available physical memory, or if overcommitting
   // is turned off, we use the memory commit limit from /proc/meminfo (see IMPALA-1690).
@@ -262,6 +262,7 @@ Status ChooseProcessMemLimit(int64_t* bytes_limit) {
                  << " exceeds CGroup memory limit of "
                  << PrettyPrinter::PrintBytes(cgroup_mem_limit);
   }
+  if (process_avail_mem) *process_avail_mem = avail_mem;
   return Status::OK();
 }
 }
diff --git a/be/src/util/mem-info.h b/be/src/util/mem-info.h
index f36e8f705..cabc24fde 100644
--- a/be/src/util/mem-info.h
+++ b/be/src/util/mem-info.h
@@ -126,5 +126,6 @@ class MemInfo {
 /// the memory available to the daemon process. Returns an error if the memory limit is
 /// invalid or another error is encountered that should prevent starting up the daemon.
 /// Logs the memory limit chosen and any relevant diagnostics related to that choice.
-Status ChooseProcessMemLimit(int64_t* bytes_limit);
+/// If avail_mem is not nullptr, the bytes of system available memory will be returned.
+Status ChooseProcessMemLimit(int64_t* bytes_limit, int64_t* avail_mem = nullptr);
 }
diff --git a/be/src/util/metrics.h b/be/src/util/metrics.h
index 0970caf81..fd9c45d5e 100644
--- a/be/src/util/metrics.h
+++ b/be/src/util/metrics.h
@@ -270,9 +270,11 @@ class AtomicHighWaterMarkGauge : public ScalarMetric<int64_t, TMetricKind::GAUGE
 
   /// Adds 'delta' to the current value atomically.
   /// The hwm value is also updated atomically.
-  void Increment(int64_t delta) {
+  /// The updated current value is also returned.
+  int64_t Increment(int64_t delta) {
     const int64_t new_val = current_value_->Increment(delta);
     UpdateMax(new_val);
+    return new_val;
   }
 
   IntGauge* current_value() const { return current_value_; }
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index d84453ca8..a5abd6651 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -2420,6 +2420,26 @@
     "kind": "GAUGE",
     "key": "tmp-file-mgr.scratch-space-bytes-used.dir-$0"
   },
+  {
+    "description": "The current total read memory buffer bytes for all scratch directories.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Read memory buffer used for scratch directories",
+    "units": "BYTES",
+    "kind": "GAUGE",
+    "key": "tmp-file-mgr.scratch-read-memory-buffer-used"
+  },
+  {
+    "description": "The high water mark for read memory buffer bytes of all scratch directories.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Read memory buffer HWM for scratch directories",
+    "units": "BYTES",
+    "kind": "GAUGE",
+    "key": "tmp-file-mgr.scratch-read-memory-buffer-used-high-water-mark"
+  },
   {
     "description": "The current total spilled bytes for the local buffer directory.",
     "contexts": [
diff --git a/tests/custom_cluster/test_scratch_disk.py b/tests/custom_cluster/test_scratch_disk.py
index 17dd34ec0..a5ca75bbe 100644
--- a/tests/custom_cluster/test_scratch_disk.py
+++ b/tests/custom_cluster/test_scratch_disk.py
@@ -45,6 +45,13 @@ class TestScratchDir(CustomClusterTestSuite):
       from tpch.orders
       order by o_orderdate
       """
+  # Query against a big table with order by requires spill to disk if intermediate
+  # results don't fit in memory.
+  spill_query_big_table = """
+      select l_orderkey, l_linestatus, l_shipdate, l_comment
+      from tpch.lineitem
+      order by l_orderkey
+      """
   # Query without order by can be executed without spilling to disk.
   in_mem_query = """
       select o_orderdate, o_custkey, o_comment from tpch.orders
@@ -440,3 +447,38 @@ class TestScratchDir(CustomClusterTestSuite):
     # assert that we did use the scratch space and should be integer times of the
     # remote file size.
     assert (total_size > 0 and total_size % (8 * 1024 * 1024) == 0)
+
+  @pytest.mark.execute_serially
+  @SkipIf.not_hdfs
+  def test_scratch_dirs_batch_reading(self, vector):
+    # Set the buffer directory small enough to spill to the remote one.
+    normal_dirs = self.generate_dirs(1)
+    normal_dirs[0] = '{0}:2MB:{1}'.format(normal_dirs[0], 1)
+    normal_dirs.append('hdfs://localhost:20500/tmp')
+    self._start_impala_cluster([
+      '--impalad_args=-logbuflevel=-1 -scratch_dirs={0}'.format(','.join(normal_dirs)),
+      '--impalad_args=--allow_multiple_scratch_dirs_per_device=true',
+      '--impalad_args=--buffer_pool_clean_pages_limit=1m',
+      '--impalad_args=--remote_tmp_file_size=1M',
+      '--impalad_args=--remote_tmp_file_block_size=1m',
+      '--impalad_args=--remote_read_memory_buffer_size=1GB',
+      '--impalad_args=--remote_batch_read=true'],
+      cluster_size=1,
+      expected_num_impalads=1)
+    self.assert_impalad_log_contains("INFO", "Using scratch directory ",
+                                    expected_count=len(normal_dirs) - 1)
+    vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
+    impalad = self.cluster.impalads[0]
+    client = impalad.service.create_beeswax_client()
+    handle = self.execute_query_async_using_client(client, self.spill_query, vector)
+    verifier = MetricVerifier(impalad.service)
+    verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2)
+    results = client.fetch(self.spill_query, handle)
+    assert results.success
+    metrics0 = self.get_metric(
+      'tmp-file-mgr.scratch-read-memory-buffer-used-high-water-mark')
+    assert (metrics0 > 0)
+    metrics1 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used-high-water-mark')
+    assert (metrics1 > 0)
+    client.close_query(handle)
+    client.close()