You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by st...@apache.org on 2019/08/01 20:16:16 UTC

[impala] 01/05: IMPALA-8779, IMPALA-8780: RowBatchQueue re-factoring and BufferedPRS impl

This is an automated email from the ASF dual-hosted git repository.

stakiar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/impala.git

commit 699450aadbf45f36617472b7c777dc2d9aad066a
Author: Sahil Takiar <ta...@gmail.com>
AuthorDate: Wed Jul 17 10:53:25 2019 -0700

    IMPALA-8779, IMPALA-8780: RowBatchQueue re-factoring and BufferedPRS impl
    
    Improves the encapsulation of RowBatchQueue by the doing the following
    re-factoring:
    * Renames RowBatchQueue to BlockingRowBatchQueue which is more
    indicitive of what the queue does
    * Re-factors the timers managed by the scan-node into the
    BlockingRowBatchQueue implementation
    * Favors composition over inheritance by re-factoring
    BlockingRowBatchQueue to own a BlockingQueue rather than extending one
    
    The re-factoring lays the groundwork for introducing a generic
    RowBatchQueue that all RowBatch queues inherit from.
    
    Adds a new DequeRowBatchQueue which is a simple wrapper around a
    std::deque that (1) stores unique_ptr to queued RowBatch-es and (2)
    has a maximum capacity.
    
    Implements BufferedPlanRootSink using the new DequeRowBatchQueue.
    DequeRowBatchQueue is generic enough that replacing it with a
    SpillableQueue (queue backed by a BufferedTupleStream) should be
    straightforward. BufferedPlanRootSink is synchronized to protect access
    to DequeRowBatchQueue since the queue is not thread safe.
    
    BufferedPlanRootSink FlushFinal blocks until the consumer thread has
    processed all RowBatches. This ensures that the coordinator fragment
    stays alive until all results are fetched, but allows all other
    fragments to be shutdown immediately.
    
    Testing:
    * Running core tests
    * Updated tests/query_test/test_result_spooling.py
    
    Change-Id: I9b1bb4b9c6f6e92c70e8fbee6ccdf48c2f85b7be
    Reviewed-on: http://gerrit.cloudera.org:8080/13883
    Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
    Tested-by: Impala Public Jenkins <im...@cloudera.com>
---
 be/src/exec/blocking-plan-root-sink.cc             |  15 +--
 be/src/exec/blocking-plan-root-sink.h              |   3 -
 be/src/exec/buffered-plan-root-sink.cc             | 126 ++++++++++++++++++++-
 be/src/exec/buffered-plan-root-sink.h              |  53 ++++++++-
 be/src/exec/data-sink.cc                           |   1 +
 be/src/exec/hdfs-scan-node.cc                      |   4 +-
 be/src/exec/kudu-scan-node.cc                      |   4 +-
 be/src/exec/plan-root-sink.cc                      |  15 +++
 be/src/exec/plan-root-sink.h                       |  10 ++
 be/src/exec/scan-node.cc                           |  10 +-
 be/src/exec/scan-node.h                            |   6 +-
 be/src/exec/scanner-context.cc                     |   1 -
 be/src/runtime/CMakeLists.txt                      |   3 +-
 ...-batch-queue.cc => blocking-row-batch-queue.cc} |  39 +++++--
 be/src/runtime/blocking-row-batch-queue.h          | 114 +++++++++++++++++++
 be/src/runtime/deque-row-batch-queue.cc            |  66 +++++++++++
 be/src/runtime/deque-row-batch-queue.h             |  70 ++++++++++++
 be/src/runtime/row-batch-queue.h                   |  80 -------------
 be/src/util/blocking-queue.h                       |  47 ++++----
 tests/query_test/test_result_spooling.py           |   7 +-
 20 files changed, 514 insertions(+), 160 deletions(-)

diff --git a/be/src/exec/blocking-plan-root-sink.cc b/be/src/exec/blocking-plan-root-sink.cc
index 8e65176..05ebd30 100644
--- a/be/src/exec/blocking-plan-root-sink.cc
+++ b/be/src/exec/blocking-plan-root-sink.cc
@@ -38,21 +38,10 @@ BlockingPlanRootSink::BlockingPlanRootSink(
 
 Status BlockingPlanRootSink::Send(RuntimeState* state, RowBatch* batch) {
   SCOPED_TIMER(profile()->total_time_counter());
-  ValidateCollectionSlots(*row_desc_, batch);
+  PlanRootSink::ValidateCollectionSlots(*row_desc_, batch);
+  RETURN_IF_ERROR(PlanRootSink::UpdateAndCheckRowsProducedLimit(state, batch));
   int current_batch_row = 0;
 
-  // Check to ensure that the number of rows produced by query execution does not exceed
-  // rows_returned_limit_. Since the BlockingPlanRootSink has a single producer, the
-  // num_rows_returned_ value can be verified without acquiring the lock_.
-  num_rows_produced_ += batch->num_rows();
-  if (num_rows_produced_limit_ > 0 && num_rows_produced_ > num_rows_produced_limit_) {
-    Status err = Status::Expected(TErrorCode::ROWS_PRODUCED_LIMIT_EXCEEDED,
-        PrintId(state->query_id()),
-        PrettyPrinter::Print(num_rows_produced_limit_, TUnit::NONE));
-    VLOG_QUERY << err.msg().msg();
-    return err;
-  }
-
   // Don't enter the loop if batch->num_rows() == 0; no point triggering the consumer with
   // 0 rows to return. Be wary of ever returning 0-row batches to the client; some poorly
   // written clients may not cope correctly with them. See IMPALA-4335.
diff --git a/be/src/exec/blocking-plan-root-sink.h b/be/src/exec/blocking-plan-root-sink.h
index 71af942..cb95da8 100644
--- a/be/src/exec/blocking-plan-root-sink.h
+++ b/be/src/exec/blocking-plan-root-sink.h
@@ -90,8 +90,5 @@ class BlockingPlanRootSink : public PlanRootSink {
 
   /// Set by GetNext() to indicate to Send() how many rows it should write to results_.
   int num_rows_requested_ = 0;
-
-  /// Updated by Send() to indicate the total number of rows produced by query execution.
-  int64_t num_rows_produced_ = 0;
 };
 }
diff --git a/be/src/exec/buffered-plan-root-sink.cc b/be/src/exec/buffered-plan-root-sink.cc
index a7f2467..4ba2f07 100644
--- a/be/src/exec/buffered-plan-root-sink.cc
+++ b/be/src/exec/buffered-plan-root-sink.cc
@@ -16,30 +16,148 @@
 // under the License.
 
 #include "exec/buffered-plan-root-sink.h"
+#include "runtime/deque-row-batch-queue.h"
+#include "service/query-result-set.h"
+
+#include "common/names.h"
 
 namespace impala {
 
+// The maximum number of row batches to queue before calls to Send() start to block.
+// After this many row batches have been added, Send() will block until GetNext() reads
+// RowBatches from the queue.
+const uint32_t MAX_QUEUED_ROW_BATCHES = 10;
+
 BufferedPlanRootSink::BufferedPlanRootSink(
     TDataSinkId sink_id, const RowDescriptor* row_desc, RuntimeState* state)
-  : PlanRootSink(sink_id, row_desc, state) {}
+  : PlanRootSink(sink_id, row_desc, state),
+    batch_queue_(new DequeRowBatchQueue(MAX_QUEUED_ROW_BATCHES)) {}
 
 Status BufferedPlanRootSink::Send(RuntimeState* state, RowBatch* batch) {
+  SCOPED_TIMER(profile()->total_time_counter());
+  // If the batch is empty, we have nothing to do so just return Status::OK().
+  if (batch->num_rows() == 0) return Status::OK();
+
+  // Close should only be called by the producer thread, no RowBatches should be sent
+  // after the sink is closed.
+  DCHECK(!closed_);
+  DCHECK(batch_queue_->IsOpen());
+  PlanRootSink::ValidateCollectionSlots(*row_desc_, batch);
+  RETURN_IF_ERROR(PlanRootSink::UpdateAndCheckRowsProducedLimit(state, batch));
+
+  // Make a copy of the given RowBatch and place it on the queue.
+  unique_ptr<RowBatch> output_batch =
+      make_unique<RowBatch>(batch->row_desc(), batch->capacity(), mem_tracker());
+  batch->DeepCopyTo(output_batch.get());
+
+  {
+    // Add the copied batch to the RowBatch queue and wake up the consumer thread if it is
+    // waiting for rows to process.
+    unique_lock<mutex> l(lock_);
+
+    // If the queue is full, wait for the producer thread to read batches from it.
+    while (!state->is_cancelled() && batch_queue_->IsFull()) {
+      batch_queue_has_capacity_.Wait(l);
+    }
+    RETURN_IF_CANCELLED(state);
+
+    // Add the batch to the queue and then notify the consumer that rows are available.
+    if (!batch_queue_->AddBatch(move(output_batch))) {
+      // Adding a batch should always be successful because the queue should always be
+      // open when Send is called, and the call to batch_queue_has_capacity_.Wait(l)
+      // ensures space is available.
+      DCHECK(false) << "DequeueRowBatchQueue::AddBatch should never return false";
+    }
+  }
+  // Release the lock before calling notify so the consumer thread can immediately acquire
+  // the lock.
+  rows_available_.NotifyOne();
   return Status::OK();
 }
 
 Status BufferedPlanRootSink::FlushFinal(RuntimeState* state) {
+  SCOPED_TIMER(profile()->total_time_counter());
+  DCHECK(!closed_);
+  unique_lock<mutex> l(lock_);
+  sender_state_ = SenderState::EOS;
+  // If no batches are ever added, wake up the consumer thread so it can check the
+  // SenderState and return appropriately.
+  rows_available_.NotifyAll();
+  // Wait until the consumer has read all rows from the batch_queue_.
+  consumer_eos_.Wait(l);
+  RETURN_IF_CANCELLED(state);
   return Status::OK();
 }
 
 void BufferedPlanRootSink::Close(RuntimeState* state) {
+  SCOPED_TIMER(profile()->total_time_counter());
+  unique_lock<mutex> l(lock_);
+  // FlushFinal() won't have been called when the fragment instance encounters an error
+  // before sending all rows.
+  if (sender_state_ == SenderState::ROWS_PENDING) {
+    sender_state_ = SenderState::CLOSED_NOT_EOS;
+  }
+  batch_queue_->Close();
+  // While it should be safe to call NotifyOne() here, prefer to use NotifyAll() to
+  // ensure that all sleeping threads are awoken. The call to NotifyAll() is not on the
+  // fast path so any overhead from calling it should be negligible.
+  rows_available_.NotifyAll();
   DataSink::Close(state);
 }
 
-void BufferedPlanRootSink::Cancel(RuntimeState* state) {}
+void BufferedPlanRootSink::Cancel(RuntimeState* state) {
+  DCHECK(state->is_cancelled());
+  // Wake up all sleeping threads so they can check the cancellation state.
+  // While it should be safe to call NotifyOne() here, prefer to use NotifyAll() to
+  // ensure that all sleeping threads are awoken. The calls to NotifyAll() are not on the
+  // fast path so any overhead from calling it should be negligible.
+  rows_available_.NotifyAll();
+  consumer_eos_.NotifyAll();
+  batch_queue_has_capacity_.NotifyAll();
+}
 
 Status BufferedPlanRootSink::GetNext(
     RuntimeState* state, QueryResultSet* results, int num_results, bool* eos) {
-  *eos = true;
-  return Status::OK();
+  {
+    unique_lock<mutex> l(lock_);
+    while (batch_queue_->IsEmpty() && sender_state_ == SenderState::ROWS_PENDING
+        && !state->is_cancelled()) {
+      rows_available_.Wait(l);
+    }
+
+    // If the query was cancelled while the sink was waiting for rows to become available,
+    // or if the query was cancelled before the current call to GetNext, set eos and then
+    // return. The queue could be empty if the sink was closed while waiting for rows to
+    // become available, or if the sink was closed before the current call to GetNext.
+    if (!state->is_cancelled() && !batch_queue_->IsEmpty()) {
+      unique_ptr<RowBatch> batch = batch_queue_->GetBatch();
+      // TODO for now, if num_results < batch->num_rows(), we terminate returning results
+      // early until we can properly handle fetch requests where
+      // num_results < batch->num_rows().
+      if (num_results > 0 && num_results < batch->num_rows()) {
+        *eos = true;
+        batch_queue_has_capacity_.NotifyOne();
+        consumer_eos_.NotifyOne();
+        return Status::Expected(TErrorCode::NOT_IMPLEMENTED_ERROR,
+            "BufferedPlanRootSink does not support setting num_results < BATCH_SIZE");
+      }
+      RETURN_IF_ERROR(
+          results->AddRows(output_expr_evals_, batch.get(), 0, batch->num_rows()));
+      batch->Reset();
+    }
+    *eos = batch_queue_->IsEmpty() && sender_state_ == SenderState::EOS;
+    if (*eos) consumer_eos_.NotifyOne();
+  }
+  // Release the lock before calling notify so the consumer thread can immediately
+  // acquire the lock. It is safe to call notify batch_queue_has_capacity_ regardless of
+  // whether a RowBatch is read. Either (1) a RowBatch is read and the queue is no longer
+  // full, so notify the consumer thread or (2) a Rowbatch was not read, which means
+  // either FlushFinal was called or the query was cancelled. If FlushFinal was called
+  // then the consumer thread has completed. If the query is cancelled, then we wake up
+  // the consumer thread so it can check the cancellation status and return. Releasing
+  // the lock is safe because the consumer always loops until the queue is actually has
+  // space.
+  batch_queue_has_capacity_.NotifyOne();
+  return state->GetQueryStatus();
 }
 }
diff --git a/be/src/exec/buffered-plan-root-sink.h b/be/src/exec/buffered-plan-root-sink.h
index f875988..939c2a0 100644
--- a/be/src/exec/buffered-plan-root-sink.h
+++ b/be/src/exec/buffered-plan-root-sink.h
@@ -18,27 +18,70 @@
 #pragma once
 
 #include "exec/plan-root-sink.h"
+#include "util/condition-variable.h"
 
 namespace impala {
 
+class DequeRowBatchQueue;
+
 /// PlanRootSink that buffers RowBatches from the 'sender' (fragment) thread. RowBatches
-/// are buffered in memory until a memory limit is hit. Any subsequent calls to Send will
-/// block until the 'consumer' (coordinator) thread has read enough RowBatches to free up
-/// sufficient memory.
+/// are buffered in memory until a max number of RowBatches are queued. Any subsequent
+/// calls to Send will block until the 'consumer' (coordinator) thread has read enough
+/// RowBatches to free up sufficient space in the queue. The blocking behavior follows
+/// the same semantics as BlockingPlanRootSink.
+///
+/// FlushFinal() blocks until the consumer has read all RowBatches from the queue or
+/// until the sink is either closed or cancelled. This ensures that the coordinator
+/// fragment stays alive until the client fetches all results, but allows all other
+/// fragments to complete and release their resources.
+///
+/// The sink assumes a non-thread safe RowBatchQueue is injected and uses a single lock to
+/// synchronize access to the queue.
 class BufferedPlanRootSink : public PlanRootSink {
  public:
-  BufferedPlanRootSink(
-      TDataSinkId sink_id, const RowDescriptor* row_desc, RuntimeState* state);
+  BufferedPlanRootSink(TDataSinkId sink_id, const RowDescriptor* row_desc,
+      RuntimeState* state);
 
+  /// Creates a copy of the given RowBatch and adds it to the queue. The copy is
+  /// necessary as the ownership of 'batch' remains with the sender.
   virtual Status Send(RuntimeState* state, RowBatch* batch) override;
 
+  /// Notifies the consumer of producer eos and blocks until the consumer has read all
+  /// batches from the queue, or until the sink is either closed or cancelled.
   virtual Status FlushFinal(RuntimeState* state) override;
 
+  /// Release resources and unblocks consumer.
   virtual void Close(RuntimeState* state) override;
 
+  /// Blocks until rows are available for consumption
   virtual Status GetNext(
       RuntimeState* state, QueryResultSet* result_set, int num_rows, bool* eos) override;
 
+  /// Notifies both consumer and producer threads so they can check the cancellation
+  /// status.
   virtual void Cancel(RuntimeState* state) override;
+
+ private:
+  /// Protects the RowBatchQueue and all ConditionVariables.
+  boost::mutex lock_;
+
+  /// Waited on by the consumer inside GetNext() until rows are available for consumption.
+  /// Signaled when the producer adds a RowBatch to the queue. Also signaled by
+  /// FlushFinal(), Close() and Cancel() to unblock the sender.
+  ConditionVariable rows_available_;
+
+  /// Waited on by the producer inside FlushFinal() until the consumer has hit eos.
+  /// Signaled when the consumer reads all RowBatches from the queue. Also signaled in
+  /// Cancel() to unblock the producer.
+  ConditionVariable consumer_eos_;
+
+  /// Waited on by the producer inside Send() if the RowBatchQueue is full. Signaled
+  /// when the consumer reads a batch from the RowBatchQueue. Also signaled in Cancel()
+  /// to unblock the producer.
+  ConditionVariable batch_queue_has_capacity_;
+
+  /// A DequeRowBatchQueue that buffers RowBatches from the sender for consumption by
+  /// the consumer. The queue is not thread safe and access is protected by 'lock_'.
+  std::unique_ptr<DequeRowBatchQueue> batch_queue_;
 };
 }
diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc
index f9bec90..e2d0ae9 100644
--- a/be/src/exec/data-sink.cc
+++ b/be/src/exec/data-sink.cc
@@ -33,6 +33,7 @@
 #include "gen-cpp/ImpalaInternalService_constants.h"
 #include "gen-cpp/ImpalaInternalService_types.h"
 #include "gutil/strings/substitute.h"
+#include "runtime/deque-row-batch-queue.h"
 #include "runtime/krpc-data-stream-sender.h"
 #include "runtime/mem-tracker.h"
 #include "util/container-util.h"
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 2c64f97..f839bc4 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -25,12 +25,12 @@
 #include "exec/exec-node-util.h"
 #include "exec/hdfs-scanner.h"
 #include "exec/scanner-context.h"
+#include "runtime/blocking-row-batch-queue.h"
 #include "runtime/descriptors.h"
 #include "runtime/fragment-instance-state.h"
 #include "runtime/io/request-context.h"
 #include "runtime/mem-tracker.h"
 #include "runtime/query-state.h"
-#include "runtime/row-batch-queue.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/runtime-state.h"
@@ -305,7 +305,7 @@ void HdfsScanNode::ThreadTokenAvailableCb(ThreadResourcePool* pool) {
 
     if (!first_thread) {
       // Cases 5, 6 and 7.
-      if (thread_state_.batch_queue()->AtCapacity()) break;
+      if (thread_state_.batch_queue()->IsFull()) break;
       if (!scanner_mem_limiter->ClaimMemoryForScannerThread(this, est_mem)) {
         COUNTER_ADD(thread_state_.scanner_thread_mem_unavailable_counter(), 1);
         break;
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index 12b5365..fe08731 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -24,10 +24,10 @@
 #include "exec/kudu-util.h"
 #include "exprs/scalar-expr.h"
 #include "gutil/gscoped_ptr.h"
+#include "runtime/blocking-row-batch-queue.h"
 #include "runtime/fragment-instance-state.h"
 #include "runtime/mem-pool.h"
 #include "runtime/query-state.h"
-#include "runtime/row-batch-queue.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-state.h"
 #include "runtime/scanner-mem-limiter.h"
@@ -149,7 +149,7 @@ void KuduScanNode::ThreadAvailableCb(ThreadResourcePool* pool) {
     // * Don't start up a thread if there is not enough memory available for the
     //    estimated memory consumption (include reservation and non-reserved memory).
     if (!first_thread) {
-      if (thread_state_.batch_queue()->AtCapacity()) break;
+      if (thread_state_.batch_queue()->IsFull()) break;
       if (!mem_limiter->ClaimMemoryForScannerThread(
               this, EstimateScannerThreadMemConsumption())) {
         COUNTER_ADD(thread_state_.scanner_thread_mem_unavailable_counter(), 1);
diff --git a/be/src/exec/plan-root-sink.cc b/be/src/exec/plan-root-sink.cc
index 6f9015e..43afdd0 100644
--- a/be/src/exec/plan-root-sink.cc
+++ b/be/src/exec/plan-root-sink.cc
@@ -60,4 +60,19 @@ void PlanRootSink::ValidateCollectionSlots(
   }
 #endif
 }
+
+Status PlanRootSink::UpdateAndCheckRowsProducedLimit(
+    RuntimeState* state, RowBatch* batch) {
+  // Since the PlanRootSink has a single producer, the
+  // num_rows_returned_ value can be verified without acquiring any locks.
+  num_rows_produced_ += batch->num_rows();
+  if (num_rows_produced_limit_ > 0 && num_rows_produced_ > num_rows_produced_limit_) {
+    Status err = Status::Expected(TErrorCode::ROWS_PRODUCED_LIMIT_EXCEEDED,
+        PrintId(state->query_id()),
+        PrettyPrinter::Print(num_rows_produced_limit_, TUnit::NONE));
+    VLOG_QUERY << err.msg().msg();
+    return err;
+  }
+  return Status::OK();
+}
 }
diff --git a/be/src/exec/plan-root-sink.h b/be/src/exec/plan-root-sink.h
index 7a40b8c..ae5651f 100644
--- a/be/src/exec/plan-root-sink.h
+++ b/be/src/exec/plan-root-sink.h
@@ -88,6 +88,11 @@ class PlanRootSink : public DataSink {
   /// SubplanNode with respect to setting collection-slots to NULL.
   void ValidateCollectionSlots(const RowDescriptor& row_desc, RowBatch* batch);
 
+  /// Check to ensure that the number of rows produced by query execution does not exceed
+  /// the NUM_ROWS_PRODUCED_LIMIT query option. Returns an error Status if the given
+  /// batch causes the limit to be exceeded. Updates the value of num_rows_produced_.
+  Status UpdateAndCheckRowsProducedLimit(RuntimeState* state, RowBatch* batch);
+
   /// State of the sender:
   /// - ROWS_PENDING: the sender is still producing rows; the only non-terminal state
   /// - EOS: the sender has passed all rows to Send()
@@ -96,8 +101,13 @@ class PlanRootSink : public DataSink {
   enum class SenderState { ROWS_PENDING, EOS, CLOSED_NOT_EOS };
   SenderState sender_state_ = SenderState::ROWS_PENDING;
 
+ private:
   /// Limit on the number of rows produced by this query, initialized by the constructor.
   const int64_t num_rows_produced_limit_;
+
+  /// Updated by CheckRowsProducedLimit() to indicate the total number of rows produced
+  /// by query execution.
+  int64_t num_rows_produced_ = 0;
 };
 }
 
diff --git a/be/src/exec/scan-node.cc b/be/src/exec/scan-node.cc
index ab05432..710d1ea 100644
--- a/be/src/exec/scan-node.cc
+++ b/be/src/exec/scan-node.cc
@@ -21,9 +21,9 @@
 #include <boost/bind.hpp>
 
 #include "exprs/scalar-expr.h"
+#include "runtime/blocking-row-batch-queue.h"
 #include "runtime/io/disk-io-mgr.h"
 #include "runtime/query-state.h"
-#include "runtime/row-batch-queue.h"
 #include "runtime/row-batch.h"
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/runtime-state.h"
@@ -259,8 +259,8 @@ void ScanNode::ScannerThreadState::Open(
   VLOG(2) << "Max row batch queue size for scan node '" << parent->id()
           << "' in fragment instance '" << PrintId(state->fragment_instance_id())
           << "': " << max_row_batches;
-  batch_queue_.reset(
-      new RowBatchQueue(max_row_batches, FLAGS_max_queued_row_batch_bytes));
+  batch_queue_.reset(new BlockingRowBatchQueue(max_row_batches,
+      FLAGS_max_queued_row_batch_bytes, row_batches_get_timer_, row_batches_put_timer_));
 
   // Start measuring the scanner thread concurrency only once the node is opened.
   average_concurrency_ = parent->runtime_profile()->AddSamplingCounter(
@@ -303,7 +303,7 @@ bool ScanNode::ScannerThreadState::EnqueueBatchWithTimeout(
   // Transfer memory ownership before enqueueing. If the caller retries, this transfer
   // is idempotent.
   (*row_batch)->SetMemTracker(row_batches_mem_tracker_);
-  if (!batch_queue_->BlockingPutWithTimeout(move(*row_batch), timeout_micros)) {
+  if (!batch_queue_->AddBatchWithTimeout(move(*row_batch), timeout_micros)) {
     return false;
   }
   COUNTER_ADD(row_batches_enqueued_, 1);
@@ -319,8 +319,6 @@ void ScanNode::ScannerThreadState::Close(ScanNode* parent) {
   scanner_threads_.JoinAll();
   DCHECK_EQ(num_active_.Load(), 0) << "There should be no active threads";
   if (batch_queue_ != nullptr) {
-    row_batches_put_timer_->Set(batch_queue_->total_put_wait_time());
-    row_batches_get_timer_->Set(batch_queue_->total_get_wait_time());
     row_batches_peak_mem_consumption_->Set(row_batches_mem_tracker_->peak_consumption());
     batch_queue_->Cleanup();
   }
diff --git a/be/src/exec/scan-node.h b/be/src/exec/scan-node.h
index 68a3fad..7c3a800 100644
--- a/be/src/exec/scan-node.h
+++ b/be/src/exec/scan-node.h
@@ -26,7 +26,7 @@
 
 namespace impala {
 
-class RowBatchQueue;
+class BlockingRowBatchQueue;
 class TScanRange;
 
 /// Abstract base class of all scan nodes. Subclasses support different storage layers
@@ -253,7 +253,7 @@ class ScanNode : public ExecNode {
     bool EnqueueBatchWithTimeout(std::unique_ptr<RowBatch>* row_batch,
         int64_t timeout_micros);
 
-    RowBatchQueue* batch_queue() { return batch_queue_.get(); }
+    BlockingRowBatchQueue* batch_queue() { return batch_queue_.get(); }
     RuntimeProfile::ThreadCounters* thread_counters() const { return thread_counters_; }
     int max_num_scanner_threads() const { return max_num_scanner_threads_; }
     int64_t estimated_per_thread_mem() const { return estimated_per_thread_mem_; }
@@ -282,7 +282,7 @@ class ScanNode : public ExecNode {
     /// Outgoing row batches queue. Row batches are produced asynchronously by the scanner
     /// threads and consumed by the main fragment thread that calls GetNext() on the scan
     /// node.
-    boost::scoped_ptr<RowBatchQueue> batch_queue_;
+    boost::scoped_ptr<BlockingRowBatchQueue> batch_queue_;
 
     /// The number of scanner threads currently running.
     AtomicInt32 num_active_{0};
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index a8b5bb2..7b44200 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -26,7 +26,6 @@
 #include "runtime/exec-env.h"
 #include "runtime/mem-pool.h"
 #include "runtime/row-batch.h"
-#include "runtime/row-batch-queue.h"
 #include "runtime/runtime-state.h"
 #include "runtime/string-buffer.h"
 #include "util/debug-util.h"
diff --git a/be/src/runtime/CMakeLists.txt b/be/src/runtime/CMakeLists.txt
index 242f02b..848ff37 100644
--- a/be/src/runtime/CMakeLists.txt
+++ b/be/src/runtime/CMakeLists.txt
@@ -28,6 +28,7 @@ set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/runtime")
 set_source_files_properties(${ROW_BATCH_PROTO_SRCS} PROPERTIES GENERATED TRUE)
 
 add_library(Runtime
+  blocking-row-batch-queue.cc
   buffered-tuple-stream.cc
   client-cache.cc
   collection-value.cc
@@ -38,6 +39,7 @@ add_library(Runtime
   date-value.cc
   debug-options.cc
   descriptors.cc
+  deque-row-batch-queue.cc
   dml-exec-state.cc
   exec-env.cc
   fragment-instance-state.cc
@@ -62,7 +64,6 @@ add_library(Runtime
   reservation-manager.cc
   row-batch.cc
   ${ROW_BATCH_PROTO_SRCS}
-  row-batch-queue.cc
   runtime-filter.cc
   runtime-filter-bank.cc
   runtime-filter-ir.cc
diff --git a/be/src/runtime/row-batch-queue.cc b/be/src/runtime/blocking-row-batch-queue.cc
similarity index 51%
rename from be/src/runtime/row-batch-queue.cc
rename to be/src/runtime/blocking-row-batch-queue.cc
index e694338..b711c9e 100644
--- a/be/src/runtime/row-batch-queue.cc
+++ b/be/src/runtime/blocking-row-batch-queue.cc
@@ -15,35 +15,54 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "runtime/row-batch-queue.h"
-
+#include "runtime/blocking-row-batch-queue.h"
 #include "runtime/row-batch.h"
+#include "util/runtime-profile-counters.h"
 
 #include "common/names.h"
 
+using namespace std;
+
 namespace impala {
 
-RowBatchQueue::RowBatchQueue(int max_batches, int64_t max_bytes)
-  : BlockingQueue<unique_ptr<RowBatch>,RowBatchBytesFn>(max_batches, max_bytes) {}
+BlockingRowBatchQueue::BlockingRowBatchQueue(int max_batches, int64_t max_bytes,
+    RuntimeProfile::Counter* get_batch_wait_timer,
+    RuntimeProfile::Counter* add_batch_wait_timer)
+  : batch_queue_(new BlockingQueue<std::unique_ptr<RowBatch>, RowBatchBytesFn>(
+        max_batches, max_bytes, get_batch_wait_timer, add_batch_wait_timer)) {}
 
-RowBatchQueue::~RowBatchQueue() {
+BlockingRowBatchQueue::~BlockingRowBatchQueue() {
   DCHECK(cleanup_queue_.empty());
 }
 
-void RowBatchQueue::AddBatch(unique_ptr<RowBatch> batch) {
-  if (!BlockingPut(move(batch))) {
+void BlockingRowBatchQueue::AddBatch(unique_ptr<RowBatch> batch) {
+  if (!batch_queue_->BlockingPut(move(batch))) {
     lock_guard<SpinLock> l(lock_);
     cleanup_queue_.push_back(move(batch));
   }
 }
 
-unique_ptr<RowBatch> RowBatchQueue::GetBatch() {
+bool BlockingRowBatchQueue::AddBatchWithTimeout(
+    unique_ptr<RowBatch>&& batch, int64_t timeout_micros) {
+  return batch_queue_->BlockingPutWithTimeout(
+      forward<unique_ptr<RowBatch>>(batch), timeout_micros);
+}
+
+unique_ptr<RowBatch> BlockingRowBatchQueue::GetBatch() {
   unique_ptr<RowBatch> result;
-  if (BlockingGet(&result)) return result;
+  if (batch_queue_->BlockingGet(&result)) return result;
   return unique_ptr<RowBatch>();
 }
 
-void RowBatchQueue::Cleanup() {
+bool BlockingRowBatchQueue::IsFull() const {
+  return batch_queue_->AtCapacity();
+}
+
+void BlockingRowBatchQueue::Shutdown() {
+  batch_queue_->Shutdown();
+}
+
+void BlockingRowBatchQueue::Cleanup() {
   unique_ptr<RowBatch> batch = nullptr;
   while ((batch = GetBatch()) != nullptr) {
     batch.reset();
diff --git a/be/src/runtime/blocking-row-batch-queue.h b/be/src/runtime/blocking-row-batch-queue.h
new file mode 100644
index 0000000..f7fb63c
--- /dev/null
+++ b/be/src/runtime/blocking-row-batch-queue.h
@@ -0,0 +1,114 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <list>
+#include <memory>
+
+#include "runtime/row-batch.h"
+#include "util/blocking-queue.h"
+#include "util/spinlock.h"
+
+namespace impala {
+
+class RowBatch;
+
+/// Functor that returns the bytes in MemPool chunks for a row batch.
+/// Note that we don't include attached BufferPool::BufferHandle objects because this
+/// queue is only used in scan nodes that don't attach buffers.
+struct RowBatchBytesFn {
+  int64_t operator()(const std::unique_ptr<RowBatch>& batch) {
+    return batch->tuple_data_pool()->total_reserved_bytes();
+  }
+};
+
+/// Provides blocking queue semantics for row batches. Row batches have a property that
+/// they must be processed in the order they were produced, even in cancellation
+/// paths. Preceding row batches can contain ptrs to memory in subsequent row batches
+/// and we need to make sure those ptrs stay valid.
+///
+/// Row batches that are added after Shutdown() are queued in a separate "cleanup"
+/// queue, which can be cleaned up during Close().
+///
+/// The queue supports limiting the capacity in terms of bytes enqueued and number of
+/// batches to be enqueued.
+///
+/// The queue takes in two counters: 'get_batch_wait_timer' and 'add_batch_wait_timer'.
+/// 'get_batch_wait_timer' tracks how long GetBatch spends blocking waiting for batches
+/// to be added to the queue. 'add_batch_wait_timer' tracks how long AddBatch spends
+/// blocking waiting for space to be available in the queue.
+///
+/// All functions are thread safe.
+class BlockingRowBatchQueue {
+ public:
+  /// 'max_batches' is the maximum number of row batches that can be queued.
+  /// 'max_bytes' is the maximum number of bytes of row batches that can be queued (-1
+  /// means no limit).
+  /// 'get_batch_wait_timer' tracks how long GetBatch blocks waiting for batches.
+  /// 'add_batch_wait_timer' tracks how long AddBatch blocks waiting for space in the
+  /// queue.
+  /// When the queue is full, producers will block.
+  BlockingRowBatchQueue(int max_batches, int64_t max_bytes,
+      RuntimeProfile::Counter* get_batch_wait_timer,
+      RuntimeProfile::Counter* add_batch_wait_timer);
+  ~BlockingRowBatchQueue();
+
+  /// Adds a batch to the queue. This is blocking if the queue is full.
+  void AddBatch(std::unique_ptr<RowBatch> batch);
+
+  /// Adds a batch to the queue waiting for the specified amount of time for space to
+  /// be available in the queue. Returns true if the batch was successfully added to the
+  /// queue, false otherwise. 'batch' is passed by r-value reference because this method
+  /// does not transfer ownership of the 'batch'. This is necessary because this method
+  /// may or may not successfully add 'batch' to the queue (depending on if the timeout
+  /// was hit).
+  bool AddBatchWithTimeout(std::unique_ptr<RowBatch>&& batch, int64_t timeout_micros);
+
+  /// Gets a row batch from the queue, blocks if the queue is empty. Returns NULL if
+  /// the queue has already been shutdown.
+  std::unique_ptr<RowBatch> GetBatch();
+
+  /// Returns true if the queue is full, false otherwise. Does not account of the current
+  /// size of the cleanup queue. A queue is considered full if it either contains the max
+  /// number of row batches specified in the constructor, or it contains the max number
+  /// of bytes specified in the construtor.
+  bool IsFull() const;
+
+  /// Shutdowns the underlying BlockingQueue. Future calls to AddBatch will put the
+  /// RowBatch on the cleanup queue. Future calls to GetBatch will continue to return
+  /// RowBatches from the BlockingQueue.
+  void Shutdown();
+
+  /// Resets all RowBatches currently in the queue and clears the cleanup_queue_. Not
+  /// valid to call AddBatch() after this is called. Finalizes all counters started for
+  /// this queue.
+  void Cleanup();
+
+ private:
+  /// Lock protecting cleanup_queue_
+  SpinLock lock_;
+
+  /// Queue of orphaned row batches enqueued after the RowBatchQueue has been closed.
+  /// They need to exist as preceding row batches may reference buffers owned by row
+  /// batches in this queue.
+  std::list<std::unique_ptr<RowBatch>> cleanup_queue_;
+
+  /// BlockingQueue that stores the RowBatches
+  BlockingQueue<std::unique_ptr<RowBatch>, RowBatchBytesFn>* batch_queue_;
+};
+}
diff --git a/be/src/runtime/deque-row-batch-queue.cc b/be/src/runtime/deque-row-batch-queue.cc
new file mode 100644
index 0000000..6807b8b
--- /dev/null
+++ b/be/src/runtime/deque-row-batch-queue.cc
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/deque-row-batch-queue.h"
+#include "runtime/row-batch.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+DequeRowBatchQueue::DequeRowBatchQueue(int max_batches)
+  : max_batches_(max_batches) {}
+
+DequeRowBatchQueue::~DequeRowBatchQueue() {
+  DCHECK(closed_);
+}
+
+bool DequeRowBatchQueue::AddBatch(unique_ptr<RowBatch> batch) {
+  if (closed_ || IsFull()) return false;
+  batch_queue_.push_back(move(batch));
+  return true;
+}
+
+unique_ptr<RowBatch> DequeRowBatchQueue::GetBatch() {
+  if (closed_ || IsEmpty()) return unique_ptr<RowBatch>();
+  unique_ptr<RowBatch> result = move(batch_queue_.front());
+  batch_queue_.pop_front();
+  return result;
+}
+
+bool DequeRowBatchQueue::IsFull() const {
+  return batch_queue_.size() == max_batches_;
+}
+
+bool DequeRowBatchQueue::IsEmpty() const {
+  return batch_queue_.empty();
+}
+
+bool DequeRowBatchQueue::IsOpen() const {
+  return !closed_;
+}
+
+void DequeRowBatchQueue::Close() {
+  if (closed_) return;
+  while (!batch_queue_.empty()) {
+    unique_ptr<RowBatch> result = GetBatch();
+    result->Reset();
+  }
+  batch_queue_.clear();
+  closed_ = true;
+}
+}
diff --git a/be/src/runtime/deque-row-batch-queue.h b/be/src/runtime/deque-row-batch-queue.h
new file mode 100644
index 0000000..95cc2af
--- /dev/null
+++ b/be/src/runtime/deque-row-batch-queue.h
@@ -0,0 +1,70 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <queue>
+
+#include "runtime/row-batch.h"
+
+namespace impala {
+
+class RowBatch;
+
+/// A RowBatchQueue that provides non-blocking queue semantics. RowBatches are stored
+/// inside a std::deque. None of the methods block, this class is not thread safe. The
+/// size of the queue can be capped by the 'max_batches' parameter. Calls to AddBatch
+/// after the capacity has been reached will return false. Calls to GetBatch on an empty
+/// queue will return null.
+class DequeRowBatchQueue {
+ public:
+  DequeRowBatchQueue(int max_batches);
+  ~DequeRowBatchQueue();
+
+  /// Adds the given RowBatch to the queue. Returns true if the batch was successfully
+  /// added, returns false if the queue is full or has already been closed. The ownership
+  /// of the given batch is transferred from the 'batch' to the queue.
+  bool AddBatch(std::unique_ptr<RowBatch> batch);
+
+  /// Returns and removes the RowBatch at the head of the queue. Returns a nullptr if the
+  /// queue is already closed or the queue is empty. The ownership of the returned batch
+  /// is transferred from the queue to the returned unique_ptr.
+  std::unique_ptr<RowBatch> GetBatch();
+
+  /// Returns true if the queue limit has been reached, false otherwise.
+  bool IsFull() const;
+
+  /// Returns true if the queue is empty, false otherwise.
+  bool IsEmpty() const;
+
+  /// Returns false if Close() has been called, true otherwise.
+  bool IsOpen() const;
+
+  /// Resets the remaining RowBatches in the queue and releases the queue memory.
+  void Close();
+
+ private:
+  /// Queue of row batches.
+  std::deque<std::unique_ptr<RowBatch>> batch_queue_;
+
+  /// True if the queue has been closed, false otherwise.
+  bool closed_ = false;
+
+  /// The max size of the queue.
+  const int max_batches_;
+};
+}
diff --git a/be/src/runtime/row-batch-queue.h b/be/src/runtime/row-batch-queue.h
deleted file mode 100644
index 79e8293..0000000
--- a/be/src/runtime/row-batch-queue.h
+++ /dev/null
@@ -1,80 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_RUNTIME_BLOCKING_QUEUE_H
-#define IMPALA_RUNTIME_BLOCKING_QUEUE_H
-
-#include <list>
-#include <memory>
-
-#include "runtime/row-batch.h"
-#include "util/blocking-queue.h"
-#include "util/spinlock.h"
-
-namespace impala {
-
-class RowBatch;
-
-/// Functor that returns the bytes in MemPool chunks for a row batch.
-/// Note that we don't include attached BufferPool::BufferHandle objects because this
-/// queue is only used in scan nodes that don't attach buffers.
-struct RowBatchBytesFn {
-  int64_t operator()(const std::unique_ptr<RowBatch>& batch) {
-    return batch->tuple_data_pool()->total_reserved_bytes();
-  }
-};
-
-/// Extends blocking queue for row batches. Row batches have a property that
-/// they must be processed in the order they were produced, even in cancellation
-/// paths. Preceding row batches can contain ptrs to memory in subsequent row batches
-/// and we need to make sure those ptrs stay valid.
-/// Row batches that are added after Shutdown() are queued in a separate "cleanup"
-/// queue, which can be cleaned up during Close().
-///
-/// The queue supports limiting the capacity in terms of bytes enqueued.
-///
-/// All functions are thread safe.
-class RowBatchQueue : public BlockingQueue<std::unique_ptr<RowBatch>, RowBatchBytesFn> {
- public:
-  /// 'max_batches' is the maximum number of row batches that can be queued.
-  /// 'max_bytes' is the maximum number of bytes of row batches that can be queued (-1
-  /// means no limit).
-  /// When the queue is full, producers will block.
-  RowBatchQueue(int max_batches, int64_t max_bytes);
-  ~RowBatchQueue();
-
-  /// Adds a batch to the queue. This is blocking if the queue is full.
-  void AddBatch(std::unique_ptr<RowBatch> batch);
-
-  /// Gets a row batch from the queue. Returns NULL if there are no more.
-  /// This function blocks.
-  /// Returns NULL after Shutdown().
-  std::unique_ptr<RowBatch> GetBatch();
-
-  /// Deletes all row batches in cleanup_queue_. Not valid to call AddBatch()
-  /// after this is called.
-  void Cleanup();
-
- private:
-  /// Lock protecting cleanup_queue_
-  SpinLock lock_;
-
-  /// Queue of orphaned row batches
-  std::list<std::unique_ptr<RowBatch>> cleanup_queue_;
-};
-}
-#endif
diff --git a/be/src/util/blocking-queue.h b/be/src/util/blocking-queue.h
index 34c2453..811e029 100644
--- a/be/src/util/blocking-queue.h
+++ b/be/src/util/blocking-queue.h
@@ -29,6 +29,7 @@
 #include "common/compiler-util.h"
 #include "util/aligned-new.h"
 #include "util/condition-variable.h"
+#include "util/runtime-profile.h"
 #include "util/stopwatch.h"
 #include "util/time.h"
 
@@ -59,15 +60,21 @@ struct ByteLimitDisabledFn {
 /// held before 'put_lock_'. When the 'get_list_' is empty, the caller of BlockingGet()
 /// will atomically swap the 'put_list_' with 'get_list_'. The swapping happens with both
 /// the 'get_lock_' and 'put_lock_' held.
+///
+/// The queue supports two optional RuntimeProfile::Counters. One to track the amount
+/// of time spent blocking in BlockingGet() and the other to track the amount of time
+/// spent in BlockingPut().
 template <typename T, typename ElemBytesFn = ByteLimitDisabledFn<T>>
 class BlockingQueue : public CacheLineAligned {
  public:
-  BlockingQueue(size_t max_elements, int64_t max_bytes = -1)
+  BlockingQueue(size_t max_elements, int64_t max_bytes = -1,
+      RuntimeProfile::Counter* get_wait_timer = nullptr,
+      RuntimeProfile::Counter* put_wait_timer = nullptr)
     : shutdown_(false),
       max_elements_(max_elements),
-      total_put_wait_time_(0),
+      put_wait_timer_(put_wait_timer),
       get_list_size_(0),
-      total_get_wait_time_(0),
+      get_wait_timer_(get_wait_timer),
       max_bytes_(max_bytes) {
     DCHECK(max_bytes == -1 || max_bytes > 0) << max_bytes;
     DCHECK_GT(max_elements_, 0);
@@ -98,15 +105,15 @@ class BlockingQueue : public CacheLineAligned {
         put_cv_.NotifyOne();
         // Sleep with 'get_lock_' held to block off other readers which cannot
         // make progress anyway.
-        timer.Start();
+        if (get_wait_timer_ != nullptr) timer.Start();
         get_cv_.Wait(write_lock);
-        timer.Stop();
+        if (get_wait_timer_ != nullptr) timer.Stop();
       }
       DCHECK(!put_list_.empty());
       put_list_.swap(get_list_);
       get_list_size_.Store(get_list_.size());
       write_lock.unlock();
-      total_get_wait_time_ += timer.ElapsedTime();
+      if (get_wait_timer_ != nullptr) get_wait_timer_->Add(timer.ElapsedTime());
     }
 
     DCHECK(!get_list_.empty());
@@ -143,11 +150,11 @@ class BlockingQueue : public CacheLineAligned {
     DCHECK_GE(val_bytes, 0);
     boost::unique_lock<boost::mutex> write_lock(put_lock_);
     while (!HasCapacityInternal(write_lock, val_bytes) && !shutdown_) {
-      timer.Start();
+      if (put_wait_timer_ != nullptr) timer.Start();
       put_cv_.Wait(write_lock);
-      timer.Stop();
+      if (put_wait_timer_ != nullptr) timer.Stop();
     }
-    total_put_wait_time_ += timer.ElapsedTime();
+    if (put_wait_timer_ != nullptr) put_wait_timer_->Add(timer.ElapsedTime());
     if (UNLIKELY(shutdown_)) return false;
 
     DCHECK_LT(put_list_.size(), max_elements_);
@@ -173,12 +180,12 @@ class BlockingQueue : public CacheLineAligned {
     TimeFromNowMicros(timeout_micros, &abs_time);
     bool notified = true;
     while (!HasCapacityInternal(write_lock, val_bytes) && !shutdown_ && notified) {
-      timer.Start();
+      if (put_wait_timer_ != nullptr) timer.Start();
       // Wait until we're notified or until the timeout expires.
       notified = put_cv_.WaitUntil(write_lock, abs_time);
-      timer.Stop();
+      if (put_wait_timer_ != nullptr) timer.Stop();
     }
-    total_put_wait_time_ += timer.ElapsedTime();
+    if (put_wait_timer_ != nullptr) put_wait_timer_->Add(timer.ElapsedTime());
     // If the list is still full or if the the queue has been shut down, return false.
     // NOTE: We don't check 'notified' here as it appears that pthread condition variables
     // have a weird behavior in which they can return ETIMEDOUT from timed_wait even if
@@ -215,18 +222,6 @@ class BlockingQueue : public CacheLineAligned {
     return SizeLocked(write_lock) >= max_elements_;
   }
 
-  int64_t total_get_wait_time() const {
-    // Hold lock to make sure the value read is consistent (i.e. no torn read).
-    boost::lock_guard<boost::mutex> read_lock(get_lock_);
-    return total_get_wait_time_;
-  }
-
-  int64_t total_put_wait_time() const {
-    // Hold lock to make sure the value read is consistent (i.e. no torn read).
-    boost::lock_guard<boost::mutex> write_lock(put_lock_);
-    return total_put_wait_time_;
-  }
-
  private:
 
   uint32_t ALWAYS_INLINE SizeLocked(const boost::unique_lock<boost::mutex>& lock) const {
@@ -277,7 +272,7 @@ class BlockingQueue : public CacheLineAligned {
   ConditionVariable put_cv_;
 
   /// Total amount of time threads blocked in BlockingPut(). Guarded by 'put_lock_'.
-  int64_t total_put_wait_time_;
+  RuntimeProfile::Counter* put_wait_timer_ = nullptr;
 
   /// Running counter for bytes enqueued, incremented through the producer thread.
   /// Decremented by transferring value from 'get_bytes_dequeued_'.
@@ -300,7 +295,7 @@ class BlockingQueue : public CacheLineAligned {
   /// Total amount of time a thread blocked in BlockingGet(). Guarded by 'get_lock_'.
   /// Note that a caller of BlockingGet() may sleep with 'get_lock_' held and this
   /// variable doesn't include the time which other threads block waiting for 'get_lock_'.
-  int64_t total_get_wait_time_;
+  RuntimeProfile::Counter* get_wait_timer_ = nullptr;
 
   /// Running count of bytes dequeued. Decremented from 'put_bytes_enqueued_' when it
   /// exceeds the queue capacity. Kept separate from 'put_bytes_enqueued_' so that
diff --git a/tests/query_test/test_result_spooling.py b/tests/query_test/test_result_spooling.py
index 5f4c5e7..40b55b6 100644
--- a/tests/query_test/test_result_spooling.py
+++ b/tests/query_test/test_result_spooling.py
@@ -24,10 +24,9 @@ class TestResultSpooling(ImpalaTestSuite):
     return 'functional-query'
 
   def test_result_spooling(self):
-    """Test that setting SPOOL_QUERY_RESULTS does not crash Impala. The implementation
-    of query result spooling has not been completed yet, so queries that run when
-    SPOOL_QUERY_RESULTS = true, will return no results."""
+    """Tests that setting SPOOL_QUERY_RESULTS = true for simple queries returns the
+    correct number of results."""
     query_opts = {"spool_query_results": "true"}
     query = "select * from functional.alltypes limit 10"
     result = self.execute_query_expect_success(self.client, query, query_opts)
-    assert(len(result.data) == 0)
+    assert(len(result.data) == 10)