You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2020/08/25 21:41:52 UTC

[impala] 02/04: IMPALA-9955, IMPALA-9957: Fix not enough reservation for large pages in GroupingAggregator

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit e0a6e942b28909baa0f56e21e3d33adfb5eb19b7
Author: stiga-huang <hu...@gmail.com>
AuthorDate: Mon Jul 27 11:28:10 2020 +0800

    IMPALA-9955,IMPALA-9957: Fix not enough reservation for large pages in GroupingAggregator
    
    The minimum requirement for a spillable operator is ((min_buffers -2) *
    default_buffer_size) + 2 * max_row_size. In the min reservation, we only
    reserve space for two large pages, one for reading, the other for
    writing. However, to make the non-streaming GroupingAggregator work
    correctly, we have to manage these extra reservations carefully. So it
    won't run out of the min reservation when it actually needs to spill a
    large page, or when it actually needs to read a large page.
    
    To be specific, for how to manage the large write page reservation,
    depending on whether needs_serialize is true or false:
    - If the aggregator needs to serialize the intermediate results when
      spilling a partition, we have to save a large page worth of
      reservation for the serialize stream, in case it needs to write large
      rows. This space can be restored when all the partitions are spilled
      so the serialize stream is not needed until we build/repartition a
      spilled partition and thus have pinned partitions again. If the large
      write page reservation is used, we save it back whenever possible
      after we spill or close a partition.
    - If the aggregator doesn't need the serialize stream at all, we can
      restore the large write page reservation whenever we fail to add a
      large row, before spilling any partitions. Reclaim it whenever
      possible after we spill or close a partition.
    A special case is when we are processing a large row and it's the last
    row in building/repartitioning a spilled partition, the large write page
    reservation can be restored for it no matter whether we need the
    serialize stream. Because partitions will be read out after this so no
    needs for spilling.
    
    For the large read page reservation, it's transferred to the spilled
    BufferedTupleStream that we are reading in building/repartitioning a
    spilled partition. The stream will restore some of it when reading a
    large page, and reclaim it when the output row batch is reset. Note that
    the stream is read in attach_on_read mode, the large page will be
    attached to the row batch's buffers and only get freed when the row
    batch is reset.
    
    Tests:
    - Add tests in test_spilling_large_rows (test_spilling.py) with
      different row sizes to reproduce the issue.
    - One test in test_spilling_no_debug_action becomes flaky after this
      patch. Revise the query to make the udf allocate larger strings so it
      can consistently pass.
    - Run CORE tests.
    
    Change-Id: I3d9c3a2e7f0da60071b920dec979729e86459775
    Reviewed-on: http://gerrit.cloudera.org:8080/16240
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
    Reviewed-by: Tim Armstrong <ta...@cloudera.com>
---
 be/src/codegen/gen_ir_descriptions.py              |   2 +-
 be/src/exec/grouping-aggregator-ir.cc              |  28 ++-
 be/src/exec/grouping-aggregator-partition.cc       |  65 +++++-
 be/src/exec/grouping-aggregator.cc                 | 232 ++++++++++++++++++---
 be/src/exec/grouping-aggregator.h                  |  61 +++++-
 be/src/runtime/buffered-tuple-stream.cc            |  56 ++++-
 be/src/runtime/buffered-tuple-stream.h             |  16 ++
 be/src/runtime/bufferpool/buffer-pool.cc           |   4 +
 be/src/runtime/bufferpool/buffer-pool.h            |   3 +
 be/src/runtime/bufferpool/reservation-tracker.cc   |   4 +-
 .../queries/QueryTest/spilling-large-rows.test     |  62 ++++++
 .../QueryTest/spilling-no-debug-action.test        |   2 +-
 12 files changed, 474 insertions(+), 61 deletions(-)

diff --git a/be/src/codegen/gen_ir_descriptions.py b/be/src/codegen/gen_ir_descriptions.py
index f085a9c..8da5569 100755
--- a/be/src/codegen/gen_ir_descriptions.py
+++ b/be/src/codegen/gen_ir_descriptions.py
@@ -48,7 +48,7 @@ ir_functions = [
   ["AGG_FN_EVALUATOR_AGG_FN_CTX",
    "_ZNK6impala14AggFnEvaluator10agg_fn_ctxEv"],
   ["GROUPING_AGG_ADD_BATCH_IMPL",
-   "_ZN6impala18GroupingAggregator12AddBatchImplILb0EEENS_6StatusEPNS_8RowBatchENS_13TPrefetchMode4typeEPNS_12HashTableCtxE"],
+   "_ZN6impala18GroupingAggregator12AddBatchImplILb0EEENS_6StatusEPNS_8RowBatchENS_13TPrefetchMode4typeEPNS_12HashTableCtxEb"],
   ["NON_GROUPING_AGG_ADD_BATCH_IMPL",
    "_ZN6impala21NonGroupingAggregator12AddBatchImplEPNS_8RowBatchE"],
   ["GROUPING_AGG_ADD_BATCH_STREAMING_IMPL",
diff --git a/be/src/exec/grouping-aggregator-ir.cc b/be/src/exec/grouping-aggregator-ir.cc
index fb70e3b..18b5f5f 100644
--- a/be/src/exec/grouping-aggregator-ir.cc
+++ b/be/src/exec/grouping-aggregator-ir.cc
@@ -26,7 +26,8 @@ using namespace impala;
 
 template <bool AGGREGATED_ROWS>
 Status GroupingAggregator::AddBatchImpl(RowBatch* batch,
-    TPrefetchMode::type prefetch_mode, HashTableCtx* __restrict__ ht_ctx) {
+    TPrefetchMode::type prefetch_mode, HashTableCtx* __restrict__ ht_ctx,
+    bool has_more_rows) {
   DCHECK(!hash_partitions_.empty());
   DCHECK(!is_streaming_preagg_);
 
@@ -45,7 +46,8 @@ Status GroupingAggregator::AddBatchImpl(RowBatch* batch,
     EvalAndHashPrefetchGroup<AGGREGATED_ROWS>(batch, group_start, prefetch_mode, ht_ctx);
 
     FOREACH_ROW_LIMIT(batch, group_start, cache_size, batch_iter) {
-      RETURN_IF_ERROR(ProcessRow<AGGREGATED_ROWS>(batch_iter.Get(), ht_ctx));
+      RETURN_IF_ERROR(ProcessRow<AGGREGATED_ROWS>(batch_iter.Get(), ht_ctx,
+          has_more_rows));
       expr_vals_cache->NextRow();
     }
     DCHECK(expr_vals_cache->AtEnd());
@@ -85,7 +87,7 @@ void IR_ALWAYS_INLINE GroupingAggregator::EvalAndHashPrefetchGroup(RowBatch* bat
 
 template <bool AGGREGATED_ROWS>
 Status GroupingAggregator::ProcessRow(
-    TupleRow* __restrict__ row, HashTableCtx* __restrict__ ht_ctx) {
+    TupleRow* __restrict__ row, HashTableCtx* __restrict__ ht_ctx, bool has_more_rows) {
   HashTableCtx::ExprValuesCache* expr_vals_cache = ht_ctx->expr_values_cache();
   // Hoist lookups out of non-null branch to speed up non-null case.
   const uint32_t hash = expr_vals_cache->CurExprValuesHash();
@@ -123,12 +125,14 @@ Status GroupingAggregator::ProcessRow(
 
   // If we are seeing this result row for the first time, we need to construct the
   // result row and initialize it.
-  return AddIntermediateTuple<AGGREGATED_ROWS>(dst_partition, row, hash, it);
+  return AddIntermediateTuple<AGGREGATED_ROWS>(dst_partition, row, hash, it,
+      has_more_rows);
 }
 
 template <bool AGGREGATED_ROWS>
 Status GroupingAggregator::AddIntermediateTuple(Partition* __restrict__ partition,
-    TupleRow* __restrict__ row, uint32_t hash, HashTable::Iterator insert_it) {
+    TupleRow* __restrict__ row, uint32_t hash, HashTable::Iterator insert_it,
+    bool has_more_rows) {
   while (true) {
     DCHECK(partition->aggregated_row_stream->is_pinned());
     Tuple* intermediate_tuple = ConstructIntermediateTuple(partition->agg_fn_evals,
@@ -144,6 +148,16 @@ Status GroupingAggregator::AddIntermediateTuple(Partition* __restrict__ partitio
       return std::move(add_batch_status_);
     }
 
+    // If we don't need to reserve extra space for the serialize stream, restore them
+    // before spilling any partitions. One case is we don't need the serialize stream at
+    // all. The other case is this is the last row to add, and 'partition' will be read
+    // out and closed after adding this row, so no partitions need to be spilled.
+    if ((!needs_serialize_ || !has_more_rows)
+        && large_write_page_reservation_.GetReservation() > 0) {
+      RestoreLargeWritePageReservation();
+      continue;
+    }
+
     // We did not have enough memory to add intermediate_tuple to the stream.
     RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS));
     if (partition->is_spilled()) {
@@ -244,6 +258,6 @@ bool GroupingAggregator::TryAddToHashTable(HashTableCtx* __restrict__ ht_ctx,
 
 // Instantiate required templates.
 template Status GroupingAggregator::AddBatchImpl<false>(
-    RowBatch*, TPrefetchMode::type, HashTableCtx*);
+    RowBatch*, TPrefetchMode::type, HashTableCtx*, bool);
 template Status GroupingAggregator::AddBatchImpl<true>(
-    RowBatch*, TPrefetchMode::type, HashTableCtx*);
+    RowBatch*, TPrefetchMode::type, HashTableCtx*, bool);
diff --git a/be/src/exec/grouping-aggregator-partition.cc b/be/src/exec/grouping-aggregator-partition.cc
index 784795d..8837623 100644
--- a/be/src/exec/grouping-aggregator-partition.cc
+++ b/be/src/exec/grouping-aggregator-partition.cc
@@ -19,6 +19,7 @@
 
 #include <set>
 #include <sstream>
+#include <gutil/strings/substitute.h>
 
 #include "exec/exec-node.h"
 #include "exec/hash-table.inline.h"
@@ -117,25 +118,60 @@ Status GroupingAggregator::Partition::SerializeStreamForSpilling() {
     Status status;
     BufferedTupleStream* new_stream = parent->serialize_stream_.get();
     HashTable::Iterator it = hash_tbl->Begin(parent->ht_ctx_.get());
+    // Marks if we have used the large write page reservation. We only reclaim it after we
+    // finish writing to 'new_stream', because there are no other works interleaving that
+    // could occupy it.
+    bool used_large_page_reservation = false;
     while (!it.AtEnd()) {
       Tuple* tuple = it.GetTuple();
       it.Next();
       AggFnEvaluator::Serialize(agg_fn_evals, tuple);
-      if (UNLIKELY(!new_stream->AddRow(reinterpret_cast<TupleRow*>(&tuple), &status))) {
-        DCHECK(!status.ok()) << "Stream was unpinned - AddRow() only fails on error";
-        // Even if we can't add to new_stream, finish up processing this agg stream to
-        // make clean up easier (someone has to finalize this stream and we don't want to
-        // remember where we are).
-        parent->CleanupHashTbl(agg_fn_evals, it);
-        hash_tbl->Close();
-        hash_tbl.reset();
-        aggregated_row_stream->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
-        return status;
+      TupleRow* row = reinterpret_cast<TupleRow*>(&tuple);
+      if (UNLIKELY(!new_stream->AddRow(row, &status))) {
+        bool row_is_added = false;
+        if (status.ok()) {
+          // Don't get enough unused reservation to add the large row. Restore the saved
+          // reservation for a large write page and try again.
+          DCHECK(!used_large_page_reservation
+              && parent->large_write_page_reservation_.GetReservation() > 0)
+              << "Run out of large page reservation in spilling " << DebugString()
+              << "\nused_large_page_reservation=" << used_large_page_reservation << "\n"
+              << parent->DebugString() << "\n"
+              << parent->buffer_pool_client()->DebugString() << "\n"
+              << new_stream->DebugString() << "\n"
+              << this->aggregated_row_stream->DebugString();
+          used_large_page_reservation = true;
+          parent->RestoreLargeWritePageReservation();
+          row_is_added = new_stream->AddRow(row, &status);
+        }
+        if (UNLIKELY(!row_is_added)) {
+          if (status.ok()) {
+            // Still fail to write the large row after restoring all the extra reservation
+            // for the large page. We can't spill anything else to free some reservation
+            // since we are currently spilling a partition. This indicates a bug that some
+            // of the min reservation are used incorrectly.
+            status = Status(TErrorCode::INTERNAL_ERROR, strings::Substitute(
+                "Internal error: couldn't serialize a large row in $0 of $1, only had $2 "
+                "bytes of unused reservation:\n$3", DebugString(), parent->DebugString(),
+                parent->buffer_pool_client()->GetUnusedReservation(),
+                parent->buffer_pool_client()->DebugString()));
+          }
+          // Even if we can't add to new_stream, finish up processing this agg stream to
+          // make clean up easier (someone has to finalize this stream and we don't want
+          // to remember where we are).
+          parent->CleanupHashTbl(agg_fn_evals, it);
+          hash_tbl->Close();
+          hash_tbl.reset();
+          aggregated_row_stream->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+          return status;
+        }
       }
     }
 
     aggregated_row_stream->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
     aggregated_row_stream.swap(parent->serialize_stream_);
+    // Save back the large write page reservation if we have restored it.
+    if (used_large_page_reservation) parent->SaveLargeWritePageReservation();
     // Recreate the serialize_stream (and reserve 1 buffer) now in preparation for
     // when we need to spill again. We need to have this available before we need
     // to spill to make sure it is available. This should be acquirable since we just
@@ -224,4 +260,13 @@ void GroupingAggregator::Partition::Close(bool finalize_rows) {
   for (AggFnEvaluator* eval : agg_fn_evals) eval->Close(parent->state_);
   if (agg_fn_perm_pool.get() != nullptr) agg_fn_perm_pool->FreeAll();
 }
+
+string GroupingAggregator::Partition::DebugString() const {
+  std::stringstream ss;
+  ss << "Partition " << this << " (id=" << idx << ", level=" << level << ", is_spilled="
+     << is_spilled() << ", is_closed=" << is_closed
+     << ", aggregated_row_stream=" << aggregated_row_stream->DebugString()
+     << ",\nunaggregated_row_stream=" << unaggregated_row_stream->DebugString() << ")";
+  return ss.str();
+}
 } // namespace impala
diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc
index c9a33ac..b391893 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -216,6 +216,25 @@ Status GroupingAggregator::Open(RuntimeState* state) {
     RETURN_IF_ERROR(reservation_manager_.ClaimBufferReservation(state));
   }
 
+  // Init the SubReservations for non-streaming instances. Open() may be called many times
+  // if this is in a subplan. Only init them once.
+  if (!is_streaming_preagg_ && large_write_page_reservation_.is_closed()) {
+    DCHECK(large_read_page_reservation_.is_closed());
+    large_write_page_reservation_.Init(buffer_pool_client());
+    large_read_page_reservation_.Init(buffer_pool_client());
+    // The min reservation only guarantees reading one large page and writing one large
+    // page at the same time. Save the extra reservation so we can restore them when we
+    // actually need them, to avoid accidentally occupy them for other purposes.
+    int64_t extra_reservation = resource_profile_.max_row_buffer_size
+        - resource_profile_.spillable_buffer_size;
+    if (extra_reservation > 0) {
+      DCHECK_GT(buffer_pool_client()->GetUnusedReservation(), extra_reservation * 2)
+        << buffer_pool_client()->DebugString() << "\n" << resource_profile_;
+      SaveLargeReadPageReservation();
+      SaveLargeWritePageReservation();
+    }
+  }
+
   DCHECK(ht_ctx_.get() != nullptr);
   RETURN_IF_ERROR(ht_ctx_->Open(state));
 
@@ -259,6 +278,12 @@ Status GroupingAggregator::GetRowsFromPartition(
     if (output_partition_ != nullptr) {
       output_partition_->Close(false);
       output_partition_ = nullptr;
+      // Try to save the large write page reservation (if it's used) after closing
+      // a partition.
+      if (!large_write_page_reservation_.is_closed()
+          && large_write_page_reservation_.GetReservation() == 0) {
+        TrySaveLargeWritePageReservation();
+      }
     }
     if (aggregated_partitions_.empty() && spilled_partitions_.empty()) {
       // No more partitions, all done.
@@ -420,6 +445,8 @@ void GroupingAggregator::Close(RuntimeState* state) {
   if (serialize_stream_.get() != nullptr) {
     serialize_stream_->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
   }
+  large_write_page_reservation_.Close();
+  large_read_page_reservation_.Close();
   reservation_manager_.Close(state);
   if (reservation_tracker_ != nullptr) reservation_tracker_->Close();
   // Must be called after tuple_pool_ is freed, so that mem_tracker_ can be closed.
@@ -434,9 +461,9 @@ Status GroupingAggregator::AddBatch(RuntimeState* state, RowBatch* batch) {
   TPrefetchMode::type prefetch_mode = state->query_options().prefetch_mode;
   GroupingAggregatorConfig::AddBatchImplFn add_batch_impl_fn = add_batch_impl_fn_.load();
   if (add_batch_impl_fn != nullptr) {
-    RETURN_IF_ERROR(add_batch_impl_fn(this, batch, prefetch_mode, ht_ctx_.get()));
+    RETURN_IF_ERROR(add_batch_impl_fn(this, batch, prefetch_mode, ht_ctx_.get(), true));
   } else {
-    RETURN_IF_ERROR(AddBatchImpl<false>(batch, prefetch_mode, ht_ctx_.get()));
+    RETURN_IF_ERROR(AddBatchImpl<false>(batch, prefetch_mode, ht_ctx_.get(), true));
   }
 
   return Status::OK();
@@ -582,6 +609,43 @@ void GroupingAggregator::CopyGroupingValues(
   }
 }
 
+int GroupingAggregator::GetNumPinnedPartitions() {
+  // If we are building a spilled partition, all items of hash_partitions_ are the same.
+  // Just need to check the first one.
+  if (hash_partitions_[0] == hash_partitions_[1]) {
+    return hash_partitions_[0]->is_spilled() ? 0 : 1;
+  }
+  int res = 0;
+  for (Partition* hash_partition : hash_partitions_) {
+    if (!hash_partition->is_spilled()) ++res;
+  }
+  return res;
+}
+
+bool GroupingAggregator::AddRowToSpilledStream(BufferedTupleStream* stream,
+    TupleRow* __restrict__ row, Status* status) {
+  DCHECK(!stream->is_pinned());
+  if (LIKELY(stream->AddRow(row, status))) return true;
+  if (!status->ok()) return false;
+  // We fail to add a large row due to run out of unused reservation and fail to increase
+  // the reservation. If we don't have the serialize stream, spilling partitions don't
+  // need extra reservation so we can restore the large write page reservation and try
+  // again. The same if we have the serialize stream but all partitions are spilled.
+  if ((!needs_serialize_ || GetNumPinnedPartitions() == 0)
+      && large_write_page_reservation_.GetReservation() > 0) {
+    RestoreLargeWritePageReservation();
+    if (LIKELY(stream->AddRow(row, status))) {
+      // 'stream' is spilled so the large write page should already be spilled after it's
+      // written.
+      SaveLargeWritePageReservation();
+      return true;
+    }
+    DCHECK(!status->ok()) << "Extra reservation not used by AddRow in "
+        << DebugString() << ":\n" << buffer_pool_client()->DebugString();
+  }
+  return false;
+}
+
 template <bool AGGREGATED_ROWS>
 Status GroupingAggregator::AppendSpilledRow(
     Partition* __restrict__ partition, TupleRow* __restrict__ row) {
@@ -592,16 +656,21 @@ Status GroupingAggregator::AppendSpilledRow(
       partition->unaggregated_row_stream.get();
   DCHECK(!stream->is_pinned());
   Status status;
-  if (LIKELY(stream->AddRow(row, &status))) return Status::OK();
+  if (LIKELY(AddRowToSpilledStream(stream, row, &status))) return Status::OK();
   RETURN_IF_ERROR(status);
 
-  // Keep trying to free memory by spilling until we succeed or hit an error.
-  // Running out of partitions to spill is treated as an error by SpillPartition().
-  while (true) {
+  // Keep trying to free memory by spilling and retry AddRow() until we run out of
+  // partitions or hit an error.
+  for (int n = GetNumPinnedPartitions(); n > 0; --n) {
     RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS));
-    if (stream->AddRow(row, &status)) return Status::OK();
+    if (LIKELY(AddRowToSpilledStream(stream, row, &status))) return Status::OK();
     RETURN_IF_ERROR(status);
   }
+  // If we have spilled all partitions, we should be able to add the row use the large
+  // write page reservation. This indicates a bug that we can't work in min reservation.
+  return Status(TErrorCode::INTERNAL_ERROR, Substitute("Internal error: No reservation "
+      "for writing a large page even after all partitions are spilled in $0:\n$1",
+      DebugString(), buffer_pool_client()->DebugString()));
 }
 
 void GroupingAggregator::SetDebugOptions(const TDebugOptions& debug_options) {
@@ -620,7 +689,19 @@ void GroupingAggregator::DebugString(int indentation_level, stringstream* out) c
        << "intermediate_tuple_id=" << intermediate_tuple_id_
        << " output_tuple_id=" << output_tuple_id_ << " needs_finalize=" << needs_finalize_
        << " grouping_exprs=" << ScalarExpr::DebugString(grouping_exprs_)
-       << " agg_exprs=" << AggFn::DebugString(agg_fns_);
+       << " agg_exprs=" << AggFn::DebugString(agg_fns_)
+       << " large_read_page_reservation=";
+  if (large_read_page_reservation_.is_closed()) {
+    *out << "<closed>";
+  } else {
+    *out << large_read_page_reservation_.GetReservation();
+  }
+  *out << " large_write_page_reservation=";
+  if (large_write_page_reservation_.is_closed()) {
+    *out << "<closed>";
+  } else {
+    *out << large_write_page_reservation_.GetReservation();
+  }
   *out << ")";
 }
 
@@ -777,8 +858,10 @@ Status GroupingAggregator::BuildSpilledPartition(Partition** built_partition) {
   // significantly, we could do better here by keeping the incomplete hash table in
   // memory and only spilling unaggregated rows that didn't fit in the hash table
   // (somewhat similar to the passthrough pre-aggregation).
-  RETURN_IF_ERROR(ProcessStream<true>(src_partition->aggregated_row_stream.get()));
-  RETURN_IF_ERROR(ProcessStream<false>(src_partition->unaggregated_row_stream.get()));
+  RETURN_IF_ERROR(ProcessStream<true>(src_partition->aggregated_row_stream.get(),
+      /* has_more_streams */ src_partition->unaggregated_row_stream->num_rows() > 0));
+  RETURN_IF_ERROR(ProcessStream<false>(src_partition->unaggregated_row_stream.get(),
+      /* has_more_streams */ false));
   src_partition->Close(false);
   spilled_partitions_.pop_front();
   hash_partitions_.clear();
@@ -803,6 +886,9 @@ Status GroupingAggregator::RepartitionSpilledPartition() {
   // Leave the partition in 'spilled_partitions_' to be closed if we hit an error.
   Partition* partition = spilled_partitions_.front();
   DCHECK(partition->is_spilled());
+  DCHECK(partition->aggregated_row_stream->num_rows()
+      + partition->unaggregated_row_stream->num_rows() > 1)
+      << "Should not repartition a single-row partition: " << partition->DebugString();
 
   // Create the new hash partitions to repartition into. This will allocate a
   // write buffer for each partition's aggregated row stream.
@@ -812,22 +898,26 @@ Status GroupingAggregator::RepartitionSpilledPartition() {
   // Rows in this partition could have been spilled into two streams, depending
   // on if it is an aggregated intermediate, or an unaggregated row. Aggregated
   // rows are processed first to save a hash table lookup in AddBatchImpl().
-  RETURN_IF_ERROR(ProcessStream<true>(partition->aggregated_row_stream.get()));
-
-  // Prepare write buffers so we can append spilled rows to unaggregated partitions.
-  for (Partition* hash_partition : hash_partitions_) {
-    if (!hash_partition->is_spilled()) continue;
-    // The aggregated rows have been repartitioned. Free up at least a buffer's worth of
-    // reservation and use it to pin the unaggregated write buffer.
-    RETURN_IF_ERROR(hash_partition->aggregated_row_stream->UnpinStream(
-        BufferedTupleStream::UNPIN_ALL));
-    bool got_buffer;
-    RETURN_IF_ERROR(
-        hash_partition->unaggregated_row_stream->PrepareForWrite(&got_buffer));
-    DCHECK(got_buffer) << "Accounted in min reservation"
-                       << buffer_pool_client()->DebugString();
+  RETURN_IF_ERROR(ProcessStream<true>(partition->aggregated_row_stream.get(),
+      /* has_more_streams */ partition->unaggregated_row_stream->num_rows() > 0));
+
+  if (partition->unaggregated_row_stream->num_rows() > 0) {
+    // Prepare write buffers so we can append spilled rows to unaggregated partitions.
+    for (Partition* hash_partition : hash_partitions_) {
+      if (!hash_partition->is_spilled()) continue;
+      // The aggregated rows have been repartitioned. Free up at least a buffer's worth of
+      // reservation and use it to pin the unaggregated write buffer.
+      RETURN_IF_ERROR(hash_partition->aggregated_row_stream->UnpinStream(
+          BufferedTupleStream::UNPIN_ALL));
+      bool got_buffer;
+      RETURN_IF_ERROR(
+          hash_partition->unaggregated_row_stream->PrepareForWrite(&got_buffer));
+      DCHECK(got_buffer) << "Accounted in min reservation"
+                         << buffer_pool_client()->DebugString();
+    }
+    RETURN_IF_ERROR(ProcessStream<false>(partition->unaggregated_row_stream.get(),
+        /* has_more_streams */ false));
   }
-  RETURN_IF_ERROR(ProcessStream<false>(partition->unaggregated_row_stream.get()));
 
   COUNTER_ADD(num_row_repartitioned_, partition->aggregated_row_stream->num_rows());
   COUNTER_ADD(num_row_repartitioned_, partition->unaggregated_row_stream->num_rows());
@@ -844,12 +934,26 @@ Status GroupingAggregator::RepartitionSpilledPartition() {
 }
 
 template <bool AGGREGATED_ROWS>
-Status GroupingAggregator::ProcessStream(BufferedTupleStream* input_stream) {
+Status GroupingAggregator::ProcessStream(BufferedTupleStream* input_stream,
+    bool has_more_streams) {
   DCHECK(!is_streaming_preagg_);
   if (input_stream->num_rows() > 0) {
+    if (!input_stream->is_pinned()) {
+      // This is the only stream that we are currently reading and it's unpinned. Transfer
+      // the large read page reservation to the stream by restoring the reservation and
+      // saving it immediately to the stream.
+      if (large_read_page_reservation_.GetReservation() > 0) {
+        RestoreLargeReadPageReservation();
+        input_stream->SaveLargeReadPageReservation();
+      } else {
+        DCHECK_EQ(resource_profile_.max_row_buffer_size,
+            resource_profile_.spillable_buffer_size)
+            << "Large read page reservation not reclaimed in previous ProcessStream";
+      }
+    }
     while (true) {
       bool got_buffer = false;
-      RETURN_IF_ERROR(input_stream->PrepareForRead(true, &got_buffer));
+      RETURN_IF_ERROR(input_stream->PrepareForRead(/*attach_on_read*/ true, &got_buffer));
       if (got_buffer) break;
       // Did not have a buffer to read the input stream. Spill and try again.
       RETURN_IF_ERROR(SpillPartition(AGGREGATED_ROWS));
@@ -860,13 +964,27 @@ Status GroupingAggregator::ProcessStream(BufferedTupleStream* input_stream) {
     const RowDescriptor* desc =
         AGGREGATED_ROWS ? &intermediate_row_desc_ : &input_row_desc_;
     RowBatch batch(desc, state_->batch_size(), mem_tracker_.get());
+    int64_t rows_read = 0;
     do {
       RETURN_IF_ERROR(input_stream->GetNext(&batch, &eos));
-      RETURN_IF_ERROR(
-          AddBatchImpl<AGGREGATED_ROWS>(&batch, prefetch_mode, ht_ctx_.get()));
+      rows_read += batch.num_rows();
+      if (rows_read == input_stream->num_rows()) DCHECK(eos);
+      bool has_more_rows = AGGREGATED_ROWS ? (has_more_streams || !eos) : !eos;
+      RETURN_IF_ERROR(AddBatchImpl<AGGREGATED_ROWS>(&batch, prefetch_mode, ht_ctx_.get(),
+          has_more_rows));
       RETURN_IF_ERROR(QueryMaintenance(state_));
       batch.Reset();
+      // We are reading in attach_on_read mode, the large read page reservation could be
+      // used by an attached buffer of the large page. It's only freed after resetting the
+      // batch. Save back the large read page reservation if we have used it.
+      input_stream->SaveLargeReadPageReservation();
     } while (!eos);
+    if (!input_stream->is_pinned() && input_stream->HasLargeReadPageReservation()) {
+      // Save back the large read page reservation by restoring it from the stream and
+      // save it immediately.
+      input_stream->RestoreLargeReadPageReservation();
+      SaveLargeReadPageReservation();
+    }
   }
   input_stream->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
   return Status::OK();
@@ -901,7 +1019,12 @@ Status GroupingAggregator::SpillPartition(bool more_aggregate_rows) {
   for (int i = 0; i < PARTITION_FANOUT; ++i) {
     if (hash_partitions_[i] == hash_partitions_[partition_idx]) hash_tbls_[i] = nullptr;
   }
-  return hash_partitions_[partition_idx]->Spill(more_aggregate_rows);
+  Status status = hash_partitions_[partition_idx]->Spill(more_aggregate_rows);
+  // Try to save back the large write page reservation if it's used by a pinned partition.
+  if (status.ok() && large_write_page_reservation_.GetReservation() == 0) {
+    TrySaveLargeWritePageReservation();
+  }
+  return status;
 }
 
 Status GroupingAggregator::MoveHashPartitions(int64_t num_input_rows) {
@@ -994,6 +1117,55 @@ int64_t GroupingAggregator::MinReservation() const {
       + resource_profile_.max_row_buffer_size * 2;
 }
 
+void GroupingAggregator::SaveLargeWritePageReservation() {
+  bool success = TrySaveLargeWritePageReservation();
+  DCHECK(success);
+}
+
+void GroupingAggregator::SaveLargeReadPageReservation() {
+  bool success = TrySaveLargeReadPageReservation();
+  DCHECK(success);
+}
+
+bool GroupingAggregator::TrySaveLargeWritePageReservation() {
+  DCHECK_EQ(large_write_page_reservation_.GetReservation(), 0);
+  int64_t extra_reservation = resource_profile_.max_row_buffer_size
+      - resource_profile_.spillable_buffer_size;
+  if (extra_reservation > 0
+      && buffer_pool_client()->GetUnusedReservation() >= extra_reservation) {
+    buffer_pool_client()->SaveReservation(&large_write_page_reservation_,
+        extra_reservation);
+    return true;
+  }
+  return false;
+}
+
+bool GroupingAggregator::TrySaveLargeReadPageReservation() {
+  DCHECK_EQ(large_read_page_reservation_.GetReservation(), 0);
+  int64_t extra_reservation = resource_profile_.max_row_buffer_size
+      - resource_profile_.spillable_buffer_size;
+  if (buffer_pool_client()->GetUnusedReservation() >= extra_reservation) {
+    buffer_pool_client()->SaveReservation(&large_read_page_reservation_,
+        extra_reservation);
+    return true;
+  }
+  return false;
+}
+
+void GroupingAggregator::RestoreLargeWritePageReservation() {
+  DCHECK_GT(large_write_page_reservation_.GetReservation(), 0);
+  DCHECK_EQ(large_write_page_reservation_.GetReservation(),
+      resource_profile_.max_row_buffer_size - resource_profile_.spillable_buffer_size);
+  buffer_pool_client()->RestoreAllReservation(&large_write_page_reservation_);
+}
+
+void GroupingAggregator::RestoreLargeReadPageReservation() {
+  int64_t extra_read_reservation = large_read_page_reservation_.GetReservation();
+  DCHECK_EQ(extra_read_reservation, resource_profile_.max_row_buffer_size
+          - resource_profile_.spillable_buffer_size);
+  buffer_pool_client()->RestoreAllReservation(&large_read_page_reservation_);
+}
+
 BufferPool::ClientHandle* GroupingAggregator::buffer_pool_client() {
   return reservation_manager_.buffer_pool_client();
 }
diff --git a/be/src/exec/grouping-aggregator.h b/be/src/exec/grouping-aggregator.h
index 8f68dc2..f9f7d0d 100644
--- a/be/src/exec/grouping-aggregator.h
+++ b/be/src/exec/grouping-aggregator.h
@@ -162,7 +162,7 @@ class GroupingAggregatorConfig : public AggregatorConfig {
   const HashTableConfig* hash_table_config_;
 
   typedef Status (*AddBatchImplFn)(
-      GroupingAggregator*, RowBatch*, TPrefetchMode::type, HashTableCtx*);
+      GroupingAggregator*, RowBatch*, TPrefetchMode::type, HashTableCtx*, bool);
   /// Jitted AddBatchImpl function pointer. Null if codegen is disabled.
   CodegenFnPtr<AddBatchImplFn> add_batch_impl_fn_;
 
@@ -279,6 +279,15 @@ class GroupingAggregator : public Aggregator {
 
   /// Allocator for hash table memory.
   std::unique_ptr<Suballocator> ht_allocator_;
+  /// Saved reservation for writing a large page to a spilled stream or writing the last
+  /// large row to a pinned partition when building/repartitioning a spilled partition.
+  /// ('max_page_len' - 'default_page_len') reservation is saved when claiming the initial
+  /// min reservation.
+  BufferPool::SubReservation large_write_page_reservation_;
+  /// Saved reservation for reading a large page from a spilled stream.
+  /// ('max_page_len' - 'default_page_len') reservation is saved when claiming the initial
+  /// min reservation.
+  BufferPool::SubReservation large_read_page_reservation_;
 
   /// MemPool used to allocate memory during Close() when creating new output tuples. The
   /// pool should not be Reset() to allow amortizing memory allocation over a series of
@@ -441,6 +450,8 @@ class GroupingAggregator : public Aggregator {
 
     bool is_spilled() const { return hash_tbl.get() == nullptr; }
 
+    std::string DebugString() const;
+
     GroupingAggregator* parent;
 
     /// If true, this partition is closed and there is nothing left to do.
@@ -527,12 +538,15 @@ class GroupingAggregator : public Aggregator {
   /// 'prefetch_mode' specifies the prefetching mode in use. If it's not PREFETCH_NONE,
   ///     hash table buckets will be prefetched based on the hash values computed. Note
   ///     that 'prefetch_mode' will be substituted with constants during codegen time.
-  //
+  /// 'has_more_rows' is used in building/repartitioning a spilled partition, to indicate
+  ///     whether there are more rows after the given 'batch' in the input. We may restore
+  ///     the 'large_write_page_reservation_' when adding the last row.
+  ///
   /// This function is replaced by codegen. We pass in ht_ctx_.get() as an argument for
   /// performance.
   template <bool AGGREGATED_ROWS>
   Status IR_ALWAYS_INLINE AddBatchImpl(RowBatch* batch, TPrefetchMode::type prefetch_mode,
-      HashTableCtx* ht_ctx) WARN_UNUSED_RESULT;
+      HashTableCtx* ht_ctx, bool has_more_rows) WARN_UNUSED_RESULT;
 
   /// Evaluates the rows in 'batch' starting at 'start_row_idx' and stores the results in
   /// the expression values cache in 'ht_ctx'. The number of rows evaluated depends on
@@ -548,7 +562,7 @@ class GroupingAggregator : public Aggregator {
   /// May spill partitions if not enough memory is available.
   template <bool AGGREGATED_ROWS>
   Status IR_ALWAYS_INLINE ProcessRow(
-      TupleRow* row, HashTableCtx* ht_ctx) WARN_UNUSED_RESULT;
+      TupleRow* row, HashTableCtx* ht_ctx, bool has_more_rows) WARN_UNUSED_RESULT;
 
   /// Create a new intermediate tuple in partition, initialized with row. ht_ctx is
   /// the context for the partition's hash table and hash is the precomputed hash of
@@ -559,7 +573,8 @@ class GroupingAggregator : public Aggregator {
   /// for insertion returned from HashTable::FindBuildRowBucket().
   template <bool AGGREGATED_ROWS>
   Status IR_ALWAYS_INLINE AddIntermediateTuple(Partition* partition, TupleRow* row,
-      uint32_t hash, HashTable::Iterator insert_it) WARN_UNUSED_RESULT;
+      uint32_t hash, HashTable::Iterator insert_it, bool has_more_rows)
+      WARN_UNUSED_RESULT;
 
   /// Append a row to a spilled partition. The row may be aggregated or unaggregated
   /// according to AGGREGATED_ROWS. May spill partitions if needed to append the row
@@ -570,7 +585,8 @@ class GroupingAggregator : public Aggregator {
 
   /// Reads all the rows from input_stream and process them by calling AddBatchImpl().
   template <bool AGGREGATED_ROWS>
-  Status ProcessStream(BufferedTupleStream* input_stream) WARN_UNUSED_RESULT;
+  Status ProcessStream(BufferedTupleStream* input_stream, bool has_more_streams)
+      WARN_UNUSED_RESULT;
 
   /// Get rows for the next rowbatch from the next partition. Sets 'partition_eos_' to
   /// true if all rows from all partitions have been returned or the limit is reached.
@@ -682,6 +698,39 @@ class GroupingAggregator : public Aggregator {
   /// as the partitions aggregate stream needs to be serialized and rewritten.
   /// We do not spill streaming preaggregations, so we do not need to reserve any buffers.
   int64_t MinReservation() const;
+
+  /// Try to save the extra reservation ('max_row_buffer_size' - 'spillable_buffer_size')
+  /// for a large write page to 'large_write_page_reservation_'. Do nothing if there are
+  /// not enough unused reservation. Return true if succeeds.
+  bool TrySaveLargeWritePageReservation();
+
+  /// Similar to above but for the large read page.
+  bool TrySaveLargeReadPageReservation();
+
+  /// Same as TrySaveLargeWritePageReservation() but make sure it succeeds.
+  void SaveLargeWritePageReservation();
+
+  /// Same as TrySaveLargeReadPageReservation() but make sure it succeeds.
+  void SaveLargeReadPageReservation();
+
+  /// Restore the extra reservation we saved in 'large_write_page_reservation_' for a
+  /// large write page. 'large_write_page_reservation_' must not be used.
+  void RestoreLargeWritePageReservation();
+
+  /// Similar to above but for the large read page.
+  void RestoreLargeReadPageReservation();
+
+  /// A wrapper of 'stream->AddRow()' to add 'row' to a spilled 'stream'. When it fails to
+  /// add a large row due to run out of unused reservation and fails to increase the
+  /// reservation, retry it after restoring the large write page reservation when we don't
+  /// need to save this reservation for spilling partitions. If succeeds, returns true and
+  /// save back the large write page reservation. Otherwise, returns false with a non-ok
+  /// status.
+  bool AddRowToSpilledStream(BufferedTupleStream* stream, TupleRow* __restrict__ row,
+      Status* status);
+
+  /// Gets current number of pinned partitions.
+  int GetNumPinnedPartitions();
 };
 } // namespace impala
 
diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc
index 1ca8a92..5d12fdf 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -58,6 +58,7 @@ BufferedTupleStream::BufferedTupleStream(RuntimeState* state,
     buffer_pool_(ExecEnv::GetInstance()->buffer_pool()),
     buffer_pool_client_(buffer_pool_client),
     read_page_reservation_(buffer_pool_client_),
+    large_read_page_reservation_(buffer_pool_client_),
     write_page_reservation_(buffer_pool_client_),
     default_page_len_(default_page_len),
     max_page_len_(max_page_len),
@@ -170,6 +171,12 @@ string BufferedTupleStream::DebugString() const {
   } else {
     ss << read_page_reservation_.GetReservation();
   }
+  ss << " large_read_page_reservation=";
+  if (large_read_page_reservation_.is_closed()) {
+    ss << "<closed>";
+  } else {
+    ss << large_read_page_reservation_.GetReservation();
+  }
   ss << " write_page_reservation=";
   if (write_page_reservation_.is_closed()) {
     ss << "<closed>";
@@ -258,6 +265,7 @@ void BufferedTupleStream::Close(RowBatch* batch, FlushMode flush) {
     }
   }
   read_page_reservation_.Close();
+  large_read_page_reservation_.Close();
   write_page_reservation_.Close();
   pages_.clear();
   num_pages_ = 0;
@@ -474,6 +482,28 @@ void BufferedTupleStream::InvalidateWriteIterator() {
   }
 }
 
+void BufferedTupleStream::SaveLargeReadPageReservation() {
+  DCHECK(!pinned_);
+  if (large_read_page_reservation_.GetReservation() < max_page_len_ - default_page_len_) {
+    // Reclaim the reservation for reading the next large page.
+    // large_read_page_reservation_ may not be 0 since we might only used some portion of
+    // it in reading the previous large page which is smaller than max_page_len_.
+    int64_t reservation_to_reclaim = max_page_len_ - default_page_len_
+        - large_read_page_reservation_.GetReservation();
+    buffer_pool_client_->SaveReservation(&large_read_page_reservation_,
+        reservation_to_reclaim);
+  }
+}
+
+void BufferedTupleStream::RestoreLargeReadPageReservation() {
+  DCHECK(!pinned_);
+  buffer_pool_client_->RestoreAllReservation(&large_read_page_reservation_);
+}
+
+bool BufferedTupleStream::HasLargeReadPageReservation() {
+  return large_read_page_reservation_.GetReservation() > 0;
+}
+
 Status BufferedTupleStream::NextReadPage(ReadIterator* read_iter) {
   DCHECK(read_iter->is_valid());
   DCHECK(!closed_);
@@ -511,16 +541,24 @@ Status BufferedTupleStream::NextReadPage(ReadIterator* read_iter) {
   }
 
   int64_t read_page_len = read_iter->read_page_->len();
-  if (!pinned_ && read_page_len > default_page_len_
-      && buffer_pool_client_->GetUnusedReservation() < read_page_len) {
+  if (!pinned_ && read_page_len > default_page_len_) {
     // If we are iterating over an unpinned stream and encounter a page that is larger
     // than the default page length, then unpinning the previous page may not have
-    // freed up enough reservation to pin the next one. The client is responsible for
-    // ensuring the reservation is available, so this indicates a bug.
-    return Status(TErrorCode::INTERNAL_ERROR, Substitute("Internal error: couldn't pin "
+    // freed up enough reservation to pin the next one. Try to restore some extra saved
+    // reservation for reading a large page.
+    int64_t needed_reservation = read_page_len - default_page_len_;
+    if (large_read_page_reservation_.GetReservation() >= needed_reservation) {
+      buffer_pool_client_->RestoreReservation(&large_read_page_reservation_,
+          needed_reservation);
+    }
+    if (buffer_pool_client_->GetUnusedReservation() < read_page_len) {
+      // Still failed to get enough unused reservation. The client is responsible for
+      // ensuring the reservation is available, so this indicates a bug.
+      return Status(TErrorCode::INTERNAL_ERROR, Substitute("Internal error: couldn't pin "
           "large page of $0 bytes, client only had $1 bytes of unused reservation:\n$2",
           read_page_len, buffer_pool_client_->GetUnusedReservation(),
           buffer_pool_client_->DebugString()));
+    }
   }
   // Ensure the next page is pinned for reading. By this point we should have enough
   // reservation to pin the page. If the stream is pinned, the page is already pinned.
@@ -600,6 +638,14 @@ Status BufferedTupleStream::PrepareForReadInternal(
   } else {
     // Eagerly pin the first page in the stream.
     read_iter->SetReadPage(pages_.begin());
+    if (read_iter == &read_it_ && !pinned_
+        && read_iter->read_page_->len() > default_page_len_) {
+      int64_t extra_needed_reservation = read_iter->read_page_->len() - default_page_len_;
+      if (large_read_page_reservation_.GetReservation() >= extra_needed_reservation) {
+        buffer_pool_client_->RestoreReservation(&large_read_page_reservation_,
+            extra_needed_reservation);
+      }
+    }
     // Check if we need to increment the pin count of the read page.
     RETURN_IF_ERROR(PinPageIfNeeded(&*read_iter->read_page_, pinned_));
     DCHECK(read_iter->read_page_->is_pinned());
diff --git a/be/src/runtime/buffered-tuple-stream.h b/be/src/runtime/buffered-tuple-stream.h
index 805cd86..98a588b 100644
--- a/be/src/runtime/buffered-tuple-stream.h
+++ b/be/src/runtime/buffered-tuple-stream.h
@@ -368,6 +368,18 @@ class BufferedTupleStream {
   Status GetNext(
       RowBatch* batch, bool* eos, std::vector<FlatRowPtr>* flat_rows);
 
+  /// Saves extra reservation of the underlying buffer pool client for a large read page
+  /// in this stream. The caller should make sure there are enough unused reservation in
+  /// the buffer pool. The reservation of 'large_read_page_reservation_' will be
+  /// 'max_page_len_' - 'default_page_len_' after calling this.
+  void SaveLargeReadPageReservation();
+
+  /// Restores the large read page reservation back to the underlying buffer pool client.
+  void RestoreLargeReadPageReservation();
+
+  /// Returns true if we have saved some space for a large read page.
+  bool HasLargeReadPageReservation();
+
   /// Must be called once at the end to cleanup all resources. If 'batch' is non-NULL,
   /// attaches buffers from pinned pages that rows returned from GetNext() may reference.
   /// Otherwise deletes all pages. Does nothing if the stream was already closed. The
@@ -591,6 +603,10 @@ class BufferedTupleStream {
   /// is saved if there is a read iterator, no pinned read page, and the possibility
   /// that the read iterator will advance to a valid page.
   BufferPool::SubReservation read_page_reservation_;
+  /// Extra saved reservation for reading a large page.
+  /// 'max_page_len_' - 'default_page_len_' is saved if this is the only input stream of
+  /// the operator.
+  BufferPool::SubReservation large_read_page_reservation_;
 
   /// Pointer into write_page_ to the byte after the last row written.
   uint8_t* write_ptr_ = nullptr;
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc
index 687f937..4c93a18 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -368,6 +368,10 @@ void BufferPool::ClientHandle::RestoreReservation(SubReservation* src, int64_t b
   DCHECK(success); // Transferring reservation to parent shouldn't fail.
 }
 
+void BufferPool::ClientHandle::RestoreAllReservation(SubReservation* src) {
+  RestoreReservation(src, src->GetReservation());
+}
+
 void BufferPool::ClientHandle::SetDebugDenyIncreaseReservation(double probability) {
   impl_->reservation()->SetDebugDenyIncreaseReservation(probability);
 }
diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h
index 04fcdbe..32decec 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -368,6 +368,9 @@ class BufferPool::ClientHandle {
   /// coordinate to ensure there is sufficient unused reservation.
   void RestoreReservation(SubReservation* src, int64_t bytes);
 
+  /// Same as above but move all of src's unused reservation to this client.
+  void RestoreAllReservation(SubReservation* src);
+
   /// Accessors for this client's reservation corresponding to the identically-named
   /// methods in ReservationTracker.
   int64_t GetReservation() const;
diff --git a/be/src/runtime/bufferpool/reservation-tracker.cc b/be/src/runtime/bufferpool/reservation-tracker.cc
index 3a7107d..e961e8d 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker.cc
@@ -433,7 +433,9 @@ void ReservationTracker::CheckConsistency() const {
   DCHECK_LE(reservation_.Load(), reservation_limit_.Load());
   DCHECK_GE(child_reservations_.Load(), 0);
   DCHECK_GE(used_reservation_.Load(), 0);
-  DCHECK_LE(used_reservation_.Load() + child_reservations_.Load(), reservation_.Load());
+  DCHECK_LE(used_reservation_.Load() + child_reservations_.Load(), reservation_.Load())
+      << used_reservation_.Load() << " + " << child_reservations_.Load() << " > "
+      << reservation_.Load();
 
   DCHECK_EQ(reservation_.Load(), counters_.peak_reservation->current_value());
   DCHECK_LE(reservation_.Load(), counters_.peak_reservation->value());
diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling-large-rows.test b/testdata/workloads/functional-query/queries/QueryTest/spilling-large-rows.test
index db401f9..24a243a 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling-large-rows.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling-large-rows.test
@@ -191,3 +191,65 @@ int,int,string,string,bigint
 23,3,'3333333333','3',10
 24,4,'4444444444','4',13
 ====
+---- QUERY
+# IMPALA-9955,IMPALA-9957: Test aggregation on a table with both large rows and normal
+# rows. TODO: use UNION ALL to generate the table after IMPALA-10040 is fixed.
+set mem_limit="1gb";
+create table bigstrs2 stored as parquet as
+select *, repeat(string_col, 100000) as bigstr
+from functional.alltypes
+order by id
+limit 10;
+insert into bigstrs2
+select *, repeat(string_col, 10000000) as bigstr
+from functional.alltypes
+order by id
+limit 10;
+====
+---- QUERY
+set mem_limit="1gb";
+set max_row_size=10m;
+select group_concat(string_col), length(bigstr) from bigstrs2
+group by bigstr;
+---- TYPES
+string,int
+---- RESULTS
+'0',100000
+'1',100000
+'2',100000
+'3',100000
+'4',100000
+'5',100000
+'6',100000
+'7',100000
+'8',100000
+'9',100000
+'0',10000000
+'1',10000000
+'2',10000000
+'3',10000000
+'4',10000000
+'5',10000000
+'6',10000000
+'7',10000000
+'8',10000000
+'9',10000000
+====
+---- QUERY
+# IMPALA-9955,IMPALA-9957: Test aggregation on a table with random length of large rows.
+# Can't check the results since it's a random table. The following queries will fail or
+# crash the impalads when there are bugs in reservation.
+set mem_limit="2gb";
+create table bigstrs3 stored as parquet as
+select *, repeat(uuid(), cast(random() * 200000 as int)) as bigstr
+from functional.alltypes
+limit 100;
+# Length of uuid() is 36. So the max row size is 7,200,000.
+set MAX_ROW_SIZE=8m;
+create table my_str_group stored as parquet as
+  select group_concat(string_col) as ss, bigstr
+  from bigstrs3 group by bigstr;
+create table my_cnt stored as parquet as
+  select count(*) as cnt, bigstr
+  from bigstrs3 group by bigstr;
+====
diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling-no-debug-action.test b/testdata/workloads/functional-query/queries/QueryTest/spilling-no-debug-action.test
index 3ebcea7..a5b6d6f 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling-no-debug-action.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling-no-debug-action.test
@@ -84,7 +84,7 @@ row_regex: .*NumHashTableBuildsSkipped: .* \([1-9][0-9]*\)
 # Aggregation query that will OOM and fail to spill because of IMPALA-3304 without
 # any help from DEBUG_ACTION.
 set mem_limit=75m;
-select l_orderkey, group_concat(l_comment) comments
+select l_orderkey, group_concat(repeat(l_comment, 10)) comments
 from lineitem
 group by l_orderkey
 order by comments desc