You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ar...@apache.org on 2022/09/28 14:28:39 UTC

[impala] branch master updated (12059ba14 -> 55194a9c8)

This is an automated email from the ASF dual-hosted git repository.

arawat pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git


    from 12059ba14 IMPALA-11614: Fix test_metrics_are_zero for Ozone
     new d47d305bf IMPALA-11418: A statement that returns at most one row need not to spool results
     new 6dfab93fe IMPALA-10791 Add batch reading for remote temporary files
     new 55194a9c8 IMPALA-11576: Fix for test_multiple_storage_locations on S3

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 be/src/runtime/io/CMakeLists.txt                   |   2 +
 be/src/runtime/io/disk-file-test.cc                | 149 +++++++++
 be/src/runtime/io/disk-file.cc                     |  50 +++
 be/src/runtime/io/disk-file.h                      | 363 +++++++++++++++++++-
 be/src/runtime/io/disk-io-mgr-test.cc              | 122 +++++++
 be/src/runtime/io/disk-io-mgr.cc                   | 117 ++++++-
 be/src/runtime/io/request-context.cc               |  28 +-
 be/src/runtime/io/request-context.h                |  31 ++
 be/src/runtime/io/request-ranges.h                 |  43 ++-
 be/src/runtime/io/scan-range.cc                    | 123 ++++---
 be/src/runtime/tmp-file-mgr-internal.h             | 118 ++++++-
 be/src/runtime/tmp-file-mgr-test.cc                | 175 +++++++++-
 be/src/runtime/tmp-file-mgr.cc                     | 369 ++++++++++++++++++---
 be/src/runtime/tmp-file-mgr.h                      |  62 +++-
 be/src/util/mem-info.cc                            |   3 +-
 be/src/util/mem-info.h                             |   3 +-
 be/src/util/metrics.h                              |   4 +-
 common/thrift/metrics.json                         |  20 ++
 .../apache/impala/analysis/AnalysisContext.java    |   9 +
 .../org/apache/impala/catalog/FeIcebergTable.java  |   5 +-
 .../apache/impala/catalog/FileMetadataLoader.java  |   2 +-
 .../org/apache/impala/catalog/HdfsPartition.java   |   8 +-
 .../java/org/apache/impala/service/Frontend.java   |   8 +-
 .../PlannerTest/bloom-filter-assignment.test       |  20 +-
 .../queries/PlannerTest/constant-folding.test      |   4 +-
 ...-runtime-filters-hdfs-num-rows-est-enabled.test |   2 +-
 .../PlannerTest/min-max-runtime-filters.test       |  10 +-
 .../queries/PlannerTest/mt-dop-validation.test     |   8 +-
 .../PlannerTest/parquet-filtering-disabled.test    |   8 +-
 .../queries/PlannerTest/parquet-filtering.test     |  16 +-
 .../queries/PlannerTest/resource-requirements.test |  26 +-
 .../queries/PlannerTest/result-spooling.test       | 104 ++++++
 .../PlannerTest/runtime-filter-query-options.test  |  10 +-
 .../queries/PlannerTest/tablesample.test           |   4 +-
 .../queries/PlannerTest/tpcds/tpcds-q13.test       |  18 +-
 .../queries/PlannerTest/tpcds/tpcds-q16.test       |  18 +-
 .../queries/PlannerTest/tpcds/tpcds-q23a.test      |  14 +-
 .../queries/PlannerTest/tpcds/tpcds-q32.test       |  18 +-
 .../queries/PlannerTest/tpcds/tpcds-q38.test       |  18 +-
 .../queries/PlannerTest/tpcds/tpcds-q48.test       |  18 +-
 .../queries/PlannerTest/tpcds/tpcds-q87.test       |  18 +-
 .../queries/PlannerTest/tpcds/tpcds-q92.test       |  18 +-
 .../queries/PlannerTest/tpcds/tpcds-q94.test       |  18 +-
 .../queries/PlannerTest/tpcds/tpcds-q95.test       |  18 +-
 .../queries/PlannerTest/tpcds/tpcds-q96.test       |  18 +-
 .../queries/PlannerTest/tpcds/tpcds-q97.test       |  18 +-
 .../queries/PlannerTest/tpch-all.test              |  32 +-
 .../queries/PlannerTest/tpch-kudu.test             |   4 +-
 .../queries/PlannerTest/tpch-nested.test           |  16 +-
 .../QueryTest/admission-max-min-mem-limits.test    |  12 +-
 .../QueryTest/dedicated-coord-mem-estimates.test   |   6 +-
 .../queries/QueryTest/explain-level2.test          |   2 +-
 tests/custom_cluster/test_scratch_disk.py          |  42 +++
 tests/query_test/test_observability.py             |   3 +-
 54 files changed, 1990 insertions(+), 365 deletions(-)
 create mode 100644 be/src/runtime/io/disk-file-test.cc


[impala] 01/03: IMPALA-11418: A statement that returns at most one row need not to spool results

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arawat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit d47d305bf42f963ba6fac18c014e96c734e9e9ff
Author: xqhe <he...@126.com>
AuthorDate: Thu Jul 7 15:36:54 2022 +0800

    IMPALA-11418: A statement that returns at most one row need not to spool results
    
    A query that returns at most one row can run more efficiently without
    result spooling. If result spooling is enabled, it will set the
    minimum memory reservation in PlanRootSink, e.g. 'select 1' minimum
    memory reservation is 4MB.
    
    This optimization can reduce the statement's resource reservation and
    prevent the exception 'Failed to get minimum memory reservation' when
    the host memory limit not available.
    
    Testing:
    - Add tests in result-spooling.test
    
    Change-Id: Icd4d73c21106048df68a270cf03d4abd56bd3aac
    Reviewed-on: http://gerrit.cloudera.org:8080/18711
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 .../apache/impala/analysis/AnalysisContext.java    |   9 ++
 .../java/org/apache/impala/service/Frontend.java   |   8 +-
 .../PlannerTest/bloom-filter-assignment.test       |  20 ++--
 .../queries/PlannerTest/constant-folding.test      |   4 +-
 ...-runtime-filters-hdfs-num-rows-est-enabled.test |   2 +-
 .../PlannerTest/min-max-runtime-filters.test       |  10 +-
 .../queries/PlannerTest/mt-dop-validation.test     |   8 +-
 .../PlannerTest/parquet-filtering-disabled.test    |   8 +-
 .../queries/PlannerTest/parquet-filtering.test     |  16 ++--
 .../queries/PlannerTest/resource-requirements.test |  26 +++---
 .../queries/PlannerTest/result-spooling.test       | 104 +++++++++++++++++++++
 .../PlannerTest/runtime-filter-query-options.test  |  10 +-
 .../queries/PlannerTest/tablesample.test           |   4 +-
 .../queries/PlannerTest/tpcds/tpcds-q13.test       |  18 ++--
 .../queries/PlannerTest/tpcds/tpcds-q16.test       |  18 ++--
 .../queries/PlannerTest/tpcds/tpcds-q23a.test      |  14 +--
 .../queries/PlannerTest/tpcds/tpcds-q32.test       |  18 ++--
 .../queries/PlannerTest/tpcds/tpcds-q38.test       |  18 ++--
 .../queries/PlannerTest/tpcds/tpcds-q48.test       |  18 ++--
 .../queries/PlannerTest/tpcds/tpcds-q87.test       |  18 ++--
 .../queries/PlannerTest/tpcds/tpcds-q92.test       |  18 ++--
 .../queries/PlannerTest/tpcds/tpcds-q94.test       |  18 ++--
 .../queries/PlannerTest/tpcds/tpcds-q95.test       |  18 ++--
 .../queries/PlannerTest/tpcds/tpcds-q96.test       |  18 ++--
 .../queries/PlannerTest/tpcds/tpcds-q97.test       |  18 ++--
 .../queries/PlannerTest/tpch-all.test              |  32 +++----
 .../queries/PlannerTest/tpch-kudu.test             |   4 +-
 .../queries/PlannerTest/tpch-nested.test           |  16 ++--
 .../QueryTest/admission-max-min-mem-limits.test    |  12 +--
 .../QueryTest/dedicated-coord-mem-estimates.test   |   6 +-
 .../queries/QueryTest/explain-level2.test          |   2 +-
 tests/query_test/test_observability.py             |   3 +-
 32 files changed, 316 insertions(+), 200 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
index bb21809a7..f151cf19c 100644
--- a/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
+++ b/fe/src/main/java/org/apache/impala/analysis/AnalysisContext.java
@@ -471,6 +471,15 @@ public class AnalysisContext {
     } finally {
       authzChecker.postAnalyze(authzCtx);
     }
+    // A statement that returns at most one row does not need to spool query results.
+    if (analysisException == null && analysisResult_.stmt_ instanceof SelectStmt &&
+        ((SelectStmt)analysisResult_.stmt_).returnsAtMostOneRow()) {
+      clientRequest.query_options.setSpool_query_results(false);
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("Result spooling is disabled due to the statement returning at most "
+            + "one row.");
+      }
+    }
     long durationMs = timeline_.markEvent("Analysis finished") / 1000000;
     LOG.info("Analysis took {} ms", durationMs);
 
diff --git a/fe/src/main/java/org/apache/impala/service/Frontend.java b/fe/src/main/java/org/apache/impala/service/Frontend.java
index 99e2ae7f3..79fdf0aac 100644
--- a/fe/src/main/java/org/apache/impala/service/Frontend.java
+++ b/fe/src/main/java/org/apache/impala/service/Frontend.java
@@ -1821,13 +1821,15 @@ public class Frontend {
       }
       TExecutorGroupSet new_entry = new TExecutorGroupSet(e);
       if (poolService != null) {
-        // Find out the max_mem_limit from the pool service
+        // Find out the max_mem_limit from the pool service. Set to max_mem_limit when it
+        // is greater than 0, otherwise set to max possible threshold value.
         TPoolConfig poolConfig =
             poolService.getPoolConfig(e.getExec_group_name_prefix());
         Preconditions.checkNotNull(poolConfig);
-        new_entry.setMax_mem_limit(poolConfig.getMax_query_mem_limit());
+        new_entry.setMax_mem_limit(poolConfig.getMax_query_mem_limit() > 0 ?
+            poolConfig.getMax_query_mem_limit() : Long.MAX_VALUE);
       } else {
-        // Set to max possible thresold value when there is no pool service
+        // Set to max possible threshold value when there is no pool service
         new_entry.setMax_mem_limit(Long.MAX_VALUE);
       }
       result.add(new_entry);
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/bloom-filter-assignment.test b/testdata/workloads/functional-planner/queries/PlannerTest/bloom-filter-assignment.test
index 9aa82ecc3..c141b52cc 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/bloom-filter-assignment.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/bloom-filter-assignment.test
@@ -10,7 +10,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=34.94MB mem-reservation=2.97MB thread-reservation=3 runtime-filters-memory=1.00MB
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -61,7 +61,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=4.44MB mem-reservation=2.94MB thread-reservation=3 runtime-filters-memory=1.00MB
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -100,7 +100,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=19.69MB mem-reservation=2.95MB thread-reservation=3 runtime-filters-memory=1.00MB
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -145,7 +145,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=19.69MB mem-reservation=2.95MB thread-reservation=3 runtime-filters-memory=1.00MB
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -190,7 +190,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=19.69MB mem-reservation=2.95MB thread-reservation=3 runtime-filters-memory=1.00MB
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -235,7 +235,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=18.69MB mem-reservation=1.95MB thread-reservation=3
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -280,7 +280,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=54.62MB mem-reservation=5.91MB thread-reservation=4 runtime-filters-memory=2.00MB
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 05:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -351,7 +351,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=54.62MB mem-reservation=5.91MB thread-reservation=4 runtime-filters-memory=2.00MB
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 05:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -421,7 +421,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=55.62MB mem-reservation=6.91MB thread-reservation=4 runtime-filters-memory=3.00MB
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 05:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -492,7 +492,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=24.38MB mem-reservation=6.89MB thread-reservation=4 runtime-filters-memory=3.00MB
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 05:AGGREGATE [FINALIZE]
 |  output: count(*)
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
index 16f3d2bc4..2baf1d42a 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/constant-folding.test
@@ -317,7 +317,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=138.00MB mem-reservation=4.00MB thread-reservation=2
 PLAN-ROOT SINK
 |  output exprs: sum(2 + id)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 02:AGGREGATE [FINALIZE]
 |  output: sum(2 + id), count:merge(*)
@@ -463,7 +463,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=128.02MB mem-reservation=4.00MB thread-reservation=2
 PLAN-ROOT SINK
 |  output exprs: sum(id + c3)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 01:AGGREGATE [FINALIZE]
 |  output: sum(CAST(id AS BIGINT) + CAST(CAST(10 AS INT) + CAST(CAST(20 AS SMALLINT) + CAST(30 AS SMALLINT) AS INT) AS BIGINT))
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters-hdfs-num-rows-est-enabled.test b/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters-hdfs-num-rows-est-enabled.test
index e5aa22767..40deb4a94 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters-hdfs-num-rows-est-enabled.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters-hdfs-num-rows-est-enabled.test
@@ -11,7 +11,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=21.38MB mem-reservation=4.00MB thread-reservation=4
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 05:AGGREGATE [FINALIZE]
 |  output: count(*)
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters.test b/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters.test
index bb9f5ecfa..41b53fb74 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/min-max-runtime-filters.test
@@ -6,7 +6,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=4.94MB mem-reservation=1.94MB thread-reservation=3
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -44,7 +44,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=6.44MB mem-reservation=1.94MB thread-reservation=3
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -81,7 +81,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=7.94MB mem-reservation=1.94MB thread-reservation=3
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -117,7 +117,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=2.02GB mem-reservation=36.95MB thread-reservation=4
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 05:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -173,7 +173,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=6.88MB mem-reservation=3.88MB thread-reservation=4
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=100.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 05:AGGREGATE [FINALIZE]
 |  output: count(*)
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
index f4f0d1469..a22b1732e 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/mt-dop-validation.test
@@ -7,7 +7,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Instance Resources: mem-estimate=2.03GB mem-reservation=16.00KB thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -46,7 +46,7 @@ F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Instance Resources: mem-estimate=116.00KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 06:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
@@ -118,7 +118,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Instance Resources: mem-estimate=2.03GB mem-reservation=35.02MB thread-reservation=1 runtime-filters-memory=1.00MB
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -161,7 +161,7 @@ F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Instance Resources: mem-estimate=116.00KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 06:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test
index eb95d348b..04e2d1ea8 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering-disabled.test
@@ -12,7 +12,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=32.10MB mem-reservation=16.00KB thread-reservation=2
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -50,7 +50,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=128.10MB mem-reservation=88.00KB thread-reservation=2
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -82,7 +82,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=48.10MB mem-reservation=24.00KB thread-reservation=2
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -332,7 +332,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=128.10MB mem-reservation=88.00KB thread-reservation=2
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
index 3d0a4f143..52ff1c575 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/parquet-filtering.test
@@ -11,7 +11,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=32.10MB mem-reservation=16.00KB thread-reservation=2
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -42,7 +42,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=128.10MB mem-reservation=32.00KB thread-reservation=2
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -75,7 +75,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=128.10MB mem-reservation=88.00KB thread-reservation=2
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -111,7 +111,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=128.10MB mem-reservation=32.00KB thread-reservation=2
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -146,7 +146,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=48.10MB mem-reservation=24.00KB thread-reservation=2
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -458,7 +458,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=16.10MB mem-reservation=8.00KB thread-reservation=2
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 05:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -586,7 +586,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=128.10MB mem-reservation=88.00KB thread-reservation=2
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -623,7 +623,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=100.00KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
index 5fc965b60..554041c6a 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/resource-requirements.test
@@ -1867,16 +1867,16 @@ Per-Instance Resources: mem-estimate=120.06MB mem-reservation=38.00MB thread-res
 # Non-grouping aggregation with zero-slot parquet scan
 select count(*) from tpch_parquet.lineitem
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=4.00MB Threads=2
+Max Per-Host Resource Reservation: Memory=128.00KB Threads=2
 Per-Host Resource Estimates: Memory=10MB
 Codegen disabled by planner
 Analyzed query: SELECT count(*) FROM tpch_parquet.lineitem
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=2
+|  Per-Host Resources: mem-estimate=1.02MB mem-reservation=128.00KB thread-reservation=2
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 01:AGGREGATE [FINALIZE]
 |  output: sum_init_zero(tpch_parquet.lineitem.stats: num_rows)
@@ -1894,16 +1894,16 @@ PLAN-ROOT SINK
    tuple-ids=0 row-size=8B cardinality=3
    in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=4.12MB Threads=3
+Max Per-Host Resource Reservation: Memory=128.00KB Threads=3
 Per-Host Resource Estimates: Memory=10MB
 Codegen disabled by planner
 Analyzed query: SELECT count(*) FROM tpch_parquet.lineitem
 
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+|  Per-Host Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 03:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
@@ -1934,16 +1934,16 @@ Per-Host Resources: mem-estimate=1.02MB mem-reservation=128.00KB thread-reservat
    tuple-ids=0 row-size=8B cardinality=3
    in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=4.12MB Threads=2
-Per-Host Resource Estimates: Memory=84MB
+Max Per-Host Resource Reservation: Memory=128.00KB Threads=2
+Per-Host Resource Estimates: Memory=80MB
 Codegen disabled by planner
 Analyzed query: SELECT count(*) FROM tpch_parquet.lineitem
 
 F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+|  Per-Instance Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 03:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
@@ -5847,16 +5847,16 @@ PLAN-ROOT SINK
 # Kudu Scan count(*)
 select count(*) from functional_kudu.alltypes
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=4.00MB Threads=2
+Max Per-Host Resource Reservation: Memory=0B Threads=2
 Per-Host Resource Estimates: Memory=10MB
 Codegen disabled by planner
 Analyzed query: SELECT count(*) FROM functional_kudu.alltypes
 
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=2
+|  Per-Host Resources: mem-estimate=400.00KB mem-reservation=0B thread-reservation=2
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 01:AGGREGATE [FINALIZE]
 |  output: sum_init_zero(functional_kudu.alltypes.stats: num_rows)
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/result-spooling.test b/testdata/workloads/functional-planner/queries/PlannerTest/result-spooling.test
index 88d16695a..10b5a6644 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/result-spooling.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/result-spooling.test
@@ -138,3 +138,107 @@ Per-Instance Resources: mem-estimate=308.13MB mem-reservation=20.00MB thread-res
    tuple-ids=0 row-size=231B cardinality=6.00M
    in pipelines: 00(GETNEXT)
 ====
+# Select stmt returning one constant row does not need to spool query results.
+select 1, uuid()
+---- DISTRIBUTEDPLAN
+Max Per-Host Resource Reservation: Memory=0B Threads=1
+Per-Host Resource Estimates: Memory=10MB
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1
+PLAN-ROOT SINK
+|  output exprs: 1, uuid()
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+00:UNION
+   constant-operands=1
+   mem-estimate=0B mem-reservation=0B thread-reservation=0
+   tuple-ids=0 row-size=13B cardinality=1
+   in pipelines: <none>
+====
+# Select from an inline view that returns one row does not need to spool query results
+select a, b from (select 1 a, uuid() b) t
+---- DISTRIBUTEDPLAN
+Max Per-Host Resource Reservation: Memory=0B Threads=1
+Per-Host Resource Estimates: Memory=10MB
+F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=0B mem-reservation=0B thread-reservation=1
+PLAN-ROOT SINK
+|  output exprs: a, b
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+00:UNION
+   constant-operands=1
+   mem-estimate=0B mem-reservation=0B thread-reservation=0
+   tuple-ids=0 row-size=13B cardinality=1
+   in pipelines: <none>
+====
+# Select stmt with a 'limit 1' clause does not need to spool query results
+select id from functional.dimtbl limit 1
+---- DISTRIBUTEDPLAN
+Max Per-Host Resource Reservation: Memory=8.00KB Threads=3
+Per-Host Resource Estimates: Memory=16MB
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=16.00KB mem-reservation=0B thread-reservation=1
+PLAN-ROOT SINK
+|  output exprs: id
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+01:EXCHANGE [UNPARTITIONED]
+|  limit: 1
+|  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=0 row-size=8B cardinality=1
+|  in pipelines: 00(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+Per-Host Resources: mem-estimate=16.00MB mem-reservation=8.00KB thread-reservation=2
+00:SCAN HDFS [functional.dimtbl, RANDOM]
+   HDFS partitions=1/1 files=1 size=171B
+   stored statistics:
+     table: rows=10 size=171B
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=10
+   limit: 1
+   mem-estimate=16.00MB mem-reservation=8.00KB thread-reservation=1
+   tuple-ids=0 row-size=8B cardinality=1
+   in pipelines: 00(GETNEXT)
+====
+# Select stmt with an aggregate function and no group by does not need to spool query results
+select count(*) from functional.dimtbl
+---- DISTRIBUTEDPLAN
+Max Per-Host Resource Reservation: Memory=8.00KB Threads=3
+Per-Host Resource Estimates: Memory=16MB
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
+PLAN-ROOT SINK
+|  output exprs: count(*)
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
+|
+03:AGGREGATE [FINALIZE]
+|  output: count:merge(*)
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=1 row-size=8B cardinality=1
+|  in pipelines: 03(GETNEXT), 01(OPEN)
+|
+02:EXCHANGE [UNPARTITIONED]
+|  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=1 row-size=8B cardinality=1
+|  in pipelines: 01(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=1 instances=1
+Per-Host Resources: mem-estimate=16.02MB mem-reservation=8.00KB thread-reservation=2
+01:AGGREGATE
+|  output: count(*)
+|  mem-estimate=16.00KB mem-reservation=0B spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=1 row-size=8B cardinality=1
+|  in pipelines: 01(GETNEXT), 00(OPEN)
+|
+00:SCAN HDFS [functional.dimtbl, RANDOM]
+   HDFS partitions=1/1 files=1 size=171B
+   stored statistics:
+     table: rows=10 size=171B
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=10
+   mem-estimate=16.00MB mem-reservation=8.00KB thread-reservation=1
+   tuple-ids=0 row-size=0B cardinality=10
+   in pipelines: 00(GETNEXT)
+====
\ No newline at end of file
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-query-options.test b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-query-options.test
index a8d13fbb6..fa0291438 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-query-options.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/runtime-filter-query-options.test
@@ -676,7 +676,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=5.02MB mem-reservation=5.00MB thread-reservation=3 runtime-filters-memory=1.00MB
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -715,7 +715,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=3
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -754,7 +754,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=5.02MB mem-reservation=5.00MB thread-reservation=3 runtime-filters-memory=1.00MB
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 03:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -794,7 +794,7 @@ F02:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 06:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
@@ -866,7 +866,7 @@ F03:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 07:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test b/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
index 2b61a833e..ec8b7f7a1 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tablesample.test
@@ -270,10 +270,10 @@ PLAN-ROOT SINK
 select count(*) from functional_parquet.iceberg_non_partitioned tablesample system(10) repeatable(1234)
 ---- PLAN
 F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=32.02MB mem-reservation=4.00MB thread-reservation=2
+|  Per-Host Resources: mem-estimate=32.02MB mem-reservation=8.00KB thread-reservation=2
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 01:AGGREGATE [FINALIZE]
 |  output: count(*)
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q13.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q13.test
index bcd85ddb4..b50e2c28a 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q13.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q13.test
@@ -54,7 +54,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=316.75MB mem-reservation=26.70MB thread-reservation=7 runtime-filters-memory=5.00MB
 PLAN-ROOT SINK
 |  output exprs: avg(ss_quantity), avg(ss_ext_sales_price), avg(ss_ext_wholesale_cost), sum(ss_ext_wholesale_cost)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 11:AGGREGATE [FINALIZE]
 |  output: avg(CAST(ss_quantity AS BIGINT)), avg(ss_ext_sales_price), avg(ss_ext_wholesale_cost), sum(ss_ext_wholesale_cost)
@@ -180,13 +180,13 @@ PLAN-ROOT SINK
    tuple-ids=2 row-size=39B cardinality=181.75K
    in pipelines: 02(GETNEXT)
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=40.08MB Threads=14
-Per-Host Resource Estimates: Memory=339MB
+Max Per-Host Resource Reservation: Memory=36.08MB Threads=14
+Per-Host Resource Estimates: Memory=335MB
 F07:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+|  Per-Host Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: avg(ss_quantity), avg(ss_ext_sales_price), avg(ss_ext_wholesale_cost), sum(ss_ext_wholesale_cost)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 19:AGGREGATE [FINALIZE]
 |  output: avg:merge(ss_quantity), avg:merge(ss_ext_sales_price), avg:merge(ss_ext_wholesale_cost), sum:merge(ss_ext_wholesale_cost)
@@ -366,13 +366,13 @@ Per-Host Resources: mem-estimate=139.26MB mem-reservation=14.81MB thread-reserva
    tuple-ids=0 row-size=40B cardinality=288.04K
    in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=56.83MB Threads=16
-Per-Host Resource Estimates: Memory=167MB
+Max Per-Host Resource Reservation: Memory=52.83MB Threads=16
+Per-Host Resource Estimates: Memory=164MB
 F07:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+|  Per-Instance Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: avg(ss_quantity), avg(ss_ext_sales_price), avg(ss_ext_wholesale_cost), sum(ss_ext_wholesale_cost)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 19:AGGREGATE [FINALIZE]
 |  output: avg:merge(ss_quantity), avg:merge(ss_ext_sales_price), avg:merge(ss_ext_wholesale_cost), sum:merge(ss_ext_wholesale_cost)
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q16.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q16.test
index c1a68e474..244687d19 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q16.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q16.test
@@ -30,7 +30,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=570.31MB mem-reservation=45.06MB thread-reservation=7 runtime-filters-memory=4.00MB
 PLAN-ROOT SINK
 |  output exprs: count(cs_order_number), sum(cs_ext_ship_cost), sum(cs_net_profit)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 13:TOP-N [LIMIT=100]
 |  order by: count(cs_order_number) ASC
@@ -160,13 +160,13 @@ PLAN-ROOT SINK
    tuple-ids=4 row-size=12B cardinality=1.44M
    in pipelines: 04(GETNEXT)
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=45.77MB Threads=14
-Per-Host Resource Estimates: Memory=588MB
+Max Per-Host Resource Reservation: Memory=41.77MB Threads=14
+Per-Host Resource Estimates: Memory=584MB
 F07:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=4.00MB mem-reservation=4.00MB thread-reservation=1
+|  Per-Host Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: count(cs_order_number), sum(cs_ext_ship_cost), sum(cs_net_profit)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 13:TOP-N [LIMIT=100]
 |  order by: count(cs_order_number) ASC
@@ -351,13 +351,13 @@ Per-Host Resources: mem-estimate=97.00MB mem-reservation=5.00MB thread-reservati
    tuple-ids=4 row-size=12B cardinality=1.44M
    in pipelines: 04(GETNEXT)
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=54.58MB Threads=13
-Per-Host Resource Estimates: Memory=216MB
+Max Per-Host Resource Reservation: Memory=50.58MB Threads=13
+Per-Host Resource Estimates: Memory=212MB
 F07:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Instance Resources: mem-estimate=4.00MB mem-reservation=4.00MB thread-reservation=1
+|  Per-Instance Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: count(cs_order_number), sum(cs_ext_ship_cost), sum(cs_net_profit)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 13:TOP-N [LIMIT=100]
 |  order by: count(cs_order_number) ASC
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q23a.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q23a.test
index 984257d74..a501f7c83 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q23a.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q23a.test
@@ -55,7 +55,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=633.30MB mem-reservation=84.56MB thread-reservation=11 runtime-filters-memory=10.00MB
 PLAN-ROOT SINK
 |  output exprs: sum(sales)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 49:AGGREGATE [FINALIZE]
 |  output: sum(sales)
@@ -507,13 +507,13 @@ PLAN-ROOT SINK
    tuple-ids=8 row-size=12B cardinality=2.88M
    in pipelines: 09(GETNEXT)
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=270.38MB Threads=50
+Max Per-Host Resource Reservation: Memory=266.38MB Threads=50
 Per-Host Resource Estimates: Memory=1.34GB
 F31:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+|  Per-Host Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: sum(sales)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 89:AGGREGATE [FINALIZE]
 |  output: sum:merge(sales)
@@ -1240,13 +1240,13 @@ Per-Host Resources: mem-estimate=65.14MB mem-reservation=12.75MB thread-reservat
    tuple-ids=8 row-size=12B cardinality=2.88M
    in pipelines: 09(GETNEXT)
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=501.12MB Threads=63
+Max Per-Host Resource Reservation: Memory=497.12MB Threads=63
 Per-Host Resource Estimates: Memory=1.11GB
 F31:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+|  Per-Instance Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: sum(sales)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 89:AGGREGATE [FINALIZE]
 |  output: sum:merge(sales)
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q32.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q32.test
index 4946724e1..78a4a1e28 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q32.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q32.test
@@ -22,7 +22,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=391.88MB mem-reservation=28.75MB thread-reservation=6 runtime-filters-memory=4.00MB
 PLAN-ROOT SINK
 |  output exprs: sum(cs_ext_discount_amt)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 10:AGGREGATE [FINALIZE]
 |  output: sum(cs_ext_discount_amt)
@@ -132,13 +132,13 @@ PLAN-ROOT SINK
    tuple-ids=3 row-size=16B cardinality=1.44M
    in pipelines: 03(GETNEXT)
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=39.94MB Threads=13
-Per-Host Resource Estimates: Memory=423MB
+Max Per-Host Resource Reservation: Memory=35.94MB Threads=13
+Per-Host Resource Estimates: Memory=419MB
 F07:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+|  Per-Host Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: sum(cs_ext_discount_amt)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 19:AGGREGATE [FINALIZE]
 |  output: sum:merge(cs_ext_discount_amt)
@@ -309,13 +309,13 @@ Per-Host Resources: mem-estimate=158.15MB mem-reservation=13.94MB thread-reserva
    tuple-ids=3 row-size=16B cardinality=1.44M
    in pipelines: 03(GETNEXT)
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=45.81MB Threads=12
-Per-Host Resource Estimates: Memory=189MB
+Max Per-Host Resource Reservation: Memory=41.81MB Threads=12
+Per-Host Resource Estimates: Memory=185MB
 F07:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+|  Per-Instance Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: sum(cs_ext_discount_amt)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 19:AGGREGATE [FINALIZE]
 |  output: sum:merge(cs_ext_discount_amt)
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q38.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q38.test
index 14437691c..6932c4722 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q38.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q38.test
@@ -34,7 +34,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=622.94MB mem-reservation=190.94MB thread-reservation=10 runtime-filters-memory=10.00MB
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 20:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -230,13 +230,13 @@ PLAN-ROOT SINK
    tuple-ids=0 row-size=8B cardinality=2.88M
    in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=340.81MB Threads=22
-Per-Host Resource Estimates: Memory=903MB
+Max Per-Host Resource Reservation: Memory=336.81MB Threads=22
+Per-Host Resource Estimates: Memory=899MB
 F12:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+|  Per-Host Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 36:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
@@ -540,13 +540,13 @@ Per-Host Resources: mem-estimate=80.58MB mem-reservation=46.94MB thread-reservat
    tuple-ids=0 row-size=8B cardinality=2.88M
    in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=484.62MB Threads=25
-Per-Host Resource Estimates: Memory=769MB
+Max Per-Host Resource Reservation: Memory=480.62MB Threads=25
+Per-Host Resource Estimates: Memory=765MB
 F12:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+|  Per-Instance Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 36:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q48.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q48.test
index 8bd224bc9..37542b51d 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q48.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q48.test
@@ -46,7 +46,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=249.81MB mem-reservation=21.77MB thread-reservation=6 runtime-filters-memory=4.00MB
 PLAN-ROOT SINK
 |  output exprs: sum(ss_quantity)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 09:AGGREGATE [FINALIZE]
 |  output: sum(CAST(ss_quantity AS BIGINT))
@@ -150,13 +150,13 @@ PLAN-ROOT SINK
    tuple-ids=2 row-size=39B cardinality=181.75K
    in pipelines: 02(GETNEXT)
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=35.08MB Threads=12
-Per-Host Resource Estimates: Memory=272MB
+Max Per-Host Resource Reservation: Memory=31.08MB Threads=12
+Per-Host Resource Estimates: Memory=268MB
 F06:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+|  Per-Host Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: sum(ss_quantity)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 16:AGGREGATE [FINALIZE]
 |  output: sum:merge(ss_quantity)
@@ -307,13 +307,13 @@ Per-Host Resources: mem-estimate=104.29MB mem-reservation=9.88MB thread-reservat
    tuple-ids=0 row-size=28B cardinality=288.04K
    in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=46.89MB Threads=14
-Per-Host Resource Estimates: Memory=146MB
+Max Per-Host Resource Reservation: Memory=42.89MB Threads=14
+Per-Host Resource Estimates: Memory=142MB
 F06:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+|  Per-Instance Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: sum(ss_quantity)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 16:AGGREGATE [FINALIZE]
 |  output: sum:merge(ss_quantity)
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q87.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q87.test
index 162daeea6..c45aa0be2 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q87.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q87.test
@@ -36,7 +36,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=618.94MB mem-reservation=186.94MB thread-reservation=10 runtime-filters-memory=6.00MB
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 20:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -225,13 +225,13 @@ PLAN-ROOT SINK
    tuple-ids=0 row-size=8B cardinality=2.88M
    in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=330.81MB Threads=22
-Per-Host Resource Estimates: Memory=893MB
+Max Per-Host Resource Reservation: Memory=326.81MB Threads=22
+Per-Host Resource Estimates: Memory=889MB
 F12:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+|  Per-Host Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 36:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
@@ -528,13 +528,13 @@ Per-Host Resources: mem-estimate=80.58MB mem-reservation=46.94MB thread-reservat
    tuple-ids=0 row-size=8B cardinality=2.88M
    in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=470.62MB Threads=25
-Per-Host Resource Estimates: Memory=755MB
+Max Per-Host Resource Reservation: Memory=466.62MB Threads=25
+Per-Host Resource Estimates: Memory=751MB
 F12:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+|  Per-Instance Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 36:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q92.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q92.test
index e910be514..3cd762f93 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q92.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q92.test
@@ -24,7 +24,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=295.88MB mem-reservation=28.75MB thread-reservation=6 runtime-filters-memory=4.00MB
 PLAN-ROOT SINK
 |  output exprs: sum(ws_ext_discount_amt)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 11:TOP-N [LIMIT=100]
 |  order by: sum(ws_ext_discount_amt) ASC
@@ -139,13 +139,13 @@ PLAN-ROOT SINK
    tuple-ids=3 row-size=16B cardinality=719.38K
    in pipelines: 03(GETNEXT)
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=39.94MB Threads=13
-Per-Host Resource Estimates: Memory=327MB
+Max Per-Host Resource Reservation: Memory=35.94MB Threads=13
+Per-Host Resource Estimates: Memory=323MB
 F07:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=4.00MB mem-reservation=4.00MB thread-reservation=1
+|  Per-Host Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: sum(ws_ext_discount_amt)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 11:TOP-N [LIMIT=100]
 |  order by: sum(ws_ext_discount_amt) ASC
@@ -321,13 +321,13 @@ Per-Host Resources: mem-estimate=110.15MB mem-reservation=13.94MB thread-reserva
    tuple-ids=3 row-size=16B cardinality=719.38K
    in pipelines: 03(GETNEXT)
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=45.81MB Threads=12
-Per-Host Resource Estimates: Memory=156MB
+Max Per-Host Resource Reservation: Memory=41.81MB Threads=12
+Per-Host Resource Estimates: Memory=152MB
 F07:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Instance Resources: mem-estimate=4.00MB mem-reservation=4.00MB thread-reservation=1
+|  Per-Instance Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: sum(ws_ext_discount_amt)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 11:TOP-N [LIMIT=100]
 |  order by: sum(ws_ext_discount_amt) ASC
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q94.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q94.test
index 130f80afd..10f2409b9 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q94.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q94.test
@@ -32,7 +32,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=409.81MB mem-reservation=30.25MB thread-reservation=7 runtime-filters-memory=4.00MB
 PLAN-ROOT SINK
 |  output exprs: count(ws_order_number), sum(ws_ext_ship_cost), sum(ws_net_profit)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 13:TOP-N [LIMIT=100]
 |  order by: count(ws_order_number) ASC
@@ -162,13 +162,13 @@ PLAN-ROOT SINK
    tuple-ids=6 row-size=8B cardinality=71.76K
    in pipelines: 05(GETNEXT)
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=42.33MB Threads=15
-Per-Host Resource Estimates: Memory=435MB
+Max Per-Host Resource Reservation: Memory=38.33MB Threads=15
+Per-Host Resource Estimates: Memory=431MB
 F08:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=4.00MB mem-reservation=4.00MB thread-reservation=1
+|  Per-Host Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: count(ws_order_number), sum(ws_ext_ship_cost), sum(ws_net_profit)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 13:TOP-N [LIMIT=100]
 |  order by: count(ws_order_number) ASC
@@ -360,13 +360,13 @@ Per-Host Resources: mem-estimate=65.00MB mem-reservation=3.00MB thread-reservati
    tuple-ids=4 row-size=12B cardinality=719.38K
    in pipelines: 04(GETNEXT)
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=48.20MB Threads=14
-Per-Host Resource Estimates: Memory=170MB
+Max Per-Host Resource Reservation: Memory=44.20MB Threads=14
+Per-Host Resource Estimates: Memory=166MB
 F08:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Instance Resources: mem-estimate=4.00MB mem-reservation=4.00MB thread-reservation=1
+|  Per-Instance Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: count(ws_order_number), sum(ws_ext_ship_cost), sum(ws_net_profit)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 13:TOP-N [LIMIT=100]
 |  order by: count(ws_order_number) ASC
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q95.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q95.test
index 3d23f1ede..fe88fa396 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q95.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q95.test
@@ -35,7 +35,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=663.16MB mem-reservation=113.88MB thread-reservation=10 runtime-filters-memory=8.00MB
 PLAN-ROOT SINK
 |  output exprs: count(ws_order_number), sum(ws_ext_ship_cost), sum(ws_net_profit)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 21:TOP-N [LIMIT=100]
 |  order by: count(ws_order_number) ASC
@@ -236,13 +236,13 @@ PLAN-ROOT SINK
    tuple-ids=9 row-size=12B cardinality=719.38K
    in pipelines: 08(GETNEXT)
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=131.20MB Threads=21
-Per-Host Resource Estimates: Memory=690MB
+Max Per-Host Resource Reservation: Memory=127.20MB Threads=21
+Per-Host Resource Estimates: Memory=686MB
 F12:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=4.00MB mem-reservation=4.00MB thread-reservation=1
+|  Per-Host Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: count(ws_order_number), sum(ws_ext_ship_cost), sum(ws_net_profit)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 21:TOP-N [LIMIT=100]
 |  order by: count(ws_order_number) ASC
@@ -526,13 +526,13 @@ Per-Host Resources: mem-estimate=67.00MB mem-reservation=5.00MB thread-reservati
    tuple-ids=9 row-size=12B cardinality=719.38K
    in pipelines: 08(GETNEXT)
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=140.95MB Threads=20
-Per-Host Resource Estimates: Memory=374MB
+Max Per-Host Resource Reservation: Memory=136.95MB Threads=20
+Per-Host Resource Estimates: Memory=370MB
 F12:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Instance Resources: mem-estimate=4.00MB mem-reservation=4.00MB thread-reservation=1
+|  Per-Instance Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: count(ws_order_number), sum(ws_ext_ship_cost), sum(ws_net_profit)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 21:TOP-N [LIMIT=100]
 |  order by: count(ws_order_number) ASC
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q96.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q96.test
index 621eb8cfe..4f1067e59 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q96.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q96.test
@@ -20,7 +20,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=168.81MB mem-reservation=10.39MB thread-reservation=5 runtime-filters-memory=3.00MB
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 07:AGGREGATE [FINALIZE]
 |  output: count(*)
@@ -103,13 +103,13 @@ PLAN-ROOT SINK
    tuple-ids=0 row-size=12B cardinality=2.88M
    in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=14.39MB Threads=9
-Per-Host Resource Estimates: Memory=173MB
+Max Per-Host Resource Reservation: Memory=10.39MB Threads=9
+Per-Host Resource Estimates: Memory=169MB
 F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+|  Per-Host Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 12:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
@@ -226,13 +226,13 @@ Per-Host Resources: mem-estimate=56.87MB mem-reservation=9.81MB thread-reservati
    tuple-ids=0 row-size=12B cardinality=2.88M
    in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=24.20MB Threads=9
-Per-Host Resource Estimates: Memory=102MB
+Max Per-Host Resource Reservation: Memory=20.20MB Threads=9
+Per-Host Resource Estimates: Memory=98MB
 F04:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+|  Per-Instance Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: count(*)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 12:AGGREGATE [FINALIZE]
 |  output: count:merge(*)
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q97.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q97.test
index 609ce2c36..f22454563 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q97.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpcds/tpcds-q97.test
@@ -41,7 +41,7 @@ F00:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
 |  Per-Host Resources: mem-estimate=281.80MB mem-reservation=106.94MB thread-reservation=5 runtime-filters-memory=2.00MB
 PLAN-ROOT SINK
 |  output exprs: sum(CASE WHEN ssci.customer_sk IS NOT NULL AND csci.customer_sk IS NULL THEN 1 ELSE 0 END), sum(CASE WHEN ssci.customer_sk IS NULL AND csci.customer_sk IS NOT NULL THEN 1 ELSE 0 END), sum(CASE WHEN ssci.customer_sk IS NOT NULL AND csci.customer_sk IS NOT NULL THEN 1 ELSE 0 END)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 09:AGGREGATE [FINALIZE]
 |  output: sum(CAST(CASE WHEN ss_customer_sk IS NOT NULL AND cs_bill_customer_sk IS NULL THEN CAST(1 AS TINYINT) ELSE CAST(0 AS TINYINT) END AS BIGINT)), sum(CAST(CASE WHEN ss_customer_sk IS NULL AND cs_bill_customer_sk IS NOT NULL THEN CAST(1 AS TINYINT) ELSE CAST(0 AS TINYINT) END AS BIGINT)), sum(CAST(CASE WHEN ss_customer_sk IS NOT NULL AND cs_bill_customer_sk IS NOT NULL THEN CAST(1 AS TINYINT) ELSE CAST(0 AS TINYINT) END AS BIGINT))
@@ -134,13 +134,13 @@ PLAN-ROOT SINK
    tuple-ids=0 row-size=16B cardinality=2.88M
    in pipelines: 00(GETNEXT)
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=151.88MB Threads=10
-Per-Host Resource Estimates: Memory=396MB
+Max Per-Host Resource Reservation: Memory=147.88MB Threads=10
+Per-Host Resource Estimates: Memory=392MB
 F06:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+|  Per-Host Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: sum(CASE WHEN ssci.customer_sk IS NOT NULL AND csci.customer_sk IS NULL THEN 1 ELSE 0 END), sum(CASE WHEN ssci.customer_sk IS NULL AND csci.customer_sk IS NOT NULL THEN 1 ELSE 0 END), sum(CASE WHEN ssci.customer_sk IS NOT NULL AND csci.customer_sk IS NOT NULL THEN 1 ELSE 0 END)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 17:AGGREGATE [FINALIZE]
 |  output: sum:merge(CASE WHEN ssci.customer_sk IS NOT NULL AND csci.customer_sk IS NULL THEN 1 ELSE 0 END), sum:merge(CASE WHEN ssci.customer_sk IS NULL AND csci.customer_sk IS NOT NULL THEN 1 ELSE 0 END), sum:merge(CASE WHEN ssci.customer_sk IS NOT NULL AND csci.customer_sk IS NOT NULL THEN 1 ELSE 0 END)
@@ -286,13 +286,13 @@ Per-Host Resources: mem-estimate=69.00MB mem-reservation=37.94MB thread-reservat
    tuple-ids=0 row-size=16B cardinality=2.88M
    in pipelines: 00(GETNEXT)
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=158.75MB Threads=12
-Per-Host Resource Estimates: Memory=287MB
+Max Per-Host Resource Reservation: Memory=154.75MB Threads=12
+Per-Host Resource Estimates: Memory=283MB
 F06:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
-|  Per-Instance Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+|  Per-Instance Resources: mem-estimate=32.00KB mem-reservation=0B thread-reservation=1
 PLAN-ROOT SINK
 |  output exprs: sum(CASE WHEN ssci.customer_sk IS NOT NULL AND csci.customer_sk IS NULL THEN 1 ELSE 0 END), sum(CASE WHEN ssci.customer_sk IS NULL AND csci.customer_sk IS NOT NULL THEN 1 ELSE 0 END), sum(CASE WHEN ssci.customer_sk IS NOT NULL AND csci.customer_sk IS NOT NULL THEN 1 ELSE 0 END)
-|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|  mem-estimate=0B mem-reservation=0B thread-reservation=0
 |
 17:AGGREGATE [FINALIZE]
 |  output: sum:merge(CASE WHEN ssci.customer_sk IS NOT NULL AND csci.customer_sk IS NULL THEN 1 ELSE 0 END), sum:merge(CASE WHEN ssci.customer_sk IS NULL AND csci.customer_sk IS NOT NULL THEN 1 ELSE 0 END), sum:merge(CASE WHEN ssci.customer_sk IS NOT NULL AND csci.customer_sk IS NOT NULL THEN 1 ELSE 0 END)
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
index 23d59df24..b7a7e15eb 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-all.test
@@ -1173,8 +1173,8 @@ PLAN-ROOT SINK
    predicates: l_discount <= 0.07, l_discount >= 0.05, l_quantity < 24, l_shipdate < '1995-01-01', l_shipdate >= '1994-01-01'
    row-size=46B cardinality=600.12K
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=12.00MB Threads=3
-Per-Host Resource Estimates: Memory=268MB
+Max Per-Host Resource Reservation: Memory=8.00MB Threads=3
+Per-Host Resource Estimates: Memory=264MB
 PLAN-ROOT SINK
 |
 03:AGGREGATE [FINALIZE]
@@ -1192,8 +1192,8 @@ PLAN-ROOT SINK
    predicates: l_discount <= 0.07, l_discount >= 0.05, l_quantity < 24, l_shipdate < '1995-01-01', l_shipdate >= '1994-01-01'
    row-size=46B cardinality=600.12K
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=20.00MB Threads=3
-Per-Host Resource Estimates: Memory=180MB
+Max Per-Host Resource Reservation: Memory=16.00MB Threads=3
+Per-Host Resource Estimates: Memory=176MB
 PLAN-ROOT SINK
 |
 03:AGGREGATE [FINALIZE]
@@ -3111,8 +3111,8 @@ PLAN-ROOT SINK
    runtime filters: RF000 -> l_partkey
    row-size=46B cardinality=600.12K
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=30.50MB Threads=6
-Per-Host Resource Estimates: Memory=359MB
+Max Per-Host Resource Reservation: Memory=26.50MB Threads=6
+Per-Host Resource Estimates: Memory=355MB
 PLAN-ROOT SINK
 |
 07:AGGREGATE [FINALIZE]
@@ -3144,8 +3144,8 @@ PLAN-ROOT SINK
    runtime filters: RF000 -> l_partkey
    row-size=46B cardinality=600.12K
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=40.50MB Threads=8
-Per-Host Resource Estimates: Memory=258MB
+Max Per-Host Resource Reservation: Memory=36.50MB Threads=8
+Per-Host Resource Estimates: Memory=254MB
 PLAN-ROOT SINK
 |
 07:AGGREGATE [FINALIZE]
@@ -3677,8 +3677,8 @@ PLAN-ROOT SINK
    runtime filters: RF000 -> tpch.lineitem.l_partkey
    row-size=16B cardinality=6.00M
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=46.75MB Threads=8
-Per-Host Resource Estimates: Memory=625MB
+Max Per-Host Resource Reservation: Memory=42.75MB Threads=8
+Per-Host Resource Estimates: Memory=621MB
 PLAN-ROOT SINK
 |
 12:AGGREGATE [FINALIZE]
@@ -3733,8 +3733,8 @@ PLAN-ROOT SINK
    runtime filters: RF000 -> tpch.lineitem.l_partkey
    row-size=16B cardinality=6.00M
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=78.62MB Threads=11
-Per-Host Resource Estimates: Memory=445MB
+Max Per-Host Resource Reservation: Memory=74.62MB Threads=11
+Per-Host Resource Estimates: Memory=441MB
 PLAN-ROOT SINK
 |
 12:AGGREGATE [FINALIZE]
@@ -4113,8 +4113,8 @@ PLAN-ROOT SINK
    runtime filters: RF000 -> l_partkey
    row-size=72B cardinality=197.63K
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=22.94MB Threads=5
-Per-Host Resource Estimates: Memory=335MB
+Max Per-Host Resource Reservation: Memory=18.94MB Threads=5
+Per-Host Resource Estimates: Memory=331MB
 PLAN-ROOT SINK
 |
 06:AGGREGATE [FINALIZE]
@@ -4146,8 +4146,8 @@ PLAN-ROOT SINK
    runtime filters: RF000 -> l_partkey
    row-size=72B cardinality=197.63K
 ---- PARALLELPLANS
-Max Per-Host Resource Reservation: Memory=33.88MB Threads=5
-Per-Host Resource Estimates: Memory=218MB
+Max Per-Host Resource Reservation: Memory=29.88MB Threads=5
+Per-Host Resource Estimates: Memory=214MB
 PLAN-ROOT SINK
 |
 06:AGGREGATE [FINALIZE]
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-kudu.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-kudu.test
index 3b77ce96b..1133c5bb7 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-kudu.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-kudu.test
@@ -437,7 +437,7 @@ where
   and l_discount between 0.05 and 0.07
   and l_quantity < 24
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=4.00MB Threads=2
+Max Per-Host Resource Reservation: Memory=0B Threads=2
 Per-Host Resource Estimates: Memory=10MB
 PLAN-ROOT SINK
 |
@@ -1412,7 +1412,7 @@ where
     )
   )
 ---- PLAN
-Max Per-Host Resource Reservation: Memory=5.00MB Threads=3
+Max Per-Host Resource Reservation: Memory=2.94MB Threads=3
 Per-Host Resource Estimates: Memory=15MB
 PLAN-ROOT SINK
 |
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test
index b012e9f18..352480215 100644
--- a/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/tpch-nested.test
@@ -806,8 +806,8 @@ PLAN-ROOT SINK
    predicates: l_discount <= 0.07, l_discount >= 0.05, l_quantity < 24, l_shipdate < '1995-01-01', l_shipdate >= '1994-01-01'
    row-size=36B cardinality=1.50M
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=20.00MB Threads=3
-Per-Host Resource Estimates: Memory=356MB
+Max Per-Host Resource Reservation: Memory=16.00MB Threads=3
+Per-Host Resource Estimates: Memory=352MB
 PLAN-ROOT SINK
 |
 03:AGGREGATE [FINALIZE]
@@ -2074,8 +2074,8 @@ PLAN-ROOT SINK
    runtime filters: RF000 -> l_partkey
    row-size=36B cardinality=1.50M
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=40.00MB Threads=5
-Per-Host Resource Estimates: Memory=414MB
+Max Per-Host Resource Reservation: Memory=36.00MB Threads=5
+Per-Host Resource Estimates: Memory=410MB
 PLAN-ROOT SINK
 |
 06:AGGREGATE [FINALIZE]
@@ -2447,8 +2447,8 @@ PLAN-ROOT SINK
    runtime filters: RF000 -> l.l_partkey, RF002 -> l_partkey
    row-size=24B cardinality=15.00M
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=158.94MB Threads=8
-Per-Host Resource Estimates: Memory=990MB
+Max Per-Host Resource Reservation: Memory=154.94MB Threads=8
+Per-Host Resource Estimates: Memory=986MB
 PLAN-ROOT SINK
 |
 12:AGGREGATE [FINALIZE]
@@ -2674,8 +2674,8 @@ PLAN-ROOT SINK
    runtime filters: RF000 -> l_partkey
    row-size=56B cardinality=1.50M
 ---- DISTRIBUTEDPLAN
-Max Per-Host Resource Reservation: Memory=32.94MB Threads=5
-Per-Host Resource Estimates: Memory=599MB
+Max Per-Host Resource Reservation: Memory=28.94MB Threads=5
+Per-Host Resource Estimates: Memory=595MB
 PLAN-ROOT SINK
 |
 06:AGGREGATE [FINALIZE]
diff --git a/testdata/workloads/functional-query/queries/QueryTest/admission-max-min-mem-limits.test b/testdata/workloads/functional-query/queries/QueryTest/admission-max-min-mem-limits.test
index 7e1ff107f..c38ec124d 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/admission-max-min-mem-limits.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/admission-max-min-mem-limits.test
@@ -19,8 +19,8 @@ row_regex: .*Cluster Memory Admitted: 68.00 MB.*
 set request_pool=poolLowMinLimit;
 select * from functional_parquet.alltypes limit 1;
 ---- RUNTIME_PROFILE
-row_regex: .*Per-Host Resource Estimates: Memory=20MB.*
-row_regex: .*Cluster Memory Admitted: 36.09 MB.*
+row_regex: .*Per-Host Resource Estimates: Memory=16MB.*
+row_regex: .*Cluster Memory Admitted: 32.09 MB.*
 ====
 ---- QUERY
 # No mem_limit set
@@ -32,9 +32,9 @@ select * from functional_parquet.alltypes limit 1;
 ---- CATCH
 Rejected query from pool root.poolLowMaxLimit: minimum memory reservation is greater than
  memory available to the query for buffer reservations. Memory reservation needed given
- the current plan: 4.09 MB. Adjust either the mem_limit or the pool config
+ the current plan: 88.00 KB. Adjust either the mem_limit or the pool config
  (max-query-mem-limit, min-query-mem-limit) for the query to allow the query
- memory limit to be at least 36.09 MB. Note that changing the mem_limit may also change
+ memory limit to be at least 32.09 MB. Note that changing the mem_limit may also change
  the plan. See the query profile for more information about the per-node memory
  requirements.
 ====
@@ -92,9 +92,9 @@ select * from functional_parquet.alltypes limit 1;
 ---- CATCH
 Rejected query from pool root.poolLowMinLimit: minimum memory reservation is greater than
  memory available to the query for buffer reservations. Memory reservation needed given
- the current plan: 4.09 MB. Adjust either the mem_limit or the pool config
+ the current plan: 88.00 KB. Adjust either the mem_limit or the pool config
  (max-query-mem-limit, min-query-mem-limit) for the query to allow the query
- memory limit to be at least 36.09 MB. Note that changing the mem_limit may also change
+ memory limit to be at least 32.09 MB. Note that changing the mem_limit may also change
  the plan. See the query profile for more information about the per-node memory
  requirements.
 ====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test b/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test
index f7ee61dc2..5a3d65bcf 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/dedicated-coord-mem-estimates.test
@@ -47,9 +47,9 @@ row_regex: .*Cluster Memory Admitted: 145.47 MB.*
 # SELECT with a non-grouping aggregate in the coordinator fragment.
 select avg(int_col) from functional.alltypes;
 ---- RUNTIME_PROFILE
-row_regex: .*Per-Host Resource Estimates: Memory=20MB.*
-row_regex: .*Dedicated Coordinator Resource Estimate: Memory=104MB.*
-row_regex: .*Cluster Memory Admitted: 144.08 MB.*
+row_regex: .*Per-Host Resource Estimates: Memory=16MB.*
+row_regex: .*Dedicated Coordinator Resource Estimate: Memory=100MB.*
+row_regex: .*Cluster Memory Admitted: 132.12 MB.*
 ====
 ---- QUERY
 # SELECT with num_nodes=1 and a complex plan in the coordinator.
diff --git a/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test b/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test
index d65e4f6d1..5ffcd130e 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/explain-level2.test
@@ -67,7 +67,7 @@ from functional_avro.alltypes t1
   left outer join functional_avro.alltypes t3 on (t2.id = t3.id)
 where t1.month = 1 and t2.year = 2009 and t3.bool_col = false
 ---- RESULTS: VERIFY_IS_SUBSET
-'Per-Host Resource Estimates: Memory=58MB'
+'Per-Host Resource Estimates: Memory=54MB'
 'WARNING: The following tables are missing relevant table and/or column statistics.'
 'functional_avro.alltypes, functional_parquet.alltypessmall'
 ====
diff --git a/tests/query_test/test_observability.py b/tests/query_test/test_observability.py
index a5721b78f..f68f98f32 100644
--- a/tests/query_test/test_observability.py
+++ b/tests/query_test/test_observability.py
@@ -180,7 +180,8 @@ class TestObservability(ImpalaTestSuite):
         "NUM_NODES=1,NUM_SCANNER_THREADS=1,"
         "RUNTIME_FILTER_MODE=OFF,MT_DOP=0,{erasure_coding}TIMEZONE={timezone},"
         "CLIENT_IDENTIFIER="
-        "query_test/test_observability.py::TestObservability::()::test_query_options"
+        "query_test/test_observability.py::TestObservability::()::test_query_options,"
+        "SPOOL_QUERY_RESULTS=0"
         "\n")
     expected_str = expected_str.format(
         erasure_coding="ALLOW_ERASURE_CODED_FILES=1," if IS_EC else "",


[impala] 02/03: IMPALA-10791 Add batch reading for remote temporary files

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arawat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 6dfab93fe9cb54aceed0b5203275827980752074
Author: Yida Wu <wy...@gmail.com>
AuthorDate: Tue Oct 26 14:10:53 2021 -0700

    IMPALA-10791 Add batch reading for remote temporary files
    
    The patch adds a feature to batch read from a remote temporary
    file in order to improve the reading performance for the spilled
    remote data.
    
    Originally, the design is to use the local disk file as the buffer
    for batch read from the remote file. But in practice, it
    doesn't help to improve the performance. Therefore, the design
    is changed to use the memory as the read buffer.
    
    Currently, each TmpFileRemote has two DiskFile, one is for the
    remote, and one is for the local buffer. The patch adds MemBlocks
    to the local buffer file. Each local buffer file is divided into
    several MemBlocks evenly. Moreover, in order to guarantee a
    single page not being cut into two parts in different blocks,
    the block size could be a little different to each other in
    practice. The default block size is the minimum value between
    the default file size and
    MAX_REMOTE_READ_MEM_BLOCK_THRESHOLD_BYTES, which is 16MB.
    
    When pinning a page, the system will detect if there is enough
    memory for the block that holds the page. If yes, the block will
    be stored in the memory until all of the pages in the block are
    read or the query ends. If not, we will go reading the page
    directly and disable this block, because it may be good to avoid
    duplicated reads from the remote fs for the same content.
    
    One challenge of the read buffer is where to get the extra memory
    for it, because when impala starts to spill data, it means the
    process lacks of memory to use. By default, impala process will
    reserve 20% of the total system memory as unused memory, and here
    we will use this unused memory for the read buffer because it is
    reasonable to use it for the emergency case like spilling and
    the memory of the read buffer will be returned immediately after
    the use.
    
    For system reliability consideration, we set a restriction that,
    the maximum bytes of the read buffer memory are no more than 10%
    of the total system memory and 50% of the unused memory. Also,
    if the unused memory is less than 5% of the total system memory,
    the read buffer will be disabled.
    
    Two start options have been added for the new feature.
    
    1. remote_batch_read. Default is false. If set true, the batch read
    is enabled.
    2. remote_read_memory_buffer_size. Default is 1G. The maximum memory
    that can be used by the read buffer. The number is also restricted
    by the process memory limit, which can not exceed 10% of the process
    memory limit.
    
    Added metrics ScratchReadsUseMem/ScratchBytesReadUseMem/
    ScratchBytesReadUseLocalDisk to the query profile.
    
    The patch also increases the MAX_REMOTE_TMPFILE_SIZE_THRESHOLD_MB
    from 256 to 512.
    
    Tests:
    Ran core and exhaustive tests.
    Added and ran TmpFileMgrTest::TestBatchReadingFromRemote.
    Added e2e test test_scratch_dirs_batch_reading.
    
    Change-Id: I1dcc5d0881ffaeff09c5c514306cd668373ad31b
    Reviewed-on: http://gerrit.cloudera.org:8080/17979
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/runtime/io/CMakeLists.txt          |   2 +
 be/src/runtime/io/disk-file-test.cc       | 149 ++++++++++++
 be/src/runtime/io/disk-file.cc            |  50 ++++
 be/src/runtime/io/disk-file.h             | 363 ++++++++++++++++++++++++++++-
 be/src/runtime/io/disk-io-mgr-test.cc     | 122 ++++++++++
 be/src/runtime/io/disk-io-mgr.cc          | 117 ++++++++--
 be/src/runtime/io/request-context.cc      |  28 +--
 be/src/runtime/io/request-context.h       |  31 +++
 be/src/runtime/io/request-ranges.h        |  43 ++--
 be/src/runtime/io/scan-range.cc           | 123 ++++++----
 be/src/runtime/tmp-file-mgr-internal.h    | 118 ++++++++--
 be/src/runtime/tmp-file-mgr-test.cc       | 175 +++++++++++++-
 be/src/runtime/tmp-file-mgr.cc            | 369 ++++++++++++++++++++++++++----
 be/src/runtime/tmp-file-mgr.h             |  62 ++++-
 be/src/util/mem-info.cc                   |   3 +-
 be/src/util/mem-info.h                    |   3 +-
 be/src/util/metrics.h                     |   4 +-
 common/thrift/metrics.json                |  20 ++
 tests/custom_cluster/test_scratch_disk.py |  42 ++++
 19 files changed, 1667 insertions(+), 157 deletions(-)

diff --git a/be/src/runtime/io/CMakeLists.txt b/be/src/runtime/io/CMakeLists.txt
index 1600c9310..cbc1f1357 100644
--- a/be/src/runtime/io/CMakeLists.txt
+++ b/be/src/runtime/io/CMakeLists.txt
@@ -43,6 +43,7 @@ add_dependencies(Io gen-deps)
 add_library(IoTests STATIC
   data-cache-trace-test.cc
   disk-io-mgr-test.cc
+  disk-file-test.cc
 )
 add_dependencies(IoTests gen-deps)
 
@@ -55,6 +56,7 @@ add_executable(data-cache-trace-replayer data-cache-trace-replayer.cc)
 target_link_libraries(data-cache-trace-replayer ${IMPALA_TEST_LINK_LIBS})
 
 ADD_UNIFIED_BE_LSAN_TEST(disk-io-mgr-test DiskIoMgrTest.*)
+ADD_UNIFIED_BE_LSAN_TEST(disk-file-test DiskFileTest.*)
 ADD_UNIFIED_BE_LSAN_TEST(data-cache-trace-test DataCacheTraceTest.*)
 # Exception to unified be: Custom main function (platform tests)
 ADD_BE_LSAN_TEST(data-cache-test)
diff --git a/be/src/runtime/io/disk-file-test.cc b/be/src/runtime/io/disk-file-test.cc
new file mode 100644
index 000000000..5dc917dae
--- /dev/null
+++ b/be/src/runtime/io/disk-file-test.cc
@@ -0,0 +1,149 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "common/names.h"
+#include "runtime/io/disk-io-mgr-internal.h"
+#include "testutil/death-test-util.h"
+
+using namespace std;
+
+namespace impala {
+namespace io {
+class DiskFileTest : public testing::Test {
+ public:
+  void ValidateMemBlockStatus(MemBlockStatus last_status);
+  void ValidateMemBlockStatusTransition(MemBlock& block, MemBlockStatus old_status,
+      MemBlockStatus new_status, bool expect_success);
+};
+
+// last_status is the MemBlock's last status it is going to reach other than
+// MemBlockStatus::DISABLED.
+void DiskFileTest::ValidateMemBlockStatus(MemBlockStatus last_status) {
+  const int block_id = 0;
+  const int64_t block_size = 1024;
+  bool expect_reserved = last_status >= MemBlockStatus::RESERVED;
+  bool expect_alloc = last_status >= MemBlockStatus::ALLOC;
+  bool reserved = false;
+  bool alloc = false;
+  MemBlock block(block_id);
+  ASSERT_TRUE(block.data() == nullptr);
+  ASSERT_TRUE(block.IsStatus(MemBlockStatus::UNINIT));
+  if (last_status == MemBlockStatus::UNINIT) goto end;
+  block.SetStatus(MemBlockStatus::RESERVED);
+  ASSERT_TRUE(block.IsStatus(MemBlockStatus::RESERVED));
+  if (last_status == MemBlockStatus::RESERVED) goto end;
+  {
+    unique_lock<SpinLock> read_buffer_lock(*(block.GetLock()));
+    EXPECT_OK(block.AllocLocked(read_buffer_lock, block_size));
+  }
+  ASSERT_TRUE(block.IsStatus(MemBlockStatus::ALLOC));
+  ASSERT_TRUE(block.data() != nullptr);
+  if (last_status == MemBlockStatus::ALLOC) goto end;
+  ASSERT_EQ(last_status, MemBlockStatus::WRITTEN);
+  memset(block.data(), 1, block_size);
+  block.SetStatus(MemBlockStatus::WRITTEN);
+  ASSERT_TRUE(block.IsStatus(MemBlockStatus::WRITTEN));
+  for (int i = 0; i < block_size; i++) {
+    EXPECT_EQ(block.data()[i], 1);
+  }
+end:
+  block.Delete(&reserved, &alloc);
+  ASSERT_EQ(reserved, expect_reserved);
+  ASSERT_EQ(alloc, expect_alloc);
+  ASSERT_TRUE(block.IsStatus(MemBlockStatus::DISABLED));
+  ASSERT_TRUE(block.data() == nullptr);
+}
+
+void DiskFileTest::ValidateMemBlockStatusTransition(MemBlock& block,
+    MemBlockStatus old_status, MemBlockStatus new_status, bool expect_success) {
+  block.status_ = old_status;
+  if (expect_success) {
+    block.SetStatus(new_status);
+    ASSERT_TRUE(block.IsStatus(new_status));
+  } else {
+    IMPALA_ASSERT_DEBUG_DEATH(block.SetStatus(new_status), "");
+  }
+}
+
+// Test the basic flow of a MemBlock.
+TEST_F(DiskFileTest, MemBlockTest) {
+  ValidateMemBlockStatus(MemBlockStatus::UNINIT);
+  ValidateMemBlockStatus(MemBlockStatus::RESERVED);
+  ValidateMemBlockStatus(MemBlockStatus::ALLOC);
+  ValidateMemBlockStatus(MemBlockStatus::WRITTEN);
+}
+
+// Test the MemBlock status transition.
+TEST_F(DiskFileTest, MemBlockStatusTransition) {
+  MemBlock block(0);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::UNINIT, MemBlockStatus::UNINIT, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::UNINIT, MemBlockStatus::RESERVED, true);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::UNINIT, MemBlockStatus::ALLOC, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::UNINIT, MemBlockStatus::WRITTEN, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::UNINIT, MemBlockStatus::DISABLED, true);
+
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::RESERVED, MemBlockStatus::UNINIT, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::RESERVED, MemBlockStatus::RESERVED, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::RESERVED, MemBlockStatus::ALLOC, true);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::RESERVED, MemBlockStatus::WRITTEN, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::RESERVED, MemBlockStatus::DISABLED, true);
+
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::ALLOC, MemBlockStatus::UNINIT, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::ALLOC, MemBlockStatus::RESERVED, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::ALLOC, MemBlockStatus::ALLOC, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::ALLOC, MemBlockStatus::WRITTEN, true);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::ALLOC, MemBlockStatus::DISABLED, true);
+
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::WRITTEN, MemBlockStatus::UNINIT, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::WRITTEN, MemBlockStatus::RESERVED, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::WRITTEN, MemBlockStatus::ALLOC, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::WRITTEN, MemBlockStatus::WRITTEN, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::WRITTEN, MemBlockStatus::DISABLED, true);
+
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::DISABLED, MemBlockStatus::UNINIT, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::DISABLED, MemBlockStatus::RESERVED, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::DISABLED, MemBlockStatus::ALLOC, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::DISABLED, MemBlockStatus::WRITTEN, false);
+  ValidateMemBlockStatusTransition(
+      block, MemBlockStatus::DISABLED, MemBlockStatus::DISABLED, true);
+}
+} // namespace io
+} // namespace impala
diff --git a/be/src/runtime/io/disk-file.cc b/be/src/runtime/io/disk-file.cc
index fc4f5fe0a..767047567 100644
--- a/be/src/runtime/io/disk-file.cc
+++ b/be/src/runtime/io/disk-file.cc
@@ -67,3 +67,53 @@ DiskFile::DiskFile(const string& path, DiskIoMgr* io_mgr, int64_t file_size,
     space_reserved_.Store(true);
   }
 }
+
+DiskFile::DiskFile(const string& path, DiskIoMgr* io_mgr, int64_t file_size,
+    DiskFileType disk_type, int64_t read_buffer_block_size, int num_read_buffer_blocks)
+  : path_(path),
+    file_size_(file_size),
+    disk_type_(disk_type),
+    file_status_(DiskFileStatus::INWRITING) {
+  DCHECK(disk_type == DiskFileType::LOCAL_BUFFER);
+  hdfs_conn_ = nullptr;
+  space_reserved_.Store(false);
+  file_writer_.reset(new LocalFileWriter(io_mgr, path_.c_str(), file_size));
+  read_buffer_ =
+      std::make_unique<ReadBuffer>(read_buffer_block_size, num_read_buffer_blocks);
+}
+
+DiskFile::ReadBuffer::ReadBuffer(
+    int64_t read_buffer_block_size, int64_t num_read_buffer_blocks)
+  : read_buffer_block_size_(read_buffer_block_size),
+    num_of_read_buffer_blocks_(num_read_buffer_blocks) {
+  page_cnts_per_block_ = std::make_unique<int64_t[]>(num_read_buffer_blocks);
+  read_buffer_block_offsets_ = std::make_unique<int64_t[]>(num_read_buffer_blocks);
+  memset(page_cnts_per_block_.get(), 0, num_read_buffer_blocks * sizeof(int64_t));
+  memset(read_buffer_block_offsets_.get(), DISK_FILE_INVALID_FILE_OFFSET,
+      num_read_buffer_blocks * sizeof(int64_t));
+  for (int i = 0; i < num_read_buffer_blocks; i++) {
+    read_buffer_blocks_.emplace_back(std::make_unique<MemBlock>(i));
+  }
+}
+
+void MemBlock::Delete(bool* reserved, bool* allocated) {
+  DCHECK(reserved != nullptr);
+  DCHECK(allocated != nullptr);
+  *reserved = false;
+  *allocated = false;
+  unique_lock<SpinLock> lock(mem_block_lock_);
+  switch (status_) {
+    case MemBlockStatus::WRITTEN:
+    case MemBlockStatus::ALLOC:
+      // Release the memory.
+      DCHECK(data_ != nullptr);
+      free(data_);
+      data_ = nullptr;
+      *allocated = true;
+    case MemBlockStatus::RESERVED:
+      *reserved = true;
+    default:
+      SetStatusLocked(lock, MemBlockStatus::DISABLED);
+      DCHECK(data_ == nullptr);
+  }
+}
diff --git a/be/src/runtime/io/disk-file.h b/be/src/runtime/io/disk-file.h
index 97112420b..f0ef8ce27 100644
--- a/be/src/runtime/io/disk-file.h
+++ b/be/src/runtime/io/disk-file.h
@@ -25,6 +25,7 @@
 #include "runtime/hdfs-fs-cache.h"
 #include "runtime/io/file-writer.h"
 
+#include "util/condition-variable.h"
 #include "util/spinlock.h"
 
 namespace impala {
@@ -36,6 +37,130 @@ class RemoteOperRange;
 class ScanRange;
 class WriteRange;
 
+static const int64_t DISK_FILE_INVALID_FILE_OFFSET = -1;
+
+/// MemBlockStatus indicates the status of a MemBlock.
+/// Normal status change should be: UNINIT -> RESERVED -> ALLOC -> WRITTEN -> DISABLED.
+/// But all status can jump to DISABLED directly.
+/// UNINIT is the default status, indicates the block is not initialized.
+/// RESERVED indicates the memory required by the block is reserved.
+/// ALLOC indicates the memory required by the block is allocated.
+/// WRITTEN indicates the memory is allocated and content has been written to the memory.
+/// DISABLED indicates the MemBlock is disabled, doesn't allow any writes to or reads from
+/// the block. It is a final state, and no memory should be allocated or reserved.
+enum class MemBlockStatus { UNINIT, RESERVED, ALLOC, WRITTEN, DISABLED };
+
+/// Each MemBlock can contain multiple pages, and be used as the buffer to read multiple
+/// pages at a time from the DiskFile.
+/// The caller may need to maintain the status of the MemBlock and make sure the block is
+/// used under the correct status.
+class MemBlock {
+ public:
+  MemBlock(int block_id) : block_id_(block_id), status_(MemBlockStatus::UNINIT) {}
+  virtual ~MemBlock() {
+    // Must be MemBlockStatus::DISABLED before destruction.
+    DCHECK_EQ(static_cast<int>(status_), static_cast<int>(MemBlockStatus::DISABLED));
+    DCHECK(data_ == nullptr);
+  }
+
+  // Release the memory if it is allocated.
+  // The MemBlock status will be set to MemBlockStatus::DISABLED after deletion.
+  // Return to the caller whether the memory is reserved or allocated before deletion.
+  // Must be called before the MemBlock destruction.
+  void Delete(bool* reserved, bool* alloc);
+
+  // Allocate the memory for the MemBlock.
+  // Status must be MemBlockStatus::RESERVED before allocation.
+  // If successfully allocated, the status will be set to MemBlockStatus::ALLOC.
+  Status AllocLocked(const std::unique_lock<SpinLock>& lock, int64_t size) {
+    DCHECK(lock.mutex() == &mem_block_lock_ && lock.owns_lock());
+    DCHECK_EQ(static_cast<int>(status_), static_cast<int>(MemBlockStatus::RESERVED));
+    // Use malloc, could be better to alloc from a buffer pool.
+    data_ = static_cast<uint8_t*>(malloc(size));
+    if (UNLIKELY(data_ == nullptr)) {
+      return Status(strings::Substitute("Couldn't allocate memory for a memory block, "
+                                        "block size: '$0' bytes",
+          size));
+    }
+    SetStatusLocked(lock, MemBlockStatus::ALLOC);
+    return Status::OK();
+  }
+
+  uint8_t* data() { return data_; }
+
+  MemBlockStatus GetStatus() {
+    std::unique_lock<SpinLock> l(mem_block_lock_);
+    return status_;
+  }
+
+  bool IsStatus(MemBlockStatus status) {
+    std::unique_lock<SpinLock> l(mem_block_lock_);
+    return IsStatusLocked(l, status);
+  }
+
+  bool IsStatusLocked(const std::unique_lock<SpinLock>& lock, MemBlockStatus status) {
+    DCHECK(lock.mutex() == &mem_block_lock_ && lock.owns_lock());
+    return status_ == status;
+  }
+
+  void SetStatus(MemBlockStatus status) {
+    std::unique_lock<SpinLock> l(mem_block_lock_);
+    SetStatusLocked(l, status);
+  }
+
+  void SetStatusLocked(const std::unique_lock<SpinLock>& lock, MemBlockStatus status) {
+    DCHECK(lock.mutex() == &mem_block_lock_ && lock.owns_lock());
+    SetInternalStatus(status);
+  }
+
+  /// Return the lock of the memory block.
+  SpinLock* GetLock() { return &mem_block_lock_; }
+
+  /// Return the block id.
+  int block_id() { return block_id_; }
+
+ private:
+  friend class TmpFileRemote;
+  friend class RemoteOperRange;
+  friend class DiskFileTest;
+
+  /// Caller should hold the lock.
+  void SetInternalStatus(MemBlockStatus new_status) {
+    switch (new_status) {
+      case MemBlockStatus::RESERVED: {
+        DCHECK(status_ == MemBlockStatus::UNINIT);
+        break;
+      }
+      case MemBlockStatus::ALLOC: {
+        DCHECK(status_ == MemBlockStatus::RESERVED);
+        break;
+      }
+      case MemBlockStatus::WRITTEN: {
+        DCHECK(status_ == MemBlockStatus::ALLOC);
+        break;
+      }
+      case MemBlockStatus::DISABLED: {
+        break;
+      }
+      default:
+        DCHECK(false) << "Invalid memory block status: " << static_cast<int>(new_status);
+    }
+    status_ = new_status;
+  }
+
+  /// The id of the memory block.
+  const int block_id_;
+
+  /// Protect the members below.
+  SpinLock mem_block_lock_;
+
+  /// The status of the memory block.
+  MemBlockStatus status_;
+
+  /// The data of the memory block, may contain multiple pages.
+  uint8_t* data_ = nullptr;
+};
+
 /// DiskFileType indicates the type of the file handled by the DiskFile.
 /// LOCAL indicates the file is in the local filesystem.
 /// LOCAL_BUFFER indicates the file is used as a buffer in the local filesystem.
@@ -64,8 +189,46 @@ class DiskFile {
   DiskFile(const std::string& path, DiskIoMgr* io_mgr, int64_t file_size,
       DiskFileType disk_type, const hdfsFS* hdfs_conn = nullptr);
 
+  /// Constructor for a file with read buffers.
+  DiskFile(const std::string& path, DiskIoMgr* io_mgr, int64_t file_size,
+      DiskFileType disk_type, int64_t read_buffer_size, int num_read_buffer_blocks);
+
   virtual ~DiskFile() {}
 
+  /// The ReadBuffer is designed for batch reading. Each ReadBuffer belongs to one
+  /// DiskFile, and contains multiple read buffer blocks which are divided from the
+  /// DiskFile by the block size. Each block contains multiple pages.
+  /// When reading a page from the read buffer, we firstly use the offset of the page
+  /// to calculate which block contains the page, then see whether the block is
+  /// available or not. If it is available, the caller can read the page from the block.
+  /// The block is only available after a fetch, which is triggered in TmpFileMgr.
+  /// The default size of a read buffer block is fixed and the number of the block per
+  /// disk file is the default file size divided by the default block size.
+  struct ReadBuffer {
+    ReadBuffer(int64_t read_buffer_block_size, int64_t num_read_buffer_blocks);
+
+    /// The default read buffer block size.
+    const int64_t read_buffer_block_size_;
+
+    /// The number of read buffer blocks per disk file.
+    const int64_t num_of_read_buffer_blocks_;
+
+    /// Each read buffer is a memory block, therefore, the size of read_buffer_blocks_ is
+    /// num_of_read_buffer_blocks_.
+    std::vector<std::unique_ptr<MemBlock>> read_buffer_blocks_;
+
+    /// Protect below members.
+    SpinLock read_buffer_ctrl_lock_;
+
+    /// The statistics for the page number for each read buffer block.
+    /// The size of page_cnts_per_block_ is num_of_read_buffer_blocks_.
+    std::unique_ptr<int64_t[]> page_cnts_per_block_;
+
+    /// The start offsets of each read buffer block to the whole file.
+    /// The size of read_buffer_block_offsets_ is num_of_read_buffer_blocks_.
+    std::unique_ptr<int64_t[]> read_buffer_block_offsets_;
+  };
+
   // Delete the physical file. Caller should hold the exclusive file lock.
   Status Delete(const std::unique_lock<boost::shared_mutex>& lock);
 
@@ -81,7 +244,7 @@ class DiskFile {
   int64_t file_size() const { return file_size_; }
 
   /// Return the actual size of the file.
-  int64_t actual_file_size() const { return actual_file_size_.Load(); }
+  int64_t actual_file_size() { return actual_file_size_.Load(); }
 
   /// If return True, the file is persisted.
   /// The caller should hold the status lock.
@@ -101,6 +264,9 @@ class DiskFile {
     return GetFileStatusLocked(l) == DiskFileStatus::DELETED;
   }
 
+  /// If True, the file is to be deleted.
+  bool is_to_delete() { return to_delete_.Load(); }
+
   /// Set the status of the DiskFile. Caller should not hold the status lock.
   void SetStatus(DiskFileStatus status) {
     std::unique_lock<SpinLock> l(status_lock_);
@@ -114,6 +280,9 @@ class DiskFile {
     SetInternalStatus(status);
   }
 
+  /// Set the flag of to_delete.
+  void SetToDeleteFlag(bool to_delete = true) { to_delete_.Store(to_delete); }
+
   /// Returns the status of the file.
   /// The caller should not hold the status lock.
   DiskFileStatus GetFileStatus() {
@@ -133,15 +302,182 @@ class DiskFile {
   void SetSpaceReserved() { space_reserved_.Store(true); }
   bool IsSpaceReserved() { return space_reserved_.Load(); }
 
-  /// Set actual file size. Should only be called by the TmpFileRemote::AllocateSpace()
-  /// right after the allocation is at capacity, and the function should only be called
-  /// once during the lifetime of the DiskFile.
+  /// Set actual file size.
+  /// The function should only be called once during the lifetime of the DiskFile.
   void SetActualFileSize(int64_t size) {
     DCHECK_EQ(0, actual_file_size_.Load());
     DCHECK_LE(file_size_, size);
     actual_file_size_.Store(size);
   }
 
+  // Update the metadata of read buffer if the file is batch read enabled.
+  // The metadata of read buffer is set when the file is written, because each page may
+  // have different sizes, so for each read buffer block, the number of pages and the
+  // start offset of a block could be different. By updating the metadata, these
+  // information would be recorded.
+  void UpdateReadBufferMetaDataIfNeeded(int64_t offset) {
+    if (!IsBatchReadEnabled()) return;
+    int64_t par_idx = GetReadBufferIndex(offset);
+    DCheckReadBufferIdx(par_idx);
+    std::lock_guard<SpinLock> lock(read_buffer_->read_buffer_ctrl_lock_);
+    read_buffer_->page_cnts_per_block_[par_idx]++;
+    int64_t cur_offset = read_buffer_->read_buffer_block_offsets_[par_idx];
+    if (cur_offset == DISK_FILE_INVALID_FILE_OFFSET || offset < cur_offset) {
+      read_buffer_->read_buffer_block_offsets_[par_idx] = offset;
+    }
+  }
+
+  // Return the index of the buffer block by the file offset.
+  int GetReadBufferIndex(int64_t offset) {
+    int read_buffer_idx = offset / read_buffer_block_size();
+    if (read_buffer_idx >= num_of_read_buffers()) {
+      // Because the offset could be a little over the default file size, the index
+      // could equal to the max number of read buffers, but can't be more than it.
+      DCHECK(read_buffer_idx == num_of_read_buffers());
+      read_buffer_idx = num_of_read_buffers() - 1;
+    }
+    DCheckReadBufferIdx(read_buffer_idx);
+    return read_buffer_idx;
+  }
+
+  // Return the start offset by the index of the buffer block.
+  int64_t GetReadBuffStartOffset(int buffer_idx) {
+    DCheckReadBufferIdx(buffer_idx);
+    std::lock_guard<SpinLock> lock(read_buffer_->read_buffer_ctrl_lock_);
+    int64_t offset = read_buffer_->read_buffer_block_offsets_[buffer_idx];
+    DCHECK(offset != DISK_FILE_INVALID_FILE_OFFSET);
+    return offset;
+  }
+
+  // Return the actual size of the specific read buffer block.
+  int64_t GetReadBuffActualSize(int buffer_idx) {
+    DCheckReadBufferIdx(buffer_idx);
+    std::lock_guard<SpinLock> lock(read_buffer_->read_buffer_ctrl_lock_);
+    int64_t cur_offset = read_buffer_->read_buffer_block_offsets_[buffer_idx];
+    DCHECK(cur_offset != DISK_FILE_INVALID_FILE_OFFSET);
+    while (buffer_idx != num_of_read_buffers() - 1) {
+      DCHECK_LT(buffer_idx, num_of_read_buffers() - 1);
+      int64_t nxt_offset = read_buffer_->read_buffer_block_offsets_[buffer_idx + 1];
+      if (nxt_offset != DISK_FILE_INVALID_FILE_OFFSET) return nxt_offset - cur_offset;
+      buffer_idx++;
+    }
+    int64_t actual_file_size = actual_file_size_.Load();
+    DCHECK_GT(actual_file_size, 0);
+    return actual_file_size - cur_offset;
+  }
+
+  // Return the number of the page count in the read buffer block.
+  int64_t GetReadBuffPageCount(int buffer_idx) {
+    DCheckReadBufferIdx(buffer_idx);
+    std::lock_guard<SpinLock> lock(read_buffer_->read_buffer_ctrl_lock_);
+    return read_buffer_->page_cnts_per_block_[buffer_idx];
+  }
+
+  // Return the read buffer block.
+  MemBlock* GetBufferBlock(int index) {
+    DCheckReadBufferIdx(index);
+    return read_buffer_->read_buffer_blocks_[index].get();
+  }
+
+  // Return the lock of the read buffer block.
+  SpinLock* GetBufferBlockLock(int index) {
+    DCheckReadBufferIdx(index);
+    return read_buffer_->read_buffer_blocks_[index]->GetLock();
+  }
+
+  // Check if there is an available local memory buffer for the specific offset.
+  // Caller should hold the physical lock of the disk file in case the object is
+  // destroyed. But the caller should not hold the lock of the memory block because the
+  // IsStatus() would require the lock.
+  bool CanReadFromReadBuffer(
+      const boost::shared_lock<boost::shared_mutex>& lock, int64_t offset) {
+    if (!IsBatchReadEnabled()) return false;
+    DCHECK(lock.mutex() == &physical_file_lock_ && lock.owns_lock());
+    MemBlock* read_buffer_block = GetBufferBlock(GetReadBufferIndex(offset));
+    return read_buffer_block != nullptr
+        && read_buffer_block->IsStatus(MemBlockStatus::WRITTEN);
+  }
+
+  // Return if batch reading is enabled.
+  bool IsBatchReadEnabled() { return read_buffer_ != nullptr; }
+
+  void DCheckMemBlock(const boost::shared_lock<boost::shared_mutex>& file_lock,
+      MemBlock* read_buffer_block) {
+    DCHECK(file_lock.mutex() == &physical_file_lock_ && file_lock.owns_lock());
+    DCHECK(read_buffer_block != nullptr);
+  }
+
+  // Read the spilled data from the memory buffer.
+  // Caller should hold the physical file lock of the disk file in case the object is
+  // destroyed. Also, caller should guarantee the buffer won't be released during reading,
+  // it is good for the caller to have the lock of the read buffer block.
+  Status ReadFromMemBuffer(int64_t offset_to_file, int64_t len, uint8_t* dst,
+      const boost::shared_lock<boost::shared_mutex>& file_lock) {
+    DCHECK(file_lock.mutex() == &physical_file_lock_ && file_lock.owns_lock());
+    int64_t idx = GetReadBufferIndex(offset_to_file);
+    DCheckReadBufferIdx(idx);
+    uint8_t* read_buffer_block = read_buffer_->read_buffer_blocks_[idx]->data();
+    DCHECK(read_buffer_block != nullptr);
+    int64_t offset_to_block = offset_to_file - GetReadBuffStartOffset(idx);
+    DCHECK_GE(offset_to_block, 0);
+    DCHECK_GE(GetReadBuffActualSize(idx), offset_to_block + len);
+    memcpy(dst, read_buffer_block + offset_to_block, len);
+    return Status::OK();
+  }
+
+  // Helper function to allocate the memory for a reading buffer block.
+  // Caller should hold both physical_file_lock_ and the lock of the read buffer block.
+  Status AllocReadBufferBlockLocked(MemBlock* read_buffer_block, int64_t size,
+      const boost::shared_lock<boost::shared_mutex>& file_lock,
+      const std::unique_lock<SpinLock>& block_lock) {
+    DCheckMemBlock(file_lock, read_buffer_block);
+    return read_buffer_block->AllocLocked(block_lock, size);
+  }
+
+  // Helper function to set the status of a memory block.
+  // Caller should hold the physical_file_lock_.
+  // If the caller holds the lock of the read buffer block, SetStatusLocked() will be
+  // called.
+  void SetReadBufferBlockStatus(MemBlock* read_buffer_block, MemBlockStatus status,
+      const boost::shared_lock<boost::shared_mutex>& file_lock,
+      const std::unique_lock<SpinLock>* block_lock = nullptr) {
+    DCheckMemBlock(file_lock, read_buffer_block);
+    if (block_lock == nullptr) {
+      read_buffer_block->SetStatus(status);
+    } else {
+      read_buffer_block->SetStatusLocked(*block_lock, status);
+    }
+  }
+
+  // Helper function to check the status of a read buffer block.
+  // Caller should hold the physical_file_lock_.
+  // If the caller holds the lock of the read buffer block, IsStatusLocked() will be
+  // called.
+  bool IsReadBufferBlockStatus(MemBlock* read_buffer_block, MemBlockStatus status,
+      const boost::shared_lock<boost::shared_mutex>& file_lock,
+      const std::unique_lock<SpinLock>* block_lock = nullptr) {
+    DCheckMemBlock(file_lock, read_buffer_block);
+    if (block_lock == nullptr) return read_buffer_block->IsStatus(status);
+    return read_buffer_block->IsStatusLocked(*block_lock, status);
+  }
+
+  // Helper function to delete the read buffer block.
+  // Caller should hold the physical_file_lock_, but should not hold the lock of the
+  // read buffer block, because Delete() will hold the read buffer block's lock.
+  template <typename T>
+  void DeleteReadBuffer(
+      MemBlock* read_buffer_block, bool* reserved, bool* alloc, const T& file_lock) {
+    DCHECK(file_lock.mutex() == &physical_file_lock_ && file_lock.owns_lock());
+    DCHECK(read_buffer_block != nullptr);
+    return read_buffer_block->Delete(reserved, alloc);
+  }
+
+  // Return the number of read buffer blocks of this DiskFile.
+  int64_t num_of_read_buffers() { return read_buffer_->num_of_read_buffer_blocks_; }
+
+  // Return the default size of a read buffer.
+  int64_t read_buffer_block_size() { return read_buffer_->read_buffer_block_size_; }
+
  private:
   friend class RemoteOperRange;
   friend class ScanRange;
@@ -183,11 +519,18 @@ class DiskFile {
   /// avoid a deadlock.
   /// The lock order of file lock and status lock (above) should be file lock acquired
   /// first.
+  /// If the disk file has the memory blocks, the lock also protects them from
+  /// destruction.
   boost::shared_mutex physical_file_lock_;
 
   /// The hdfs connection used to connect to the remote scratch path.
   hdfsFS hdfs_conn_;
 
+  /// to_delete_ is set to true if the file is to be deleted.
+  /// It is a flag for the deleting thread to fetch the unique file lock and
+  /// ask the current lock holder to yield.
+  AtomicBool to_delete_{false};
+
   /// Specify if the file's space is reserved to be allowed to write to the filesystem
   /// because the filesystem may reach the size limit and needs some time before it can
   /// release space for new writes to the filesystem, so the space reserved indicator is
@@ -229,6 +572,10 @@ class DiskFile {
   /// like S3, it is set after a successful upload.
   AtomicInt64 actual_file_size_{0};
 
+  /// The read buffer for the disk file, would be a nullptr if batch reading is not
+  /// enabled.
+  std::unique_ptr<ReadBuffer> read_buffer_;
+
   /// Internal setter to set the status.
   /// The status is from INWRITING -> PERSISTED -> DELETED, which should not be a
   /// reverse transition.
@@ -258,6 +605,14 @@ class DiskFile {
 
   /// Return the status lock of the file.
   SpinLock* GetStatusLock() { return &status_lock_; }
+
+  /// Helper function to DCHECK if the read buffer control is not NULL and if the buffer
+  /// index is valid.
+  void DCheckReadBufferIdx(int buffer_idx) {
+    DCHECK(read_buffer_ != nullptr);
+    DCHECK_LT(buffer_idx, read_buffer_->num_of_read_buffer_blocks_);
+    DCHECK_GE(buffer_idx, 0);
+  }
 };
 } // namespace io
 } // namespace impala
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
index 4dab7af3f..52741f0f8 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -2479,5 +2479,127 @@ TEST_F(DiskIoMgrTest, WriteToRemoteDiffPagesSuccess) {
   tmp_file_grp->Close();
   io_mgr.UnregisterContext(io_ctx.get());
 }
+
+// Delete the physical remote file after upload.
+TEST_F(DiskIoMgrTest, WriteToRemoteFileDeleted) {
+  num_oper_ = 0;
+  num_ranges_written_ = 0;
+  string remote_file_path = REMOTE_URL + "/test";
+  string local_buffer_path = LOCAL_BUFFER_PATH + "/test";
+  FLAGS_remote_tmp_file_size = "1K";
+  int64_t file_size = 1024;
+  int64_t block_size = 1024;
+
+  // Delete the hdfs file if it exists.
+  hdfsDelete(hdfsConnect("default", 0), remote_file_path.c_str(), 1);
+
+  // Delete the file in local file system if it exists.
+  vector<string> local_buffer_path_vec;
+  local_buffer_path_vec.push_back(local_buffer_path);
+  Status rm_status = FileSystemUtil::RemovePaths(local_buffer_path_vec);
+
+  TmpFileMgr tmp_file_mgr;
+  DiskIoMgr io_mgr(1, 1, 1, 1, 10);
+  ASSERT_OK(io_mgr.Init());
+  TmpFileGroup* tmp_file_grp = NewRemoteFileGroup(&tmp_file_mgr, &io_mgr);
+  ASSERT_TRUE(tmp_file_grp != nullptr);
+
+  ObjectPool tmp_pool;
+  unique_ptr<RequestContext> io_ctx = io_mgr.RegisterContext();
+
+  TmpFileRemote tmp_file(
+      tmp_file_grp, 0, remote_file_path, local_buffer_path, false, REMOTE_URL.c_str());
+  DiskFile* remote_file = tmp_file.DiskFile();
+  DiskFile* local_buffer_file = tmp_file.DiskBufferFile();
+  tmp_file.GetWriteFile()->SetActualFileSize(file_size);
+
+  // Write some data for testing.
+  size_t write_size_len = sizeof(int32_t);
+  vector<WriteRange*> ranges;
+  vector<int32_t> datas;
+  for (int i = 0; i < file_size / write_size_len; i++) {
+    int32_t* data = tmp_pool.Add(new int32_t);
+    *data = rand();
+    datas.push_back(*data);
+    WriteRange** new_range = tmp_pool.Add(new WriteRange*);
+    WriteRange::WriteDoneCallback callback = [=](const Status& status) {
+      ASSERT_EQ(0, status.code());
+      lock_guard<mutex> l(written_mutex_);
+      num_ranges_written_ = 1;
+      writes_done_.NotifyOne();
+    };
+
+    *new_range = tmp_pool.Add(new WriteRange(remote_file_path, 0, 0, callback));
+    (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
+    (*new_range)->SetDiskFile(tmp_file.GetWriteFile());
+    ranges.push_back(*new_range);
+    EXPECT_OK(io_ctx->AddWriteRange(*new_range));
+    {
+      unique_lock<mutex> lock(written_mutex_);
+      while (num_ranges_written_ < 1) writes_done_.Wait(lock);
+    }
+    num_ranges_written_ = 0;
+    if (i == file_size / write_size_len - 1) {
+      tmp_file.SetAtCapacity();
+    }
+  }
+
+  auto disk_id = io_mgr.RemoteDfsDiskFileOperId();
+  bool upload_ok = false;
+  RemoteOperRange::RemoteOperDoneCallback callback = [&](const Status& status) {
+    upload_ok = status.ok();
+    lock_guard<mutex> l(oper_mutex_);
+    num_oper_ = 1;
+    oper_done_.NotifyOne();
+  };
+
+  // Request to upload the file to the remote.
+  auto oper_range = tmp_pool.Add(new RemoteOperRange(local_buffer_file, remote_file,
+      block_size, disk_id, RequestType::FILE_UPLOAD, &io_mgr, callback));
+  Status add_status = io_ctx->AddRemoteOperRange(oper_range);
+  ASSERT_OK(add_status);
+
+  // Wait until the file is created before calling the deletion.
+  while (!HdfsFileExist(remote_file_path)) {
+    usleep(rand() % 1000);
+  }
+  // Delete the file to create the failure.
+  hdfsDelete(hdfsConnect("default", 0), remote_file_path.c_str(), 1);
+
+  {
+    unique_lock<mutex> lock(oper_mutex_);
+    while (num_oper_ < 1) oper_done_.Wait(lock);
+  }
+
+  // If any chance the file is deleted and cause an upload failure, it won't lead to a
+  // crash. Otherwise the upload succeeds, and we should meet a failure during reading
+  // due to the deletion of the remote file.
+  EXPECT_FALSE(HdfsFileExist(remote_file_path));
+  if (upload_ok) {
+    // TryEvictFile and the local buffer file should be evicted.
+    Status try_evict_status = tmp_file_mgr.TryEvictFile(&tmp_file);
+    ASSERT_TRUE(try_evict_status.ok());
+
+    // None of the files should exist.
+    EXPECT_FALSE(FileExist(local_buffer_path));
+
+    // Should fail reading the first range.
+    ScanRange* scan_range = tmp_pool.Add(new ScanRange);
+    auto range = ranges.at(0);
+    size_t buffer_len = sizeof(int32_t);
+    vector<uint8_t> client_buffer(buffer_len);
+    scan_range->Reset(hdfsConnect("default", 0), range->file(), range->len(),
+        range->offset(), 0, false, 1000000,
+        BufferOpts::ReadInto(client_buffer.data(), buffer_len, BufferOpts::NO_CACHING),
+        nullptr, tmp_file.DiskFile(), tmp_file.DiskBufferFile());
+    bool needs_buffers;
+    ASSERT_OK(io_ctx->StartScanRange(scan_range, &needs_buffers));
+    unique_ptr<BufferDescriptor> io_buffer;
+    EXPECT_FALSE(scan_range->GetNext(&io_buffer).ok());
+  }
+  num_oper_ = 0;
+  tmp_file_grp->Close();
+  io_mgr.UnregisterContext(io_ctx.get());
+}
 }
 }
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 7195ad3a3..41a2e64d4 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -263,6 +263,7 @@ Status WriteRange::DoWrite() {
     ret_status = file_writer->Open();
     if (!ret_status.ok()) return DoWriteEnd(queue, ret_status);
     ret_status = file_writer->Write(this, &written_bytes);
+    disk_file_->UpdateReadBufferMetaDataIfNeeded(written_bytes - len_);
     int64_t actual_file_size = disk_file_->actual_file_size();
     // actual_file_size is only set once, otherwise it is 0 by default. If it is still
     // not set, it is impossible to be full.
@@ -296,28 +297,23 @@ Status WriteRange::DoWriteEnd(DiskQueue* queue, const Status& ret_status) {
 
 RemoteOperRange::RemoteOperRange(DiskFile* src_file, DiskFile* dst_file,
     int64_t block_size, int disk_id, RequestType::type type, DiskIoMgr* io_mgr,
-    RemoteOperDoneCallback callback)
-  : RequestRange(type, disk_id),
+    RemoteOperDoneCallback callback, int64_t file_offset)
+  : RequestRange(type, disk_id, file_offset),
     callback_(callback),
     io_mgr_(io_mgr),
     disk_file_src_(src_file),
     disk_file_dst_(dst_file),
     block_size_(block_size) {}
 
-Status RemoteOperRange::DoOper(uint8_t* buffer, int64_t buffer_size) {
-  DCHECK(request_type() == RequestType::FILE_UPLOAD);
-  return DoUpload(buffer, buffer_size);
-}
-
 Status RemoteOperRange::DoUpload(uint8_t* buffer, int64_t buffer_size) {
   DCHECK(disk_file_src_ != nullptr);
   DCHECK(disk_file_dst_ != nullptr);
   hdfsFS hdfs_conn = disk_file_dst_->hdfs_conn_;
-  int64_t file_size = disk_file_src_->actual_file_size_.Load();
+  int64_t file_size = disk_file_src_->actual_file_size();
   DCHECK(hdfs_conn != nullptr);
   DCHECK(file_size != 0);
-  const char* remote_file_path = disk_file_dst_->path().c_str();
-  const char* local_file_path = disk_file_src_->path().c_str();
+  const string& remote_file_path = disk_file_dst_->path();
+  const string& local_file_path = disk_file_src_->path();
   DiskQueue* queue = io_mgr_->disk_queues_[disk_id_];
   Status status = Status::OK();
   int64_t ret, offset = 0;
@@ -337,9 +333,9 @@ Status RemoteOperRange::DoUpload(uint8_t* buffer, int64_t buffer_size) {
   }
 
   RETURN_IF_ERROR(io_mgr_->local_file_system_->OpenForRead(
-      local_file_path, O_RDONLY, S_IRUSR | S_IWUSR, &local_file));
+      local_file_path.c_str(), O_RDONLY, S_IRUSR | S_IWUSR, &local_file));
   hdfsFile remote_hdfs_file =
-      hdfsOpenFile(hdfs_conn, remote_file_path, O_WRONLY, 0, 0, buffer_size);
+      hdfsOpenFile(hdfs_conn, remote_file_path.c_str(), O_WRONLY, 0, 0, buffer_size);
 
   if (remote_hdfs_file == nullptr) {
     status = Status(TErrorCode::DISK_IO_ERROR, GetBackendString(),
@@ -350,9 +346,12 @@ Status RemoteOperRange::DoUpload(uint8_t* buffer, int64_t buffer_size) {
   /// Read the blocks from the local buffer file and write the blocks
   /// to the remote file.
   while (file_size != offset) {
+    // If to_delete flag is set, we will quit the upload process, close the local file
+    // but leave the deletion work to the thread which sets the to_delete flag.
+    if (disk_file_src_->is_to_delete()) goto end;
     int bytes = min(file_size - offset, buffer_size);
-    status =
-        io_mgr_->local_file_system_->Fread(local_file, buffer, bytes, local_file_path);
+    status = io_mgr_->local_file_system_->Fread(
+        local_file, buffer, bytes, local_file_path.c_str());
     if (!status.ok()) goto end;
     {
       ScopedHistogramTimer write_timer(queue->write_latency());
@@ -377,13 +376,15 @@ end:
     ScopedHistogramTimer write_timer(queue->write_latency());
     if (hdfsCloseFile(hdfs_conn, remote_hdfs_file) != 0) {
       // Try to close the local file if error happens.
-      RETURN_IF_ERROR(io_mgr_->local_file_system_->Fclose(local_file, local_file_path));
+      RETURN_IF_ERROR(
+          io_mgr_->local_file_system_->Fclose(local_file, local_file_path.c_str()));
       return Status(TErrorCode::DISK_IO_ERROR, GetBackendString(),
           Substitute(
               "Failed to close HDFS file: $0", remote_file_path, GetHdfsErrorMsg("")));
     }
   }
-  RETURN_IF_ERROR(io_mgr_->local_file_system_->Fclose(local_file, local_file_path));
+  RETURN_IF_ERROR(
+      io_mgr_->local_file_system_->Fclose(local_file, local_file_path.c_str()));
   if (status.ok()) {
     disk_file_dst_->SetStatus(io::DiskFileStatus::PERSISTED);
     disk_file_dst_->SetActualFileSize(file_size);
@@ -393,6 +394,80 @@ end:
   return status;
 }
 
+Status RemoteOperRange::DoFetch() {
+  hdfsFS hdfs_conn = disk_file_src_->hdfs_conn_;
+  DCHECK(hdfs_conn != nullptr);
+  // Fetch the data from the source file (remote) to the destination file (local).
+  DCHECK(disk_file_dst_ != nullptr);
+  DCHECK(disk_file_src_ != nullptr);
+  int64_t buffer_idx = disk_file_dst_->GetReadBufferIndex(offset_);
+  int64_t local_file_size = disk_file_dst_->GetReadBuffActualSize(buffer_idx);
+  const string& remote_file_path = disk_file_src_->path();
+  DiskQueue* queue = io_mgr_->disk_queues_[disk_id_];
+  Status status = Status::OK();
+
+  // Get the shared lock to prevent the physical files from deletion during the fetching.
+  // The sequence is to get the local file lock, then remote file lock, or it might meet
+  // deadlocks.
+  shared_lock<shared_mutex> dstl(disk_file_dst_->physical_file_lock_);
+  shared_lock<shared_mutex> srcl(disk_file_src_->physical_file_lock_);
+
+  // Check if the remote file is deleted.
+  auto src_status = disk_file_src_->GetFileStatus();
+  if (src_status != io::DiskFileStatus::PERSISTED) {
+    DCHECK(src_status == io::DiskFileStatus::DELETED);
+    return Status(Substitute("File has been deleted, path: '$0'", remote_file_path));
+  }
+
+  unique_lock<SpinLock> read_buffer_lock(
+      *(disk_file_dst_->GetBufferBlockLock(buffer_idx)));
+  MemBlock* read_buffer_bloc = disk_file_dst_->GetBufferBlock(buffer_idx);
+  if (disk_file_dst_->IsReadBufferBlockStatus(
+          read_buffer_bloc, MemBlockStatus::DISABLED, dstl, &read_buffer_lock)) {
+    // If the read block is disabled, the status doesn't allow any writes to
+    // the block, probably the query ends or is cancelled.
+    return Status(Substitute(
+        "Mem block '$0' has been deleted, path: '$1'", buffer_idx, remote_file_path));
+  }
+  RETURN_IF_ERROR(disk_file_dst_->AllocReadBufferBlockLocked(
+      read_buffer_bloc, local_file_size, dstl, read_buffer_lock));
+  DCHECK(read_buffer_bloc->data() != nullptr);
+  hdfsFile remote_hdfs_file =
+      hdfsOpenFile(hdfs_conn, remote_file_path.c_str(), O_RDONLY, 0, 0, block_size_);
+  if (remote_hdfs_file == nullptr) {
+    status = Status(TErrorCode::DISK_IO_ERROR, GetBackendString(),
+        Substitute("Could not open file: $0: $1", remote_file_path, GetStrErrMsg()));
+  } else {
+    int ret = [&]() {
+      ScopedHistogramTimer read_timer(queue->read_latency());
+      return hdfsPreadFully(hdfs_conn, remote_hdfs_file, offset_,
+          read_buffer_bloc->data(), local_file_size);
+    }();
+    if (ret != -1) {
+      queue->read_size()->Update(local_file_size);
+      disk_file_dst_->SetReadBufferBlockStatus(
+          read_buffer_bloc, MemBlockStatus::WRITTEN, dstl, &read_buffer_lock);
+    } else {
+      // The caller may need to handle the error, and deal with the read buffer block.
+      status = Status(TErrorCode::DISK_IO_ERROR, GetBackendString(),
+          GetHdfsErrorMsg("Error reading from HDFS file: ", remote_file_path));
+    }
+  }
+
+  // Try to close the remote file.
+  if (remote_hdfs_file != nullptr && hdfsCloseFile(hdfs_conn, remote_hdfs_file) != 0) {
+    // If there was an error during reading, keep the old status.
+    string close_err_msg = Substitute(
+        "Failed to close HDFS file: $0", remote_file_path, GetHdfsErrorMsg(""));
+    if (status.ok()) {
+      return Status(TErrorCode::DISK_IO_ERROR, GetBackendString(), close_err_msg);
+    } else {
+      LOG(WARNING) << close_err_msg;
+    }
+  }
+  return status;
+}
+
 static void CheckSseSupport() {
   if (!CpuInfo::IsSupported(CpuInfo::SSE4_2)) {
     LOG(WARNING) << "This machine does not support sse4_2.  The default IO system "
@@ -690,7 +765,7 @@ int64_t DiskIoMgr::ComputeIdealBufferReservation(int64_t scan_range_len) {
 // Work is available if there is a RequestContext with
 //  - A ScanRange with a buffer available, or
 //  - A WriteRange in unstarted_write_ranges_ or
-//  - A RemoteOperRange in unstarted_remote_upload_ranges_
+//  - A RemoteOperRange in unstarted_remote_file_op_ranges_.
 RequestRange* DiskQueue::GetNextRequestRange(RequestContext** request_context) {
   // This loops returns either with work to do or when the disk IoMgr shuts down.
   while (true) {
@@ -766,12 +841,18 @@ void DiskQueue::DiskThreadLoop(DiskIoMgr* io_mgr) {
                                 "block size: '$0'",
                   size)));
         } else {
-          Status oper_status = oper_range->DoOper(buffer, size);
+          Status oper_status = oper_range->DoUpload(buffer, size);
           worker_context->OperDone(oper_range, oper_status);
           free(buffer);
         }
         break;
       }
+      case RequestType::FILE_FETCH: {
+        RemoteOperRange* oper_range = static_cast<RemoteOperRange*>(range);
+        Status oper_status = oper_range->DoFetch();
+        worker_context->OperDone(oper_range, oper_status);
+        break;
+      }
       default:
         DCHECK(false) << "Invalid request type: " << range->request_type();
     }
diff --git a/be/src/runtime/io/request-context.cc b/be/src/runtime/io/request-context.cc
index 40d741d2c..e1a439be9 100644
--- a/be/src/runtime/io/request-context.cc
+++ b/be/src/runtime/io/request-context.cc
@@ -58,8 +58,8 @@ class RequestContext::PerDiskState {
   const InternalQueue<WriteRange>* unstarted_write_ranges() const {
     return &unstarted_write_ranges_;
   }
-  const InternalQueue<RemoteOperRange>* unstarted_remote_upload_ranges() const {
-    return &unstarted_remote_upload_ranges_;
+  const InternalQueue<RemoteOperRange>* unstarted_remote_file_oper_ranges() const {
+    return &unstarted_remote_file_op_ranges_;
   }
 
   const InternalQueue<RequestRange>* in_flight_ranges() const {
@@ -70,8 +70,8 @@ class RequestContext::PerDiskState {
   InternalQueue<WriteRange>* unstarted_write_ranges() {
     return &unstarted_write_ranges_;
   }
-  InternalQueue<RemoteOperRange>* unstarted_remote_upload_ranges() {
-    return &unstarted_remote_upload_ranges_;
+  InternalQueue<RemoteOperRange>* unstarted_remote_file_oper_ranges() {
+    return &unstarted_remote_file_op_ranges_;
   }
 
   InternalQueue<RequestRange>* in_flight_ranges() { return &in_flight_ranges_; }
@@ -197,8 +197,8 @@ class RequestContext::PerDiskState {
   /// processed)
   InternalQueue<WriteRange> unstarted_write_ranges_;
 
-  /// A Queue for file operation ranges to process uploading operations to remote disks.
-  InternalQueue<RemoteOperRange> unstarted_remote_upload_ranges_;
+  /// A Queue for file operation ranges to process file uploading or fetching operations.
+  InternalQueue<RemoteOperRange> unstarted_remote_file_op_ranges_;
 };
 
 void RequestContext::ReadDone(int disk_id, ReadOutcome outcome, ScanRange* range) {
@@ -242,7 +242,8 @@ void RequestContext::OperDone(RequestRange* range, const Status& status) {
   if (range->request_type() == RequestType::WRITE) {
     (static_cast<WriteRange*>(range))->callback()(status);
   } else {
-    DCHECK(range->request_type() == RequestType::FILE_UPLOAD);
+    DCHECK(range->request_type() == RequestType::FILE_UPLOAD
+        || range->request_type() == RequestType::FILE_FETCH);
     (static_cast<RemoteOperRange*>(range))->callback()(status);
   }
   {
@@ -313,7 +314,7 @@ void RequestContext::Cancel() {
       }
 
       RemoteOperRange* oper_range;
-      while ((oper_range = disk_state.unstarted_remote_upload_ranges()->Dequeue())
+      while ((oper_range = disk_state.unstarted_remote_file_oper_ranges()->Dequeue())
           != nullptr) {
         remote_oper_callbacks.push_back(oper_range->callback());
       }
@@ -405,10 +406,11 @@ void RequestContext::AddRangeToDisk(const unique_lock<mutex>& lock,
     // ScheduleContext() has no effect if already scheduled, so this is safe to do always.
     disk_state->ScheduleContext(lock, this, range->disk_id());
   } else {
-    DCHECK(range->request_type() == RequestType::FILE_UPLOAD);
+    DCHECK(range->request_type() == RequestType::FILE_UPLOAD
+        || range->request_type() == RequestType::FILE_FETCH);
     DCHECK(schedule_mode == ScheduleMode::IMMEDIATELY) << static_cast<int>(schedule_mode);
     RemoteOperRange* oper_range = static_cast<RemoteOperRange*>(range);
-    disk_state->unstarted_remote_upload_ranges()->Enqueue(oper_range);
+    disk_state->unstarted_remote_file_oper_ranges()->Enqueue(oper_range);
     disk_state->ScheduleContext(lock, this, range->disk_id());
   }
 
@@ -648,10 +650,10 @@ RequestRange* RequestContext::GetNextRequestRange(int disk_id) {
   }
 
   // Do remote temporary files related work.
-  if (!request_disk_state->unstarted_remote_upload_ranges()->empty()) {
+  if (!request_disk_state->unstarted_remote_file_oper_ranges()->empty()) {
     RemoteOperRange* oper_range;
-    if (!request_disk_state->unstarted_remote_upload_ranges()->empty()) {
-      oper_range = request_disk_state->unstarted_remote_upload_ranges()->Dequeue();
+    if (!request_disk_state->unstarted_remote_file_oper_ranges()->empty()) {
+      oper_range = request_disk_state->unstarted_remote_file_oper_ranges()->Dequeue();
       request_disk_state->in_flight_ranges()->Enqueue(oper_range);
     }
   }
diff --git a/be/src/runtime/io/request-context.h b/be/src/runtime/io/request-context.h
index 5424901da..96d609d93 100644
--- a/be/src/runtime/io/request-context.h
+++ b/be/src/runtime/io/request-context.h
@@ -173,6 +173,25 @@ class RequestContext {
     bytes_read_counter_ = bytes_read_counter;
   }
 
+  void set_read_use_mem_counter(RuntimeProfile::Counter* read_use_mem_counter) {
+    read_use_mem_counter_ = read_use_mem_counter;
+  }
+
+  void set_bytes_read_use_mem_counter(
+      RuntimeProfile::Counter* bytes_read_use_mem_counter) {
+    bytes_read_use_mem_counter_ = bytes_read_use_mem_counter;
+  }
+
+  void set_read_use_local_disk_counter(
+      RuntimeProfile::Counter* read_use_local_disk_counter) {
+    read_use_local_disk_counter_ = read_use_local_disk_counter;
+  }
+
+  void set_bytes_read_use_local_disk_counter(
+      RuntimeProfile::Counter* bytes_read_use_local_disk_counter) {
+    bytes_read_use_local_disk_counter_ = bytes_read_use_local_disk_counter;
+  }
+
   void set_read_timer(RuntimeProfile::Counter* read_timer) { read_timer_ = read_timer; }
 
   void set_open_file_timer(RuntimeProfile::Counter* open_file_timer) {
@@ -338,6 +357,18 @@ class RequestContext {
   /// Total bytes read for this reader
   RuntimeProfile::Counter* bytes_read_counter_ = nullptr;
 
+  /// Total read from mem buffer for this reader
+  RuntimeProfile::Counter* read_use_mem_counter_ = nullptr;
+
+  /// Total bytes read from mem buffer for this reader
+  RuntimeProfile::Counter* bytes_read_use_mem_counter_ = nullptr;
+
+  /// Total read from local disk buffer for this reader
+  RuntimeProfile::Counter* read_use_local_disk_counter_ = nullptr;
+
+  /// Total bytes read from local disk buffer for this reader
+  RuntimeProfile::Counter* bytes_read_use_local_disk_counter_ = nullptr;
+
   /// Total time spent in hdfs reading
   RuntimeProfile::Counter* read_timer_ = nullptr;
 
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index b8bfbacd9..1b355b999 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -109,12 +109,14 @@ class BufferDescriptor {
 };
 
 /// The request type, read or write associated with a request range.
-/// Ohter than those, request type file_upload is the type for remote file operation
-/// ranges, for doing file uploading to the remote.
+/// Ohter than those, request type file_upload and file_fetch are the types for remote
+/// file operation ranges, for uploading the file to the remote filesystem or fetching the
+/// file from the remote filesystem.
 struct RequestType {
   enum type {
     READ,
     WRITE,
+    FILE_FETCH,
     FILE_UPLOAD,
   };
 };
@@ -147,9 +149,9 @@ class RequestRange : public InternalQueue<RequestRange>::Node {
   RequestType::type request_type() const { return request_type_; }
 
  protected:
-  RequestRange(RequestType::type request_type, int disk_id = -1)
+  RequestRange(RequestType::type request_type, int disk_id = -1, int64_t offset = -1)
     : fs_(nullptr),
-      offset_(-1),
+      offset_(offset),
       len_(-1),
       disk_id_(disk_id),
       request_type_(request_type) {}
@@ -364,6 +366,7 @@ class ScanRange : public RequestRange {
   friend class RequestContext;
   friend class HdfsFileReader;
   friend class LocalFileReader;
+  friend class RemoteOperRange;
 
   /// Initialize internal fields
   void InitInternal(DiskIoMgr* io_mgr, RequestContext* reader);
@@ -389,9 +392,14 @@ class ScanRange : public RequestRange {
   ReadOutcome DoRead(DiskQueue* queue, int disk_id);
 
   /// The function runs the actual read logic to read content with the specific reader.
-  /// If use_local_buffer is true, it will read from the local buffer with the local
+  /// If use_local_buffer is true, it will read from the local buffer file with the local
   /// buffer reader.
-  ReadOutcome DoReadInternal(DiskQueue* queue, int disk_id, bool use_local_buffer);
+  /// If use_mem_buffer is true, it will read from a memory block in the local buffer.
+  /// The local_file_lock is used to guarantee the local file is not deleted while
+  /// reading, should not be null if use_mem_buffer is true.
+  ReadOutcome DoReadInternal(DiskQueue* queue, int disk_id, bool use_local_buffer,
+      bool use_mem_buffer,
+      boost::shared_lock<boost::shared_mutex>* local_file_lock = nullptr);
 
   /// Whether to use file handle caching for the current file.
   bool FileHandleCacheEnabled();
@@ -696,19 +704,23 @@ class RemoteOperRange : public RequestRange {
   /// RemoteOperRange was successfully added (i.e. AddRemoteOperRange() succeeded).
   /// No locks are held while the callback is invoked.
   typedef std::function<void(const Status&)> RemoteOperDoneCallback;
-  RemoteOperRange(DiskFile* src_file, DiskFile* dst_file, int64_t file_offset,
-      int disk_id, RequestType::type type, DiskIoMgr* io_mgr,
-      RemoteOperDoneCallback callback);
+  RemoteOperRange(DiskFile* src_file, DiskFile* dst_file, int64_t block_size, int disk_id,
+      RequestType::type type, DiskIoMgr* io_mgr, RemoteOperDoneCallback callback,
+      int64_t file_offset = 0);
 
-  /// Called from a disk I/O thread to do the file operation of this range. The
+  int64_t block_size() { return block_size_; }
+
+  RemoteOperDoneCallback callback() const { return callback_; }
+
+  /// Called from a disk I/O thread to upload the file to a remote filesystem. The
   /// returned Status describes what the result of the read was. 'buff' is the
   /// block buffer which is used for file operations. 'buff_size' is the size of the
   /// block buffer. Caller must not hold 'lock_'.
-  Status DoOper(uint8_t* buff, int64_t buff_size);
-
-  int64_t block_size() { return block_size_; }
+  Status DoUpload(uint8_t* buff, int64_t buff_size);
 
-  RemoteOperDoneCallback callback() const { return callback_; }
+  /// Execute the fetch file operation from a remote filesystem.
+  /// Caller must not hold 'lock_'.
+  Status DoFetch();
 
  private:
   DISALLOW_COPY_AND_ASSIGN(RemoteOperRange);
@@ -732,9 +744,6 @@ class RemoteOperRange : public RequestRange {
 
   /// block size to do the file operation.
   int64_t block_size_;
-
-  /// Execute the upload file operation.
-  Status DoUpload(uint8_t* buff, int64_t buff_size);
 };
 
 inline bool BufferDescriptor::is_cached() const {
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index 09776b0c3..1e33a75a6 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -148,10 +148,12 @@ bool ScanRange::FileHandleCacheEnabled() {
   return false;
 }
 
-ReadOutcome ScanRange::DoReadInternal(
-    DiskQueue* queue, int disk_id, bool use_local_buff) {
+ReadOutcome ScanRange::DoReadInternal(DiskQueue* queue, int disk_id, bool use_local_buff,
+    bool use_mem_buffer, shared_lock<shared_mutex>* local_file_lock) {
   int64_t bytes_remaining = bytes_to_read_ - bytes_read_;
   DCHECK_GT(bytes_remaining, 0);
+  // Can't be set to true together.
+  DCHECK(!(use_local_buff && use_mem_buffer));
 
   unique_ptr<BufferDescriptor> buffer_desc;
   FileReader* file_reader = nullptr;
@@ -177,43 +179,75 @@ ReadOutcome ScanRange::DoReadInternal(
       buffer_manager_->add_iomgr_buffer_cumulative_bytes_used(buffer_desc->buffer_len());
     }
     read_in_flight_ = true;
-    if (use_local_buff) {
-      file_reader = local_buffer_reader_.get();
-      file_ = disk_buffer_file_->path();
-    } else {
-      file_reader = file_reader_.get();
+    // Set the correct reader to read the range if the memory buffer is not available.
+    if (!use_mem_buffer) {
+      if (use_local_buff) {
+        file_reader = local_buffer_reader_.get();
+        file_ = disk_buffer_file_->path();
+      } else {
+        file_reader = file_reader_.get();
+      }
+      use_local_buffer_ = use_local_buff;
     }
-    use_local_buffer_ = use_local_buff;
   }
-  DCHECK(file_reader != nullptr);
-
-  // No locks in this section.  Only working on local vars.  We don't want to hold a
-  // lock across the read call.
-  // To use the file handle cache:
-  // 1. It must be enabled at the daemon level.
-  // 2. It must be enabled for the particular filesystem.
-  bool use_file_handle_cache = FileHandleCacheEnabled();
-  VLOG_FILE << (use_file_handle_cache ? "Using" : "Skipping")
-            << " file handle cache for " << (expected_local_ ? "local" : "remote")
-            << " file " << file();
-  Status read_status = file_reader->Open(use_file_handle_cache);
+
   bool eof = false;
-  if (read_status.ok()) {
-    COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, 1L);
-    COUNTER_BITOR_IF_NOT_NULL(reader_->disks_accessed_bitmap_, 1LL << disk_id);
-
-    if (sub_ranges_.empty()) {
-      DCHECK(cache_.data == nullptr);
-      read_status =
-          file_reader->ReadFromPos(queue, offset_ + bytes_read_, buffer_desc->buffer_,
-              min(bytes_to_read() - bytes_read_, buffer_desc->buffer_len_),
-              &buffer_desc->len_, &eof);
-    } else {
-      read_status = ReadSubRanges(queue, buffer_desc.get(), &eof, file_reader);
+  Status read_status = Status::OK();
+
+  if (use_mem_buffer) {
+    // The only scenario to use the memory buffer is for the temporary files, the range
+    // is supposed to be read in one round.
+    // For the efficiency consideration, don't have the lock of the memory block, the
+    // safety is implicitly guaranteed by the physical lock of the disk file, which is
+    // required while removing the disk file and the memory blocks. The other case of
+    // removing the memory block is when all of the pages have been read, and that could
+    // only happen after this read.
+    DCHECK(local_file_lock != nullptr);
+    read_status = disk_buffer_file_->ReadFromMemBuffer(
+        offset_, bytes_to_read_, buffer_desc->buffer_, *local_file_lock);
+    if (read_status.ok()) {
+      buffer_desc->len_ = bytes_to_read_;
+      eof = true;
+      COUNTER_ADD_IF_NOT_NULL(reader_->read_use_mem_counter_, 1L);
+      COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_use_mem_counter_, buffer_desc->len_);
+      COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, buffer_desc->len_);
     }
+  } else {
+    DCHECK(file_reader != nullptr);
 
-    COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, buffer_desc->len_);
-    COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, -1L);
+    // No locks in this section.  Only working on local vars.  We don't want to hold a
+    // lock across the read call.
+    // To use the file handle cache:
+    // 1. It must be enabled at the daemon level.
+    // 2. It must be enabled for the particular filesystem.
+    bool use_file_handle_cache = FileHandleCacheEnabled();
+    VLOG_FILE << (use_file_handle_cache ? "Using" : "Skipping")
+              << " file handle cache for " << (expected_local_ ? "local" : "remote")
+              << " file " << file();
+
+    read_status = file_reader->Open(use_file_handle_cache);
+    if (read_status.ok()) {
+      COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, 1L);
+      COUNTER_BITOR_IF_NOT_NULL(reader_->disks_accessed_bitmap_, 1LL << disk_id);
+
+      if (sub_ranges_.empty()) {
+        DCHECK(cache_.data == nullptr);
+        read_status =
+            file_reader->ReadFromPos(queue, offset_ + bytes_read_, buffer_desc->buffer_,
+                min(bytes_to_read() - bytes_read_, buffer_desc->buffer_len_),
+                &buffer_desc->len_, &eof);
+      } else {
+        read_status = ReadSubRanges(queue, buffer_desc.get(), &eof, file_reader);
+      }
+
+      COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, buffer_desc->len_);
+      COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, -1L);
+      if (use_local_buffer_) {
+        COUNTER_ADD_IF_NOT_NULL(reader_->read_use_local_disk_counter_, 1L);
+        COUNTER_ADD_IF_NOT_NULL(
+            reader_->bytes_read_use_local_disk_counter_, buffer_desc->len_);
+      }
+    }
   }
 
   DCHECK(buffer_desc->buffer_ != nullptr);
@@ -246,7 +280,7 @@ ReadOutcome ScanRange::DoReadInternal(
   // Store the state we need before calling EnqueueReadyBuffer().
   bool eosr = buffer_desc->eosr();
   // No more reads for this scan range - we can close it.
-  if (eosr) file_reader->Close();
+  if (eosr && file_reader != nullptr) file_reader->Close();
   // Read successful - enqueue the buffer and return the appropriate outcome.
   if (!EnqueueReadyBuffer(move(buffer_desc))) return ReadOutcome::CANCELLED;
   // At this point, if eosr=true, then we cannot touch the state of this scan range
@@ -256,7 +290,9 @@ ReadOutcome ScanRange::DoReadInternal(
 
 ReadOutcome ScanRange::DoRead(DiskQueue* queue, int disk_id) {
   bool use_local_buffer = false;
-  if (disk_file_ != nullptr && disk_file_->disk_type() != DiskFileType::LOCAL) {
+  bool use_mem_buffer = false;
+  if (disk_file_ != nullptr && disk_file_->disk_type() != DiskFileType::LOCAL
+      && disk_buffer_file_ != nullptr) {
     // The sequence for acquiring the locks should always be from the local to
     // the remote to avoid deadlocks.
     shared_lock<shared_mutex> local_file_lock(*(disk_buffer_file_->GetFileLock()));
@@ -270,18 +306,25 @@ ReadOutcome ScanRange::DoRead(DiskQueue* queue, int disk_id) {
         // the only case could be the query is cancelled, so that both files are deleted.
         return ReadOutcome::CANCELLED;
       }
-      // If the local buffer exists, we can read from the local buffer, otherwise,
-      // we will read from the remote file system.
+
+      // The range can be read from local for two cases.
+      // 1. If the local buffer file is not deleted(evicted) yet.
+      // 2. A block of the file, which contains the range, has been read and stored in
+      // the memory.
+      // If we don't meet any of the cases, the range needs to be read from the remote.
       if (!disk_buffer_file_->is_deleted(buffer_file_lock)) {
         use_local_buffer = true;
+      } else if (disk_buffer_file_->CanReadFromReadBuffer(local_file_lock, offset_)) {
+        use_mem_buffer = true;
       } else {
         // Read from the remote file. The remote file must be in persisted status.
         DCHECK(disk_file_->is_persisted(file_lock));
       }
     }
-    return DoReadInternal(queue, disk_id, use_local_buffer);
+    return DoReadInternal(
+        queue, disk_id, use_local_buffer, use_mem_buffer, &local_file_lock);
   }
-  return DoReadInternal(queue, disk_id, use_local_buffer);
+  return DoReadInternal(queue, disk_id, use_local_buffer, use_mem_buffer);
 }
 
 Status ScanRange::ReadSubRanges(
diff --git a/be/src/runtime/tmp-file-mgr-internal.h b/be/src/runtime/tmp-file-mgr-internal.h
index cbdebec7d..dbe7566ef 100644
--- a/be/src/runtime/tmp-file-mgr-internal.h
+++ b/be/src/runtime/tmp-file-mgr-internal.h
@@ -30,6 +30,9 @@
 #include "util/hdfs-util.h"
 
 namespace impala {
+namespace io {
+class DiskIoMgr;
+}
 
 /// TmpFile is a handle to a physical file in a temporary directory. File space
 /// can be allocated and files removed using AllocateSpace() and Remove(). Used
@@ -84,7 +87,11 @@ class TmpFile {
   int64_t len() const { return allocation_offset_; }
 
   /// Returns the disk id of the temporary file.
-  int disk_id() const { return disk_id_; }
+  virtual int disk_id(bool is_file_op = false) const {
+    // The disk id for file operations should only be supported in TmpFileRemote.
+    DCHECK(!is_file_op);
+    return disk_id_;
+  }
 
   /// Returns if the temporary file is in local file system.
   bool is_local() { return expected_local_; }
@@ -166,9 +173,9 @@ class TmpFileLocal : public TmpFile {
   TmpFileLocal(TmpFileGroup* file_group, TmpFileMgr::DeviceId device_id,
       const std::string& path, bool expected_local = true);
 
-  bool AllocateSpace(int64_t num_bytes, int64_t* offset);
-  io::DiskFile* GetWriteFile();
-  Status Remove();
+  bool AllocateSpace(int64_t num_bytes, int64_t* offset) override;
+  io::DiskFile* GetWriteFile() override;
+  Status Remove() override;
 };
 
 /// TmpFileRemote is a derived class of TmpFile to provide methods to handle a
@@ -179,7 +186,7 @@ class TmpFileLocal : public TmpFile {
 /// read or upload on the file.A remote temporary file can have two DiskFiles, a local
 /// buffer and a remote file.
 /// Each DiskFile owns two type of locks, a file lock and a status lock.
-/// DiskFile::lock_  -- file lock
+/// DiskFile::physical_file_lock_  -- file lock
 /// DiskFile::status_lock_ -- status lock
 /// For doing file deleting operation, a unique file lock is needed. For other types of
 /// operations on the file, like reading or writing, a shared file lock is needed to
@@ -212,10 +219,48 @@ class TmpFileRemote : public TmpFile {
       bool expected_local = false, const char* url = nullptr);
   ~TmpFileRemote();
 
-  bool AllocateSpace(int64_t num_bytes, int64_t* offset);
-  io::DiskFile* GetWriteFile();
+  bool AllocateSpace(int64_t num_bytes, int64_t* offset) override;
+  io::DiskFile* GetWriteFile() override;
   TmpDir* GetLocalBufferDir() const;
-  Status Remove();
+  Status Remove() override;
+
+  /// Returns the buffer file handle for reading.
+  /// If the local file is not evicted, return immediately.
+  /// If the local file is evicted and batch reading is enabled, may also send a request
+  /// to fetch a block from the remote asynchronously to the memory.
+  io::DiskFile* GetReadBufferFile(int64_t offset);
+
+  /// Send a request to the disk queue to fetch a block asynchronously from the remote
+  /// filesystem.
+  /// If the content is in the buffer block, "fetched" will be set to true. Otherwise,
+  /// the caller should fetch the page from the remote filesystem.
+  void AsyncFetchReadBufferBlock(io::DiskFile* read_buffer_file,
+      io::MemBlock* read_buffer_block, int buffer_idx, bool* fetched);
+
+  /// Get the read buffer block index from the offset to the file.
+  int GetReadBufferIndex(int64_t offset);
+
+  /// Increase the counter of the page that have been read off the buffer block.
+  /// Return true if all the pages have been read of the block.
+  bool IncrementReadPageCount(int buffer_idx);
+
+  /// Try to delete the buffer block and release the reservation.
+  template <typename T>
+  void TryDeleteReadBuffer(const T& lock, int buffer_idx);
+
+  /// Try to delete the buffer block and release the reservation with exclusive lock.
+  void TryDeleteReadBufferExcl(int buffer_idx) {
+    std::unique_lock<boost::shared_mutex> lock(*(disk_buffer_file_->GetFileLock()));
+    TryDeleteReadBuffer(lock, buffer_idx);
+  }
+
+  /// Try to delete the buffer block and release the reservation with shared lock.
+  /// Use the exclusive one unless sure that no one else would access the specific
+  /// read buffer block during deletion and the scenario requires high performance.
+  void TryDeleteMemReadBufferShared(int buffer_idx) {
+    boost::shared_lock<boost::shared_mutex> lock(*(disk_buffer_file_->GetFileLock()));
+    TryDeleteReadBuffer(lock, buffer_idx);
+  }
 
   /// Returns the size of the file.
   int64_t file_size() const { return file_size_; }
@@ -259,6 +304,19 @@ class TmpFileRemote : public TmpFile {
     return buffer_returned_;
   }
 
+  /// Set the flag to files to indicate the file is going to be deleted.
+  void SetToDeleteFlag(bool to_delete = true) {
+    disk_buffer_file_->SetToDeleteFlag(to_delete);
+    disk_file_->SetToDeleteFlag(to_delete);
+  }
+
+  /// Returns the disk id of the temporary file.
+  /// If is_file_op is true, return the disk id specially for file operations.
+  int disk_id(bool is_file_op = false) const override {
+    if (!is_file_op) return disk_id_;
+    return disk_id_file_op_;
+  }
+
  private:
   friend class TmpWriteHandle;
   friend class TmpFileMgr;
@@ -271,14 +329,17 @@ class TmpFileRemote : public TmpFile {
   /// remaining space.
   int64_t file_size_ = 0;
 
+  /// The default size of a read buffer block.
+  int64_t read_buffer_block_size_ = 0;
+
+  /// The id of the disk for file operations.
+  int disk_id_file_op_ = 0;
+
   /// Bogus value of mtime for HDFS files.
   const int64_t mtime_{100000};
 
-  /// The range for doing file uploading.
-  std::unique_ptr<io::RemoteOperRange> upload_range_;
-
   // The pointer of the disk buffer file, which is the local buffer
-  // of the disk file when disk file is a remote disk file.
+  // of the remote disk file. The buffer is for writing.
   std::unique_ptr<io::DiskFile> disk_buffer_file_;
 
   /// The hdfs connection used to connect to the remote scratch path.
@@ -288,6 +349,12 @@ class TmpFileRemote : public TmpFile {
   /// assigned space is equal to or just over the default file size.
   bool at_capacity_ = false;
 
+  /// The range for doing file uploading.
+  std::unique_ptr<io::RemoteOperRange> upload_range_;
+
+  /// The ranges for doing fetch operations from a remote filesystem.
+  std::vector<std::unique_ptr<io::RemoteOperRange>> fetch_ranges_;
+
   /// Protect below members.
   SpinLock lock_;
 
@@ -297,6 +364,27 @@ class TmpFileRemote : public TmpFile {
   /// True if the buffer of the file is returned to the pool. We assume that the buffer
   /// only returns once and only needs to be returned when the buffer space is reserved.
   bool buffer_returned_ = false;
+
+  // The number of pages have been read per read buffer.
+  std::unique_ptr<int64_t[]> disk_read_page_cnts_;
+
+  // Return the start offset of the read buffer block.
+  int64_t GetReadBuffStartOffset(int buffer_idx) {
+    DCHECK(disk_buffer_file_ != nullptr);
+    return disk_buffer_file_->GetReadBuffStartOffset(buffer_idx);
+  }
+
+  // Return the page count of the read buffer block.
+  int64_t GetReadBuffPageCount(int buffer_idx) {
+    DCHECK(disk_buffer_file_ != nullptr);
+    return disk_buffer_file_->GetReadBuffPageCount(buffer_idx);
+  }
+
+  /// Internal DCHECK for the buffer index.
+  void DCheckReadBufferIdx(int buffer_idx) {
+    DCHECK_LT(buffer_idx, file_group_->tmp_file_mgr()->GetNumReadBuffersPerFile());
+    DCHECK_GE(buffer_idx, 0);
+  }
 };
 
 /// TmpFileDummy is a derived class of TmpFile for dummy allocation, used in
@@ -304,9 +392,9 @@ class TmpFileRemote : public TmpFile {
 class TmpFileDummy : public TmpFile {
  public:
   TmpFileDummy() : TmpFile(nullptr, -1, "") { disk_type_ = io::DiskFileType::DUMMY; }
-  bool AllocateSpace(int64_t num_bytes, int64_t* offset) { return true; }
-  io::DiskFile* GetWriteFile() { return nullptr; }
-  Status Remove() { return Status::OK(); }
+  bool AllocateSpace(int64_t num_bytes, int64_t* offset) override { return true; }
+  io::DiskFile* GetWriteFile() override { return nullptr; }
+  Status Remove() override { return Status::OK(); }
 };
 
 /// A configured temporary directory that TmpFileMgr allocates files in.
diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc
index d75b2f87f..038b7e8ee 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -58,6 +58,8 @@ DECLARE_int32(stress_scratch_write_delay_ms);
 #endif
 DECLARE_string(remote_tmp_file_size);
 DECLARE_int32(wait_for_spill_buffer_timeout_s);
+DECLARE_bool(remote_batch_read);
+DECLARE_string(remote_read_memory_buffer_size);
 
 namespace impala {
 
@@ -73,6 +75,10 @@ static const string HDFS_LOCAL_URL = "hdfs://localhost:20500/tmp";
 static const string REMOTE_URL = HDFS_LOCAL_URL;
 static const string LOCAL_BUFFER_PATH = "/tmp/tmp-file-mgr-test-buffer";
 
+/// Read buffer sizes for TestBatchReadingSetMaxBytes().
+static const int64_t READ_BUFFER_SMALL_SIZE_BYTES = 1 << 20; // 1MB
+static const int64_t READ_BUFFER_EXTREMELY_BIG_SIZE_BYTES = 100ll << 30; // 100GB
+
 class TmpFileMgrTest : public ::testing::Test {
  public:
   static const int DEFAULT_PRIORITY = numeric_limits<int>::max();
@@ -85,6 +91,8 @@ class TmpFileMgrTest : public ::testing::Test {
     FLAGS_stress_scratch_write_delay_ms = 0;
 #endif
     FLAGS_remote_tmp_file_size = "8MB";
+    FLAGS_remote_read_memory_buffer_size = "1GB";
+    FLAGS_remote_batch_read = false;
 
     metrics_.reset(new MetricGroup("tmp-file-mgr-test"));
     profile_ = RuntimeProfile::Create(&obj_pool_, "tmp-file-mgr-test");
@@ -147,6 +155,14 @@ class TmpFileMgrTest : public ::testing::Test {
     ASSERT_EQ(hwm_value->GetValue(), exp_hwm_value);
   }
 
+  /// Check the current scratch space read buffer HWM higher than zero.
+  void checkHWMReadBuffMetrics() {
+    AtomicHighWaterMarkGauge* hwm_value =
+        metrics_->FindMetricForTesting<AtomicHighWaterMarkGauge>(
+            "tmp-file-mgr.scratch-read-memory-buffer-used-high-water-mark");
+    ASSERT_TRUE(hwm_value->GetValue() > 0);
+  }
+
   void RemoveAndCreateDirs(const vector<string>& dirs) {
     for (const string& dir: dirs) {
       ASSERT_OK(FileSystemUtil::RemoveAndCreateDirectory(dir));
@@ -157,6 +173,40 @@ class TmpFileMgrTest : public ::testing::Test {
     ASSERT_OK(FileSystemUtil::RemovePaths(dirs));
   }
 
+  int64_t GetReadBufferMaxAllowedBytes(TmpFileMgr* mgr) {
+    return mgr->tmp_dirs_remote_ctrl_.CalcMaxReadBufferBytes();
+  }
+
+  int64_t GetReadBufferCurrentMaxBytes(TmpFileMgr* mgr) {
+    return mgr->tmp_dirs_remote_ctrl_.max_read_buffer_size_;
+  }
+
+  void CreateAndGetGeneralRemoteTmpDir(vector<string>* tmp_dirs) {
+    FLAGS_remote_tmp_file_size = "1K";
+    vector<string> tmp_create_dirs{{LOCAL_BUFFER_PATH}};
+    RemoveAndCreateDirs(tmp_create_dirs);
+    tmp_dirs->push_back(Substitute(LOCAL_BUFFER_PATH + ":$0", 4096));
+    tmp_dirs->push_back(REMOTE_URL);
+  }
+
+  // Helper for TestBatchReadingSetMaxBytes() to set the read buffer size and check
+  // whether we should expect the system to use the max allowed bytes instead of the
+  // specified buffer size in the testcase.
+  void SetReadBufferSizeHelper(int64_t buffer_size, bool* expect_max_allowed_bytes) {
+    ASSERT_TRUE(expect_max_allowed_bytes != nullptr);
+    if (buffer_size == READ_BUFFER_SMALL_SIZE_BYTES) {
+      FLAGS_remote_read_memory_buffer_size = "1MB";
+      *expect_max_allowed_bytes = false;
+    } else if (buffer_size == READ_BUFFER_EXTREMELY_BIG_SIZE_BYTES) {
+      FLAGS_remote_read_memory_buffer_size = "100GB";
+      // If buffer size is too large for the system, we expect the actual capacity of
+      // the read buffer to be changed to the max allowed bytes in the testcase.
+      *expect_max_allowed_bytes = true;
+    } else {
+      ASSERT_TRUE(false) << "Unexpected buffer size " << buffer_size;
+    }
+  }
+
   /// Helper to call the private CreateFiles() method and return
   /// the created files.
   static Status CreateFiles(
@@ -262,6 +312,30 @@ class TmpFileMgrTest : public ::testing::Test {
     EXPECT_EQ(end, search->second.end);
   }
 
+  /// Helper to wait for the disk file changing to specific status. Will timeout after 2
+  /// seconds.
+  static void WaitForDiskFileStatus(DiskFile* file, DiskFileStatus status) {
+    int wait_times = 10;
+    while (true) {
+      if (file->GetFileStatus() == status) {
+        break;
+      }
+      // Suppose the upload should be finished in two seconds.
+      ASSERT_TRUE(wait_times-- > 0);
+      usleep(200 * 1000);
+    }
+  }
+
+  /// Helper to get the remote temporary file from the temporary file group.
+  static TmpFileRemote* GetRemoteTmpFileByFileGroup(TmpFileGroup& file_group, int idx) {
+    return static_cast<TmpFileRemote*>(file_group.tmp_files_remote_[idx].get());
+  }
+
+  /// Helper to get the number of remote temporary files from the temporary file group.
+  static int GetRemoteTmpFileNum(TmpFileGroup& file_group) {
+    return file_group.tmp_files_remote_.size();
+  }
+
   /// Helpers to call WriteHandle methods.
   void Cancel(TmpWriteHandle* handle) { handle->Cancel(); }
   void WaitForWrite(TmpWriteHandle* handle) {
@@ -1796,7 +1870,7 @@ TEST_F(TmpFileMgrTest, TestMixTmpFileLimits) {
   RemoveAndCreateDirs(tmp_create_dirs);
   tmp_dirs.push_back(REMOTE_URL);
   int64_t alloc_size = 1024;
-  int64_t file_size = 256 * 1024 * 1024;
+  int64_t file_size = 512 * 1024 * 1024;
   int64_t offset;
   TmpFile* alloc_file;
   FLAGS_remote_tmp_file_size = "512MB";
@@ -2061,4 +2135,103 @@ TEST_F(TmpFileMgrTest, TestRemoteUploadFailed) {
   test_env_->TearDownQueries();
 }
 
+/// Test using batch reading while reading the remote spilled data.
+TEST_F(TmpFileMgrTest, TestBatchReadingFromRemote) {
+  TmpFileMgr tmp_file_mgr;
+  vector<string> tmp_dirs;
+  CreateAndGetGeneralRemoteTmpDir(&tmp_dirs);
+  FLAGS_remote_read_memory_buffer_size = "1MB";
+  FLAGS_remote_batch_read = true;
+  int64_t page_size_big = 512;
+  int64_t page_size_small = 256;
+  // There should be two files,
+  // file 1. page_big + page_big = 1024B
+  // file 2. page_big + page_small + page_big = 1280B
+  // So that we can test two cases, a file with a default size and one with a little over
+  // size.
+  int64_t file_size_1 = 2 * page_size_big;
+  int64_t file_size_2 = 2 * page_size_big + page_size_small;
+  ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, true, "", false, metrics_.get()));
+  TUniqueId id;
+  TmpFileGroup file_group(&tmp_file_mgr, io_mgr(), profile_, id);
+
+  vector<unique_ptr<MemRange>> mem_ranges;
+  vector<unique_ptr<TmpWriteHandle>> handles;
+  WriteRange::WriteDoneCallback callback =
+      bind(mem_fn(&TmpFileMgrTest::SignalCallback), this, _1);
+  const int PAGE_NUM = 5;
+  vector<vector<uint8_t>> data(PAGE_NUM);
+  for (int i = 0; i < PAGE_NUM; i++) {
+    int64_t page_size = page_size_big;
+    if (i == 3) {
+      page_size = page_size_small;
+    }
+    data[i].resize(page_size);
+    std::iota(data[i].begin(), data[i].end(), i);
+    mem_ranges.emplace_back(make_unique<MemRange>(data[i].data(), page_size));
+    unique_ptr<TmpWriteHandle> handle;
+    ASSERT_OK(file_group.Write(*(mem_ranges[i].get()), callback, &handle));
+    WaitForWrite(handle.get());
+    handles.emplace_back(move(handle));
+  }
+  WaitForCallbacks(PAGE_NUM);
+
+  // There should be two files in the TmpFileMgr.
+  ASSERT_EQ(GetRemoteTmpFileNum(file_group), 2);
+  auto file1 = GetRemoteTmpFileByFileGroup(file_group, 0);
+  auto file2 = GetRemoteTmpFileByFileGroup(file_group, 1);
+  WaitForDiskFileStatus(file1->DiskFile(), DiskFileStatus::PERSISTED);
+  WaitForDiskFileStatus(file1->DiskBufferFile(), DiskFileStatus::PERSISTED);
+  WaitForDiskFileStatus(file2->DiskFile(), DiskFileStatus::PERSISTED);
+  WaitForDiskFileStatus(file2->DiskBufferFile(), DiskFileStatus::PERSISTED);
+
+  // Check Actual Size is as expected.
+  ASSERT_EQ(file1->DiskBufferFile()->actual_file_size(), file_size_1);
+  ASSERT_EQ(file2->DiskBufferFile()->actual_file_size(), file_size_2);
+
+  // Remove the local buffers in order to read from the remote fs.
+  ASSERT_OK(tmp_file_mgr.TryEvictFile(file1));
+  ASSERT_OK(tmp_file_mgr.TryEvictFile(file2));
+
+  // Read the data.
+  for (int i = 0; i < PAGE_NUM; i++) {
+    vector<uint8_t> tmp;
+    tmp.resize(mem_ranges[i]->len());
+    ASSERT_OK(file_group.Read(handles[i].get(), MemRange(tmp.data(), tmp.size())));
+    EXPECT_EQ(0, memcmp(tmp.data(), mem_ranges[i]->data(), tmp.size()));
+    file_group.DestroyWriteHandle(move(handles[i]));
+  }
+
+  // Check the metrics that we did use the read buffer for reading.
+  checkHWMReadBuffMetrics();
+
+  file_group.Close();
+  test_env_->TearDownQueries();
+}
+
+// Test to set different capacities of the read buffer for remote spilling.
+TEST_F(TmpFileMgrTest, TestBatchReadingSetMaxBytes) {
+  vector<int64_t> buffer_sizes(
+      {READ_BUFFER_SMALL_SIZE_BYTES, READ_BUFFER_EXTREMELY_BIG_SIZE_BYTES});
+  FLAGS_remote_batch_read = true;
+  for (int64_t buffer_size : buffer_sizes) {
+    TmpFileMgr tmp_file_mgr;
+    vector<string> tmp_dirs;
+    bool expect_max_allowed_bytes = false;
+    CreateAndGetGeneralRemoteTmpDir(&tmp_dirs);
+    SetReadBufferSizeHelper(buffer_size, &expect_max_allowed_bytes);
+    ASSERT_OK(tmp_file_mgr.InitCustom(tmp_dirs, true, "", false, metrics_.get()));
+    int64_t max_allowed_bytes = GetReadBufferMaxAllowedBytes(&tmp_file_mgr);
+    int64_t cur_max_bytes = GetReadBufferCurrentMaxBytes(&tmp_file_mgr);
+    // If the buffer size is too large, then the current max bytes of the read buffer
+    // should be set to the system max allowed bytes instead of the buffer size.
+    if (expect_max_allowed_bytes) {
+      EXPECT_EQ(cur_max_bytes, max_allowed_bytes);
+    } else {
+      EXPECT_EQ(cur_max_bytes, buffer_size);
+    }
+    metrics_.reset(new MetricGroup("tmp-file-mgr-test"));
+  }
+}
+
 } // namespace impala
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 51828fbaf..be84ca420 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -50,6 +50,7 @@
 #include "util/hdfs-util.h"
 #include "util/histogram-metric.h"
 #include "util/kudu-status-util.h"
+#include "util/mem-info.h"
 #include "util/os-util.h"
 #include "util/parse-util.h"
 #include "util/pretty-printer.h"
@@ -103,6 +104,9 @@ DEFINE_string(remote_tmp_file_size, "16M",
 DEFINE_string(remote_tmp_file_block_size, "1M",
     "Specify the size of the block for doing file uploading and fetching. The block "
     "size should be power of 2 and less than the size of remote temporary file.");
+DEFINE_string(remote_read_memory_buffer_size, "1G",
+    "Specify the maximum size of read memory buffers for the remote temporary "
+    "files. Only valid when --remote_batch_read is true.");
 DEFINE_bool(remote_tmp_files_avail_pool_lifo, false,
     "If true, lifo is the algo to evict the local buffer files during spilling "
     "to the remote. Otherwise, fifo would be used.");
@@ -110,6 +114,10 @@ DEFINE_int32(wait_for_spill_buffer_timeout_s, 60,
     "Specify the timeout duration waiting for the buffer to write (second). If a spilling"
     "opertion fails to get a buffer from the pool within the duration, the operation"
     "fails.");
+DEFINE_bool(remote_batch_read, false,
+    "Set if the system uses batch reading for the remote temporary files. Batch reading"
+    "allows reading a block asynchronously when the buffer pool is trying to pin one"
+    "page of that block.");
 
 using boost::algorithm::is_any_of;
 using boost::algorithm::join;
@@ -131,7 +139,21 @@ constexpr int64_t TmpFileMgr::HOLE_PUNCH_BLOCK_SIZE_BYTES;
 
 const string TMP_SUB_DIR_NAME = "impala-scratch";
 const uint64_t AVAILABLE_SPACE_THRESHOLD_MB = 1024;
-const uint64_t MAX_REMOTE_TMPFILE_SIZE_THRESHOLD_MB = 256;
+const uint64_t MAX_REMOTE_TMPFILE_SIZE_THRESHOLD_MB = 512;
+
+// For spilling to remote fs, the max size of a read memory block.
+const uint64_t MAX_REMOTE_READ_MEM_BLOCK_THRESHOLD_BYTES = 16 * 1024 * 1024;
+
+// The memory limits for the memory buffer to read the spilled data in the remote fs.
+// The maximum bytes of the read buffer should be limited by the
+// REMOTE_READ_BUFFER_MAX_MEM_PERCENT, which stands for the percentage of the total
+// memory and REMOTE_READ_BUFFER_MEM_HARD_LIMIT_PERCENT, which stands for the percentage
+// of the remaining memory which is not used by the process.
+// Also, if the remaining memory is less than REMOTE_READ_BUFFER_DISABLE_THRESHOLD_PERCENT
+// of the total memory, then the read buffer for remote spilled data should be disabled.
+const double REMOTE_READ_BUFFER_MAX_MEM_PERCENT = 0.1;
+const double REMOTE_READ_BUFFER_MEM_HARD_LIMIT_PERCENT = 0.5;
+const double REMOTE_READ_BUFFER_DISABLE_THRESHOLD_PERCENT = 0.05;
 
 // Metric keys
 const string TMP_FILE_MGR_ACTIVE_SCRATCH_DIRS = "tmp-file-mgr.active-scratch-dirs";
@@ -141,6 +163,10 @@ const string TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED_HIGH_WATER_MARK =
     "tmp-file-mgr.scratch-space-bytes-used-high-water-mark";
 const string TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED =
     "tmp-file-mgr.scratch-space-bytes-used";
+const string TMP_FILE_MGR_SCRATCH_READ_MEMORY_BUFFER_USED_HIGH_WATER_MARK =
+    "tmp-file-mgr.scratch-read-memory-buffer-used-high-water-mark";
+const string TMP_FILE_MGR_SCRATCH_READ_MEMORY_BUFFER_USED =
+    "tmp-file-mgr.scratch-read-memory-buffer-used";
 const string SCRATCH_DIR_BYTES_USED_FORMAT =
     "tmp-file-mgr.scratch-space-bytes-used.dir-$0";
 const string LOCAL_BUFF_BYTES_USED_FORMAT = "tmp-file-mgr.local-buff-bytes-used.dir-$0";
@@ -235,6 +261,18 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dir_specifiers,
     return Status(Substitute("Invalid value of wait_for_spill_buffer_timeout_us '$0'",
         FLAGS_wait_for_spill_buffer_timeout_s));
   }
+
+  tmp_dirs_remote_ctrl_.remote_batch_read_enabled_ = FLAGS_remote_batch_read;
+  if (tmp_dirs_remote_ctrl_.remote_batch_read_enabled_) {
+    Status setup_read_buffer_status = tmp_dirs_remote_ctrl_.SetUpReadBufferParams();
+    if (!setup_read_buffer_status.ok()) {
+      LOG(WARNING) << "Disabled the read buffer for the remote temporary files "
+                      "due to errors in read buffer parameters: "
+                   << setup_read_buffer_status.msg().msg();
+      tmp_dirs_remote_ctrl_.remote_batch_read_enabled_ = false;
+    }
+  }
+
   // Below options are using for test by setting different modes to implement the
   // spilling to the remote.
   tmp_dirs_remote_ctrl_.remote_tmp_files_avail_pool_lifo_ =
@@ -357,6 +395,10 @@ Status TmpFileMgr::InitCustom(const vector<string>& tmp_dir_specifiers,
       metrics->AddHWMGauge(TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED_HIGH_WATER_MARK,
           TMP_FILE_MGR_SCRATCH_SPACE_BYTES_USED, 0);
 
+  scratch_read_memory_buffer_used_metric_ =
+      metrics->AddHWMGauge(TMP_FILE_MGR_SCRATCH_READ_MEMORY_BUFFER_USED_HIGH_WATER_MARK,
+          TMP_FILE_MGR_SCRATCH_READ_MEMORY_BUFFER_USED, 0);
+
   initialized_ = true;
 
   if ((tmp_dirs_.empty() && local_buff_dir_ == nullptr) && !tmp_dirs.empty()) {
@@ -444,17 +486,28 @@ Status TmpFileMgr::DequeueTmpFilesPool(shared_ptr<TmpFile>* tmp_file, bool quick
       tmp_file, quick_return);
 }
 
+void TmpFileMgr::ReleaseTmpFileReadBuffer(
+    const unique_lock<shared_mutex>& file_lock, TmpFile* tmp_file) {
+  DCHECK(tmp_file != nullptr);
+  DCHECK(IsRemoteBatchReadingEnabled());
+  TmpFileRemote* tmp_file_remote = static_cast<TmpFileRemote*>(tmp_file);
+  for (int i = 0; i < GetNumReadBuffersPerFile(); i++) {
+    tmp_file_remote->TryDeleteReadBuffer(file_lock, i);
+  }
+}
+
 Status TmpFileMgr::TryEvictFile(TmpFile* tmp_file) {
   DCHECK(tmp_file != nullptr);
   if (tmp_file->disk_type() == io::DiskFileType::DUMMY) return Status::OK();
 
+  TmpFileRemote* tmp_file_remote = static_cast<TmpFileRemote*>(tmp_file);
+  DiskFile* buffer_file = tmp_file_remote->DiskBufferFile();
+
   // Remove the buffer of the TmpFile.
   // After deletion of the buffer, if the TmpFile doesn't exist in the remote file system
   // either, that means the TmpFile shared pointer can be removed from the TmpFileMgr,
   // because in this case, the physical file is considered no longer in the system.
-  // Fetch the unique locks of the files when doing the deletion.
-  TmpFileRemote* tmp_file_remote = static_cast<TmpFileRemote*>(tmp_file);
-  DiskFile* buffer_file = tmp_file_remote->DiskBufferFile();
+  // Hold the unique locks of the files during the deletion.
   Status status = Status::OK();
   {
     unique_lock<shared_mutex> buffer_lock(buffer_file->physical_file_lock_);
@@ -540,6 +593,58 @@ string TmpFileMgr::GetTmpDirPath(DeviceId device_id) const {
   }
 }
 
+int64_t TmpFileMgr::TmpDirRemoteCtrl::CalcMaxReadBufferBytes() {
+  int64_t max_allow_bytes = 0;
+  int64_t process_bytes_limit;
+  int64_t total_avail_mem;
+  if (!ChooseProcessMemLimit(&process_bytes_limit, &total_avail_mem).ok()) {
+    // Return 0 to disable read buffer if unable to get the process and system limit.
+    return max_allow_bytes;
+  }
+  DCHECK_GE(total_avail_mem, process_bytes_limit);
+  // Only allows the read buffer if the memory not being used is larger than
+  // REMOTE_READ_BUFFER_DISABLE_THRESHOLD_PERCENT of the total memory.
+  if ((total_avail_mem - process_bytes_limit)
+      > total_avail_mem * REMOTE_READ_BUFFER_DISABLE_THRESHOLD_PERCENT) {
+    // Max allowed bytes are the minimum of REMOTE_READ_BUFFER_MAX_MEM_PERCENT of the
+    // total memory and REMOTE_READ_BUFFER_MEM_HARD_LIMIT_PERCENT of the unused memory.
+    max_allow_bytes = min((total_avail_mem - process_bytes_limit)
+            * REMOTE_READ_BUFFER_MEM_HARD_LIMIT_PERCENT,
+        total_avail_mem * REMOTE_READ_BUFFER_MAX_MEM_PERCENT);
+  }
+  return max_allow_bytes;
+}
+
+Status TmpFileMgr::TmpDirRemoteCtrl::SetUpReadBufferParams() {
+  bool is_percent;
+  // If the temporary file size is smaller than the max block size, set the block size
+  // as the file size
+  read_buffer_block_size_ =
+      remote_tmp_file_size_ < MAX_REMOTE_READ_MEM_BLOCK_THRESHOLD_BYTES ?
+      remote_tmp_file_size_ :
+      MAX_REMOTE_READ_MEM_BLOCK_THRESHOLD_BYTES;
+  num_read_buffer_blocks_per_file_ =
+      static_cast<int>(remote_tmp_file_size_ / read_buffer_block_size_);
+  max_read_buffer_size_ =
+      ParseUtil::ParseMemSpec(FLAGS_remote_read_memory_buffer_size, &is_percent, 0);
+  if (max_read_buffer_size_ <= 0) {
+    return Status(Substitute("Invalid value of remote_read_memory_buffer_size '$0'",
+        FLAGS_remote_read_memory_buffer_size));
+  }
+  // Calculate the max allowed bytes for the read buffer.
+  int64_t max_allow_bytes = CalcMaxReadBufferBytes();
+  DCHECK_GE(max_allow_bytes, 0);
+  if (max_read_buffer_size_ > max_allow_bytes) {
+    max_read_buffer_size_ = max_allow_bytes;
+    LOG(WARNING) << "The remote read memory buffer size exceeds the maximum "
+                    "allowed and is reduced to "
+                 << max_allow_bytes << " bytes.";
+  }
+  LOG(INFO) << "Using " << max_read_buffer_size_
+            << " bytes for the batch reading buffer of TmpFileMgr.";
+  return Status::OK();
+}
+
 TmpDir::TmpDir(const string& raw_path, const string& prefix, bool is_local)
   : raw_path_(raw_path), prefix_(prefix), is_local_dir_(is_local), is_parsed_(false) {}
 
@@ -872,9 +977,11 @@ TmpFileRemote::TmpFileRemote(TmpFileGroup* file_group, TmpFileMgr::DeviceId devi
   if (IsHdfsPath(hdfs_url, false)) {
     disk_type_ = io::DiskFileType::DFS;
     disk_id_ = file_group->io_mgr_->RemoteDfsDiskId();
+    disk_id_file_op_ = file_group->io_mgr_->RemoteDfsDiskFileOperId();
   } else if (IsS3APath(hdfs_url, false)) {
     disk_type_ = io::DiskFileType::S3;
     disk_id_ = file_group->io_mgr_->RemoteS3DiskId();
+    disk_id_file_op_ = file_group->io_mgr_->RemoteS3DiskFileOperId();
     options = file_group_->tmp_file_mgr_->s3a_options();
   }
   Status status = HdfsFsCache::instance()->GetConnection(
@@ -883,8 +990,23 @@ TmpFileRemote::TmpFileRemote(TmpFileGroup* file_group, TmpFileMgr::DeviceId devi
   local_buffer_path_ = local_buffer_path;
   disk_file_ = make_unique<io::DiskFile>(path_, file_group->io_mgr_,
       file_group_->tmp_file_mgr_->GetRemoteTmpFileSize(), disk_type_, &hdfs_conn_);
-  disk_buffer_file_ = make_unique<io::DiskFile>(local_buffer_path_, file_group_->io_mgr_,
-      file_group_->tmp_file_mgr_->GetRemoteTmpFileSize(), io::DiskFileType::LOCAL_BUFFER);
+  if (file_group_->tmp_file_mgr_->IsRemoteBatchReadingEnabled()) {
+    read_buffer_block_size_ = file_group_->tmp_file_mgr_->GetReadBufferBlockSize();
+    int num_of_read_buffers = file_group_->tmp_file_mgr_->GetNumReadBuffersPerFile();
+    disk_buffer_file_ = make_unique<io::DiskFile>(local_buffer_path_,
+        file_group_->io_mgr_, file_group_->tmp_file_mgr_->GetRemoteTmpFileSize(),
+        io::DiskFileType::LOCAL_BUFFER, read_buffer_block_size_, num_of_read_buffers);
+    disk_read_page_cnts_ = std::make_unique<int64_t[]>(num_of_read_buffers);
+    DCHECK(disk_read_page_cnts_.get() != nullptr);
+    memset(disk_read_page_cnts_.get(), 0, num_of_read_buffers * sizeof(int64_t));
+    for (int i = 0; i < num_of_read_buffers; i++) {
+      fetch_ranges_.emplace_back(nullptr);
+    }
+  } else {
+    disk_buffer_file_ = make_unique<io::DiskFile>(local_buffer_path_,
+        file_group_->io_mgr_, file_group_->tmp_file_mgr_->GetRemoteTmpFileSize(),
+        io::DiskFileType::LOCAL_BUFFER);
+  }
 }
 
 TmpFileRemote::~TmpFileRemote() {
@@ -910,6 +1032,133 @@ io::DiskFile* TmpFileRemote::GetWriteFile() {
   return disk_buffer_file_.get();
 }
 
+int TmpFileRemote::GetReadBufferIndex(int64_t offset) {
+  DCHECK(disk_buffer_file_ != nullptr);
+  return disk_buffer_file_->GetReadBufferIndex(offset);
+}
+
+void TmpFileRemote::AsyncFetchReadBufferBlock(io::DiskFile* read_buffer_file,
+    io::MemBlock* read_buffer_block, int read_buffer_idx, bool* fetched) {
+  DCHECK(fetched != nullptr);
+  *fetched = false;
+  {
+    shared_lock<shared_mutex> read_file_lock(*(read_buffer_file->GetFileLock()));
+    unique_lock<SpinLock> mem_bloc_lock(*(read_buffer_block->GetLock()));
+    // Check the block status.
+    // If the block is disabled, the caller won't be able to use this buffer block.
+    // If the block is written, the block is already fetched, set the fetched flag and
+    // return immediately.
+    // If the block is uninitialized, we will fetch the block immediately but without
+    // waiting for the fetch, so that it won't block the current page reading.
+    // If the block is in reserved or alloc status, means one other thread is handling
+    // the block, here we don't wait because the blocking could be expensive.
+    if (read_buffer_file->IsReadBufferBlockStatus(read_buffer_block,
+            io::MemBlockStatus::DISABLED, read_file_lock, &mem_bloc_lock)) {
+      return;
+    } else if (read_buffer_file->IsReadBufferBlockStatus(read_buffer_block,
+                   io::MemBlockStatus::WRITTEN, read_file_lock, &mem_bloc_lock)) {
+      *fetched = true;
+      return;
+    } else if (read_buffer_file->IsReadBufferBlockStatus(read_buffer_block,
+                   io::MemBlockStatus::UNINIT, read_file_lock, &mem_bloc_lock)) {
+      bool dofetch = true;
+      int64_t mem_size_limit =
+          file_group_->tmp_file_mgr()->GetRemoteMaxTotalReadBufferSize();
+      auto read_mem_counter =
+          file_group_->tmp_file_mgr()->scratch_read_memory_buffer_used_metric_;
+      if (read_mem_counter->Increment(read_buffer_file->read_buffer_block_size())
+          > mem_size_limit) {
+        read_mem_counter->Increment(-1 * read_buffer_file->read_buffer_block_size());
+        dofetch = false;
+      }
+      if (dofetch) {
+        read_buffer_file->SetReadBufferBlockStatus(read_buffer_block,
+            io::MemBlockStatus::RESERVED, read_file_lock, &mem_bloc_lock);
+        RemoteOperRange::RemoteOperDoneCallback fetch_callback =
+            [read_buffer_block, tmp_file = this](const Status& fetch_status) {
+              if (!fetch_status.ok()) {
+                // Disable the read buffer if fails to fetch.
+                tmp_file->TryDeleteReadBufferExcl(read_buffer_block->block_id());
+              }
+            };
+        fetch_ranges_[read_buffer_idx].reset(new RemoteOperRange(disk_file_.get(),
+            read_buffer_file, file_group_->tmp_file_mgr()->GetRemoteTmpBlockSize(),
+            disk_id(true), RequestType::FILE_FETCH, file_group_->io_mgr_, fetch_callback,
+            GetReadBuffStartOffset(read_buffer_idx)));
+        Status add_status = file_group_->io_ctx_->AddRemoteOperRange(
+            fetch_ranges_[read_buffer_idx].get());
+        if (!add_status.ok()) {
+          read_buffer_file->SetReadBufferBlockStatus(read_buffer_block,
+              io::MemBlockStatus::DISABLED, read_file_lock, &mem_bloc_lock);
+        }
+      } else {
+        read_buffer_file->SetReadBufferBlockStatus(read_buffer_block,
+            io::MemBlockStatus::DISABLED, read_file_lock, &mem_bloc_lock);
+      }
+    }
+  }
+  *fetched = true;
+  return;
+}
+
+io::DiskFile* TmpFileRemote::GetReadBufferFile(int64_t offset) {
+  // If the local buffer file exists, return the file directly.
+  // If it is deleted (probably due to eviction), and batch reading is enabled, would
+  // try to fetch the current block asynchronously if it is not present in the memory
+  // buffer.
+  // If the local buffer file is deleted and the read memory buffer doesn't have the
+  // block right now, then return a nullptr to indicate there is no buffer available.
+  io::DiskFile* read_buffer_file = disk_buffer_file_.get();
+  if (disk_buffer_file_->GetFileStatus() != io::DiskFileStatus::DELETED) {
+    return read_buffer_file;
+  }
+  if (!file_group_->tmp_file_mgr()->IsRemoteBatchReadingEnabled()) return nullptr;
+  int read_buffer_idx = GetReadBufferIndex(offset);
+  io::MemBlock* read_buffer_block = disk_buffer_file_->GetBufferBlock(read_buffer_idx);
+  bool fetched = false;
+  io::MemBlockStatus block_status = read_buffer_block->GetStatus();
+  if (block_status == io::MemBlockStatus::DISABLED) {
+    // do nothing
+  } else if (block_status == io::MemBlockStatus::WRITTEN) {
+    fetched = true;
+  } else {
+    AsyncFetchReadBufferBlock(
+        read_buffer_file, read_buffer_block, read_buffer_idx, &fetched);
+  }
+  return fetched ? read_buffer_file : nullptr;
+}
+
+bool TmpFileRemote::IncrementReadPageCount(int buffer_idx) {
+  int64_t read_count = 0;
+  int64_t total_num = 0;
+  DCheckReadBufferIdx(buffer_idx);
+  total_num = GetReadBuffPageCount(buffer_idx);
+  {
+    lock_guard<SpinLock> lock(lock_);
+    read_count = ++disk_read_page_cnts_[buffer_idx];
+  }
+  // Return true if all the pages have been read of the block.
+  return read_count == total_num;
+}
+
+template <typename T>
+void TmpFileRemote::TryDeleteReadBuffer(const T& lock, int buffer_idx) {
+  DCheckReadBufferIdx(buffer_idx);
+  bool reserved = false;
+  bool allocated = false;
+  DCHECK(disk_buffer_file_->IsBatchReadEnabled());
+  DCHECK(lock.mutex() == disk_buffer_file_->GetFileLock() && lock.owns_lock());
+  disk_buffer_file_->DeleteReadBuffer(
+      disk_buffer_file_->GetBufferBlock(buffer_idx), &reserved, &allocated, lock);
+  if (reserved || allocated) {
+    // Because the reservation will increase the current allocated read buffer usage
+    // ahead of the real allocation, we need to decrease it if the block is reserved
+    // or allocated.
+    file_group_->tmp_file_mgr_->scratch_read_memory_buffer_used_metric_->Increment(
+        -1 * read_buffer_block_size_);
+  }
+}
+
 TmpDir* TmpFileRemote::GetLocalBufferDir() const {
   return file_group_->tmp_file_mgr_->GetLocalBufferDir();
 }
@@ -919,33 +1168,44 @@ Status TmpFileRemote::Remove() {
   // If True, we need to enqueue the file back to the pool after deletion.
   bool to_return_the_buffer = false;
 
-  // The order of acquiring the lock must be from local to remote to avoid deadlocks.
-  unique_lock<shared_mutex> buffer_file_lock(*(disk_buffer_file_->GetFileLock()));
-  unique_lock<shared_mutex> file_lock(*(disk_file_->GetFileLock()));
+  // Set a flag to notify other threads which are holding the file lock to release,
+  // since the remove process needs a unique lock, it accelerates acquiring the mutex.
+  SetToDeleteFlag();
 
-  // Delete the local buffer file if exists.
-  if (disk_buffer_file_->GetFileStatus() != io::DiskFileStatus::DELETED) {
-    status = disk_buffer_file_->Delete(buffer_file_lock);
-    if (!status.ok()) {
-      // If the physical file is failed to delete, log a warning, and set a deleted flag
-      // anyway.
-      LOG(WARNING) << "Delete file: " << disk_buffer_file_->path() << " failed.";
-      disk_buffer_file_->SetStatus(io::DiskFileStatus::DELETED);
-    } else if (disk_file_->GetFileStatus() != io::DiskFileStatus::PERSISTED
-        && disk_buffer_file_->IsSpaceReserved()) {
-      // If the file is not uploaded and the buffer space is reserved, we need to return
-      // the buffer to the pool after deletion of the TmpFile. The buffer of a uploaded
-      // file should have been returned to the pool after upload operation completes.
-      to_return_the_buffer = true;
-    } else {
-      // Do nothing.
+  {
+    // The order of acquiring the lock must be from local to remote to avoid deadlocks.
+    unique_lock<shared_mutex> buffer_file_lock(*(disk_buffer_file_->GetFileLock()));
+    unique_lock<shared_mutex> file_lock(*(disk_file_->GetFileLock()));
+
+    // Delete the local buffer file if exists.
+    if (disk_buffer_file_->GetFileStatus() != io::DiskFileStatus::DELETED) {
+      status = disk_buffer_file_->Delete(buffer_file_lock);
+      if (!status.ok()) {
+        // If the physical file is failed to delete, log a warning, and set a deleted flag
+        // anyway.
+        LOG(WARNING) << "Delete file: " << disk_buffer_file_->path() << " failed.";
+        disk_buffer_file_->SetStatus(io::DiskFileStatus::DELETED);
+      } else if (disk_file_->GetFileStatus() != io::DiskFileStatus::PERSISTED
+          && disk_buffer_file_->IsSpaceReserved()) {
+        // If the file is not uploaded and the buffer space is reserved, we need to return
+        // the buffer to the pool after deletion of the TmpFile. The buffer of a uploaded
+        // file should have been returned to the pool after upload operation completes.
+        to_return_the_buffer = true;
+      } else {
+        // Do nothing.
+      }
     }
-  }
 
-  // Set the remote file status to deleted. The physical remote files would be deleted
-  // during deconstruction of TmpFileGroup by deleting the entire remote
-  // directory for efficiency consideration.
-  disk_file_->SetStatus(io::DiskFileStatus::DELETED);
+    // Set the remote file status to deleted. The physical remote files would be deleted
+    // during deconstruction of TmpFileGroup by deleting the entire remote
+    // directory for efficiency consideration.
+    disk_file_->SetStatus(io::DiskFileStatus::DELETED);
+
+    // Try to delete all the read buffers.
+    if (file_group_->tmp_file_mgr_->IsRemoteBatchReadingEnabled()) {
+      file_group_->tmp_file_mgr_->ReleaseTmpFileReadBuffer(buffer_file_lock, this);
+    }
+  }
 
   // Update the metrics.
   GetDir()->bytes_used_metric()->Increment(-file_size_);
@@ -972,6 +1232,13 @@ TmpFileGroup::TmpFileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr,
         ADD_COUNTER(profile, "UncompressedScratchBytesWritten", TUnit::BYTES)),
     read_counter_(ADD_COUNTER(profile, "ScratchReads", TUnit::UNIT)),
     bytes_read_counter_(ADD_COUNTER(profile, "ScratchBytesRead", TUnit::BYTES)),
+    read_use_mem_counter_(ADD_COUNTER(profile, "ScratchReadsUseMem", TUnit::UNIT)),
+    bytes_read_use_mem_counter_(
+        ADD_COUNTER(profile, "ScratchBytesReadUseMem", TUnit::BYTES)),
+    read_use_local_disk_counter_(
+        ADD_COUNTER(profile, "ScratchReadsUseLocalDisk", TUnit::UNIT)),
+    bytes_read_use_local_disk_counter_(
+        ADD_COUNTER(profile, "ScratchBytesReadUseLocalDisk", TUnit::BYTES)),
     scratch_space_bytes_used_counter_(
         ADD_COUNTER(profile, "ScratchFileUsedBytes", TUnit::BYTES)),
     disk_read_timer_(ADD_TIMER(profile, "TotalReadBlockTime")),
@@ -987,6 +1254,10 @@ TmpFileGroup::TmpFileGroup(TmpFileMgr* tmp_file_mgr, DiskIoMgr* io_mgr,
     free_ranges_(64) {
   DCHECK(tmp_file_mgr != nullptr);
   io_ctx_ = io_mgr_->RegisterContext();
+  io_ctx_->set_read_use_mem_counter(read_use_mem_counter_);
+  io_ctx_->set_bytes_read_use_mem_counter(bytes_read_use_mem_counter_);
+  io_ctx_->set_read_use_local_disk_counter(read_use_local_disk_counter_);
+  io_ctx_->set_bytes_read_use_local_disk_counter(bytes_read_use_local_disk_counter_);
   // Populate the priority based index ranges.
   const std::vector<std::unique_ptr<TmpDir>>& tmp_dirs = tmp_file_mgr_->tmp_dirs_;
   if (tmp_dirs.size() > 0) {
@@ -1352,29 +1623,29 @@ Status TmpFileGroup::ReadAsync(TmpWriteHandle* handle, MemRange buffer) {
   // Don't grab handle->write_state_lock_, it is safe to touch all of handle's state
   // since the write is not in flight.
   handle->read_range_ = scan_range_pool_.Add(new ScanRange);
-
+  int64_t offset = handle->write_range_->offset();
   if (handle->file_ != nullptr && !handle->file_->is_local()) {
     TmpFileRemote* tmp_file = static_cast<TmpFileRemote*>(handle->file_);
-    DiskFile* disk_buffer_file = tmp_file->DiskBufferFile();
-    DiskFile* disk_file = tmp_file->DiskFile();
+    DiskFile* local_read_buffer_file = tmp_file->GetReadBufferFile(offset);
+    DiskFile* remote_file = tmp_file->DiskFile();
     // Reset the read_range, use the remote filesystem's disk id.
-    handle->read_range_->Reset(tmp_file->hdfs_conn_, disk_file->path().c_str(),
-        handle->write_range_->len(), handle->write_range_->offset(), tmp_file->disk_id(),
-        false, tmp_file->mtime_,
+    handle->read_range_->Reset(tmp_file->hdfs_conn_, remote_file->path().c_str(),
+        handle->write_range_->len(), offset, tmp_file->disk_id(), false, tmp_file->mtime_,
         BufferOpts::ReadInto(
             read_buffer.data(), read_buffer.len(), BufferOpts::NO_CACHING),
-        nullptr, disk_file, disk_buffer_file);
+        nullptr, remote_file, local_read_buffer_file);
   } else {
     // Read from local.
     handle->read_range_->Reset(nullptr, handle->write_range_->file(),
-        handle->write_range_->len(), handle->write_range_->offset(),
-        handle->write_range_->disk_id(), false, ScanRange::INVALID_MTIME,
+        handle->write_range_->len(), offset, handle->write_range_->disk_id(), false,
+        ScanRange::INVALID_MTIME,
         BufferOpts::ReadInto(
             read_buffer.data(), read_buffer.len(), BufferOpts::NO_CACHING));
   }
 
   read_counter_->Add(1);
   bytes_read_counter_->Add(read_buffer.len());
+
   bool needs_buffers;
   RETURN_IF_ERROR(io_ctx_->StartScanRange(handle->read_range_, &needs_buffers));
   DCHECK(!needs_buffers) << "Already provided a buffer";
@@ -1431,6 +1702,16 @@ Status TmpFileGroup::WaitForAsyncRead(
     if (!status.ok()) goto exit;
   }
 exit:
+  if (handle->file_ != nullptr && !handle->file_->is_local()) {
+    auto tmp_file = static_cast<TmpFileRemote*>(handle->file_);
+    // If all the pages of specific read buffer have been read, try delete the read
+    // buffer.
+    if (tmp_file_mgr()->IsRemoteBatchReadingEnabled()) {
+      int buffer_idx = tmp_file->GetReadBufferIndex(handle->write_range_->offset());
+      bool all_read = tmp_file->IncrementReadPageCount(buffer_idx);
+      if (all_read) tmp_file->TryDeleteMemReadBufferShared(buffer_idx);
+    }
+  }
   // Always return the buffer before exiting to avoid leaking it.
   if (io_mgr_buffer != nullptr) handle->read_range_->ReturnBuffer(move(io_mgr_buffer));
   handle->read_range_ = nullptr;
@@ -1762,15 +2043,13 @@ void TmpWriteHandle::WriteComplete(const Status& write_status) {
       // Do file upload if the local buffer file is finished.
       if (write_range_->is_full()) {
         TmpFileRemote* tmp_file = static_cast<TmpFileRemote*>(file_);
-        int disk_id = tmp_file->DiskFile()->disk_type() == io::DiskFileType::S3 ?
-            parent_->io_mgr_->RemoteS3DiskFileOperId() :
-            parent_->io_mgr_->RemoteDfsDiskFileOperId();
         RemoteOperRange::RemoteOperDoneCallback u_callback =
             [this, tmp_file](
                 const Status& upload_status) { UploadComplete(tmp_file, upload_status); };
-        tmp_file->upload_range_.reset(new RemoteOperRange(tmp_file->DiskBufferFile(),
-            tmp_file->DiskFile(), parent_->tmp_file_mgr()->GetRemoteTmpBlockSize(),
-            disk_id, RequestType::FILE_UPLOAD, parent_->io_mgr_, u_callback));
+        tmp_file->upload_range_.reset(
+            new RemoteOperRange(tmp_file->DiskBufferFile(), tmp_file->DiskFile(),
+                parent_->tmp_file_mgr()->GetRemoteTmpBlockSize(), tmp_file->disk_id(true),
+                RequestType::FILE_UPLOAD, parent_->io_mgr_, u_callback));
         status = parent_->io_ctx_->AddRemoteOperRange(tmp_file->upload_range_.get());
       }
     }
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index 9f3691c9b..a6a3cb5a7 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -49,6 +49,7 @@ namespace io {
   class DiskIoMgr;
   class RequestContext;
   class ScanRange;
+  class DiskFile;
   class WriteRange;
   class RemoteOperRange;
 }
@@ -136,21 +137,40 @@ class TmpFileMgr {
   /// A configuration for the control parameters of remote temporary directories.
   /// The struct is used by TmpFileMgr and has the same lifecycle as TmpFileMgr.
   struct TmpDirRemoteCtrl {
+    /// Calculate the maximum allowed read buffer bytes for the remote spilling.
+    int64_t CalcMaxReadBufferBytes();
+
+    /// A helper function to set up the paremeters of read buffers for remote files.
+    Status SetUpReadBufferParams() WARN_UNUSED_RESULT;
+
     /// The high water mark metric for local buffer directory.
     AtomicInt64 local_buff_dir_bytes_high_water_mark_{0};
 
     /// The default size of a remote temporary file.
     int64_t remote_tmp_file_size_;
 
+    /// The default size of a read buffer block for a remote temporary file.
+    int64_t read_buffer_block_size_;
+
+    /// The number of read buffer blocks for a remote file, it is from
+    /// remote_tmp_file_size_/read_buffer_block_size_.
+    int num_read_buffer_blocks_per_file_;
+
     /// The default block size of a remote temporary file. The block is used as a buffer
     /// while doing upload and fetch a remote temporary file.
     int64_t remote_tmp_block_size_;
 
+    /// The maximum total size of read buffer for remote spilling.
+    int64_t max_read_buffer_size_;
+
     /// Specify the mode to enqueue the tmp file to the pool.
     /// If true, the file would be placed in the first to be poped out from the pool.
     /// If false, the file would be placed in the last of the pool.
     bool remote_tmp_files_avail_pool_lifo_;
 
+    /// Indicates if batch reading is enabled for the remote temporary files.
+    bool remote_batch_read_enabled_;
+
     /// Temporary file buffer pool managed by TmpFileMgr, is only activated when there is
     /// a remote scratch space is registered. So, if TmpFileMgr::HasRemoteDir() is true,
     /// the tmp_file_pool_ is non-null. Otherwise, it is null.
@@ -197,6 +217,21 @@ class TmpFileMgr {
     return tmp_dirs_remote_ctrl_.remote_tmp_file_size_;
   }
 
+  /// Return the read buffer block size for a remote temporary file.
+  int64_t GetReadBufferBlockSize() const {
+    return tmp_dirs_remote_ctrl_.read_buffer_block_size_;
+  }
+
+  /// Return the number of read buffer blocks for a remote temporary file.
+  int GetNumReadBuffersPerFile() const {
+    return tmp_dirs_remote_ctrl_.num_read_buffer_blocks_per_file_;
+  }
+
+  /// Return the maximum total size of all the read buffer blocks for remote spilling.
+  int64_t GetRemoteMaxTotalReadBufferSize() const {
+    return tmp_dirs_remote_ctrl_.max_read_buffer_size_;
+  }
+
   /// Return the remote temporary block size.
   int64_t GetRemoteTmpBlockSize() const {
     return tmp_dirs_remote_ctrl_.remote_tmp_block_size_;
@@ -213,6 +248,11 @@ class TmpFileMgr {
     return tmp_dirs_remote_ctrl_.remote_tmp_files_avail_pool_lifo_;
   }
 
+  // Return if batch reading for remote temporary files is enabled.
+  bool IsRemoteBatchReadingEnabled() {
+    return tmp_dirs_remote_ctrl_.remote_batch_read_enabled_;
+  }
+
   /// Return the local buffer dir for remote spilling.
   TmpDir* GetLocalBufferDir() const;
 
@@ -257,6 +297,11 @@ class TmpFileMgr {
   /// one would wait util a file is dequeued.
   Status DequeueTmpFilesPool(std::shared_ptr<TmpFile>* tmp_file, bool quick_return);
 
+  /// The function releases all the memory of read buffer in a temporary file.
+  /// Caller needs to hold the unique lock of the buffer file.
+  void ReleaseTmpFileReadBuffer(
+      const std::unique_lock<boost::shared_mutex>& lock, TmpFile* tmp_file);
+
   /// Try to delete the buffer of a TmpFile to make some space for other buffers.
   /// May return an error status if error happens during deletion of the buffer.
   Status TryEvictFile(TmpFile* tmp_file);
@@ -350,6 +395,9 @@ class TmpFileMgr {
 
   /// Metrics to track the scratch space HWM.
   AtomicHighWaterMarkGauge* scratch_bytes_used_metric_ = nullptr;
+
+  /// Metrics to track the read memory buffer HWM.
+  AtomicHighWaterMarkGauge* scratch_read_memory_buffer_used_metric_ = nullptr;
 };
 
 /// Represents a group of temporary files - one per disk with a scratch directory. The
@@ -553,6 +601,18 @@ class TmpFileGroup {
   /// Number of bytes read from disk (includes reads started but not yet complete).
   RuntimeProfile::Counter* const bytes_read_counter_;
 
+  /// Number of read operations that use mem buffer.
+  RuntimeProfile::Counter* const read_use_mem_counter_;
+
+  /// Number of bytes read from mem buffer.
+  RuntimeProfile::Counter* const bytes_read_use_mem_counter_;
+
+  /// Number of read operations that use local disk buffer.
+  RuntimeProfile::Counter* const read_use_local_disk_counter_;
+
+  /// Number of bytes read from local disk buffer.
+  RuntimeProfile::Counter* const bytes_read_use_local_disk_counter_;
+
   /// Amount of scratch space allocated in bytes.
   RuntimeProfile::Counter* const scratch_space_bytes_used_counter_;
 
@@ -566,7 +626,7 @@ class TmpFileGroup {
   /// is not enabled.
   RuntimeProfile::Counter* compression_timer_;
 
-  /// Protects tmp_files_remote_ptrs_lock_.
+  /// Protects tmp_files_remote_ptrs_.
   SpinLock tmp_files_remote_ptrs_lock_;
 
   /// A map of raw pointer and its shared_ptr of remote TmpFiles.
diff --git a/be/src/util/mem-info.cc b/be/src/util/mem-info.cc
index df932cb70..43cfd24ff 100644
--- a/be/src/util/mem-info.cc
+++ b/be/src/util/mem-info.cc
@@ -198,7 +198,7 @@ string ThpConfig::DebugString() const {
   return stream.str();
 }
 
-Status ChooseProcessMemLimit(int64_t* bytes_limit) {
+Status ChooseProcessMemLimit(int64_t* bytes_limit, int64_t* process_avail_mem) {
   // Depending on the system configuration, we detect the total amount of memory
   // available to the system - either the available physical memory, or if overcommitting
   // is turned off, we use the memory commit limit from /proc/meminfo (see IMPALA-1690).
@@ -262,6 +262,7 @@ Status ChooseProcessMemLimit(int64_t* bytes_limit) {
                  << " exceeds CGroup memory limit of "
                  << PrettyPrinter::PrintBytes(cgroup_mem_limit);
   }
+  if (process_avail_mem) *process_avail_mem = avail_mem;
   return Status::OK();
 }
 }
diff --git a/be/src/util/mem-info.h b/be/src/util/mem-info.h
index f36e8f705..cabc24fde 100644
--- a/be/src/util/mem-info.h
+++ b/be/src/util/mem-info.h
@@ -126,5 +126,6 @@ class MemInfo {
 /// the memory available to the daemon process. Returns an error if the memory limit is
 /// invalid or another error is encountered that should prevent starting up the daemon.
 /// Logs the memory limit chosen and any relevant diagnostics related to that choice.
-Status ChooseProcessMemLimit(int64_t* bytes_limit);
+/// If avail_mem is not nullptr, the bytes of system available memory will be returned.
+Status ChooseProcessMemLimit(int64_t* bytes_limit, int64_t* avail_mem = nullptr);
 }
diff --git a/be/src/util/metrics.h b/be/src/util/metrics.h
index 0970caf81..fd9c45d5e 100644
--- a/be/src/util/metrics.h
+++ b/be/src/util/metrics.h
@@ -270,9 +270,11 @@ class AtomicHighWaterMarkGauge : public ScalarMetric<int64_t, TMetricKind::GAUGE
 
   /// Adds 'delta' to the current value atomically.
   /// The hwm value is also updated atomically.
-  void Increment(int64_t delta) {
+  /// The updated current value is also returned.
+  int64_t Increment(int64_t delta) {
     const int64_t new_val = current_value_->Increment(delta);
     UpdateMax(new_val);
+    return new_val;
   }
 
   IntGauge* current_value() const { return current_value_; }
diff --git a/common/thrift/metrics.json b/common/thrift/metrics.json
index d84453ca8..a5abd6651 100644
--- a/common/thrift/metrics.json
+++ b/common/thrift/metrics.json
@@ -2420,6 +2420,26 @@
     "kind": "GAUGE",
     "key": "tmp-file-mgr.scratch-space-bytes-used.dir-$0"
   },
+  {
+    "description": "The current total read memory buffer bytes for all scratch directories.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Read memory buffer used for scratch directories",
+    "units": "BYTES",
+    "kind": "GAUGE",
+    "key": "tmp-file-mgr.scratch-read-memory-buffer-used"
+  },
+  {
+    "description": "The high water mark for read memory buffer bytes of all scratch directories.",
+    "contexts": [
+      "IMPALAD"
+    ],
+    "label": "Read memory buffer HWM for scratch directories",
+    "units": "BYTES",
+    "kind": "GAUGE",
+    "key": "tmp-file-mgr.scratch-read-memory-buffer-used-high-water-mark"
+  },
   {
     "description": "The current total spilled bytes for the local buffer directory.",
     "contexts": [
diff --git a/tests/custom_cluster/test_scratch_disk.py b/tests/custom_cluster/test_scratch_disk.py
index 17dd34ec0..a5ca75bbe 100644
--- a/tests/custom_cluster/test_scratch_disk.py
+++ b/tests/custom_cluster/test_scratch_disk.py
@@ -45,6 +45,13 @@ class TestScratchDir(CustomClusterTestSuite):
       from tpch.orders
       order by o_orderdate
       """
+  # Query against a big table with order by requires spill to disk if intermediate
+  # results don't fit in memory.
+  spill_query_big_table = """
+      select l_orderkey, l_linestatus, l_shipdate, l_comment
+      from tpch.lineitem
+      order by l_orderkey
+      """
   # Query without order by can be executed without spilling to disk.
   in_mem_query = """
       select o_orderdate, o_custkey, o_comment from tpch.orders
@@ -440,3 +447,38 @@ class TestScratchDir(CustomClusterTestSuite):
     # assert that we did use the scratch space and should be integer times of the
     # remote file size.
     assert (total_size > 0 and total_size % (8 * 1024 * 1024) == 0)
+
+  @pytest.mark.execute_serially
+  @SkipIf.not_hdfs
+  def test_scratch_dirs_batch_reading(self, vector):
+    # Set the buffer directory small enough to spill to the remote one.
+    normal_dirs = self.generate_dirs(1)
+    normal_dirs[0] = '{0}:2MB:{1}'.format(normal_dirs[0], 1)
+    normal_dirs.append('hdfs://localhost:20500/tmp')
+    self._start_impala_cluster([
+      '--impalad_args=-logbuflevel=-1 -scratch_dirs={0}'.format(','.join(normal_dirs)),
+      '--impalad_args=--allow_multiple_scratch_dirs_per_device=true',
+      '--impalad_args=--buffer_pool_clean_pages_limit=1m',
+      '--impalad_args=--remote_tmp_file_size=1M',
+      '--impalad_args=--remote_tmp_file_block_size=1m',
+      '--impalad_args=--remote_read_memory_buffer_size=1GB',
+      '--impalad_args=--remote_batch_read=true'],
+      cluster_size=1,
+      expected_num_impalads=1)
+    self.assert_impalad_log_contains("INFO", "Using scratch directory ",
+                                    expected_count=len(normal_dirs) - 1)
+    vector.get_value('exec_option')['buffer_pool_limit'] = self.buffer_pool_limit
+    impalad = self.cluster.impalads[0]
+    client = impalad.service.create_beeswax_client()
+    handle = self.execute_query_async_using_client(client, self.spill_query, vector)
+    verifier = MetricVerifier(impalad.service)
+    verifier.wait_for_metric("impala-server.num-fragments-in-flight", 2)
+    results = client.fetch(self.spill_query, handle)
+    assert results.success
+    metrics0 = self.get_metric(
+      'tmp-file-mgr.scratch-read-memory-buffer-used-high-water-mark')
+    assert (metrics0 > 0)
+    metrics1 = self.get_metric('tmp-file-mgr.scratch-space-bytes-used-high-water-mark')
+    assert (metrics1 > 0)
+    client.close_query(handle)
+    client.close()


[impala] 03/03: IMPALA-11576: Fix for test_multiple_storage_locations on S3

Posted by ar...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

arawat pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 55194a9c83bba52c64d0d2e75d5ebd3d8c67b3e2
Author: Gergely Fürnstáhl <gf...@cloudera.com>
AuthorDate: Tue Sep 27 11:54:35 2022 +0200

    IMPALA-11576: Fix for test_multiple_storage_locations on S3
    
    Fixed absolute/relative path handling in filesystems not supporting
    block location (e.g. S3).
    
    Testing:
     - test_multiple_storage_location passes on S3
    
    Change-Id: I08badd96d060a2377c9a1eafa287a3adf8fa11db
    Reviewed-on: http://gerrit.cloudera.org:8080/19045
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java    | 5 ++---
 .../main/java/org/apache/impala/catalog/FileMetadataLoader.java   | 2 +-
 fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java     | 8 ++++----
 3 files changed, 7 insertions(+), 8 deletions(-)

diff --git a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
index 5c25c0960..37b657b0c 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeIcebergTable.java
@@ -537,10 +537,9 @@ public interface FeIcebergTable extends FeFsTable {
         relPath = relUri.getPath();
       }
 
-
       if (!FileSystemUtil.supportsStorageIds(fs)) {
-        return HdfsPartition.FileDescriptor.createWithNoBlocks(fileStatus,
-            StringUtils.isNotEmpty(relPath) ? relPath : absPath);
+        return HdfsPartition.FileDescriptor.createWithNoBlocks(
+            fileStatus, relPath, absPath);
       }
 
       BlockLocation[] locations;
diff --git a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
index 272a24f0a..ee42fab49 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
@@ -259,7 +259,7 @@ public class FileMetadataLoader {
   private FileDescriptor createFd(FileSystem fs, FileStatus fileStatus,
       String relPath, Reference<Long> numUnknownDiskIds) throws IOException {
     if (!FileSystemUtil.supportsStorageIds(fs)) {
-      return FileDescriptor.createWithNoBlocks(fileStatus, relPath);
+      return FileDescriptor.createWithNoBlocks(fileStatus, relPath, null);
     }
     BlockLocation[] locations;
     if (fileStatus instanceof LocatedFileStatus) {
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
index 88b7eec26..9d481d540 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsPartition.java
@@ -221,11 +221,11 @@ public class HdfsPartition extends CatalogObjectImpl
      * Creates the file descriptor of a file represented by 'fileStatus' that
      * resides in a filesystem that doesn't support the BlockLocation API (e.g. S3).
      */
-    public static FileDescriptor createWithNoBlocks(FileStatus fileStatus,
-        String relPath) {
+    public static FileDescriptor createWithNoBlocks(
+        FileStatus fileStatus, String relPath, String absPath) {
       FlatBufferBuilder fbb = new FlatBufferBuilder(1);
-      return new FileDescriptor(createFbFileDesc(fbb, fileStatus, relPath, null, false,
-          null));
+      return new FileDescriptor(
+          createFbFileDesc(fbb, fileStatus, relPath, null, false, absPath));
     }
 
     /**