You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2019/08/24 00:44:38 UTC

[impala] 02/02: IMPALA-8818: Replace deque with spillable queue in BufferedPRS

This is an automated email from the ASF dual-hosted git repository.

tarmstrong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit d037ac8304b43f6e4bb4c6ba2eb1910a9e921c24
Author: Sahil Takiar <ta...@gmail.com>
AuthorDate: Tue Jul 30 08:56:00 2019 -0700

    IMPALA-8818: Replace deque with spillable queue in BufferedPRS
    
    Replaces DequeRowBatchQueue with SpillableRowBatchQueue in
    BufferedPlanRootSink. A few changes to BufferedPlanRootSink were
    necessary for it to work with the spillable queue, however, all the
    synchronization logic is the same.
    
    SpillableRowBatchQueue is a wrapper around a BufferedTupleStream and
    a ReservationManager. It takes in a TBackendResourceProfile that
    specifies the max / min memory reservation the BufferedTupleStream can
    use to buffer rows. The 'max_unpinned_bytes' parameter limits the max
    number of bytes that can be unpinned in the BufferedTupleStream. The
    limit is a 'soft' limit because calls to AddBatch may push the amount of
    unpinned memory over the limit. The queue is non-blocking and not thread
    safe. It provides AddBatch and GetBatch methods. Calls to AddBatch spill
    if the BufferedTupleStream does not have enough reservation to fit the
    entire RowBatch.
    
    Adds two new query options: 'MAX_PINNED_RESULT_SPOOLING_MEMORY' and
    'MAX_UNPINNED_RESULT_SPOOLING_MEMORY', which bound the amount of pinned
    and unpinned memory that a query can use for spooling, respectively.
    MAX_PINNED_RESULT_SPOOLING_MEMORY must be <=
    MAX_UNPINNED_RESULT_SPOOLING_MEMORY in order to allow all the pinned
    data in the BufferedTupleStream to be unpinned. This is enforced in a
    new method in QueryOptions called 'ValidateQueryOptions'.
    
    Planner Changes:
    
    PlanRootSink.java now computes a full ResourceProfile if result spooling
    is enabled. The min mem reservation is bounded by the size of the read and
    write pages used by the BufferedTupleStream. The max mem reservation is
    bounded by 'MAX_PINNED_RESULT_SPOOLING_MEMORY'. The mem estimate is
    computed by estimating the size of the result set using stats.
    
    BufferedTupleStream Re-Factoring:
    
    For the most part, using a BufferedTupleStream outside an ExecNode works
    properly. However, some changes were necessary:
    * The message for the MAX_ROW_SIZE error is ExecNode specific. In order to
    fix this, this patch introduces the concept of an ExecNode 'label' which
    is a more generic version of an ExecNode 'id'.
    * The definition of TBackendResourceProfile lived in PlanNodes.thrift,
    it was moved to its own file so it can be used by DataSinks.thrift.
    * Modified BufferedTupleStream so it internally tracks how many bytes
    are unpinned (necessary for 'MAX_UNPINNED_RESULT_SPOOLING_MEMORY').
    
    Metrics:
    * Added a few of the metrics mentioned in IMPALA-8825 to
    BufferedPlanRootSink. Specifically, added timers to track how much time
    is spent waiting in the BufferedPlanRootSink 'Send' and 'GetNext'
    methods.
    * The BufferedTupleStream in the SpillableRowBatchQueue exposes several
    BufferPool metrics such as number of reserved and unpinned bytes.
    
    Bug Fixes:
    * Fixed a bug in BufferedPlanRootSink where the MemPool used by the
    expression evaluators was not being cleared incrementally.
    * Fixed a bug where the inactive timer was not being properly updated in
    BufferedPlanRootSink.
    * Fixed a bug where RowBatch memory was not freed if
    BufferedPlanRootSink::GetNext terminated early because it could not
    handle requests where num_results < BATCH_SIZE.
    
    Testing:
    * Added new tests to test_result_spooling.py.
    * Updated errors thrown in spilling-large-rows.test.
    * Ran exhaustive tests.
    
    Change-Id: I10f9e72374cdf9501c0e5e2c5b39c13688ae65a9
    Reviewed-on: http://gerrit.cloudera.org:8080/14039
    Reviewed-by: Sahil Takiar <st...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/generated-sources/gen-cpp/CMakeLists.txt        |   2 +
 be/src/exec/analytic-eval-node.cc                  |   2 +-
 be/src/exec/blocking-plan-root-sink.cc             |   5 +
 be/src/exec/blocking-plan-root-sink.h              |   7 +
 be/src/exec/buffered-plan-root-sink.cc             |  62 +++++----
 be/src/exec/buffered-plan-root-sink.h              |  48 ++++++-
 be/src/exec/data-sink.cc                           |   5 +-
 be/src/exec/exec-node.cc                           |   9 ++
 be/src/exec/exec-node.h                            |   4 +
 be/src/exec/grouping-aggregator-partition.cc       |   6 +-
 be/src/exec/grouping-aggregator.cc                 |   2 +-
 be/src/exec/partial-sort-node.cc                   |   2 +-
 be/src/exec/partitioned-hash-join-builder.cc       |  14 +-
 be/src/exec/partitioned-hash-join-builder.h        |  17 ++-
 be/src/exec/partitioned-hash-join-node.cc          |   6 +-
 be/src/exec/plan-root-sink.h                       |   5 +
 be/src/exec/sort-node.cc                           |   2 +-
 be/src/runtime/CMakeLists.txt                      |   2 +-
 be/src/runtime/buffered-tuple-stream-test.cc       |  25 ++--
 be/src/runtime/buffered-tuple-stream.cc            |  11 +-
 be/src/runtime/buffered-tuple-stream.h             |  23 +++-
 be/src/runtime/deque-row-batch-queue.cc            |  66 ----------
 be/src/runtime/deque-row-batch-queue.h             |  70 ----------
 be/src/runtime/sorter.cc                           |  10 +-
 be/src/runtime/sorter.h                            |   8 +-
 be/src/runtime/spillable-row-batch-queue.cc        | 143 +++++++++++++++++++++
 be/src/runtime/spillable-row-batch-queue.h         | 138 ++++++++++++++++++++
 be/src/service/impala-beeswax-server.cc            |   1 +
 be/src/service/impala-server.cc                    |   7 +
 be/src/service/query-options-test.cc               |  43 +++++++
 be/src/service/query-options.cc                    |  34 +++++
 be/src/service/query-options.h                     |  14 +-
 common/thrift/CMakeLists.txt                       |   1 +
 common/thrift/DataSinks.thrift                     |  10 +-
 common/thrift/ImpalaInternalService.thrift         |   6 +
 common/thrift/ImpalaService.thrift                 |  18 ++-
 common/thrift/PlanNodes.thrift                     |  24 +---
 common/thrift/ResourceProfile.thrift               |  40 ++++++
 common/thrift/generate_error_codes.py              |   4 +-
 .../org/apache/impala/planner/PlanRootSink.java    |  57 +++++++-
 .../org/apache/impala/planner/ResourceProfile.java |   7 +-
 .../impala/planner/ResourceProfileBuilder.java     |   7 +-
 .../org/apache/impala/planner/PlannerTest.java     |  16 +++
 .../queries/PlannerTest/result-spooling.test       | 135 +++++++++++++++++++
 .../queries/QueryTest/spilling-large-rows.test     |   8 +-
 tests/query_test/test_result_spooling.py           | 137 +++++++++++++++++++-
 46 files changed, 995 insertions(+), 268 deletions(-)

diff --git a/be/generated-sources/gen-cpp/CMakeLists.txt b/be/generated-sources/gen-cpp/CMakeLists.txt
index 5052a74..90b4930 100644
--- a/be/generated-sources/gen-cpp/CMakeLists.txt
+++ b/be/generated-sources/gen-cpp/CMakeLists.txt
@@ -85,6 +85,8 @@ set(SRC_FILES
   Planner_types.cpp
   parquet_constants.cpp
   parquet_types.cpp
+  ResourceProfile_constants.cpp
+  ResourceProfile_types.cpp
   RuntimeProfile_constants.cpp
   RuntimeProfile_types.cpp
   StatestoreService.cpp
diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc
index 0ede1a6..73b5346 100644
--- a/be/src/exec/analytic-eval-node.cc
+++ b/be/src/exec/analytic-eval-node.cc
@@ -190,7 +190,7 @@ Status AnalyticEvalNode::Open(RuntimeState* state) {
   input_stream_.reset(new BufferedTupleStream(state, child(0)->row_desc(),
       buffer_pool_client(), resource_profile_.spillable_buffer_size,
       resource_profile_.spillable_buffer_size));
-  RETURN_IF_ERROR(input_stream_->Init(id(), true));
+  RETURN_IF_ERROR(input_stream_->Init(label(), true));
   bool success;
   RETURN_IF_ERROR(input_stream_->PrepareForReadWrite(true, &success));
   DCHECK(success) << "Had reservation: " << buffer_pool_client()->DebugString();
diff --git a/be/src/exec/blocking-plan-root-sink.cc b/be/src/exec/blocking-plan-root-sink.cc
index 05ebd30..4396db0 100644
--- a/be/src/exec/blocking-plan-root-sink.cc
+++ b/be/src/exec/blocking-plan-root-sink.cc
@@ -36,6 +36,11 @@ BlockingPlanRootSink::BlockingPlanRootSink(
     TDataSinkId sink_id, const RowDescriptor* row_desc, RuntimeState* state)
   : PlanRootSink(sink_id, row_desc, state) {}
 
+Status BlockingPlanRootSink::Prepare(
+    RuntimeState* state, MemTracker* parent_mem_tracker) {
+  return DataSink::Prepare(state, parent_mem_tracker);
+}
+
 Status BlockingPlanRootSink::Send(RuntimeState* state, RowBatch* batch) {
   SCOPED_TIMER(profile()->total_time_counter());
   PlanRootSink::ValidateCollectionSlots(*row_desc_, batch);
diff --git a/be/src/exec/blocking-plan-root-sink.h b/be/src/exec/blocking-plan-root-sink.h
index cb95da8..b6d6c23 100644
--- a/be/src/exec/blocking-plan-root-sink.h
+++ b/be/src/exec/blocking-plan-root-sink.h
@@ -50,6 +50,13 @@ class BlockingPlanRootSink : public PlanRootSink {
   BlockingPlanRootSink(
       TDataSinkId sink_id, const RowDescriptor* row_desc, RuntimeState* state);
 
+  /// TODO: Currently, this does nothing, it just calls DataSink::Prepare. However, adding
+  /// it is necessary because BufferedPlanRootSink needs to use PlanRootSink::Prepare.
+  /// Once IMPALA-8825 (add counters to track how long the producer and consumer threads
+  /// block, and the rate at which rows are read / sent) is done, this should do the work
+  /// to initialize the necessary counters.
+  virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) override;
+
   /// Blocks until the consumer has consumed 'batch' by calling GetNext().
   virtual Status Send(RuntimeState* state, RowBatch* batch) override;
 
diff --git a/be/src/exec/buffered-plan-root-sink.cc b/be/src/exec/buffered-plan-root-sink.cc
index 4ba2f07..512cb8e 100644
--- a/be/src/exec/buffered-plan-root-sink.cc
+++ b/be/src/exec/buffered-plan-root-sink.cc
@@ -16,22 +16,35 @@
 // under the License.
 
 #include "exec/buffered-plan-root-sink.h"
-#include "runtime/deque-row-batch-queue.h"
 #include "service/query-result-set.h"
 
 #include "common/names.h"
 
 namespace impala {
 
-// The maximum number of row batches to queue before calls to Send() start to block.
-// After this many row batches have been added, Send() will block until GetNext() reads
-// RowBatches from the queue.
-const uint32_t MAX_QUEUED_ROW_BATCHES = 10;
-
-BufferedPlanRootSink::BufferedPlanRootSink(
-    TDataSinkId sink_id, const RowDescriptor* row_desc, RuntimeState* state)
+BufferedPlanRootSink::BufferedPlanRootSink(TDataSinkId sink_id,
+    const RowDescriptor* row_desc, RuntimeState* state,
+    const TBackendResourceProfile& resource_profile, const TDebugOptions& debug_options)
   : PlanRootSink(sink_id, row_desc, state),
-    batch_queue_(new DequeRowBatchQueue(MAX_QUEUED_ROW_BATCHES)) {}
+    resource_profile_(resource_profile),
+    debug_options_(debug_options) {}
+
+Status BufferedPlanRootSink::Prepare(
+    RuntimeState* state, MemTracker* parent_mem_tracker) {
+  RETURN_IF_ERROR(DataSink::Prepare(state, parent_mem_tracker));
+  row_batches_send_wait_timer_ = ADD_TIMER(profile(), "RowBatchSendWaitTime");
+  row_batches_get_wait_timer_ = ADD_TIMER(profile(), "RowBatchGetWaitTime");
+  return Status::OK();
+}
+
+Status BufferedPlanRootSink::Open(RuntimeState* state) {
+  RETURN_IF_ERROR(DataSink::Open(state));
+  batch_queue_.reset(new SpillableRowBatchQueue(name_,
+      state->query_options().max_spilled_result_spooling_mem, state, mem_tracker(),
+      profile(), row_desc_, resource_profile_, debug_options_));
+  RETURN_IF_ERROR(batch_queue_->Open());
+  return Status::OK();
+}
 
 Status BufferedPlanRootSink::Send(RuntimeState* state, RowBatch* batch) {
   SCOPED_TIMER(profile()->total_time_counter());
@@ -45,11 +58,6 @@ Status BufferedPlanRootSink::Send(RuntimeState* state, RowBatch* batch) {
   PlanRootSink::ValidateCollectionSlots(*row_desc_, batch);
   RETURN_IF_ERROR(PlanRootSink::UpdateAndCheckRowsProducedLimit(state, batch));
 
-  // Make a copy of the given RowBatch and place it on the queue.
-  unique_ptr<RowBatch> output_batch =
-      make_unique<RowBatch>(batch->row_desc(), batch->capacity(), mem_tracker());
-  batch->DeepCopyTo(output_batch.get());
-
   {
     // Add the copied batch to the RowBatch queue and wake up the consumer thread if it is
     // waiting for rows to process.
@@ -57,17 +65,14 @@ Status BufferedPlanRootSink::Send(RuntimeState* state, RowBatch* batch) {
 
     // If the queue is full, wait for the producer thread to read batches from it.
     while (!state->is_cancelled() && batch_queue_->IsFull()) {
+      SCOPED_TIMER(profile()->inactive_timer());
+      SCOPED_TIMER(row_batches_send_wait_timer_);
       batch_queue_has_capacity_.Wait(l);
     }
     RETURN_IF_CANCELLED(state);
 
     // Add the batch to the queue and then notify the consumer that rows are available.
-    if (!batch_queue_->AddBatch(move(output_batch))) {
-      // Adding a batch should always be successful because the queue should always be
-      // open when Send is called, and the call to batch_queue_has_capacity_.Wait(l)
-      // ensures space is available.
-      DCHECK(false) << "DequeueRowBatchQueue::AddBatch should never return false";
-    }
+    RETURN_IF_ERROR(batch_queue_->AddBatch(batch));
   }
   // Release the lock before calling notify so the consumer thread can immediately acquire
   // the lock.
@@ -84,7 +89,10 @@ Status BufferedPlanRootSink::FlushFinal(RuntimeState* state) {
   // SenderState and return appropriately.
   rows_available_.NotifyAll();
   // Wait until the consumer has read all rows from the batch_queue_.
-  consumer_eos_.Wait(l);
+  {
+    SCOPED_TIMER(profile()->inactive_timer());
+    consumer_eos_.Wait(l);
+  }
   RETURN_IF_CANCELLED(state);
   return Status::OK();
 }
@@ -97,7 +105,7 @@ void BufferedPlanRootSink::Close(RuntimeState* state) {
   if (sender_state_ == SenderState::ROWS_PENDING) {
     sender_state_ = SenderState::CLOSED_NOT_EOS;
   }
-  batch_queue_->Close();
+  if (batch_queue_ != nullptr) batch_queue_->Close();
   // While it should be safe to call NotifyOne() here, prefer to use NotifyAll() to
   // ensure that all sleeping threads are awoken. The call to NotifyAll() is not on the
   // fast path so any overhead from calling it should be negligible.
@@ -122,6 +130,7 @@ Status BufferedPlanRootSink::GetNext(
     unique_lock<mutex> l(lock_);
     while (batch_queue_->IsEmpty() && sender_state_ == SenderState::ROWS_PENDING
         && !state->is_cancelled()) {
+      SCOPED_TIMER(row_batches_get_wait_timer_);
       rows_available_.Wait(l);
     }
 
@@ -130,7 +139,9 @@ Status BufferedPlanRootSink::GetNext(
     // return. The queue could be empty if the sink was closed while waiting for rows to
     // become available, or if the sink was closed before the current call to GetNext.
     if (!state->is_cancelled() && !batch_queue_->IsEmpty()) {
-      unique_ptr<RowBatch> batch = batch_queue_->GetBatch();
+      unique_ptr<RowBatch> batch =
+          make_unique<RowBatch>(row_desc_, state->batch_size(), mem_tracker());
+      RETURN_IF_ERROR(batch_queue_->GetBatch(batch.get()));
       // TODO for now, if num_results < batch->num_rows(), we terminate returning results
       // early until we can properly handle fetch requests where
       // num_results < batch->num_rows().
@@ -138,11 +149,14 @@ Status BufferedPlanRootSink::GetNext(
         *eos = true;
         batch_queue_has_capacity_.NotifyOne();
         consumer_eos_.NotifyOne();
+        batch->Reset();
         return Status::Expected(TErrorCode::NOT_IMPLEMENTED_ERROR,
             "BufferedPlanRootSink does not support setting num_results < BATCH_SIZE");
       }
       RETURN_IF_ERROR(
           results->AddRows(output_expr_evals_, batch.get(), 0, batch->num_rows()));
+      // Prevent expr result allocations from accumulating.
+      expr_results_pool_->Clear();
       batch->Reset();
     }
     *eos = batch_queue_->IsEmpty() && sender_state_ == SenderState::EOS;
@@ -155,7 +169,7 @@ Status BufferedPlanRootSink::GetNext(
   // either FlushFinal was called or the query was cancelled. If FlushFinal was called
   // then the consumer thread has completed. If the query is cancelled, then we wake up
   // the consumer thread so it can check the cancellation status and return. Releasing
-  // the lock is safe because the consumer always loops until the queue is actually has
+  // the lock is safe because the consumer always loops until the queue actually has
   // space.
   batch_queue_has_capacity_.NotifyOne();
   return state->GetQueryStatus();
diff --git a/be/src/exec/buffered-plan-root-sink.h b/be/src/exec/buffered-plan-root-sink.h
index 939c2a0..d437b02 100644
--- a/be/src/exec/buffered-plan-root-sink.h
+++ b/be/src/exec/buffered-plan-root-sink.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include "exec/plan-root-sink.h"
+#include "runtime/spillable-row-batch-queue.h"
 #include "util/condition-variable.h"
 
 namespace impala {
@@ -25,7 +26,9 @@ namespace impala {
 class DequeRowBatchQueue;
 
 /// PlanRootSink that buffers RowBatches from the 'sender' (fragment) thread. RowBatches
-/// are buffered in memory until a max number of RowBatches are queued. Any subsequent
+/// are buffered in memory until the queue is full (the definition of 'full' depends on
+/// the queue being used, in the current implementation the SpillableRowBatchQueue is
+/// 'full' when the amount of spilled data exceeds the configured limit). Any subsequent
 /// calls to Send will block until the 'consumer' (coordinator) thread has read enough
 /// RowBatches to free up sufficient space in the queue. The blocking behavior follows
 /// the same semantics as BlockingPlanRootSink.
@@ -40,7 +43,17 @@ class DequeRowBatchQueue;
 class BufferedPlanRootSink : public PlanRootSink {
  public:
   BufferedPlanRootSink(TDataSinkId sink_id, const RowDescriptor* row_desc,
-      RuntimeState* state);
+      RuntimeState* state, const TBackendResourceProfile& resource_profile,
+      const TDebugOptions& debug_options);
+
+  /// Initializes the row_batches_get_wait_timer_ and row_batches_send_wait_timer_
+  /// counters.
+  virtual Status Prepare(RuntimeState* state, MemTracker* parent_mem_tracker) override;
+
+  /// Creates and opens the SpillableRowBatchQueue, returns an error Status if the queue
+  /// could not be opened. Failure to open the queue could occur if the initial
+  /// reservation for the BufferedTupleStream could not be acquired.
+  virtual Status Open(RuntimeState* state) override;
 
   /// Creates a copy of the given RowBatch and adds it to the queue. The copy is
   /// necessary as the ownership of 'batch' remains with the sender.
@@ -50,10 +63,10 @@ class BufferedPlanRootSink : public PlanRootSink {
   /// batches from the queue, or until the sink is either closed or cancelled.
   virtual Status FlushFinal(RuntimeState* state) override;
 
-  /// Release resources and unblocks consumer.
+  /// Releases resources and unblocks the consumer thread.
   virtual void Close(RuntimeState* state) override;
 
-  /// Blocks until rows are available for consumption
+  /// Blocks until rows are available for consumption.
   virtual Status GetNext(
       RuntimeState* state, QueryResultSet* result_set, int num_rows, bool* eos) override;
 
@@ -80,8 +93,31 @@ class BufferedPlanRootSink : public PlanRootSink {
   /// to unblock the producer.
   ConditionVariable batch_queue_has_capacity_;
 
-  /// A DequeRowBatchQueue that buffers RowBatches from the sender for consumption by
+  /// A SpillableRowBatchQueue that buffers RowBatches from the sender for consumption by
   /// the consumer. The queue is not thread safe and access is protected by 'lock_'.
-  std::unique_ptr<DequeRowBatchQueue> batch_queue_;
+  std::unique_ptr<SpillableRowBatchQueue> batch_queue_;
+
+  /// The TBackendResourceProfile created by the fe/ for the PlanRootSink. Passed to the
+  /// SpillableRowBatchQueue to impose the necessary memory limits.
+  const TBackendResourceProfile& resource_profile_;
+
+  /// Required by the SpillableRowBatchQueue's ReservationManager when claiming the
+  /// initial reservation.
+  const TDebugOptions& debug_options_;
+
+  /// Measures the amount of time spent by Impala waiting for the result spooling queue
+  /// to have more space. The queue may become full if Impala has produced enough rows to
+  /// fill up the queue, and the client hasn't not consumed any rows, or is consuming
+  /// rows in at a slower rate than Impala is producing them. Specifically, this counter
+  /// measures the amount of time spent waiting on 'batch_queue_has_capacity_' in the
+  /// 'Send'  method.
+  RuntimeProfile::Counter* row_batches_send_wait_timer_ = nullptr;
+
+  /// Measures the amount of time spend by the client waiting for the result spooling
+  /// queue to have rows. The queue may be empty if the query has not produced any rows
+  /// yet or if the client is consuming rows at a faster rate than Impala is producing
+  /// them. Specifically, this counter measures the amount of time spent waiting on
+  /// 'rows_available_' in the 'GetNext' method.
+  RuntimeProfile::Counter* row_batches_get_wait_timer_ = nullptr;
 };
 }
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index e2d0ae9..68780e6 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -33,7 +33,6 @@
 #include "gen-cpp/ImpalaInternalService_constants.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gutil/strings/substitute.h"
-#include "runtime/deque-row-batch-queue.h"
 #include "runtime/krpc-data-stream-sender.h"
 #include "runtime/mem-tracker.h"
 #include "util/container-util.h"
@@ -110,7 +109,9 @@ Status DataSink::Create(const TPlanFragmentCtx& fragment_ctx,
       break;
     case TDataSinkType::PLAN_ROOT_SINK:
       if (state->query_options().spool_query_results) {
-        *sink = pool->Add(new BufferedPlanRootSink(sink_id, row_desc, state));
+        *sink = pool->Add(new BufferedPlanRootSink(sink_id, row_desc, state,
+            fragment_ctx.fragment.output_sink.plan_root_sink.resource_profile,
+            fragment_instance_ctx.debug_options));
       } else {
         *sink = pool->Add(new BlockingPlanRootSink(sink_id, row_desc, state));
       }
diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc
index c046d83..45a38c3 100644
--- a/be/src/exec/exec-node.cc
+++ b/be/src/exec/exec-node.cc
@@ -364,6 +364,15 @@ void ExecNode::DebugString(int indentation_level, stringstream* out) const {
   }
 }
 
+string ExecNode::label() const {
+  map<int, const char*>::const_iterator i = _TPlanNodeType_VALUES_TO_NAMES.find(type_);
+  string node_type_name = "UNKNOWN";
+  if (i != _TPlanNodeType_VALUES_TO_NAMES.end()) {
+    node_type_name = i->second;
+  }
+  return Substitute("$0 (id=$1)", node_type_name, std::to_string(id_));
+}
+
 void ExecNode::CollectNodes(TPlanNodeType::type node_type, vector<ExecNode*>* nodes) {
   if (type_ == node_type) nodes->push_back(this);
   for (int i = 0; i < children_.size(); ++i) {
diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h
index 339e77c..a7c0004 100644
--- a/be/src/exec/exec-node.h
+++ b/be/src/exec/exec-node.h
@@ -191,6 +191,10 @@ class ExecNode {
   int id() const { return id_; }
   TPlanNodeType::type type() const { return type_; }
 
+  /// Returns a unique label for this ExecNode of the form "PLAN_NODE_TYPE(id=[int])",
+  /// for example, EXCHANGE_NODE (id=2).
+  std::string label() const;
+
   /// Returns the row descriptor for rows produced by this node. The RowDescriptor is
   /// constant for the lifetime of the fragment instance, and so is shared by reference
   /// across the plan tree, including in RowBatches. The lifetime of the descriptor is the
diff --git a/be/src/exec/grouping-aggregator-partition.cc b/be/src/exec/grouping-aggregator-partition.cc
index 8be932d..a968ef0 100644
--- a/be/src/exec/grouping-aggregator-partition.cc
+++ b/be/src/exec/grouping-aggregator-partition.cc
@@ -57,7 +57,7 @@ Status GroupingAggregator::Partition::InitStreams() {
       new BufferedTupleStream(parent->state_, &parent->intermediate_row_desc_,
           parent->buffer_pool_client(), parent->resource_profile_.spillable_buffer_size,
           parent->resource_profile_.max_row_buffer_size, external_varlen_slots));
-  RETURN_IF_ERROR(aggregated_row_stream->Init(parent->id_, true));
+  RETURN_IF_ERROR(aggregated_row_stream->Init(parent->exec_node_->label(), true));
   bool got_buffer;
   RETURN_IF_ERROR(aggregated_row_stream->PrepareForWrite(&got_buffer));
   DCHECK(got_buffer) << "Buffer included in reservation " << parent->id_ << "\n"
@@ -69,7 +69,7 @@ Status GroupingAggregator::Partition::InitStreams() {
             parent->buffer_pool_client(), parent->resource_profile_.spillable_buffer_size,
             parent->resource_profile_.max_row_buffer_size));
     // This stream is only used to spill, no need to ever have this pinned.
-    RETURN_IF_ERROR(unaggregated_row_stream->Init(parent->id_, false));
+    RETURN_IF_ERROR(unaggregated_row_stream->Init(parent->exec_node_->label(), false));
     // Save memory by waiting until we spill to allocate the write buffer for the
     // unaggregated row stream.
     DCHECK(!unaggregated_row_stream->has_write_iterator());
@@ -144,7 +144,7 @@ Status GroupingAggregator::Partition::SerializeStreamForSpilling() {
         new BufferedTupleStream(parent->state_, &parent->intermediate_row_desc_,
             parent->buffer_pool_client(), parent->resource_profile_.spillable_buffer_size,
             parent->resource_profile_.max_row_buffer_size));
-    status = parent->serialize_stream_->Init(parent->id_, false);
+    status = parent->serialize_stream_->Init(parent->exec_node_->label(), false);
     if (status.ok()) {
       bool got_buffer;
       status = parent->serialize_stream_->PrepareForWrite(&got_buffer);
diff --git a/be/src/exec/grouping-aggregator.cc b/be/src/exec/grouping-aggregator.cc
index 6f465f2..358919a 100644
--- a/be/src/exec/grouping-aggregator.cc
+++ b/be/src/exec/grouping-aggregator.cc
@@ -204,7 +204,7 @@ Status GroupingAggregator::Open(RuntimeState* state) {
       serialize_stream_.reset(new BufferedTupleStream(state, &intermediate_row_desc_,
           buffer_pool_client(), resource_profile_.spillable_buffer_size,
           resource_profile_.max_row_buffer_size));
-      RETURN_IF_ERROR(serialize_stream_->Init(id_, false));
+      RETURN_IF_ERROR(serialize_stream_->Init(exec_node_->label(), false));
       bool got_buffer;
       // Reserve the memory for 'serialize_stream_' so we don't need to scrounge up
       // another buffer during spilling.
diff --git a/be/src/exec/partial-sort-node.cc b/be/src/exec/partial-sort-node.cc
index ad0ed03..bf049b7 100644
--- a/be/src/exec/partial-sort-node.cc
+++ b/be/src/exec/partial-sort-node.cc
@@ -60,7 +60,7 @@ Status PartialSortNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(ExecNode::Prepare(state));
   sorter_.reset(new Sorter(ordering_exprs_, is_asc_order_, nulls_first_,
       sort_tuple_exprs_, &row_descriptor_, mem_tracker(), buffer_pool_client(),
-      resource_profile_.spillable_buffer_size, runtime_profile(), state, id(), false));
+      resource_profile_.spillable_buffer_size, runtime_profile(), state, label(), false));
   RETURN_IF_ERROR(sorter_->Prepare(pool_));
   DCHECK_GE(resource_profile_.min_reservation, sorter_->ComputeMinReservation());
   state->CheckAndAddCodegenDisabledMessage(runtime_profile());
diff --git a/be/src/exec/partitioned-hash-join-builder.cc b/be/src/exec/partitioned-hash-join-builder.cc
index 67b6843..83fb432 100644
--- a/be/src/exec/partitioned-hash-join-builder.cc
+++ b/be/src/exec/partitioned-hash-join-builder.cc
@@ -51,14 +51,16 @@ using strings::Substitute;
 
 const char* PhjBuilder::LLVM_CLASS_NAME = "class.impala::PhjBuilder";
 
-PhjBuilder::PhjBuilder(int join_node_id, TJoinOp::type join_op,
-    const RowDescriptor* probe_row_desc, const RowDescriptor* build_row_desc,
-    RuntimeState* state, BufferPool::ClientHandle* buffer_pool_client,
-    int64_t spillable_buffer_size, int64_t max_row_buffer_size)
+PhjBuilder::PhjBuilder(int join_node_id, const string& join_node_label,
+    TJoinOp::type join_op, const RowDescriptor* probe_row_desc,
+    const RowDescriptor* build_row_desc, RuntimeState* state,
+    BufferPool::ClientHandle* buffer_pool_client, int64_t spillable_buffer_size,
+    int64_t max_row_buffer_size)
   : DataSink(-1, build_row_desc,
         Substitute("Hash Join Builder (join_node_id=$0)", join_node_id), state),
     runtime_state_(state),
     join_node_id_(join_node_id),
+    join_node_label_(join_node_label),
     join_op_(join_op),
     probe_row_desc_(probe_row_desc),
     buffer_pool_client_(buffer_pool_client),
@@ -259,7 +261,7 @@ void PhjBuilder::Reset(RowBatch* row_batch) {
 Status PhjBuilder::CreateAndPreparePartition(int level, Partition** partition) {
   all_partitions_.emplace_back(new Partition(runtime_state_, this, level));
   *partition = all_partitions_.back().get();
-  RETURN_IF_ERROR((*partition)->build_rows()->Init(join_node_id_, true));
+  RETURN_IF_ERROR((*partition)->build_rows()->Init(join_node_label_, true));
   bool got_buffer;
   RETURN_IF_ERROR((*partition)->build_rows()->PrepareForWrite(&got_buffer));
   DCHECK(got_buffer)
@@ -418,7 +420,7 @@ Status PhjBuilder::InitSpilledPartitionProbeStreams() {
         make_unique<BufferedTupleStream>(runtime_state_, probe_row_desc_,
             buffer_pool_client_, spillable_buffer_size_, max_row_buffer_size_));
     BufferedTupleStream* probe_stream = spilled_partition_probe_streams_.back().get();
-    RETURN_IF_ERROR(probe_stream->Init(join_node_id_, false));
+    RETURN_IF_ERROR(probe_stream->Init(join_node_label_, false));
 
     // Loop until either the stream gets a buffer or all partitions are spilled (in which
     // case SpillPartition() returns an error).
diff --git a/be/src/exec/partitioned-hash-join-builder.h b/be/src/exec/partitioned-hash-join-builder.h
index 147504b..9acc7a7 100644
--- a/be/src/exec/partitioned-hash-join-builder.h
+++ b/be/src/exec/partitioned-hash-join-builder.h
@@ -72,10 +72,10 @@ class PhjBuilder : public DataSink {
  public:
   class Partition;
 
-  PhjBuilder(int join_node_id, TJoinOp::type join_op, const RowDescriptor* probe_row_desc,
-      const RowDescriptor* build_row_desc, RuntimeState* state,
-      BufferPool::ClientHandle* buffer_pool_client, int64_t spillable_buffer_size,
-      int64_t max_row_buffer_size);
+  PhjBuilder(int join_node_id, const std::string& join_node_label, TJoinOp::type join_op,
+      const RowDescriptor* probe_row_desc, const RowDescriptor* build_row_desc,
+      RuntimeState* state, BufferPool::ClientHandle* buffer_pool_client,
+      int64_t spillable_buffer_size, int64_t max_row_buffer_size);
 
   Status InitExprsAndFilters(RuntimeState* state,
       const std::vector<TEqJoinCondition>& eq_join_conjuncts,
@@ -370,11 +370,14 @@ class PhjBuilder : public DataSink {
 
   RuntimeState* const runtime_state_;
 
-  // The ID of the plan join node this is associated with.
-  // TODO: we may want to replace this with a sink ID once we progress further with
-  // multithreading.
+  /// The ID of the plan join node this is associated with.
+  /// TODO: we may want to replace this with a sink ID once we progress further with
+  /// multithreading.
   const int join_node_id_;
 
+  /// The label of the plan join node this is associated with.
+  const std::string join_node_label_;
+
   /// The join operation this is building for.
   const TJoinOp::type join_op_;
 
diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc
index 1b00f05..99931b4 100644
--- a/be/src/exec/partitioned-hash-join-node.cc
+++ b/be/src/exec/partitioned-hash-join-node.cc
@@ -75,7 +75,7 @@ Status PartitionedHashJoinNode::Init(const TPlanNode& tnode, RuntimeState* state
   // TODO: allow PhjBuilder to be the sink of a separate fragment. For now, PhjBuilder is
   // owned by this node, but duplicates some state (exprs, etc) in anticipation of it
   // being separated out further.
-  builder_.reset(new PhjBuilder(id(), join_op_, child(0)->row_desc(),
+  builder_.reset(new PhjBuilder(id(), label(), join_op_, child(0)->row_desc(),
       child(1)->row_desc(), state, buffer_pool_client(),
       resource_profile_.spillable_buffer_size, resource_profile_.max_row_buffer_size));
   RETURN_IF_ERROR(
@@ -837,7 +837,7 @@ Status PartitionedHashJoinNode::InitNullAwareProbePartition() {
   unique_ptr<BufferedTupleStream> probe_rows =
       make_unique<BufferedTupleStream>(state, child(0)->row_desc(), buffer_pool_client(),
           resource_profile_.spillable_buffer_size, resource_profile_.max_row_buffer_size);
-  Status status = probe_rows->Init(id(), true);
+  Status status = probe_rows->Init(label(), true);
   if (!status.ok()) goto error;
   bool got_buffer;
   status = probe_rows->PrepareForWrite(&got_buffer);
@@ -861,7 +861,7 @@ Status PartitionedHashJoinNode::InitNullProbeRows() {
       make_unique<BufferedTupleStream>(state, child(0)->row_desc(), buffer_pool_client(),
           resource_profile_.spillable_buffer_size, resource_profile_.max_row_buffer_size);
   // Start with stream pinned, unpin later if needed.
-  RETURN_IF_ERROR(null_probe_rows_->Init(id(), true));
+  RETURN_IF_ERROR(null_probe_rows_->Init(label(), true));
   bool got_buffer;
   RETURN_IF_ERROR(null_probe_rows_->PrepareForWrite(&got_buffer));
   DCHECK(got_buffer) << "Accounted in min reservation"
diff --git a/be/src/exec/plan-root-sink.h b/be/src/exec/plan-root-sink.h
index ae5651f..7d381ab 100644
--- a/be/src/exec/plan-root-sink.h
+++ b/be/src/exec/plan-root-sink.h
@@ -57,6 +57,11 @@ class PlanRootSink : public DataSink {
   PlanRootSink(TDataSinkId sink_id, const RowDescriptor* row_desc, RuntimeState* state);
   virtual ~PlanRootSink();
 
+  /// Called before Send(), Open(), or Close(). Performs any additional setup necessary,
+  /// such as initializing runtime counters.
+  virtual Status Prepare(
+      RuntimeState* state, MemTracker* parent_mem_tracker) override = 0;
+
   /// Sends a new batch. Ownership of 'batch' remains with the sender.
   virtual Status Send(RuntimeState* state, RowBatch* batch) override = 0;
 
diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc
index b0b5b04..659e71c 100644
--- a/be/src/exec/sort-node.cc
+++ b/be/src/exec/sort-node.cc
@@ -56,7 +56,7 @@ Status SortNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(ExecNode::Prepare(state));
   sorter_.reset(new Sorter(ordering_exprs_, is_asc_order_, nulls_first_,
       sort_tuple_exprs_, &row_descriptor_, mem_tracker(), buffer_pool_client(),
-      resource_profile_.spillable_buffer_size, runtime_profile(), state, id(), true));
+      resource_profile_.spillable_buffer_size, runtime_profile(), state, label(), true));
   RETURN_IF_ERROR(sorter_->Prepare(pool_));
   DCHECK_GE(resource_profile_.min_reservation, sorter_->ComputeMinReservation());
   state->CheckAndAddCodegenDisabledMessage(runtime_profile());
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 848ff37..0c8b11c 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -39,7 +39,6 @@ add_library(Runtime
   date-value.cc
   debug-options.cc
   descriptors.cc
-  deque-row-batch-queue.cc
   dml-exec-state.cc
   exec-env.cc
   fragment-instance-state.cc
@@ -72,6 +71,7 @@ add_library(Runtime
   sorted-run-merger.cc
   sorter.cc
   sorter-ir.cc
+  spillable-row-batch-queue.cc
   string-value.cc
   thread-resource-mgr.cc
   timestamp-parse-util.cc
diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc
index 6ff9805..85cceb0 100644
--- a/be/src/runtime/buffered-tuple-stream-test.cc
+++ b/be/src/runtime/buffered-tuple-stream-test.cc
@@ -320,7 +320,7 @@ class SimpleTupleStreamTest : public testing::Test {
 
     BufferedTupleStream stream(
         runtime_state_, desc, &client_, default_page_len, max_page_len);
-    ASSERT_OK(stream.Init(-1, true));
+    ASSERT_OK(stream.Init("SimpleTupleStreamTest", true));
     bool got_write_reservation;
     ASSERT_OK(stream.PrepareForWrite(&got_write_reservation));
     ASSERT_TRUE(got_write_reservation);
@@ -364,7 +364,7 @@ class SimpleTupleStreamTest : public testing::Test {
   void TestIntValuesInterleaved(int num_batches, int num_batches_before_read,
       bool unpin_stream, int64_t page_len = PAGE_LEN) {
     BufferedTupleStream stream(runtime_state_, int_desc_, &client_, page_len, page_len);
-    ASSERT_OK(stream.Init(-1, true));
+    ASSERT_OK(stream.Init("SimpleTupleStreamTest", true));
     bool got_reservation;
     ASSERT_OK(stream.PrepareForReadWrite(true, &got_reservation));
     ASSERT_TRUE(got_reservation);
@@ -622,7 +622,7 @@ void SimpleTupleStreamTest::TestUnpinPin(bool varlen_data, bool read_write) {
 
   BufferedTupleStream stream(
       runtime_state_, row_desc, &client_, buffer_size, buffer_size);
-  ASSERT_OK(stream.Init(-1, true));
+  ASSERT_OK(stream.Init("SimpleTupleStreamTest::TestUnpinPin", true));
   if (read_write) {
     bool got_reservation = false;
     ASSERT_OK(stream.PrepareForReadWrite(false, &got_reservation));
@@ -715,7 +715,7 @@ void SimpleTupleStreamTest::TestTransferMemory(bool pin_stream, bool read_write)
 
   BufferedTupleStream stream(
       runtime_state_, int_desc_, &client_, buffer_size, buffer_size);
-  ASSERT_OK(stream.Init(-1, pin_stream));
+  ASSERT_OK(stream.Init("SimpleTupleStreamTest::TestTransferMemory", pin_stream));
   if (read_write) {
     bool got_reservation;
     ASSERT_OK(stream.PrepareForReadWrite(true, &got_reservation));
@@ -788,7 +788,7 @@ void SimpleTupleStreamTest::TestAttachMemory(bool pin_stream, bool attach_on_rea
 
   BufferedTupleStream stream(
       runtime_state_, int_desc_, &client_, buffer_size, buffer_size);
-  ASSERT_OK(stream.Init(-1, pin_stream));
+  ASSERT_OK(stream.Init("SimpleTupleStreamTest::TestAttachMemory", pin_stream));
   bool got_write_reservation;
   ASSERT_OK(stream.PrepareForWrite(&got_write_reservation));
   ASSERT_TRUE(got_write_reservation);
@@ -882,7 +882,8 @@ void SimpleTupleStreamTest::TestFlushResourcesReadWrite(
 
   BufferedTupleStream stream(
       runtime_state_, int_desc_, &client_, BUFFER_SIZE, BUFFER_SIZE);
-  ASSERT_OK(stream.Init(-1, pin_stream));
+  ASSERT_OK(
+      stream.Init("SimpleTupleStreamTest::TestFlushResourcesReadWrite", pin_stream));
   bool got_reservation;
   ASSERT_OK(stream.PrepareForReadWrite(attach_on_read, &got_reservation));
   ASSERT_TRUE(got_reservation);
@@ -997,7 +998,7 @@ TEST_F(SimpleTupleStreamTest, StringsOutsideStream) {
 
   BufferedTupleStream stream(
       runtime_state_, string_desc_, &client_, buffer_size, buffer_size, external_slots);
-  ASSERT_OK(stream.Init(0, false));
+  ASSERT_OK(stream.Init("SimpleTupleStreamTest::StringsOutsideStream", false));
   bool got_reservation;
   ASSERT_OK(stream.PrepareForWrite(&got_reservation));
   ASSERT_TRUE(got_reservation);
@@ -1071,7 +1072,7 @@ TEST_F(SimpleTupleStreamTest, BigRow) {
   ASSERT_TRUE(nullable_big_row_desc_->IsAnyTupleNullable());
   BufferedTupleStream nullable_stream(
       runtime_state_, nullable_big_row_desc_, &client_, BIG_ROW_BYTES, BIG_ROW_BYTES);
-  ASSERT_OK(nullable_stream.Init(-1, true));
+  ASSERT_OK(nullable_stream.Init("SimpleTupleStreamTest::BigRow", true));
   bool got_reservation;
   ASSERT_OK(nullable_stream.PrepareForWrite(&got_reservation));
 
@@ -1095,7 +1096,7 @@ TEST_F(SimpleTupleStreamTest, BigRowMemoryUse) {
   Status status;
   BufferedTupleStream stream(
       runtime_state_, big_row_desc_, &client_, DEFAULT_PAGE_LEN, BIG_ROW_BYTES * 2);
-  ASSERT_OK(stream.Init(-1, true));
+  ASSERT_OK(stream.Init("SimpleTupleStreamTest::BigRowMemoryUse", true));
   RowBatch* batch;
   bool got_reservation;
   ASSERT_OK(stream.PrepareForWrite(&got_reservation));
@@ -1136,7 +1137,7 @@ TEST_F(SimpleTupleStreamTest, BigStringReadWrite) {
   Status status;
   BufferedTupleStream stream(
       runtime_state_, string_desc_, &client_, DEFAULT_PAGE_LEN, BIG_ROW_BYTES * 2);
-  ASSERT_OK(stream.Init(-1, true));
+  ASSERT_OK(stream.Init("SimpleTupleStreamTest::BigStringReadWrite", true));
   RowBatch write_batch(string_desc_, 1024, &tracker_);
   RowBatch read_batch(string_desc_, 1024, &tracker_);
   bool got_reservation;
@@ -1302,7 +1303,7 @@ TEST_F(MultiTupleStreamTest, MultiTupleAddRowCustom) {
   int rows_added = 0;
   BufferedTupleStream stream(
       runtime_state_, string_desc_, &client_, buffer_size, buffer_size);
-  ASSERT_OK(stream.Init(-1, false));
+  ASSERT_OK(stream.Init("MultiTupleStreamTest::MultiTupleAddRowCustom", false));
   bool got_write_reservation;
   ASSERT_OK(stream.PrepareForWrite(&got_write_reservation));
   ASSERT_TRUE(got_write_reservation);
@@ -1477,7 +1478,7 @@ TEST_F(ArrayTupleStreamTest, TestArrayDeepCopy) {
   int array_lens[] = {0, 1, 5, 10, 1000, 2, 49, 20};
   int num_array_lens = sizeof(array_lens) / sizeof(array_lens[0]);
   int array_len_index = 0;
-  ASSERT_OK(stream.Init(-1, false));
+  ASSERT_OK(stream.Init("ArrayTupleStreamTest::TestArrayDeepCopy", false));
   bool got_write_reservation;
   ASSERT_OK(stream.PrepareForWrite(&got_write_reservation));
   ASSERT_TRUE(got_write_reservation);
diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc
index 2b831b4..87c07eb 100644
--- a/be/src/runtime/buffered-tuple-stream.cc
+++ b/be/src/runtime/buffered-tuple-stream.cc
@@ -209,9 +209,9 @@ string BufferedTupleStream::Page::DebugString() const {
       handle.DebugString(), num_rows, retrieved_buffer, attached_to_output_batch);
 }
 
-Status BufferedTupleStream::Init(int node_id, bool pinned) {
+Status BufferedTupleStream::Init(const string& caller_label, bool pinned) {
   if (!pinned) UnpinStream(UNPIN_ALL_EXCEPT_CURRENT);
-  node_id_ = node_id;
+  caller_label_ = caller_label;
   return Status::OK();
 }
 
@@ -267,6 +267,7 @@ void BufferedTupleStream::Close(RowBatch* batch, FlushMode flush) {
   pages_.clear();
   num_pages_ = 0;
   bytes_pinned_ = 0;
+  bytes_unpinned_ = 0;
   closed_ = true;
 }
 
@@ -281,6 +282,8 @@ int64_t BufferedTupleStream::CalcBytesPinned() const {
 Status BufferedTupleStream::PinPage(Page* page) {
   RETURN_IF_ERROR(buffer_pool_->Pin(buffer_pool_client_, &page->handle));
   bytes_pinned_ += page->len();
+  bytes_unpinned_ -= page->len();
+  DCHECK_GE(bytes_unpinned_, 0);
   return Status::OK();
 }
 
@@ -304,6 +307,8 @@ void BufferedTupleStream::UnpinPageIfNeeded(Page* page, bool stream_pinned) {
     DCHECK_EQ(new_pin_count, page->pin_count() - 1);
     buffer_pool_->Unpin(buffer_pool_client_, &page->handle);
     bytes_pinned_ -= page->len();
+    DCHECK_GE(bytes_pinned_, 0);
+    bytes_unpinned_ += page->len();
     if (page->pin_count() == 0) page->retrieved_buffer = false;
   }
 }
@@ -386,7 +391,7 @@ Status BufferedTupleStream::NewWritePage(int64_t page_len) noexcept {
 Status BufferedTupleStream::CalcPageLenForRow(int64_t row_size, int64_t* page_len) {
   if (UNLIKELY(row_size > max_page_len_)) {
     return Status(TErrorCode::MAX_ROW_SIZE,
-        PrettyPrinter::Print(row_size, TUnit::BYTES), node_id_,
+        PrettyPrinter::Print(row_size, TUnit::BYTES), caller_label_,
         PrettyPrinter::Print(state_->query_options().max_row_size, TUnit::BYTES));
   }
   *page_len = max(default_page_len_, BitUtil::RoundUpToPowerOfTwo(row_size));
diff --git a/be/src/runtime/buffered-tuple-stream.h b/be/src/runtime/buffered-tuple-stream.h
index 4090ea8..b0cb8db 100644
--- a/be/src/runtime/buffered-tuple-stream.h
+++ b/be/src/runtime/buffered-tuple-stream.h
@@ -216,11 +216,11 @@ class BufferedTupleStream {
 
   ~BufferedTupleStream() { DCHECK(closed_); }
 
-  /// Initializes the tuple stream object on behalf of node 'node_id'. Must be called
-  /// once before any of the other APIs.
+  /// Initializes the tuple stream object on behalf of a caller uniquely identified by a
+  /// 'caller_label'. Must be called once before any of the other APIs.
   /// If 'pinned' is true, the tuple stream starts off pinned, otherwise it is unpinned.
-  /// 'node_id' is only used for error reporting.
-  Status Init(int node_id, bool pinned) WARN_UNUSED_RESULT;
+  /// 'caller_label' is only used for error reporting.
+  Status Init(const std::string& caller_label, bool pinned) WARN_UNUSED_RESULT;
 
   /// Prepares the stream for writing by saving enough reservation for a default-size
   /// write page. Tries to increase reservation if there is not enough unused reservation
@@ -366,6 +366,11 @@ class BufferedTupleStream {
     return bytes_pinned_;
   }
 
+  /// Returns the number of bytes currently unpinned by the stream.
+  int64_t bytes_unpinned() const {
+    return bytes_unpinned_;
+  }
+
   bool is_closed() const { return closed_; }
   bool is_pinned() const { return pinned_; }
   bool has_read_iterator() const { return has_read_iterator_; }
@@ -418,8 +423,10 @@ class BufferedTupleStream {
   /// Description of rows stored in the stream.
   const RowDescriptor* desc_;
 
-  /// Plan node ID, used for error reporting.
-  int node_id_ = -1;
+  /// Caller ID, used for error reporting. A unique id identifying the class using the
+  /// BufferedTupleStream. For ExecNodes this is the TPlanNodeId, however, it could be
+  /// any unique string.
+  std::string caller_label_;
 
   /// The size of the fixed length portion for each tuple in the row.
   std::vector<int> fixed_tuple_sizes_;
@@ -505,6 +512,10 @@ class BufferedTupleStream {
   /// to compute it.
   int64_t bytes_pinned_ = 0;
 
+  /// Total bytes of pinned pages in pages_, stored to avoid iterating over the list
+  /// to compute it.
+  int64_t bytes_unpinned_ = 0;
+
   /// Number of rows stored in the stream. Includes rows that were already deleted during
   /// a destructive 'attach_on_read' pass over the stream.
   int64_t num_rows_ = 0;
diff --git a/be/src/runtime/deque-row-batch-queue.cc b/be/src/runtime/deque-row-batch-queue.cc
deleted file mode 100644
index 6807b8b..0000000
--- a/be/src/runtime/deque-row-batch-queue.cc
+++ /dev/null
@@ -1,66 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "runtime/deque-row-batch-queue.h"
-#include "runtime/row-batch.h"
-
-#include "common/names.h"
-
-namespace impala {
-
-DequeRowBatchQueue::DequeRowBatchQueue(int max_batches)
-  : max_batches_(max_batches) {}
-
-DequeRowBatchQueue::~DequeRowBatchQueue() {
-  DCHECK(closed_);
-}
-
-bool DequeRowBatchQueue::AddBatch(unique_ptr<RowBatch> batch) {
-  if (closed_ || IsFull()) return false;
-  batch_queue_.push_back(move(batch));
-  return true;
-}
-
-unique_ptr<RowBatch> DequeRowBatchQueue::GetBatch() {
-  if (closed_ || IsEmpty()) return unique_ptr<RowBatch>();
-  unique_ptr<RowBatch> result = move(batch_queue_.front());
-  batch_queue_.pop_front();
-  return result;
-}
-
-bool DequeRowBatchQueue::IsFull() const {
-  return batch_queue_.size() == max_batches_;
-}
-
-bool DequeRowBatchQueue::IsEmpty() const {
-  return batch_queue_.empty();
-}
-
-bool DequeRowBatchQueue::IsOpen() const {
-  return !closed_;
-}
-
-void DequeRowBatchQueue::Close() {
-  if (closed_) return;
-  while (!batch_queue_.empty()) {
-    unique_ptr<RowBatch> result = GetBatch();
-    result->Reset();
-  }
-  batch_queue_.clear();
-  closed_ = true;
-}
-}
diff --git a/be/src/runtime/deque-row-batch-queue.h b/be/src/runtime/deque-row-batch-queue.h
deleted file mode 100644
index 95cc2af..0000000
--- a/be/src/runtime/deque-row-batch-queue.h
+++ /dev/null
@@ -1,70 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#pragma once
-
-#include <queue>
-
-#include "runtime/row-batch.h"
-
-namespace impala {
-
-class RowBatch;
-
-/// A RowBatchQueue that provides non-blocking queue semantics. RowBatches are stored
-/// inside a std::deque. None of the methods block, this class is not thread safe. The
-/// size of the queue can be capped by the 'max_batches' parameter. Calls to AddBatch
-/// after the capacity has been reached will return false. Calls to GetBatch on an empty
-/// queue will return null.
-class DequeRowBatchQueue {
- public:
-  DequeRowBatchQueue(int max_batches);
-  ~DequeRowBatchQueue();
-
-  /// Adds the given RowBatch to the queue. Returns true if the batch was successfully
-  /// added, returns false if the queue is full or has already been closed. The ownership
-  /// of the given batch is transferred from the 'batch' to the queue.
-  bool AddBatch(std::unique_ptr<RowBatch> batch);
-
-  /// Returns and removes the RowBatch at the head of the queue. Returns a nullptr if the
-  /// queue is already closed or the queue is empty. The ownership of the returned batch
-  /// is transferred from the queue to the returned unique_ptr.
-  std::unique_ptr<RowBatch> GetBatch();
-
-  /// Returns true if the queue limit has been reached, false otherwise.
-  bool IsFull() const;
-
-  /// Returns true if the queue is empty, false otherwise.
-  bool IsEmpty() const;
-
-  /// Returns false if Close() has been called, true otherwise.
-  bool IsOpen() const;
-
-  /// Resets the remaining RowBatches in the queue and releases the queue memory.
-  void Close();
-
- private:
-  /// Queue of row batches.
-  std::deque<std::unique_ptr<RowBatch>> batch_queue_;
-
-  /// True if the queue has been closed, false otherwise.
-  bool closed_ = false;
-
-  /// The max size of the queue.
-  const int max_batches_;
-};
-}
diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc
index ee210dd..7b4e3c2 100644
--- a/be/src/runtime/sorter.cc
+++ b/be/src/runtime/sorter.cc
@@ -197,7 +197,7 @@ Status Sorter::Run::AddBatchInternal(
         if (total_var_len > sorter_->page_len_) {
           int64_t max_row_size = sorter_->state_->query_options().max_row_size;
           return Status(TErrorCode::MAX_ROW_SIZE,
-              PrettyPrinter::Print(total_var_len, TUnit::BYTES), sorter_->node_id_,
+              PrettyPrinter::Print(total_var_len, TUnit::BYTES), sorter_->node_label_,
               PrettyPrinter::Print(max_row_size, TUnit::BYTES));
         }
       } else {
@@ -761,9 +761,9 @@ Sorter::Sorter(const std::vector<ScalarExpr*>& ordering_exprs,
       const std::vector<bool>& is_asc_order, const std::vector<bool>& nulls_first,
     const vector<ScalarExpr*>& sort_tuple_exprs, RowDescriptor* output_row_desc,
     MemTracker* mem_tracker, BufferPool::ClientHandle* buffer_pool_client,
-    int64_t page_len, RuntimeProfile* profile, RuntimeState* state, int node_id,
-    bool enable_spilling)
-  : node_id_(node_id),
+    int64_t page_len, RuntimeProfile* profile, RuntimeState* state,
+    const string& node_label, bool enable_spilling)
+  : node_label_(node_label),
     state_(state),
     expr_perm_pool_(mem_tracker),
     expr_results_pool_(mem_tracker),
@@ -806,7 +806,7 @@ Status Sorter::Prepare(ObjectPool* obj_pool) {
   TupleDescriptor* sort_tuple_desc = output_row_desc_->tuple_descriptors()[0];
   if (sort_tuple_desc->byte_size() > page_len_) {
     return Status(TErrorCode::MAX_ROW_SIZE,
-        PrettyPrinter::Print(sort_tuple_desc->byte_size(), TUnit::BYTES), node_id_,
+        PrettyPrinter::Print(sort_tuple_desc->byte_size(), TUnit::BYTES), node_label_,
         PrettyPrinter::Print(state_->query_options().max_row_size, TUnit::BYTES));
   }
   has_var_len_slots_ = sort_tuple_desc->HasVarlenSlots();
diff --git a/be/src/runtime/sorter.h b/be/src/runtime/sorter.h
index c01a54a..2040b46 100644
--- a/be/src/runtime/sorter.h
+++ b/be/src/runtime/sorter.h
@@ -95,7 +95,7 @@ class Sorter {
   /// 'sort_tuple_exprs' are the slot exprs used to materialize the tuples to be
   /// sorted. 'ordering_exprs', 'is_asc_order' and 'nulls_first' are parameters
   /// for the comparator for the sort tuples.
-  /// 'node_id' is the ID of the exec node using the sorter for error reporting.
+  /// 'node_label' is the label of the exec node using the sorter for error reporting.
   /// 'enable_spilling' should be set to false to reduce the number of requested buffers
   /// if the caller will use AddBatchNoSpill().
   ///
@@ -107,7 +107,7 @@ class Sorter {
       const std::vector<bool>& is_asc_order, const std::vector<bool>& nulls_first,
       const std::vector<ScalarExpr*>& sort_tuple_exprs, RowDescriptor* output_row_desc,
       MemTracker* mem_tracker, BufferPool::ClientHandle* client, int64_t page_len,
-      RuntimeProfile* profile, RuntimeState* state, int node_id,
+      RuntimeProfile* profile, RuntimeState* state, const std::string& node_label,
       bool enable_spilling);
   ~Sorter();
 
@@ -207,8 +207,8 @@ class Sorter {
   /// since the Sorter has started working on it's initial runs.
   void TryToIncreaseMemAllocationForMerge();
 
-  /// ID of the ExecNode that owns the sorter, used for error reporting.
-  const int node_id_;
+  /// Label of the ExecNode that owns the sorter, used for error reporting.
+  const std::string node_label_;
 
   /// Runtime state instance used to check for cancellation. Not owned.
   RuntimeState* const state_;
diff --git a/be/src/runtime/spillable-row-batch-queue.cc b/be/src/runtime/spillable-row-batch-queue.cc
new file mode 100644
index 0000000..a9be9f7
--- /dev/null
+++ b/be/src/runtime/spillable-row-batch-queue.cc
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/spillable-row-batch-queue.h"
+
+#include "runtime/query-state.h"
+#include "runtime/row-batch.h"
+#include "runtime/runtime-state.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+SpillableRowBatchQueue::SpillableRowBatchQueue(const string& name,
+    int64_t max_unpinned_bytes, RuntimeState* state, MemTracker* mem_tracker,
+    RuntimeProfile* profile, const RowDescriptor* row_desc,
+    const TBackendResourceProfile& resource_profile, const TDebugOptions& debug_options)
+  : name_(name),
+    state_(state),
+    mem_tracker_(mem_tracker),
+    profile_(profile),
+    row_desc_(row_desc),
+    resource_profile_(resource_profile),
+    debug_options_(debug_options),
+    max_unpinned_bytes_(max_unpinned_bytes) {}
+
+SpillableRowBatchQueue::~SpillableRowBatchQueue() {
+  DCHECK(closed_);
+}
+
+Status SpillableRowBatchQueue::Open() {
+  // Initialize the ResevationManager and then claim the initial reservation.
+  reservation_manager_.Init(name_, profile_, state_->instance_buffer_reservation(),
+      mem_tracker_, resource_profile_, debug_options_);
+  RETURN_IF_ERROR(reservation_manager_.ClaimBufferReservation(state_));
+
+  // Create the BufferedTupleStream, initialize it, and then create the read and write
+  // buffer pages.
+  batch_queue_ = make_unique<BufferedTupleStream>(state_, row_desc_,
+      reservation_manager_.buffer_pool_client(), resource_profile_.spillable_buffer_size,
+      resource_profile_.max_row_buffer_size);
+  RETURN_IF_ERROR(batch_queue_->Init(name_, true));
+  bool got_reservation = false;
+  RETURN_IF_ERROR(batch_queue_->PrepareForReadWrite(true, &got_reservation));
+  DCHECK(got_reservation) << "SpillableRowBatchQueue failed to get reservation using "
+                          << "buffer pool client: "
+                          << reservation_manager_.buffer_pool_client()->DebugString();
+  return Status::OK();
+}
+
+Status SpillableRowBatchQueue::AddBatch(RowBatch* batch) {
+  DCHECK(!IsFull()) << "Cannot AddBatch on a full SpillableRowBatchQueue";
+  DCHECK(!closed_) << "Cannot AddBatch on a closed SpillableRowBatchQueue";
+  Status status;
+  FOREACH_ROW(batch, 0, batch_itr) {
+    // AddRow should only return false if there was not enough unused reservation to
+    // allocate a page for the given row. If a row cannot be added to the batch_queue_
+    // then start spilling to disk by unpining the stream. Once the stream is unpinned,
+    // adding the row to the stream should succeed unless the unpinned pages needed to
+    // be spilled and either (1) there was an error (e.g. IO error) when writing to disk,
+    // (2) there is no more scratch space left to write to disk, or (3) spilling to disk
+    // is disabled.
+    if (UNLIKELY(!batch_queue_->AddRow(batch_itr.Get(), &status))) {
+      RETURN_IF_ERROR(status);
+      // StartSpilling checks if spilling is disabled and returns an error if it is not.
+      RETURN_IF_ERROR(state_->StartSpilling(mem_tracker_));
+
+      // The pin should be stream at this point.
+      DCHECK(batch_queue_->is_pinned());
+      DCHECK_EQ(batch_queue_->bytes_unpinned(), 0);
+
+      // Unpin the stream and then add the row.
+      batch_queue_->UnpinStream(BufferedTupleStream::UNPIN_ALL_EXCEPT_CURRENT);
+
+      // Append "Spilled" to the "ExecOption" info string in the runtime profile.
+      profile_->AppendExecOption("Spilled");
+
+      if (!batch_queue_->AddRow(batch_itr.Get(), &status)) {
+        RETURN_IF_ERROR(status);
+        // If the row could not be added after the stream was unpinned, an error should
+        // have been set.
+        DCHECK(false) << "Rows should be added in unpinned mode unless an error occurred";
+      }
+    }
+  }
+  return Status::OK();
+}
+
+Status SpillableRowBatchQueue::GetBatch(RowBatch* batch) {
+  DCHECK(!IsEmpty()) << "Cannot GetBatch on an empty SpillableRowBatchQueue";
+  DCHECK(!closed_) << "Cannot GetBatch on a closed SpillableRowBatchQueue";
+  bool eos = false;
+  RETURN_IF_ERROR(batch_queue_->GetNext(batch, &eos));
+  // Validate that the value of eos is consistent with IsEmpty().
+  DCHECK_EQ(eos, IsEmpty());
+  return Status::OK();
+}
+
+bool SpillableRowBatchQueue::IsFull() const {
+  // The queue is considered full if the number of unpinned bytes is greater than the
+  // max number of unpinned bytes. The queue can only be full after the stream has been
+  // unpinned. The number of unpinned bytes in the stream may exceed the imposed limit
+  // because the entire stream is unpinned at once, without checking against the
+  // max_unpinned_bytes_ limit.
+  DCHECK(!closed_);
+  return batch_queue_->bytes_unpinned() >= max_unpinned_bytes_;
+}
+
+bool SpillableRowBatchQueue::IsEmpty() const {
+  // The batch_queue_ tracks how many rows have been added to the stream (regardless of
+  // whether those rows have already been removed) and how many rows have been read from
+  // the stream. If these values are equal, the queue is considered empty.
+  DCHECK(!closed_);
+  return batch_queue_->num_rows() == batch_queue_->rows_returned();
+}
+
+bool SpillableRowBatchQueue::IsOpen() const {
+  return !closed_;
+}
+
+void SpillableRowBatchQueue::Close() {
+  if (closed_) return;
+  if (batch_queue_ != nullptr) {
+    batch_queue_->Close(nullptr, RowBatch::FlushMode::NO_FLUSH_RESOURCES);
+  }
+  reservation_manager_.Close(state_);
+  closed_ = true;
+}
+} // namespace impala
diff --git a/be/src/runtime/spillable-row-batch-queue.h b/be/src/runtime/spillable-row-batch-queue.h
new file mode 100644
index 0000000..97c60d8
--- /dev/null
+++ b/be/src/runtime/spillable-row-batch-queue.h
@@ -0,0 +1,138 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <queue>
+
+#include "runtime/buffered-tuple-stream.h"
+#include "runtime/reservation-manager.h"
+#include "runtime/row-batch.h"
+
+namespace impala {
+
+class RowBatch;
+
+/// A RowBatchQueue that provides non-blocking queue semantics. RowBatches are stored
+/// inside a BufferedTupleStream. None of the methods block, this class is not thread
+/// safe. The amount of unpinned memory used by the queue can be limited by the
+/// parameter 'max_unpinned_bytes'. Calls to AddBatch after the capacity has been reached
+/// will return an error Status. Calls to GetBatch on an empty queue will return an error
+/// Status.
+///
+/// In order to manage the reservation used by the BufferedTupleStream, this class uses
+/// a ReservationManager that creates the BufferPool::ClientHandle used by the
+/// BufferedTupleStream. The ReservationManager uses a ResourceProfile created by the fe/
+/// to limit the amount of reserved memory used by the stream.
+///
+/// 'name' is a unique name which is purely used for error reporting by the
+/// BufferedTupleStream.
+///
+/// 'max_unpinned_bytes' limits the maximum number of bytes that can be unpinned in the
+/// underlying BufferedTupleStream. The limit is only a soft limit, it
+/// might be exceeded during AddBatch when unpinning the stream.
+///
+/// 'resource_profile' is created in the fe/ by PlanRootSink and specifies the min and max
+/// amount of reserved memory the BufferedTupleStream can use as well as the size of the
+/// default and max page length used by the stream.
+///
+/// The remaining parameters are used to initialize the ReservationManager and
+/// BufferedTupleStream.
+class SpillableRowBatchQueue {
+ public:
+  SpillableRowBatchQueue(const std::string& name, int64_t max_unpinned_bytes,
+      RuntimeState* state, MemTracker* mem_tracker, RuntimeProfile* profile,
+      const RowDescriptor* row_desc, const TBackendResourceProfile& resource_profile,
+      const TDebugOptions& debug_options);
+  ~SpillableRowBatchQueue();
+
+  /// Creates and initializes the ReservationManager and BufferedTupleStream. Returns an
+  /// error Status if either could not be initialized. The ReservationManager may fail
+  /// to initialize if it cannot claim the initial buffer reservation. The
+  /// BufferedTupleStream may fail to initialize if it could not create the read and
+  /// write buffers.
+  Status Open();
+
+  /// Adds the given RowBatch to the queue. Returns Status::OK() if the batch was
+  /// successfully added, returns an error if there was an issue when adding the batch to
+  /// the BufferedTupleStream. If adding the batch to the BufferedTupleStream cannot be
+  /// achieved because there is no more available reserved memory, this method will unpin
+  /// the stream and then add the RowBatch. If the batch still cannot be added, this
+  /// method returns an error Status. It is not valid to call this method if the queue is
+  /// full or closed.
+  Status AddBatch(RowBatch* batch);
+
+  /// Returns and removes the RowBatch at the head of the queue. Returns Status::OK() if
+  /// the batch was successfully read from the queue. It is not valid to call this method
+  /// if the queue is empty or has already been closed.
+  Status GetBatch(RowBatch* batch);
+
+  /// Returns true if the queue limit has been reached, false otherwise. It is not valid
+  /// to call this method if the queue is already closed.
+  bool IsFull() const;
+
+  /// Returns true if the queue is empty, false otherwise. It is not valid to call this
+  /// method if the queue is already closed.
+  bool IsEmpty() const;
+
+  /// Returns false if Close() has been called, true otherwise.
+  bool IsOpen() const;
+
+  /// Resets the remaining RowBatches in the queue and releases the queue memory.
+  void Close();
+
+ private:
+  /// BufferedTupleStream that stores all RowBatches.
+  std::unique_ptr<BufferedTupleStream> batch_queue_;
+
+  /// ReservationManager that manages the reserved memory and BufferPool::ClientHandle
+  /// used by the BufferedTupleStream.
+  ReservationManager reservation_manager_;
+
+  /// A unique name used by the BufferedTupleStream, used purely for debugging purposes.
+  const std::string& name_;
+
+  /// Used to in initialize and manage the BufferedTupleStream and ReservationManager.
+  RuntimeState* state_;
+
+  /// The MemTracker to use in the BufferedTupleStream.
+  MemTracker* mem_tracker_;
+
+  /// Used by the BufferPool::Client created for the BufferedTupleStream.
+  RuntimeProfile* profile_;
+
+  /// Used by the BufferedTupleStream, must match the RowDescriptor of the RowBatches
+  /// stored in the queue.
+  const RowDescriptor* row_desc_;
+
+  /// Used by the ReservationManager to set the min and max reservation that can be
+  /// used by the BufferedTupleStream. Used by the BufferedTupleStream to set the default
+  /// and max page lengths.
+  const TBackendResourceProfile& resource_profile_;
+
+  /// Used by the ReservationManager for the SET_DENY_RESERVATION_PROBABILITY debug
+  /// action.
+  const TDebugOptions& debug_options_;
+
+  /// The max number of bytes that can be unpinned in the BufferedTupleStream. Set by the
+  /// query option MAX_UNPINNED_RESULT_SPOOLING_MEMORY.
+  const int64_t max_unpinned_bytes_;
+
+  /// True if the queue has been closed, false otherwise.
+  bool closed_ = false;
+};
+} // namespace impala
diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc
index 76b6a75..ab0d895 100644
--- a/be/src/service/impala-beeswax-server.cc
+++ b/be/src/service/impala-beeswax-server.cc
@@ -499,6 +499,7 @@ Status ImpalaServer::QueryToTQueryContext(const Query& query,
       RETURN_IF_ERROR(ParseQueryOptions(option, &overlay, &overlay_mask));
     }
     OverlayQueryOptions(overlay, overlay_mask, &query_ctx->client_request.query_options);
+    RETURN_IF_ERROR(ValidateQueryOptions(&overlay));
     set_query_options_mask |= overlay_mask;
   }
 
diff --git a/be/src/service/impala-server.cc b/be/src/service/impala-server.cc
index 9c4c732..b878dbb 100644
--- a/be/src/service/impala-server.cc
+++ b/be/src/service/impala-server.cc
@@ -1050,6 +1050,12 @@ void ImpalaServer::AddPoolConfiguration(TQueryCtx* ctx,
            << " set_pool_mask=" << set_pool_options_mask.to_string()
            << " overlay_mask=" << overlay_mask.to_string();
   OverlayQueryOptions(pool_options, overlay_mask, &ctx->client_request.query_options);
+
+  status = ValidateQueryOptions(&pool_options);
+  if (!status.ok()) {
+    VLOG_QUERY << "Ignoring errors while validating default query options for pool="
+               << resolved_pool << ", message: " << status.GetDetail();
+  }
 }
 
 Status ImpalaServer::Execute(TQueryCtx* query_ctx,
@@ -1515,6 +1521,7 @@ void ImpalaServer::InitializeConfigVariables() {
   QueryOptionsMask set_query_options; // unused
   Status status = ParseQueryOptions(FLAGS_default_query_options,
       &default_query_options_, &set_query_options);
+  status.MergeStatus(ValidateQueryOptions(&default_query_options_));
   if (!status.ok()) {
     // Log error and exit if the default query options are invalid.
     CLEAN_EXIT_WITH_ERROR(Substitute("Invalid default query options. Please check "
diff --git a/be/src/service/query-options-test.cc b/be/src/service/query-options-test.cc
index 715422d..30d7791 100644
--- a/be/src/service/query-options-test.cc
+++ b/be/src/service/query-options-test.cc
@@ -516,3 +516,46 @@ TEST(QueryOptions, CompressionCodec) {
 #undef ENTRIES
 #undef ENTRY
 }
+
+// Tests for setting of MAX_RESULT_SPOOLING_MEM and
+// MAX_SPILLED_RESULT_SPOOLING_MEM. Setting of these options must maintain the
+// condition 'MAX_RESULT_SPOOLING_MEM <= MAX_SPILLED_RESULT_SPOOLING_MEM'.
+// A value of 0 for each of these parameters means the memory is unbounded.
+TEST(QueryOptions, ResultSpooling) {
+  {
+    TQueryOptions options;
+    // Setting the memory to 0 (unbounded) should fail with the default spilled value.
+    options.__set_max_result_spooling_mem(0);
+    EXPECT_FALSE(ValidateQueryOptions(&options).ok());
+    // Setting the spilled memory to 0 (unbounded) should always work.
+    options.__set_max_spilled_result_spooling_mem(0);
+    EXPECT_TRUE(ValidateQueryOptions(&options).ok());
+    // Setting the memory to 0 (unbounded) should work if the spilled memory is unbounded
+    // as well.
+    options.__set_max_result_spooling_mem(0);
+    EXPECT_TRUE(ValidateQueryOptions(&options).ok());
+    // Setting the spilled memory to a bounded value should fail if the memory is bounded.
+    options.__set_max_spilled_result_spooling_mem(1);
+    EXPECT_FALSE(ValidateQueryOptions(&options).ok());
+  }
+
+  {
+    TQueryOptions options;
+    // Setting the spilled memory to a value lower than the memory should fail.
+    options.__set_max_result_spooling_mem(2);
+    EXPECT_TRUE(ValidateQueryOptions(&options).ok());
+    options.__set_max_spilled_result_spooling_mem(1);
+    EXPECT_FALSE(ValidateQueryOptions(&options).ok());
+  }
+
+  {
+    TQueryOptions options;
+    // Setting the memory to a value higher than the spilled memory should fail.
+    options.__set_max_result_spooling_mem(1);
+    EXPECT_TRUE(ValidateQueryOptions(&options).ok());
+    options.__set_max_spilled_result_spooling_mem(2);
+    EXPECT_TRUE(ValidateQueryOptions(&options).ok());
+    options.__set_max_result_spooling_mem(3);
+    EXPECT_FALSE(ValidateQueryOptions(&options).ok());
+  }
+}
diff --git a/be/src/service/query-options.cc b/be/src/service/query-options.cc
index 99b7b27..9b6b91d 100644
--- a/be/src/service/query-options.cc
+++ b/be/src/service/query-options.cc
@@ -808,6 +808,22 @@ Status impala::SetQueryOption(const string& key, const string& value,
         query_options->__set_spool_query_results(IsTrue(value));
         break;
       }
+      case TImpalaQueryOptions::MAX_RESULT_SPOOLING_MEM: {
+        int64_t max_result_spooling_mem;
+        RETURN_IF_ERROR(ParseMemValue(value, "max result spooling memory",
+            &max_result_spooling_mem));
+        query_options->__set_max_result_spooling_mem(
+            max_result_spooling_mem);
+        break;
+      }
+      case TImpalaQueryOptions::MAX_SPILLED_RESULT_SPOOLING_MEM: {
+        int64_t max_spilled_result_spooling_mem;
+        RETURN_IF_ERROR(ParseMemValue(value, "max spilled result spooling memory",
+            &max_spilled_result_spooling_mem));
+        query_options->__set_max_spilled_result_spooling_mem(
+            max_spilled_result_spooling_mem);
+        break;
+      }
       case TImpalaQueryOptions::DEFAULT_TRANSACTIONAL_TYPE: {
         TTransactionalType::type enum_type;
         RETURN_IF_ERROR(GetThriftEnum(value, "default transactional type",
@@ -889,6 +905,24 @@ Status impala::ParseQueryOptions(const string& options, TQueryOptions* query_opt
   return Status::OK();
 }
 
+Status impala::ValidateQueryOptions(TQueryOptions* query_options) {
+  // Validate that max_result_spooling_mem <=
+  // max_spilled_result_spooling_mem (a value of 0 means memory is unbounded).
+  int64_t max_mem = query_options->max_result_spooling_mem;
+  int64_t max_spilled_mem = query_options->max_spilled_result_spooling_mem;
+  if (max_mem == 0 && max_spilled_mem != 0) {
+    return Status("If max_result_spooling_mem is set to 0 (unbounded) "
+                  "max_spilled_result_spooling_mem must be set to 0 (unbounded) as "
+                  "well.");
+  }
+  if (max_spilled_mem != 0 && max_spilled_mem < max_mem) {
+    return Status(Substitute("max_spilled_result_spooling_mem '$0' must be greater than "
+                             "max_result_spooling_mem '$1'",
+        max_spilled_mem, max_mem));
+  }
+  return Status::OK();
+}
+
 void impala::PopulateQueryOptionLevels(QueryOptionLevels* query_option_levels)
 {
 #define QUERY_OPT_FN(NAME, ENUM, LEVEL)\
diff --git a/be/src/service/query-options.h b/be/src/service/query-options.h
index e388f78..72c9f81 100644
--- a/be/src/service/query-options.h
+++ b/be/src/service/query-options.h
@@ -47,7 +47,7 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
 // time we add or remove a query option to/from the enum TImpalaQueryOptions.
 #define QUERY_OPTS_TABLE\
   DCHECK_EQ(_TImpalaQueryOptions_VALUES_TO_NAMES.size(),\
-      TImpalaQueryOptions::DISABLE_DATA_CACHE + 1);\
+      TImpalaQueryOptions::MAX_SPILLED_RESULT_SPOOLING_MEM + 1);\
   REMOVED_QUERY_OPT_FN(abort_on_default_limit_exceeded, ABORT_ON_DEFAULT_LIMIT_EXCEEDED)\
   QUERY_OPT_FN(abort_on_error, ABORT_ON_ERROR, TQueryOptionLevel::REGULAR)\
   REMOVED_QUERY_OPT_FN(allow_unsupported_formats, ALLOW_UNSUPPORTED_FORMATS)\
@@ -180,7 +180,11 @@ typedef std::unordered_map<string, beeswax::TQueryOptionLevel::type>
   QUERY_OPT_FN(max_statement_length_bytes, MAX_STATEMENT_LENGTH_BYTES,\
       TQueryOptionLevel::REGULAR)\
   QUERY_OPT_FN(disable_data_cache, DISABLE_DATA_CACHE,\
-      TQueryOptionLevel::ADVANCED)
+      TQueryOptionLevel::ADVANCED)\
+  QUERY_OPT_FN(max_result_spooling_mem, MAX_RESULT_SPOOLING_MEM,\
+      TQueryOptionLevel::DEVELOPMENT)\
+  QUERY_OPT_FN(max_spilled_result_spooling_mem, MAX_SPILLED_RESULT_SPOOLING_MEM,\
+      TQueryOptionLevel::DEVELOPMENT)
   ;
 
 /// Enforce practical limits on some query options to avoid undesired query state.
@@ -221,6 +225,12 @@ void OverlayQueryOptions(const TQueryOptions& src, const QueryOptionsMask& mask,
 Status SetQueryOption(const std::string& key, const std::string& value,
     TQueryOptions* query_options, QueryOptionsMask* set_query_options_mask);
 
+/// Validates the query options after they have all been set. Returns a Status indicating
+/// the results of running the validation rules. The majority of the query options
+/// validation is done in SetQueryOption. However, more complex validations rules (e.g.
+/// validating that one config is greater than another config) are run here.
+Status ValidateQueryOptions(TQueryOptions* query_options);
+
 /// Parse a "," separated key=value pair of query options and set it in 'query_options'.
 /// If the same query option is specified more than once, the last one wins. The
 /// set_query_options_mask bitmask is updated to reflect the query options which were
diff --git a/common/thrift/CMakeLists.txt b/common/thrift/CMakeLists.txt
index 958cfac..eb9489d 100644
--- a/common/thrift/CMakeLists.txt
+++ b/common/thrift/CMakeLists.txt
@@ -228,6 +228,7 @@ set (SRC_FILES
   Planner.thrift
   Partitions.thrift
   parquet.thrift
+  ResourceProfile.thrift
   Results.thrift
   RuntimeProfile.thrift
   StatestoreService.thrift
diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift
index 1cce0eb..3c26c20 100644
--- a/common/thrift/DataSinks.thrift
+++ b/common/thrift/DataSinks.thrift
@@ -23,6 +23,7 @@ include "Exprs.thrift"
 include "Types.thrift"
 include "Descriptors.thrift"
 include "Partitions.thrift"
+include "ResourceProfile.thrift"
 
 enum TDataSinkType {
   DATA_STREAM_SINK = 0
@@ -100,6 +101,10 @@ struct TJoinBuildSink {
   2: required list<Exprs.TExpr> build_exprs
 }
 
+struct TPlanRootSink {
+  1: required ResourceProfile.TBackendResourceProfile resource_profile
+}
+
 // Union type of all table sinks.
 struct TTableSink {
   1: required Types.TTableId target_table_id
@@ -115,10 +120,11 @@ struct TDataSink {
   2: optional TDataStreamSink stream_sink
   3: optional TTableSink table_sink
   4: optional TJoinBuildSink join_build_sink
+  5: optional TPlanRootSink plan_root_sink
 
   // A human-readable label for the sink.
-  5: optional string label
+  6: optional string label
 
   // Estimated execution stats generated by the planner.
-  6: optional ExecStats.TExecStats estimated_stats
+  7: optional ExecStats.TExecStats estimated_stats
 }
diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift
index 1d1cd21..3dbc2c8 100644
--- a/common/thrift/ImpalaInternalService.thrift
+++ b/common/thrift/ImpalaInternalService.thrift
@@ -382,6 +382,12 @@ struct TQueryOptions {
 
   // If true, skip using the data cache for this query session.
   90: optional bool disable_data_cache = false;
+
+  // See comment in ImpalaService.thrift
+  91: optional i64 max_result_spooling_mem = 104857600;
+
+  // See comment in ImpalaService.thrift
+  92: optional i64 max_spilled_result_spooling_mem = 1073741824;
 }
 
 // Impala currently has two types of sessions: Beeswax and HiveServer2
diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift
index 5ef5a4b..f5c979f 100644
--- a/common/thrift/ImpalaService.thrift
+++ b/common/thrift/ImpalaService.thrift
@@ -408,7 +408,7 @@ enum TImpalaQueryOptions {
   // Enable spooling of query results. If true, query results will be spooled in
   // memory up to a specified memory limit. If the memory limit is hit, the
   // coordinator fragment will block until the client has consumed enough rows to free
-  // up more memory. If false, client consumption driven backpressure controls the rate
+  // up more memory. If false, client consumption driven back-pressure controls the rate
   // at which rows are materialized by the execution tree.
   SPOOL_QUERY_RESULTS = 85
 
@@ -432,6 +432,22 @@ enum TImpalaQueryOptions {
 
   // Disable the data cache.
   DISABLE_DATA_CACHE = 89
+
+  // The maximum amount of memory used when spooling query results. If this value is
+  // exceeded when spooling results, all memory will be unpinned and most likely spilled
+  // to disk. Set to 100 MB by default. Only applicable if SPOOL_QUERY_RESULTS
+  // is true. Setting this to 0 or -1 means the memory is unbounded. Cannot be set to
+  // values below -1.
+  MAX_RESULT_SPOOLING_MEM = 90
+
+  // The maximum amount of memory that can be spilled when spooling query results. Must be
+  // greater than or equal to MAX_RESULT_SPOOLING_MEM to allow unpinning all pinned memory
+  // if the amount of spooled results exceeds MAX_RESULT_SPOOLING_MEM. If this value is
+  // exceeded, the coordinator fragment will block until the client has consumed enough
+  // rows to free up more memory. Set to 1 GB by default. Only applicable if
+  // SPOOL_QUERY_RESULTS is true. Setting this to 0 or -1 means the memory is unbounded.
+  // Cannot be set to values below -1.
+  MAX_SPILLED_RESULT_SPOOLING_MEM = 91
 }
 
 // The summary of a DML statement.
diff --git a/common/thrift/PlanNodes.thrift b/common/thrift/PlanNodes.thrift
index eb780e2..b308a72 100644
--- a/common/thrift/PlanNodes.thrift
+++ b/common/thrift/PlanNodes.thrift
@@ -29,6 +29,7 @@ include "ExecStats.thrift"
 include "Exprs.thrift"
 include "Types.thrift"
 include "ExternalDataSource.thrift"
+include "ResourceProfile.thrift"
 
 enum TPlanNodeType {
   HDFS_SCAN_NODE = 0
@@ -371,25 +372,6 @@ struct TNestedLoopJoinNode {
   2: optional list<Exprs.TExpr> join_conjuncts
 }
 
-// This contains all of the information computed by the plan as part of the resource
-// profile that is needed by the backend to execute.
-struct TBackendResourceProfile {
-  // The minimum reservation for this plan node in bytes.
-  1: required i64 min_reservation
-
-  // The maximum reservation for this plan node in bytes. MAX_INT64 means effectively
-  // unlimited.
-  2: required i64 max_reservation
-
-  // The spillable buffer size in bytes to use for this node, chosen by the planner.
-  // Set iff the node uses spillable buffers.
-  3: optional i64 spillable_buffer_size
-
-  // The buffer size in bytes that is large enough to fit the largest row to be processed.
-  // Set if the node allocates buffers for rows from the buffer pool.
-  4: optional i64 max_row_buffer_size
-}
-
 struct TAggregator {
   1: optional list<Exprs.TExpr> grouping_exprs
   // aggregate exprs. The root of each expr is the aggregate function. The
@@ -410,7 +392,7 @@ struct TAggregator {
   // Set to true to use the streaming preagg algorithm. Node must be a preaggregation.
   6: required bool use_streaming_preaggregation
 
-  7: required TBackendResourceProfile resource_profile
+  7: required ResourceProfile.TBackendResourceProfile resource_profile
 }
 
 struct TAggregationNode {
@@ -632,7 +614,7 @@ struct TPlanNode {
   24: optional list<TRuntimeFilterDesc> runtime_filters
 
   // Resource profile for this plan node.
-  25: required TBackendResourceProfile resource_profile
+  25: required ResourceProfile.TBackendResourceProfile resource_profile
 
   26: optional TCardinalityCheckNode cardinality_check_node
 }
diff --git a/common/thrift/ResourceProfile.thrift b/common/thrift/ResourceProfile.thrift
new file mode 100644
index 0000000..be76edd
--- /dev/null
+++ b/common/thrift/ResourceProfile.thrift
@@ -0,0 +1,40 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Resource Profiles are computed by the planner. They are passed to backend ExecNodes
+// and DataSinks to configure their memory usage. See ResourceProfile.java for more
+// details.
+
+namespace cpp impala
+namespace java org.apache.impala.thrift
+
+struct TBackendResourceProfile {
+  // The minimum reservation for this plan node in bytes.
+  1: required i64 min_reservation
+
+  // The maximum reservation for this plan node in bytes. MAX_INT64 means effectively
+  // unlimited.
+  2: required i64 max_reservation
+
+  // The spillable buffer size in bytes to use for this node, chosen by the planner.
+  // Set iff the node uses spillable buffers.
+  3: optional i64 spillable_buffer_size
+
+  // The buffer size in bytes that is large enough to fit the largest row to be processed.
+  // Set if the node allocates buffers for rows from the buffer pool.
+  4: optional i64 max_row_buffer_size
+}
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 9dee6e2..8dd5b06 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -320,8 +320,8 @@ error_codes = (
    "Kudu table '$0' column '$1' contains an out of range timestamp. "
    "The valid date range is 1400-01-01..9999-12-31."),
 
-  ("MAX_ROW_SIZE", 104, "Row of size $0 could not be materialized in plan node with "
-    "id $1. Increase the max_row_size query option (currently $2) to process larger rows."),
+  ("MAX_ROW_SIZE", 104, "Row of size $0 could not be materialized by $1. Increase the "
+   "max_row_size query option (currently $2) to process larger rows."),
 
   ("IR_VERIFY_FAILED", 105,
    "Failed to verify generated IR function $0, see log for more details."),
diff --git a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
index 64abb78..f3f44b4 100644
--- a/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
+++ b/fe/src/main/java/org/apache/impala/planner/PlanRootSink.java
@@ -20,8 +20,10 @@ package org.apache.impala.planner;
 import org.apache.impala.thrift.TDataSink;
 import org.apache.impala.thrift.TDataSinkType;
 import org.apache.impala.thrift.TExplainLevel;
+import org.apache.impala.thrift.TPlanRootSink;
 import org.apache.impala.thrift.TQueryOptions;
 
+
 /**
  * Sink for the root of a query plan that produces result rows. Allows coordination
  * between the sender which produces those rows, and the consumer which sends them to the
@@ -29,6 +31,11 @@ import org.apache.impala.thrift.TQueryOptions;
  */
 public class PlanRootSink extends DataSink {
 
+  // The default estimated memory consumption is 10 mb. Only used if statistics are not
+  // available. 10 mb should be sufficient to buffer results from most queries. See
+  // IMPALA-4268 for details on how this value was chosen.
+  private static final long DEFAULT_RESULT_SPOOLING_ESTIMATED_MEMORY = 10 * 1024 * 1024;
+
   @Override
   public void appendSinkExplainString(String prefix, String detailPrefix,
       TQueryOptions queryOptions, TExplainLevel explainLevel, StringBuilder output) {
@@ -40,14 +47,60 @@ public class PlanRootSink extends DataSink {
     return "ROOT";
   }
 
+  /**
+   * Computes and sets the {@link ResourceProfile} for this PlanRootSink. If result
+   * spooling is disabled, a ResourceProfile is returned with no reservation or buffer
+   * sizes, and the estimated memory consumption is 0. Without result spooling, no rows
+   * get buffered, and only a single RowBatch is passed to the client at a time. Given
+   * that RowBatch memory is currently unreserved, no reservation is necessary. If
+   * SPOOL_QUERY_RESULTS is true, then the ResourceProfile sets a min/max resevation,
+   * estimated memory consumption, max buffer size, and spillable buffer size. The
+   * 'memEstimateBytes' (estimated memory consumption in bytes) is set by taking the
+   * estimated number of input rows into the sink and multiplying it by the estimated
+   * average row size. The estimated number of input rows is derived from the cardinality
+   * of the associated fragment's root node. If the cardinality or the average row size
+   * are not available, a default value is used. The minimum reservation is set 2x the
+   * default spillable buffer size to account for the read and write page in the
+   * BufferedTupleStream used by the backend plan-root-sink. The maximum reservation is
+   * set to the query-level config MAX_PINNED_RESULT_SPOOLING_MEMORY.
+   */
   @Override
   public void computeResourceProfile(TQueryOptions queryOptions) {
-    // TODO: add a memory estimate
-    resourceProfile_ = ResourceProfile.noReservation(0);
+    if (queryOptions.isSpool_query_results()) {
+      long bufferSize = queryOptions.getDefault_spillable_buffer_size();
+      long maxRowBufferSize = PlanNode.computeMaxSpillableBufferSize(
+          bufferSize, queryOptions.getMax_row_size());
+      long minMemReservationBytes = 2 * bufferSize;
+      long maxMemReservationBytes = Math.max(
+          queryOptions.getMax_result_spooling_mem(), minMemReservationBytes);
+
+      PlanNode inputNode = fragment_.getPlanRoot();
+
+      long memEstimateBytes;
+      if (inputNode.getCardinality() == -1 || inputNode.getAvgRowSize() == -1) {
+        memEstimateBytes = DEFAULT_RESULT_SPOOLING_ESTIMATED_MEMORY;
+      } else {
+        long inputCardinality = Math.max(1L, inputNode.getCardinality());
+        memEstimateBytes = (long) Math.ceil(inputCardinality * inputNode.getAvgRowSize());
+      }
+      memEstimateBytes = Math.min(memEstimateBytes, maxMemReservationBytes);
+
+      resourceProfile_ = new ResourceProfileBuilder()
+                             .setMemEstimateBytes(memEstimateBytes)
+                             .setMinMemReservationBytes(minMemReservationBytes)
+                             .setMaxMemReservationBytes(maxMemReservationBytes)
+                             .setMaxRowBufferBytes(maxRowBufferSize)
+                             .setSpillableBufferBytes(bufferSize)
+                             .build();
+    } else {
+      resourceProfile_ = ResourceProfile.noReservation(0);
+    }
   }
 
   @Override
   protected void toThriftImpl(TDataSink tsink) {
+    TPlanRootSink tPlanRootSink = new TPlanRootSink(resourceProfile_.toThrift());
+    tsink.setPlan_root_sink(tPlanRootSink);
   }
 
   @Override
diff --git a/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java b/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java
index 5cf4d87..4d01b1e 100644
--- a/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java
+++ b/fe/src/main/java/org/apache/impala/planner/ResourceProfile.java
@@ -33,10 +33,8 @@ public class ResourceProfile {
   private final boolean isValid_;
 
   // Estimated memory consumption in bytes. Guaranteed to be >= minReservationBytes_ if
-  // both are set (the constructor ensures this).
-  // TODO: IMPALA-5013: currently we are inconsistent about how these estimates are
-  // derived or what they mean. Re-evaluate what they mean and either deprecate or
-  // fix them.
+  // both are set and guaranteed to be <= maxMemReservationBytes_ if both are set (the
+  // constructor ensures both of these conditions).
   private final long memEstimateBytes_;
 
   // Minimum memory reservation required to execute in bytes.
@@ -71,6 +69,7 @@ public class ResourceProfile {
     Preconditions.checkArgument(maxRowBufferBytes == -1
         || LongMath.isPowerOfTwo(maxRowBufferBytes));
     Preconditions.checkArgument(!isValid || threadReservation >= 0, threadReservation);
+    Preconditions.checkArgument(maxMemReservationBytes >= minMemReservationBytes);
     isValid_ = isValid;
     memEstimateBytes_ = (minMemReservationBytes != -1) ?
         Math.max(memEstimateBytes, minMemReservationBytes) : memEstimateBytes;
diff --git a/fe/src/main/java/org/apache/impala/planner/ResourceProfileBuilder.java b/fe/src/main/java/org/apache/impala/planner/ResourceProfileBuilder.java
index 394e5dc..60c231b 100644
--- a/fe/src/main/java/org/apache/impala/planner/ResourceProfileBuilder.java
+++ b/fe/src/main/java/org/apache/impala/planner/ResourceProfileBuilder.java
@@ -50,7 +50,12 @@ public class ResourceProfileBuilder {
    */
   public ResourceProfileBuilder setMinMemReservationBytes(long minMemReservationBytes) {
     minMemReservationBytes_ = minMemReservationBytes;
-    maxMemReservationBytes_ = Long.MAX_VALUE;
+    if (maxMemReservationBytes_ == 0) maxMemReservationBytes_ = Long.MAX_VALUE;
+    return this;
+  }
+
+  public ResourceProfileBuilder setMaxMemReservationBytes(long maxMemReservationBytes) {
+    maxMemReservationBytes_ = maxMemReservationBytes;
     return this;
   }
 
diff --git a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
index 5cbcc37..606dbbe 100644
--- a/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
+++ b/fe/src/test/java/org/apache/impala/planner/PlannerTest.java
@@ -937,4 +937,20 @@ public class PlannerTest extends PlannerTestBase {
     runPlannerTestFile(
         "scan-node-fs-scheme", ImmutableSet.of(PlannerTestOption.VALIDATE_SCAN_FS));
   }
+
+  /**
+   * Validate the resource requirements of the PLAN-ROOT SINK when result spooling is
+   * enabled.
+   */
+  @Test
+  public void testResultSpooling() {
+    TQueryOptions options = defaultQueryOptions();
+    options.setExplain_level(TExplainLevel.EXTENDED);
+    options.setSpool_query_results(true);
+    options.setNum_scanner_threads(1); // Required so that output doesn't vary by machine
+    runPlannerTestFile(
+        "result-spooling", options, ImmutableSet.of(PlannerTestOption.EXTENDED_EXPLAIN,
+                                        PlannerTestOption.INCLUDE_RESOURCE_HEADER,
+                                        PlannerTestOption.VALIDATE_RESOURCES));
+  }
 }
diff --git a/testdata/workloads/functional-planner/queries/PlannerTest/result-spooling.test b/testdata/workloads/functional-planner/queries/PlannerTest/result-spooling.test
new file mode 100644
index 0000000..6c29f59
--- /dev/null
+++ b/testdata/workloads/functional-planner/queries/PlannerTest/result-spooling.test
@@ -0,0 +1,135 @@
+# Validate the minimum memory reservation for PLAN-ROOT SINK is bounded by the spillable
+# buffer size.
+select * from functional.alltypes order by id limit 10
+---- DISTRIBUTEDPLAN
+Max Per-Host Resource Reservation: Memory=4.03MB Threads=3
+Per-Host Resource Estimates: Memory=20MB
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+PLAN-ROOT SINK
+|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|
+02:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: id ASC
+|  limit: 10
+|  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=1 row-size=89B cardinality=10
+|  in pipelines: 01(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=16.00MB mem-reservation=32.00KB thread-reservation=2
+01:TOP-N [LIMIT=10]
+|  order by: id ASC
+|  mem-estimate=890B mem-reservation=0B thread-reservation=0
+|  tuple-ids=1 row-size=89B cardinality=10
+|  in pipelines: 01(GETNEXT), 00(OPEN)
+|
+00:SCAN HDFS [functional.alltypes, RANDOM]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   stored statistics:
+     table: rows=7.30K size=478.45KB
+     partitions: 24/24 rows=7.30K
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=310
+   mem-estimate=16.00MB mem-reservation=32.00KB thread-reservation=1
+   tuple-ids=0 row-size=89B cardinality=7.30K
+   in pipelines: 00(GETNEXT)
+---- PARALLELPLANS
+Max Per-Host Resource Reservation: Memory=4.06MB Threads=3
+Per-Host Resource Estimates: Memory=36MB
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=4.02MB mem-reservation=4.00MB thread-reservation=1
+PLAN-ROOT SINK
+|  mem-estimate=4.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|
+02:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: id ASC
+|  limit: 10
+|  mem-estimate=16.00KB mem-reservation=0B thread-reservation=0
+|  tuple-ids=1 row-size=89B cardinality=10
+|  in pipelines: 01(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
+Per-Host Resources: mem-estimate=32.00MB mem-reservation=64.00KB thread-reservation=2
+01:TOP-N [LIMIT=10]
+|  order by: id ASC
+|  mem-estimate=890B mem-reservation=0B thread-reservation=0
+|  tuple-ids=1 row-size=89B cardinality=10
+|  in pipelines: 01(GETNEXT), 00(OPEN)
+|
+00:SCAN HDFS [functional.alltypes, RANDOM]
+   HDFS partitions=24/24 files=24 size=478.45KB
+   stored statistics:
+     table: rows=7.30K size=478.45KB
+     partitions: 24/24 rows=7.30K
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=310
+   mem-estimate=16.00MB mem-reservation=32.00KB thread-reservation=0
+   tuple-ids=0 row-size=89B cardinality=7.30K
+   in pipelines: 00(GETNEXT)
+====
+# Validate that the maximum memory reservation for PLAN-ROOT SINK is bounded by
+# MAX_PINNED_RESULT_SPOOLING_MEMORY.
+select * from tpch.lineitem order by l_orderkey
+---- DISTRIBUTEDPLAN
+Max Per-Host Resource Reservation: Memory=24.00MB Threads=3
+Per-Host Resource Estimates: Memory=257MB
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=130.69MB mem-reservation=4.00MB thread-reservation=1
+PLAN-ROOT SINK
+|  mem-estimate=100.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|
+02:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: l_orderkey ASC
+|  mem-estimate=30.69MB mem-reservation=0B thread-reservation=0
+|  tuple-ids=1 row-size=231B cardinality=6.00M
+|  in pipelines: 01(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=3
+Per-Host Resources: mem-estimate=126.00MB mem-reservation=20.00MB thread-reservation=2
+01:SORT
+|  order by: l_orderkey ASC
+|  mem-estimate=38.00MB mem-reservation=12.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=1 row-size=231B cardinality=6.00M
+|  in pipelines: 01(GETNEXT), 00(OPEN)
+|
+00:SCAN HDFS [tpch.lineitem, RANDOM]
+   HDFS partitions=1/1 files=1 size=718.94MB
+   stored statistics:
+     table: rows=6.00M size=718.94MB
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=1.07M
+   mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=1
+   tuple-ids=0 row-size=231B cardinality=6.00M
+   in pipelines: 00(GETNEXT)
+---- PARALLELPLANS
+Max Per-Host Resource Reservation: Memory=44.00MB Threads=3
+Per-Host Resource Estimates: Memory=413MB
+F01:PLAN FRAGMENT [UNPARTITIONED] hosts=1 instances=1
+|  Per-Host Resources: mem-estimate=161.38MB mem-reservation=4.00MB thread-reservation=1
+PLAN-ROOT SINK
+|  mem-estimate=100.00MB mem-reservation=4.00MB spill-buffer=2.00MB thread-reservation=0
+|
+02:MERGING-EXCHANGE [UNPARTITIONED]
+|  order by: l_orderkey ASC
+|  mem-estimate=61.38MB mem-reservation=0B thread-reservation=0
+|  tuple-ids=1 row-size=231B cardinality=6.00M
+|  in pipelines: 01(GETNEXT)
+|
+F00:PLAN FRAGMENT [RANDOM] hosts=3 instances=6
+Per-Host Resources: mem-estimate=252.00MB mem-reservation=40.00MB thread-reservation=2
+01:SORT
+|  order by: l_orderkey ASC
+|  mem-estimate=38.00MB mem-reservation=12.00MB spill-buffer=2.00MB thread-reservation=0
+|  tuple-ids=1 row-size=231B cardinality=6.00M
+|  in pipelines: 01(GETNEXT), 00(OPEN)
+|
+00:SCAN HDFS [tpch.lineitem, RANDOM]
+   HDFS partitions=1/1 files=1 size=718.94MB
+   stored statistics:
+     table: rows=6.00M size=718.94MB
+     columns: all
+   extrapolated-rows=disabled max-scan-range-rows=1.07M
+   mem-estimate=88.00MB mem-reservation=8.00MB thread-reservation=0
+   tuple-ids=0 row-size=231B cardinality=6.00M
+   in pipelines: 00(GETNEXT)
diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling-large-rows.test b/testdata/workloads/functional-query/queries/QueryTest/spilling-large-rows.test
index cdd6f21..f1db129 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/spilling-large-rows.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/spilling-large-rows.test
@@ -15,7 +15,7 @@ select id, count(distinct bigstr)
 from bigstrs
 group by id
 ---- CATCH
-Row of size 9.54 MB could not be materialized in plan node
+Row of size 9.54 MB could not be materialized by AGGREGATION_NODE
 ====
 ---- QUERY
 # Agg should be able to process the large strings if we increase the row size.
@@ -60,7 +60,7 @@ from functional.alltypes atp
   join bigstrs bs on repeat(atp.string_col, 10000) = substring(bs.bigstr, 5000000, 10000) and atp.id = bs.id
 where atp.id < 100
 ---- CATCH
-Row of size 9.54 MB could not be materialized in plan node with id 2. Increase the max_row_size query option (currently 512.00 KB) to process larger rows.
+Row of size 9.54 MB could not be materialized by HASH_JOIN_NODE (id=2). Increase the max_row_size query option (currently 512.00 KB) to process larger rows.
 ====
 ---- QUERY
 # Row is too big to process in right side of hash join.
@@ -104,7 +104,7 @@ select id, substr(bigstr, 1, 5)
 from bigstrs
 order by bigstr, id
 ---- CATCH
-Row of size 9.54 MB could not be materialized in plan node with id 1. Increase the max_row_size query option (currently 512.00 KB) to process larger rows.
+Row of size 9.54 MB could not be materialized by SORT_NODE (id=1). Increase the max_row_size query option (currently 512.00 KB) to process larger rows.
 ====
 ---- QUERY
 # Sort should be able to process the large strings if we increase the row size.
@@ -143,7 +143,7 @@ FROM (
   ) a
 ORDER BY id
 ---- CATCH
-Row of size 9.54 MB could not be materialized in plan node with id 1. Increase the max_row_size query option (currently 512.00 KB) to process larger rows.
+Row of size 9.54 MB could not be materialized by SORT_NODE (id=1). Increase the max_row_size query option (currently 512.00 KB) to process larger rows.
 ====
 ---- QUERY
 # Sort and analytic should be able to process the large strings if we increase the row
diff --git a/tests/query_test/test_result_spooling.py b/tests/query_test/test_result_spooling.py
index 0ce541c..8022bab 100644
--- a/tests/query_test/test_result_spooling.py
+++ b/tests/query_test/test_result_spooling.py
@@ -15,16 +15,21 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import re
+import time
+import threading
+
 from time import sleep
+from tests.common.errors import Timeout
 from tests.common.impala_test_suite import ImpalaTestSuite
 from tests.common.test_vector import ImpalaTestDimension
 from tests.util.cancel_util import cancel_query_and_validate_state
 
 # Queries to execute, use the TPC-H dataset because tables are large so queries take some
 # time to execute.
-CANCELLATION_QUERIES = ['select l_returnflag from tpch_parquet.lineitem',
-                        'select * from tpch_parquet.lineitem limit 50',
-                        'select * from tpch_parquet.lineitem order by l_orderkey']
+CANCELLATION_QUERIES = ["select l_returnflag from tpch_parquet.lineitem",
+                        "select * from tpch_parquet.lineitem limit 50",
+                        "select * from tpch_parquet.lineitem order by l_orderkey"]
 
 # Time to sleep between issuing query and canceling.
 CANCEL_DELAY_IN_SECONDS = [0, 0.01, 0.1, 1, 4]
@@ -34,6 +39,8 @@ class TestResultSpooling(ImpalaTestSuite):
   @classmethod
   def add_test_dimensions(cls):
     super(TestResultSpooling, cls).add_test_dimensions()
+    # Result spooling should be independent of file format, so only test against
+    # Parquet files.
     cls.ImpalaTestMatrix.add_constraint(lambda v:
         v.get_value('table_format').file_format == 'parquet')
 
@@ -48,10 +55,126 @@ class TestResultSpooling(ImpalaTestSuite):
     """Validates that reading multiple row batches works when result spooling is
     enabled."""
     vector.get_value('exec_option')['batch_size'] = 10
-    self.validate_query("select id from functional_parquet.alltypes order by id "
-                        "limit 1000", vector.get_value('exec_option'))
+    self.__validate_query("select id from functional_parquet.alltypes order by id "
+                          "limit 1000", vector.get_value('exec_option'))
+
+  def test_spilling(self, vector):
+    """Tests that query results which don't fully fit into memory are spilled to disk.
+    The test runs a query asynchronously and wait for the PeakUnpinnedBytes counter in
+    the PLAN_ROOT_SINK section of the runtime profile to reach a non-zero value. Then
+    it fetches all the results and validates them."""
+    query = "select * from functional.alltypes order by id limit 1500"
+    exec_options = vector.get_value('exec_option')
+
+    # Set lower values for spill-to-disk configs to force the above query to spill
+    # spooled results.
+    exec_options['min_spillable_buffer_size'] = 8 * 1024
+    exec_options['default_spillable_buffer_size'] = 8 * 1024
+    exec_options['max_result_spooling_mem'] = 32 * 1024
+
+    # Execute the query without result spooling and save the results for later validation
+    base_result = self.execute_query(query, exec_options)
+    assert base_result.success, "Failed to run {0} when result spooling is disabled" \
+                                .format(query)
+
+    exec_options['spool_query_results'] = 'true'
+
+    # Amount of time to wait for the PeakUnpinnedBytes counter in the PLAN_ROOT_SINK
+    # section of the profile to reach a non-zero value.
+    timeout = 10
+
+    # Regexes to look for in the runtime profiles.
+    # PeakUnpinnedBytes can show up in exec nodes as well, so we only look for the
+    # PeakUnpinnedBytes metrics in the PLAN_ROOT_SINK section of the profile.
+    unpinned_bytes_regex = "PLAN_ROOT_SINK[\s\S]*?PeakUnpinnedBytes.*\([1-9][0-9]*\)"
+    # The PLAN_ROOT_SINK should have 'Spilled' in the 'ExecOption' info string.
+    spilled_exec_option_regex = "ExecOption:.*Spilled"
+
+    # Fetch the runtime profile every 0.5 seconds until either the timeout is hit, or
+    # PeakUnpinnedBytes shows up in the profile.
+    start_time = time.time()
+    handle = self.execute_query_async(query, exec_options)
+    try:
+      while re.search(unpinned_bytes_regex, self.client.get_runtime_profile(handle)) \
+          is None and time.time() - start_time < timeout:
+        time.sleep(0.5)
+      profile = self.client.get_runtime_profile(handle)
+      if re.search(unpinned_bytes_regex, profile) is None:
+        raise Timeout("Query {0} did not spill spooled results within the timeout {1}"
+                      .format(query, timeout))
+      # At this point PLAN_ROOT_SINK must have spilled, so spilled_exec_option_regex
+      # should be in the profile as well.
+      assert re.search(spilled_exec_option_regex, profile)
+      result = self.client.fetch(query, handle)
+      assert result.data == base_result.data
+    finally:
+      self.client.close_query(handle)
+
+  def test_full_queue(self, vector):
+    """Tests result spooling when there is no more space to buffer query results (the
+    queue is full), and the client hasn't fetched any results. Validates that
+    RowBatchSendWaitTime (amount of time Impala blocks waiting for the client to read
+    buffered results and clear up space in the queue) is updated properly."""
+    query = "select * from functional.alltypes order by id limit 1500"
+    exec_options = vector.get_value('exec_option')
+
+    # Set lower values for spill-to-disk and result spooling configs so that the queue
+    # gets full when selecting a small number of rows.
+    exec_options['min_spillable_buffer_size'] = 8 * 1024
+    exec_options['default_spillable_buffer_size'] = 8 * 1024
+    exec_options['max_result_spooling_mem'] = 32 * 1024
+    exec_options['max_spilled_result_spooling_mem'] = 32 * 1024
+    exec_options['spool_query_results'] = 'true'
+
+    # Amount of time to wait for the query to reach a running state before through a
+    # Timeout exception.
+    timeout = 10
+    # Regex to look for in the runtime profile.
+    send_wait_time_regex = "RowBatchSendWaitTime: [1-9]"
+
+    # Execute the query asynchronously, wait a bit for the result spooling queue to fill
+    # up, start fetching results, and then validate that RowBatchSendWaitTime shows a
+    # non-zero value in the profile.
+    handle = self.execute_query_async(query, exec_options)
+    try:
+      self.wait_for_any_state(handle, [self.client.QUERY_STATES['RUNNING'],
+          self.client.QUERY_STATES['FINISHED']], timeout)
+      time.sleep(5)
+      self.client.fetch(query, handle)
+      assert re.search(send_wait_time_regex, self.client.get_runtime_profile(handle)) \
+          is not None
+    finally:
+      self.client.close_query(handle)
+
+  def test_slow_query(self, vector):
+    """Tests results spooling when the client is blocked waiting for Impala to add more
+    results to the queue. Validates that RowBatchGetWaitTime (amount of time the client
+    spends waiting for Impala to buffer query results) is updated properly."""
+    query = "select id from functional.alltypes order by id limit 10"
+
+    # Add a delay to the EXCHANGE_NODE in the query above to simulate a "slow" query. The
+    # delay should give the client enough time to issue a fetch request and block until
+    # Impala produces results.
+    vector.get_value('exec_option')['debug_action'] = '2:GETNEXT:DELAY'
+    vector.get_value('exec_option')['spool_query_results'] = 'true'
+
+    # Regex to look for in the runtime profile.
+    get_wait_time_regex = "RowBatchGetWaitTime: [1-9]"
+
+    # Execute the query, start a thread to fetch results, wait for the query to finish,
+    # and then validate that RowBatchGetWaitTime shows a non-zero value in the profile.
+    handle = self.execute_query_async(query, vector.get_value('exec_option'))
+    try:
+      thread = threading.Thread(target=lambda:
+          self.create_impala_client().fetch(query, handle))
+      thread.start()
+      self.wait_for_state(handle, self.client.QUERY_STATES['FINISHED'], 10)
+      assert re.search(get_wait_time_regex, self.client.get_runtime_profile(handle)) \
+          is not None
+    finally:
+      self.client.close_query(handle)
 
-  def validate_query(self, query, exec_options):
+  def __validate_query(self, query, exec_options):
     """Compares the results of the given query with and without result spooling
     enabled."""
     exec_options = exec_options.copy()
@@ -114,6 +237,6 @@ class TestResultSpoolingCancellation(ImpalaTestSuite):
       sleep(vector.get_value('cancel_delay'))
       cancel_result = self.client.cancel(handle)
       assert cancel_result.status_code == 0,\
-          'Unexpected status code from cancel request: {0}'.format(cancel_result)
+          "Unexpected status code from cancel request: {0}".format(cancel_result)
     finally:
       if handle: self.client.close_query(handle)