You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by tm...@apache.org on 2018/05/30 01:04:02 UTC

impala git commit: IMPALA-6953: part 1: clean up DiskIoMgr

Repository: impala
Updated Branches:
  refs/heads/2.x 0e7b07592 -> 564687265


IMPALA-6953: part 1: clean up DiskIoMgr

There should be no behavioural changes as a result of
this refactoring.

Make DiskQueue an encapsulated class.

Remove friend classes where possible, either by using public
methods or moving code between classes.

Move method into protected in some cases.

Split GetNextRequestRange() into two methods that
operate on DiskQueue and RequestContext state. The methods
belong to the respective classes.

Reduce transitive #include dependencies to hopefully help
with build time.

Testing:
Ran core tests.

Change-Id: I50b448834b832a0ee0dc5d85541691cd2f308e12
Reviewed-on: http://gerrit.cloudera.org:8080/10538
Reviewed-by: Thomas Marshall <th...@cmu.edu>
Tested-by: Thomas Marshall <th...@cmu.edu>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/56468726
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/56468726
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/56468726

Branch: refs/heads/2.x
Commit: 564687265247d957fb8cda26bcf86fcf6a80f87a
Parents: 0e7b075
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Sat Apr 28 11:27:53 2018 -0700
Committer: Thomas Marshall <th...@cmu.edu>
Committed: Wed May 30 01:03:39 2018 +0000

----------------------------------------------------------------------
 be/src/exec/blocking-join-node.cc        |   1 +
 be/src/exec/hdfs-orc-scanner.h           |   1 +
 be/src/exec/hdfs-parquet-table-writer.cc |   2 +
 be/src/exec/hdfs-scan-node-mt.cc         |   1 +
 be/src/exec/hdfs-scan-node-mt.h          |   2 +-
 be/src/exec/hdfs-scan-node.h             |   1 -
 be/src/exec/hdfs-scanner.h               |   1 +
 be/src/exec/kudu-scan-node.cc            |   1 +
 be/src/exec/kudu-table-sink.cc           |   4 +-
 be/src/exec/parquet-column-readers.cc    |   1 +
 be/src/runtime/io/disk-io-mgr-internal.h |  76 +++--
 be/src/runtime/io/disk-io-mgr.cc         | 246 ++++++----------
 be/src/runtime/io/disk-io-mgr.h          |  58 ++--
 be/src/runtime/io/error-converter.cc     |   4 +
 be/src/runtime/io/local-file-system.cc   |   6 +-
 be/src/runtime/io/request-context.cc     | 123 +++++---
 be/src/runtime/io/request-context.h      | 390 ++++++++++++++------------
 be/src/runtime/io/request-ranges.h       | 137 +++++----
 be/src/runtime/io/scan-range.cc          |  51 +++-
 be/src/runtime/row-batch.h               |   1 -
 20 files changed, 582 insertions(+), 525 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/56468726/be/src/exec/blocking-join-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/blocking-join-node.cc b/be/src/exec/blocking-join-node.cc
index 57b7723..88ca007 100644
--- a/be/src/exec/blocking-join-node.cc
+++ b/be/src/exec/blocking-join-node.cc
@@ -29,6 +29,7 @@
 #include "runtime/thread-resource-mgr.h"
 #include "util/debug-util.h"
 #include "util/runtime-profile-counters.h"
+#include "util/thread.h"
 #include "util/time.h"
 
 #include "gen-cpp/PlanNodes_types.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/56468726/be/src/exec/hdfs-orc-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-orc-scanner.h b/be/src/exec/hdfs-orc-scanner.h
index 5bdf4e7..132442e 100644
--- a/be/src/exec/hdfs-orc-scanner.h
+++ b/be/src/exec/hdfs-orc-scanner.h
@@ -21,6 +21,7 @@
 
 #include <orc/OrcFile.hh>
 
+#include "runtime/io/disk-io-mgr.h"
 #include "runtime/runtime-state.h"
 #include "exec/hdfs-scanner.h"
 #include "exec/hdfs-scan-node.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/56468726/be/src/exec/hdfs-parquet-table-writer.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc
index 6277a99..585459d 100644
--- a/be/src/exec/hdfs-parquet-table-writer.cc
+++ b/be/src/exec/hdfs-parquet-table-writer.cc
@@ -17,6 +17,8 @@
 
 #include "exec/hdfs-parquet-table-writer.h"
 
+#include <boost/unordered_set.hpp>
+
 #include "common/version.h"
 #include "exec/hdfs-table-sink.h"
 #include "exec/parquet-column-stats.inline.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/56468726/be/src/exec/hdfs-scan-node-mt.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.cc b/be/src/exec/hdfs-scan-node-mt.cc
index 4573978..b675b20 100644
--- a/be/src/exec/hdfs-scan-node-mt.cc
+++ b/be/src/exec/hdfs-scan-node-mt.cc
@@ -19,6 +19,7 @@
 
 #include <sstream>
 
+#include "exec/scanner-context.h"
 #include "runtime/runtime-state.h"
 #include "runtime/row-batch.h"
 #include "util/debug-util.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/56468726/be/src/exec/hdfs-scan-node-mt.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-mt.h b/be/src/exec/hdfs-scan-node-mt.h
index 3502b18..3db232f 100644
--- a/be/src/exec/hdfs-scan-node-mt.h
+++ b/be/src/exec/hdfs-scan-node-mt.h
@@ -23,7 +23,6 @@
 
 #include "exec/hdfs-scanner.h"
 #include "exec/hdfs-scan-node-base.h"
-#include "exec/scanner-context.h"
 
 namespace impala {
 
@@ -31,6 +30,7 @@ class DescriptorTbl;
 class ObjectPool;
 class RuntimeState;
 class RowBatch;
+class ScannerContext;
 class TPlanNode;
 
 /// Scan node that materializes tuples, evaluates conjuncts and runtime filters

http://git-wip-us.apache.org/repos/asf/impala/blob/56468726/be/src/exec/hdfs-scan-node.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.h b/be/src/exec/hdfs-scan-node.h
index 53d5c71..d35a40c 100644
--- a/be/src/exec/hdfs-scan-node.h
+++ b/be/src/exec/hdfs-scan-node.h
@@ -30,7 +30,6 @@
 #include "common/atomic.h"
 #include "exec/filter-context.h"
 #include "exec/hdfs-scan-node-base.h"
-#include "runtime/io/disk-io-mgr.h"
 #include "util/counting-barrier.h"
 #include "util/thread.h"
 

http://git-wip-us.apache.org/repos/asf/impala/blob/56468726/be/src/exec/hdfs-scanner.h
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scanner.h b/be/src/exec/hdfs-scanner.h
index 73b8b70..d642313 100644
--- a/be/src/exec/hdfs-scanner.h
+++ b/be/src/exec/hdfs-scanner.h
@@ -30,6 +30,7 @@
 #include "common/status.h"
 #include "exec/hdfs-scan-node-base.h"
 #include "exec/scanner-context.h"
+#include "runtime/io/disk-io-mgr.h"
 #include "runtime/row-batch.h"
 #include "runtime/tuple.h"
 

http://git-wip-us.apache.org/repos/asf/impala/blob/56468726/be/src/exec/kudu-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-scan-node.cc b/be/src/exec/kudu-scan-node.cc
index 9ae5710..19f1d5c 100644
--- a/be/src/exec/kudu-scan-node.cc
+++ b/be/src/exec/kudu-scan-node.cc
@@ -24,6 +24,7 @@
 #include "exprs/scalar-expr.h"
 #include "gutil/gscoped_ptr.h"
 #include "runtime/fragment-instance-state.h"
+#include "runtime/io/disk-io-mgr.h"
 #include "runtime/mem-pool.h"
 #include "runtime/runtime-state.h"
 #include "runtime/row-batch.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/56468726/be/src/exec/kudu-table-sink.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc
index 67bb86e..32ba2bb 100644
--- a/be/src/exec/kudu-table-sink.cc
+++ b/be/src/exec/kudu-table-sink.cc
@@ -17,8 +17,10 @@
 
 #include "exec/kudu-table-sink.h"
 
-#include <kudu/client/write_op.h>
 #include <sstream>
+
+#include <boost/bind.hpp>
+#include <kudu/client/write_op.h>
 #include <thrift/protocol/TDebugProtocol.h>
 
 #include "exec/kudu-util.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/56468726/be/src/exec/parquet-column-readers.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/parquet-column-readers.cc b/be/src/exec/parquet-column-readers.cc
index f403ce6..f9e67dd 100644
--- a/be/src/exec/parquet-column-readers.cc
+++ b/be/src/exec/parquet-column-readers.cc
@@ -31,6 +31,7 @@
 #include "rpc/thrift-util.h"
 #include "runtime/collection-value-builder.h"
 #include "runtime/exec-env.h"
+#include "runtime/io/disk-io-mgr.h"
 #include "runtime/tuple-row.h"
 #include "runtime/tuple.h"
 #include "runtime/runtime-state.h"

http://git-wip-us.apache.org/repos/asf/impala/blob/56468726/be/src/runtime/io/disk-io-mgr-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-internal.h b/be/src/runtime/io/disk-io-mgr-internal.h
index 475456a..3aa43b4 100644
--- a/be/src/runtime/io/disk-io-mgr-internal.h
+++ b/be/src/runtime/io/disk-io-mgr-internal.h
@@ -21,17 +21,11 @@
 #include <unistd.h>
 #include <queue>
 #include <boost/thread/locks.hpp>
-#include <gutil/strings/substitute.h>
 
 #include "common/logging.h"
 #include "runtime/io/request-context.h"
 #include "runtime/io/disk-io-mgr.h"
-#include "runtime/mem-tracker.h"
 #include "util/condition-variable.h"
-#include "util/cpu-info.h"
-#include "util/debug-util.h"
-#include "util/disk-info.h"
-#include "util/filesystem-util.h"
 #include "util/hdfs-util.h"
 #include "util/impalad-metrics.h"
 #include "util/runtime-profile-counters.h"
@@ -63,36 +57,66 @@ static inline bool is_file_handle_caching_enabled() {
   return FLAGS_max_cached_file_handles > 0;
 }
 
-/// Per disk state
-struct DiskQueue {
+/// Global queue of requests for a disk. One or more disk threads pull requests off
+/// a given queue. RequestContexts are scheduled in round-robin order to provide some
+/// level of fairness between RequestContexts.
+class DiskQueue {
+ public:
+  DiskQueue(int disk_id) : disk_id_(disk_id) {}
+  // Destructor is only run in backend tests - in a daemon the singleton DiskIoMgr
+  // is not destroyed.
+  ~DiskQueue();
+
+  /// Disk worker thread loop. This function retrieves the next range to process on
+  /// the disk queue and invokes ScanRange::DoRead() or Write() depending on the type
+  /// of Range. There can be multiple threads per disk running this loop.
+  void DiskThreadLoop(DiskIoMgr* io_mgr);
+
+  /// Enqueue the request context to the disk queue.
+  void EnqueueContext(RequestContext* worker) {
+    {
+      boost::unique_lock<boost::mutex> disk_lock(lock_);
+      // Check that the reader is not already on the queue
+      DCHECK(find(request_contexts_.begin(), request_contexts_.end(), worker) ==
+          request_contexts_.end());
+      request_contexts_.push_back(worker);
+    }
+    work_available_.NotifyAll();
+  }
+
+  /// Signals that disk threads for this queue should stop processing new work and
+  /// terminate once done.
+  void ShutDown();
+
+  /// Append debug string to 'ss'. Acquires the DiskQueue lock.
+  void DebugString(std::stringstream* ss);
+
+ private:
+  /// Called from the disk thread to get the next range to process. Wait until a scan
+  /// is available to process, a write range is available, or 'shut_down_' is set to
+  /// true. Returns the range to process and the RequestContext that the range belongs
+  /// to. Only returns NULL if the disk thread should be shut down.
+  RequestRange* GetNextRequestRange(RequestContext** request_context);
+
   /// Disk id (0-based)
-  int disk_id;
+  const int disk_id_;
 
-  /// Lock that protects access to 'request_contexts' and 'work_available'
-  boost::mutex lock;
+  /// Lock that protects below members.
+  boost::mutex lock_;
 
   /// Condition variable to signal the disk threads that there is work to do or the
   /// thread should shut down.  A disk thread will be woken up when there is a reader
   /// added to the queue. A reader is only on the queue when it has at least one
   /// scan range that is not blocked on available buffers.
-  ConditionVariable work_available;
+  ConditionVariable work_available_;
 
   /// list of all request contexts that have work queued on this disk
-  std::list<RequestContext*> request_contexts;
-
-  /// Enqueue the request context to the disk queue.  The DiskQueue lock must not be taken.
-  inline void EnqueueContext(RequestContext* worker) {
-    {
-      boost::unique_lock<boost::mutex> disk_lock(lock);
-      /// Check that the reader is not already on the queue
-      DCHECK(find(request_contexts.begin(), request_contexts.end(), worker) ==
-          request_contexts.end());
-      request_contexts.push_back(worker);
-    }
-    work_available.NotifyAll();
-  }
+  std::list<RequestContext*> request_contexts_;
 
-  DiskQueue(int id) : disk_id(id) {}
+  /// True if the IoMgr should be torn down. Worker threads check this when dequeueing
+  /// from 'request_contexts_' and terminate themselves once it is true. Only used in
+  /// backend tests - in a daemon the singleton DiskIOMgr is never shut down.
+  bool shut_down_ = false;
 };
 }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/56468726/be/src/runtime/io/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 28f2293..ff4c8f8 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -27,6 +27,8 @@
 
 #include "gutil/strings/substitute.h"
 #include "util/bit-util.h"
+#include "util/disk-info.h"
+#include "util/filesystem-util.h"
 #include "util/hdfs-util.h"
 #include "util/time.h"
 
@@ -89,9 +91,7 @@ DEFINE_int32(num_adls_io_threads, 16, "Number of ADLS I/O threads");
 
 DECLARE_int64(min_buffer_size);
 
-// With 1024B through 8MB buffers, this is up to ~2GB of buffers.
-DEFINE_int32(max_free_io_buffers, 128,
-    "For each io buffer size, the maximum number of buffers the IoMgr will hold onto");
+DEFINE_int32_hidden(max_free_io_buffers, 128, "Deprecated - has no effect.");
 
 // The number of cached file handles defines how much memory can be used per backend for
 // caching frequently used file handles. Measurements indicate that a single file handle
@@ -131,15 +131,8 @@ AtomicInt32 DiskIoMgr::next_disk_id_;
 string DiskIoMgr::DebugString() {
   stringstream ss;
   ss << "Disks: " << endl;
-  for (int i = 0; i < disk_queues_.size(); ++i) {
-    unique_lock<mutex> lock(disk_queues_[i]->lock);
-    ss << "  " << (void*) disk_queues_[i] << ":" ;
-    if (!disk_queues_[i]->request_contexts.empty()) {
-      ss << " Readers: ";
-      for (RequestContext* req_context: disk_queues_[i]->request_contexts) {
-        ss << (void*)req_context;
-      }
-    }
+  for (DiskQueue* disk_queue : disk_queues_) {
+    disk_queue->DebugString(&ss);
     ss << endl;
   }
   return ss.str();
@@ -187,7 +180,6 @@ DiskIoMgr::DiskIoMgr() :
         THREADS_PER_SOLID_STATE_DISK)),
     max_buffer_size_(BitUtil::RoundUpToPowerOfTwo(FLAGS_read_size)),
     min_buffer_size_(BitUtil::RoundDownToPowerOfTwo(FLAGS_min_buffer_size)),
-    shut_down_(false),
     total_bytes_read_counter_(TUnit::BYTES),
     read_timer_(TUnit::TIME_NS),
     file_handle_cache_(min(FLAGS_max_cached_file_handles,
@@ -214,7 +206,6 @@ DiskIoMgr::DiskIoMgr(int num_local_disks, int threads_per_rotational_disk,
     num_io_threads_per_solid_state_disk_(threads_per_solid_state_disk),
     max_buffer_size_(BitUtil::RoundUpToPowerOfTwo(max_buffer_size)),
     min_buffer_size_(BitUtil::RoundDownToPowerOfTwo(min_buffer_size)),
-    shut_down_(false),
     total_bytes_read_counter_(TUnit::BYTES),
     read_timer_(TUnit::TIME_NS),
     file_handle_cache_(min(FLAGS_max_cached_file_handles,
@@ -228,35 +219,12 @@ DiskIoMgr::DiskIoMgr(int num_local_disks, int threads_per_rotational_disk,
 }
 
 DiskIoMgr::~DiskIoMgr() {
-  shut_down_ = true;
-  // Notify all worker threads and shut them down.
-  for (int i = 0; i < disk_queues_.size(); ++i) {
-    if (disk_queues_[i] == nullptr) continue;
-    {
-      // This lock is necessary to properly use the condition var to notify
-      // the disk worker threads.  The readers also grab this lock so updates
-      // to shut_down_ are protected.
-      unique_lock<mutex> disk_lock(disk_queues_[i]->lock);
-    }
-    disk_queues_[i]->work_available.NotifyAll();
+  // Signal all threads to shut down, then wait for them to do so.
+  for (DiskQueue* disk_queue : disk_queues_) {
+    if (disk_queue != nullptr) disk_queue->ShutDown();
   }
   disk_thread_group_.JoinAll();
-
-  for (int i = 0; i < disk_queues_.size(); ++i) {
-    if (disk_queues_[i] == nullptr) continue;
-    int disk_id = disk_queues_[i]->disk_id;
-    for (RequestContext* context : disk_queues_[i]->request_contexts) {
-      unique_lock<mutex> context_lock(context->lock_);
-      DCHECK_EQ(context->disk_states_[disk_id].num_threads_in_op(), 0);
-      DCHECK(context->disk_states_[disk_id].done());
-      context->DecrementDiskRefCount(context_lock);
-    }
-  }
-
-  for (int i = 0; i < disk_queues_.size(); ++i) {
-    delete disk_queues_[i];
-  }
-
+  for (DiskQueue* disk_queue : disk_queues_) delete disk_queue;
   if (cached_read_options_ != nullptr) hadoopRzOptionsFree(cached_read_options_);
 }
 
@@ -287,8 +255,8 @@ Status DiskIoMgr::Init() {
       stringstream ss;
       ss << "work-loop(Disk: " << device_name << ", Thread: " << j << ")";
       std::unique_ptr<Thread> t;
-      RETURN_IF_ERROR(Thread::Create("disk-io-mgr", ss.str(), &DiskIoMgr::WorkLoop,
-          this, disk_queues_[i], &t));
+      RETURN_IF_ERROR(Thread::Create("disk-io-mgr", ss.str(), &DiskQueue::DiskThreadLoop,
+          disk_queues_[i], this, &t));
       disk_thread_group_.AddThread(move(t));
     }
   }
@@ -315,18 +283,18 @@ void DiskIoMgr::UnregisterContext(RequestContext* reader) {
 }
 
 Status DiskIoMgr::ValidateScanRange(ScanRange* range) {
-  int disk_id = range->disk_id_;
+  int disk_id = range->disk_id();
   if (disk_id < 0 || disk_id >= disk_queues_.size()) {
     return Status(TErrorCode::DISK_IO_ERROR,
         Substitute("Invalid scan range.  Bad disk id: $0", disk_id));
   }
-  if (range->offset_ < 0) {
+  if (range->offset() < 0) {
     return Status(TErrorCode::DISK_IO_ERROR,
-        Substitute("Invalid scan range. Negative offset $0", range->offset_));
+        Substitute("Invalid scan range. Negative offset $0", range->offset()));
   }
-  if (range->len_ <= 0) {
+  if (range->len() <= 0) {
     return Status(TErrorCode::DISK_IO_ERROR,
-        Substitute("Invalid scan range. Non-positive length $0", range->len_));
+        Substitute("Invalid scan range. Non-positive length $0", range->len()));
   }
   return Status::OK();
 }
@@ -350,7 +318,7 @@ Status DiskIoMgr::AddScanRanges(
     // Don't add empty ranges.
     DCHECK_NE(range->len(), 0);
     reader->AddActiveScanRangeLocked(reader_lock, range);
-    if (range->try_cache_) {
+    if (range->try_cache()) {
       reader->cached_ranges_.Enqueue(range);
     } else {
       reader->AddRangeToDisk(reader_lock, range, ScheduleMode::UPON_GETNEXT);
@@ -370,7 +338,7 @@ Status DiskIoMgr::StartScanRange(RequestContext* reader, ScanRange* range,
   if (reader->state_ == RequestContext::Cancelled) return Status::CANCELLED;
 
   DCHECK_NE(range->len(), 0);
-  if (range->try_cache_) {
+  if (range->try_cache()) {
     bool cached_read_succeeded;
     RETURN_IF_ERROR(range->ReadFromCache(reader_lock, &cached_read_succeeded));
     if (cached_read_succeeded) {
@@ -381,7 +349,8 @@ Status DiskIoMgr::StartScanRange(RequestContext* reader, ScanRange* range,
     // Cached read failed, fall back to normal read path.
   }
   // If we don't have a buffer yet, the caller must allocate buffers for the range.
-  *needs_buffers = range->external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER;
+  *needs_buffers =
+      range->external_buffer_tag() == ScanRange::ExternalBufferTag::NO_BUFFER;
   if (*needs_buffers) range->SetBlockedOnBuffer();
   reader->AddActiveScanRangeLocked(reader_lock, range);
   reader->AddRangeToDisk(reader_lock, range,
@@ -414,7 +383,7 @@ Status DiskIoMgr::GetNextUnstartedRange(RequestContext* reader, ScanRange** rang
     if (!reader->cached_ranges_.empty()) {
       // We have a cached range.
       *range = reader->cached_ranges_.Dequeue();
-      DCHECK((*range)->try_cache_);
+      DCHECK((*range)->try_cache());
       bool cached_read_succeeded;
       RETURN_IF_ERROR((*range)->ReadFromCache(reader_lock, &cached_read_succeeded));
       if (cached_read_succeeded) return Status::OK();
@@ -436,7 +405,7 @@ Status DiskIoMgr::GetNextUnstartedRange(RequestContext* reader, ScanRange** rang
       // Set this to nullptr, the next time this disk runs for this reader, it will
       // get another range ready.
       reader->disk_states_[disk_id].set_next_scan_range_to_start(nullptr);
-      ScanRange::ExternalBufferTag buffer_tag = (*range)->external_buffer_tag_;
+      ScanRange::ExternalBufferTag buffer_tag = (*range)->external_buffer_tag();
       if (buffer_tag == ScanRange::ExternalBufferTag::NO_BUFFER) {
         // We can't schedule this range until the client gives us buffers. The context
         // must be rescheduled regardless to ensure that 'next_scan_range_to_start' is
@@ -455,8 +424,8 @@ Status DiskIoMgr::GetNextUnstartedRange(RequestContext* reader, ScanRange** rang
 Status DiskIoMgr::AllocateBuffersForRange(RequestContext* reader,
     BufferPool::ClientHandle* bp_client, ScanRange* range, int64_t max_bytes) {
   DCHECK_GE(max_bytes, min_buffer_size_);
-  DCHECK(range->external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER)
-     << static_cast<int>(range->external_buffer_tag_) << " invalid to allocate buffers "
+  DCHECK(range->external_buffer_tag() == ScanRange::ExternalBufferTag::NO_BUFFER)
+     << static_cast<int>(range->external_buffer_tag()) << " invalid to allocate buffers "
      << "when already reading into an external buffer";
   BufferPool* bp = ExecEnv::GetInstance()->buffer_pool();
   Status status;
@@ -518,128 +487,53 @@ int64_t DiskIoMgr::ComputeIdealBufferReservation(int64_t scan_range_len) {
   }
 }
 
-// This function gets the next RequestRange to work on for this disk. It checks for
-// cancellation and
-// a) Updates ready_to_start_ranges if there are no scan ranges queued for this disk.
-// b) Adds an unstarted write range to in_flight_ranges_. The write range is processed
-//    immediately if there are no preceding scan ranges in in_flight_ranges_
-// It blocks until work is available or the thread is shut down.
+// This function gets the next RequestRange to work on for this disk. It blocks until
+// work is available or the thread is shut down.
 // Work is available if there is a RequestContext with
 //  - A ScanRange with a buffer available, or
 //  - A WriteRange in unstarted_write_ranges_.
-bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
-    RequestContext** request_context) {
-  int disk_id = disk_queue->disk_id;
-  *range = nullptr;
-
+RequestRange* DiskQueue::GetNextRequestRange(RequestContext** request_context) {
   // This loops returns either with work to do or when the disk IoMgr shuts down.
   while (true) {
     *request_context = nullptr;
-    RequestContext::PerDiskState* request_disk_state = nullptr;
     {
-      unique_lock<mutex> disk_lock(disk_queue->lock);
-
-      while (!shut_down_ && disk_queue->request_contexts.empty()) {
+      unique_lock<mutex> disk_lock(lock_);
+      while (!shut_down_ && request_contexts_.empty()) {
         // wait if there are no readers on the queue
-        disk_queue->work_available.Wait(disk_lock);
+        work_available_.Wait(disk_lock);
       }
       if (shut_down_) break;
-      DCHECK(!disk_queue->request_contexts.empty());
+      DCHECK(!request_contexts_.empty());
 
       // Get the next reader and remove the reader so that another disk thread
-      // can't pick it up.  It will be enqueued before issuing the read to HDFS
+      // can't pick it up. It will be enqueued before issuing the read to HDFS
       // so this is not a big deal (i.e. multiple disk threads can read for the
       // same reader).
-      // TODO: revisit.
-      *request_context = disk_queue->request_contexts.front();
-      disk_queue->request_contexts.pop_front();
+      *request_context = request_contexts_.front();
+      request_contexts_.pop_front();
       DCHECK(*request_context != nullptr);
-      request_disk_state = &((*request_context)->disk_states_[disk_id]);
-      request_disk_state->IncrementDiskThreadAfterDequeue();
-    }
-
-    // NOTE: no locks were taken in between.  We need to be careful about what state
-    // could have changed to the reader and disk in between.
-    // There are some invariants here.  Only one disk thread can have the
-    // same reader here (the reader is removed from the queue).  There can be
-    // other disk threads operating on this reader in other functions though.
-    unique_lock<mutex> request_lock((*request_context)->lock_);
-    VLOG_FILE << "Disk (id=" << disk_id << ") reading for "
-        << (*request_context)->DebugString();
-
-    // Check if reader has been cancelled
-    if ((*request_context)->state_ == RequestContext::Cancelled) {
-      request_disk_state->DecrementDiskThread(request_lock, *request_context);
-      continue;
-    }
-
-    DCHECK_EQ((*request_context)->state_, RequestContext::Active)
-        << (*request_context)->DebugString();
-
-    if (request_disk_state->next_scan_range_to_start() == nullptr &&
-        !request_disk_state->unstarted_scan_ranges()->empty()) {
-      // We don't have a range queued for this disk for what the caller should
-      // read next. Populate that.  We want to have one range waiting to minimize
-      // wait time in GetNextUnstartedRange().
-      ScanRange* new_range = request_disk_state->unstarted_scan_ranges()->Dequeue();
-      (*request_context)->num_unstarted_scan_ranges_.Add(-1);
-      (*request_context)->ready_to_start_ranges_.Enqueue(new_range);
-      request_disk_state->set_next_scan_range_to_start(new_range);
-
-      if ((*request_context)->num_unstarted_scan_ranges_.Load() == 0) {
-        // All the ranges have been started, notify everyone blocked on
-        // GetNextUnstartedRange(). Only one of them will get work so make sure to return
-        // nullptr to the other caller threads.
-        (*request_context)->ready_to_start_ranges_cv_.NotifyAll();
-      } else {
-        (*request_context)->ready_to_start_ranges_cv_.NotifyOne();
-      }
+      // Must increment refcount to keep RequestContext after dropping 'disk_lock'
+      (*request_context)->IncrementDiskThreadAfterDequeue(disk_id_);
     }
-
-    // Always enqueue a WriteRange to be processed into in_flight_ranges_.
-    // This is done so in_flight_ranges_ does not exclusively contain ScanRanges.
-    // For now, enqueuing a WriteRange on each invocation of GetNextRequestRange()
-    // does not flood in_flight_ranges() with WriteRanges because the entire
-    // WriteRange is processed and removed from the queue after GetNextRequestRange()
-    // returns. (A DCHECK is used to ensure that writes do not exceed 8MB).
-    if (!request_disk_state->unstarted_write_ranges()->empty()) {
-      WriteRange* write_range = request_disk_state->unstarted_write_ranges()->Dequeue();
-      request_disk_state->in_flight_ranges()->Enqueue(write_range);
-    }
-
-    // Get the next scan range to work on from the reader. Only in_flight_ranges
-    // are eligible since the disk threads do not start new ranges on their own.
-
-    // There are no inflight ranges, nothing to do.
-    if (request_disk_state->in_flight_ranges()->empty()) {
-      request_disk_state->DecrementDiskThread(request_lock, *request_context);
-      continue;
-    }
-    DCHECK_GT(request_disk_state->num_remaining_ranges(), 0);
-    *range = request_disk_state->in_flight_ranges()->Dequeue();
-    DCHECK(*range != nullptr);
-
-    // Now that we've picked a request range, put the context back on the queue so
-    // another thread can pick up another request range for this context.
-    request_disk_state->ScheduleContext(request_lock, *request_context, disk_id);
-    DCHECK((*request_context)->Validate()) << endl << (*request_context)->DebugString();
-    return true;
+    // Get the next range to process for this reader. If this context does not have a
+    // range, rinse and repeat.
+    RequestRange* range = (*request_context)->GetNextRequestRange(disk_id_);
+    if (range != nullptr) return range;
   }
-
   DCHECK(shut_down_);
-  return false;
+  return nullptr;
 }
 
 void DiskIoMgr::HandleWriteFinished(
     RequestContext* writer, WriteRange* write_range, const Status& write_status) {
   // Copy disk_id before running callback: the callback may modify write_range.
-  int disk_id = write_range->disk_id_;
+  int disk_id = write_range->disk_id();
 
   // Execute the callback before decrementing the thread count. Otherwise
   // RequestContext::Cancel() that waits for the disk ref count to be 0 will
   // return, creating a race, e.g. see IMPALA-1890.
   // The status of the write does not affect the status of the writer context.
-  write_range->callback_(write_status);
+  write_range->callback()(write_status);
   {
     unique_lock<mutex> writer_lock(writer->lock_);
     DCHECK(writer->Validate()) << endl << writer->DebugString();
@@ -649,32 +543,24 @@ void DiskIoMgr::HandleWriteFinished(
   }
 }
 
-void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
-  // The thread waits until there is work or the entire system is being shut down.
-  // If there is work, performs the read or write requested and re-enqueues the
-  // requesting context.
-  // Locks are not taken when reading from or writing to disk.
-  // The main loop has three parts:
-  //   1. GetNextRequestContext(): get the next request context (read or write) to
-  //      process and dequeue it.
-  //   2. For the dequeued request, gets the next scan- or write-range to process and
-  //      re-enqueues the request.
-  //   3. Perform the read or write as specified.
-  // Cancellation checking needs to happen in both steps 1 and 3.
+void DiskQueue::DiskThreadLoop(DiskIoMgr* io_mgr) {
+  // The thread waits until there is work or the queue is shut down. If there is work,
+  // performs the read or write requested. Locks are not taken when reading from or
+  // writing to disk.
   while (true) {
     RequestContext* worker_context = nullptr;
-    RequestRange* range = nullptr;
-    if (!GetNextRequestRange(disk_queue, &range, &worker_context)) {
+    RequestRange* range = GetNextRequestRange(&worker_context);
+    if (range == nullptr) {
       DCHECK(shut_down_);
       return;
     }
     if (range->request_type() == RequestType::READ) {
       ScanRange* scan_range = static_cast<ScanRange*>(range);
-      ReadOutcome outcome = scan_range->DoRead(disk_queue->disk_id);
-      worker_context->ReadDone(disk_queue->disk_id, outcome, scan_range);
+      ReadOutcome outcome = scan_range->DoRead(disk_id_);
+      worker_context->ReadDone(disk_id_, outcome, scan_range);
     } else {
       DCHECK(range->request_type() == RequestType::WRITE);
-      Write(worker_context, static_cast<WriteRange*>(range));
+      io_mgr->Write(worker_context, static_cast<WriteRange*>(range));
     }
   }
 }
@@ -708,7 +594,7 @@ Status DiskIoMgr::WriteRangeHelper(FILE* file_handle, WriteRange* write_range) {
 #endif
   RETURN_IF_ERROR(local_file_system_->Fwrite(file_handle, write_range));
 
-  ImpaladMetrics::IO_MGR_BYTES_WRITTEN->Increment(write_range->len_);
+  ImpaladMetrics::IO_MGR_BYTES_WRITTEN->Increment(write_range->len());
   return Status::OK();
 }
 
@@ -806,3 +692,29 @@ Status DiskIoMgr::ReopenCachedHdfsFileHandle(const hdfsFS& fs, std::string* fnam
   DCHECK(!cache_hit);
   return Status::OK();
 }
+
+void DiskQueue::ShutDown() {
+  {
+    unique_lock<mutex> disk_lock(lock_);
+    shut_down_ = true;
+  }
+  // All waiting threads should exit, so wake them all up.
+  work_available_.NotifyAll();
+}
+
+void DiskQueue::DebugString(stringstream* ss) {
+  unique_lock<mutex> lock(lock_);
+  *ss << "DiskQueue id=" << disk_id_ << " ptr=" << static_cast<void*>(this) << ":" ;
+  if (!request_contexts_.empty()) {
+    *ss << " Readers: ";
+    for (RequestContext* req_context: request_contexts_) {
+      *ss << static_cast<void*>(req_context);
+    }
+  }
+}
+
+DiskQueue::~DiskQueue() {
+  for (RequestContext* context : request_contexts_) {
+    context->UnregisterDiskQueue(disk_id_);
+  }
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/56468726/be/src/runtime/io/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
index cc1bb37..586b339 100644
--- a/be/src/runtime/io/disk-io-mgr.h
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -18,26 +18,18 @@
 #ifndef IMPALA_RUNTIME_IO_DISK_IO_MGR_H
 #define IMPALA_RUNTIME_IO_DISK_IO_MGR_H
 
-#include <deque>
-#include <functional>
 #include <vector>
 
-#include <boost/scoped_ptr.hpp>
-#include <boost/unordered_set.hpp>
 #include <boost/thread/mutex.hpp>
 
 #include "common/atomic.h"
 #include "common/hdfs.h"
-#include "common/object-pool.h"
 #include "common/status.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/io/handle-cache.h"
 #include "runtime/io/local-file-system.h"
 #include "runtime/io/request-ranges.h"
 #include "util/aligned-new.h"
-#include "util/bit-util.h"
-#include "util/condition-variable.h"
-#include "util/error-util.h"
 #include "util/runtime-profile.h"
 #include "util/thread.h"
 
@@ -45,7 +37,7 @@ namespace impala {
 
 namespace io {
 
-struct DiskQueue;
+class DiskQueue;
 /// Manager object that schedules IO for all queries on all disks and remote filesystems
 /// (such as S3). Each query maps to one or more RequestContext objects, each of which
 /// has its own queue of scan ranges and/or write ranges.
@@ -394,12 +386,30 @@ class DiskIoMgr : public CacheLineAligned {
   /// See "Buffer Management" in the class comment for explanation.
   static const int64_t IDEAL_MAX_SIZED_BUFFERS_PER_SCAN_RANGE = 3;
 
- private:
-  friend class BufferDescriptor;
+ protected:
+  // Protected methods are used by other classes in io::.
+  friend class DiskQueue;
+  friend class ScanRange;
   friend class RequestContext;
-  // TODO: remove io:: prefix - it is required for the "using ScanRange" workaround above.
-  friend class io::ScanRange;
 
+  /// Write the specified range to disk and calls HandleWriteFinished() when done.
+  /// Responsible for opening and closing the file that is written.
+  void Write(RequestContext* writer_context, WriteRange* write_range);
+
+  DiskQueue* GetDiskQueue(int disk_id) {
+    DCHECK_GE(disk_id, 0);
+    DCHECK_LT(disk_id, disk_queues_.size());
+    return disk_queues_[disk_id];
+  }
+
+  RuntimeProfile::Counter* total_bytes_read_counter() {
+    return &total_bytes_read_counter_;
+  }
+  RuntimeProfile::Counter* read_timer() { return &read_timer_; }
+  struct hadoopRzOptions* cached_read_options() { return cached_read_options_; }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(DiskIoMgr);
   friend class DiskIoMgrTest_Buffers_Test;
   friend class DiskIoMgrTest_BufferSizeSelection_Test;
   friend class DiskIoMgrTest_VerifyNumThreadsParameter_Test;
@@ -427,10 +437,6 @@ class DiskIoMgr : public CacheLineAligned {
   /// Options object for cached hdfs reads. Set on startup and never modified.
   struct hadoopRzOptions* cached_read_options_ = nullptr;
 
-  /// True if the IoMgr should be torn down. Worker threads watch for this to
-  /// know to terminate. This variable is read/written to by different threads.
-  volatile bool shut_down_;
-
   /// Total bytes read by the IoMgr.
   RuntimeProfile::Counter total_bytes_read_counter_;
 
@@ -452,35 +458,17 @@ class DiskIoMgr : public CacheLineAligned {
   // handles are closed.
   FileHandleCache file_handle_cache_;
 
-  /// Disk worker thread loop. This function retrieves the next range to process on
-  /// the disk queue and invokes ScanRange::DoRead() or Write() depending on the type
-  /// of Range. There can be multiple threads per disk running this loop.
-  void WorkLoop(DiskQueue* queue);
-
-  /// This is called from the disk thread to get the next range to process. It will
-  /// wait until a scan range and buffer are available, or a write range is available.
-  /// This functions returns the range to process.
-  /// Only returns false if the disk thread should be shut down.
-  /// No locks should be taken before this function call and none are left taken after.
-  bool GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
-      RequestContext** request_context);
-
   /// Invokes write_range->callback_  after the range has been written and
   /// updates per-disk state and handle state. The status of the write OK/RUNTIME_ERROR
   /// etc. is passed via write_status and to the callback.
   /// The write_status does not affect the writer->status_. That is, an write error does
   /// not cancel the writer context - that decision is left to the callback handler.
-  /// TODO: On the read path, consider not canceling the reader context on error.
   void HandleWriteFinished(
       RequestContext* writer, WriteRange* write_range, const Status& write_status);
 
   /// Validates that range is correctly initialized
   Status ValidateScanRange(ScanRange* range) WARN_UNUSED_RESULT;
 
-  /// Write the specified range to disk and calls HandleWriteFinished when done.
-  /// Responsible for opening and closing the file that is written.
-  void Write(RequestContext* writer_context, WriteRange* write_range);
-
   /// Helper method to write a range using the specified FILE handle. Returns Status:OK
   /// if the write succeeded, or a RUNTIME_ERROR with an appropriate message otherwise.
   /// Does not open or close the file that is written.

http://git-wip-us.apache.org/repos/asf/impala/blob/56468726/be/src/runtime/io/error-converter.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/error-converter.cc b/be/src/runtime/io/error-converter.cc
index 0ca25b7..966558d 100644
--- a/be/src/runtime/io/error-converter.cc
+++ b/be/src/runtime/io/error-converter.cc
@@ -16,7 +16,11 @@
 // under the License.
 
 #include "runtime/io/error-converter.h"
+
 #include "gutil/strings/substitute.h"
+#include "util/debug-util.h"
+#include "util/error-util.h"
+
 #include "common/names.h"
 
 namespace impala {

http://git-wip-us.apache.org/repos/asf/impala/blob/56468726/be/src/runtime/io/local-file-system.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/local-file-system.cc b/be/src/runtime/io/local-file-system.cc
index 3ad6aa4..63aae52 100644
--- a/be/src/runtime/io/local-file-system.cc
+++ b/be/src/runtime/io/local-file-system.cc
@@ -73,15 +73,15 @@ Status LocalFileSystem::Fwrite(FILE* file_handle, const WriteRange* write_range)
   DCHECK(file_handle != nullptr);
   DCHECK(write_range != nullptr);
   int64_t bytes_written = FwriteAux(file_handle, write_range);
-  if (bytes_written < write_range->len_) {
+  if (bytes_written < write_range->len()) {
     return ErrorConverter::GetErrorStatusFromErrno("fwrite()", write_range->file(),
-        errno, {{"range_length", SimpleItoa(write_range->len_)}});
+        errno, {{"range_length", SimpleItoa(write_range->len())}});
   }
   return Status::OK();
 }
 
 size_t LocalFileSystem::FwriteAux(FILE* file_handle, const WriteRange* write_range) {
-  return fwrite(write_range->data_, 1, write_range->len_, file_handle);
+  return fwrite(write_range->data(), 1, write_range->len(), file_handle);
 }
 
 Status LocalFileSystem::Fclose(FILE* file_handle, const WriteRange* write_range) {

http://git-wip-us.apache.org/repos/asf/impala/blob/56468726/be/src/runtime/io/request-context.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.cc b/be/src/runtime/io/request-context.cc
index dd8ff24..6c56b8c 100644
--- a/be/src/runtime/io/request-context.cc
+++ b/be/src/runtime/io/request-context.cc
@@ -24,46 +24,6 @@
 using namespace impala;
 using namespace impala::io;
 
-BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr,
-    RequestContext* reader, ScanRange* scan_range, uint8_t* buffer,
-    int64_t buffer_len)
-  : io_mgr_(io_mgr),
-    reader_(reader),
-    scan_range_(scan_range),
-    buffer_(buffer),
-    buffer_len_(buffer_len) {
-  DCHECK(io_mgr != nullptr);
-  DCHECK(scan_range != nullptr);
-  DCHECK(buffer != nullptr);
-  DCHECK_GE(buffer_len, 0);
-}
-
-BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr, RequestContext* reader,
-    ScanRange* scan_range, BufferPool::ClientHandle* bp_client,
-    BufferPool::BufferHandle handle) :
-  io_mgr_(io_mgr),
-  reader_(reader),
-  scan_range_(scan_range),
-  buffer_(handle.data()),
-  buffer_len_(handle.len()),
-  bp_client_(bp_client),
-  handle_(move(handle)) {
-  DCHECK(io_mgr != nullptr);
-  DCHECK(scan_range != nullptr);
-  DCHECK(bp_client_->is_registered());
-  DCHECK(handle_.is_open());
-}
-
-void RequestContext::FreeBuffer(BufferDescriptor* buffer) {
-  DCHECK(buffer->buffer_ != nullptr);
-  if (!buffer->is_cached() && !buffer->is_client_buffer()) {
-    // Only buffers that were allocated by DiskIoMgr need to be freed.
-    ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(
-        buffer->bp_client_, &buffer->handle_);
-  }
-  buffer->buffer_ = nullptr;
-}
-
 void RequestContext::ReadDone(int disk_id, ReadOutcome outcome, ScanRange* range) {
   // TODO: IMPALA-4249: it is safe to touch 'range' until DecrementDiskThread() is
   // called because all clients of DiskIoMgr keep ScanRange objects alive until they
@@ -140,13 +100,13 @@ void RequestContext::Cancel() {
       RequestRange* range;
       while ((range = disk_state.in_flight_ranges()->Dequeue()) != nullptr) {
         if (range->request_type() == RequestType::WRITE) {
-          write_callbacks.push_back(static_cast<WriteRange*>(range)->callback_);
+          write_callbacks.push_back(static_cast<WriteRange*>(range)->callback());
         }
       }
       while (disk_state.unstarted_scan_ranges()->Dequeue() != nullptr);
       WriteRange* write_range;
       while ((write_range = disk_state.unstarted_write_ranges()->Dequeue()) != nullptr) {
-        write_callbacks.push_back(write_range->callback_);
+        write_callbacks.push_back(write_range->callback());
       }
     }
     // Clear out the lists of scan ranges.
@@ -248,6 +208,76 @@ void RequestContext::RemoveActiveScanRangeLocked(
   active_scan_ranges_.erase(range);
 }
 
+// This function gets the next RequestRange to work on for this RequestContext and disk
+// combination this disk. It checks for cancellation and:
+// a) Updates ready_to_start_ranges if there are no scan ranges queued for this disk.
+// b) Adds an unstarted write range to in_flight_ranges_. The write range is processed
+//    immediately if there are no preceding scan ranges in in_flight_ranges_
+RequestRange* RequestContext::GetNextRequestRange(int disk_id) {
+  PerDiskState* request_disk_state = &disk_states_[disk_id];
+  // NOTE: no locks are held, so other threads could have modified the state of the reader
+  // and disk state since this context was pulled off the queue. Only one disk thread can
+  // be in this function for this reader, since the reader was removed from the queue and
+  // has not be re-added. Other disk threads may be operating on this reader in other
+  // functions though.
+  unique_lock<mutex> request_lock(lock_);
+  VLOG_FILE << "Disk (id=" << disk_id << ") reading for " << DebugString();
+
+  // Check if reader has been cancelled
+  if (state_ == RequestContext::Cancelled) {
+    request_disk_state->DecrementDiskThread(request_lock, this);
+    return nullptr;
+  }
+  DCHECK_EQ(state_, RequestContext::Active) << DebugString();
+  if (request_disk_state->next_scan_range_to_start() == nullptr &&
+      !request_disk_state->unstarted_scan_ranges()->empty()) {
+    // We don't have a range queued for this disk for what the caller should
+    // read next. Populate that.  We want to have one range waiting to minimize
+    // wait time in GetNextUnstartedRange().
+    ScanRange* new_range = request_disk_state->unstarted_scan_ranges()->Dequeue();
+    num_unstarted_scan_ranges_.Add(-1);
+    ready_to_start_ranges_.Enqueue(new_range);
+    request_disk_state->set_next_scan_range_to_start(new_range);
+
+    if (num_unstarted_scan_ranges_.Load() == 0) {
+      // All the ranges have been started, notify everyone blocked on
+      // GetNextUnstartedRange(). Only one of them will get work so make sure to return
+      // nullptr to the other caller threads.
+      ready_to_start_ranges_cv_.NotifyAll();
+    } else {
+      ready_to_start_ranges_cv_.NotifyOne();
+    }
+  }
+
+  // Always enqueue a WriteRange to be processed into in_flight_ranges_.
+  // This is done so in_flight_ranges_ does not exclusively contain ScanRanges.
+  // For now, enqueuing a WriteRange on each invocation of GetNextRequestRange()
+  // does not flood in_flight_ranges() with WriteRanges because the entire
+  // WriteRange is processed and removed from the queue after GetNextRequestRange()
+  // returns.
+  if (!request_disk_state->unstarted_write_ranges()->empty()) {
+    WriteRange* write_range = request_disk_state->unstarted_write_ranges()->Dequeue();
+    request_disk_state->in_flight_ranges()->Enqueue(write_range);
+  }
+
+  // Get the next scan range to work on from the reader. Only in_flight_ranges
+  // are eligible since the disk threads do not start new ranges on their own.
+  if (request_disk_state->in_flight_ranges()->empty()) {
+    // There are no inflight ranges, nothing to do.
+    request_disk_state->DecrementDiskThread(request_lock, this);
+    return nullptr;
+  }
+  DCHECK_GT(request_disk_state->num_remaining_ranges(), 0);
+  RequestRange* range = request_disk_state->in_flight_ranges()->Dequeue();
+  DCHECK(range != nullptr);
+
+  // Now that we've picked a request range, put the context back on the queue so
+  // another thread can pick up another request range for this context.
+  request_disk_state->ScheduleContext(request_lock, this, disk_id);
+  DCHECK(Validate()) << endl << DebugString();
+  return range;
+}
+
 RequestContext::RequestContext(DiskIoMgr* parent, int num_disks)
   : parent_(parent), disk_states_(num_disks) {}
 
@@ -278,6 +308,13 @@ string RequestContext::DebugString() const {
   return ss.str();
 }
 
+void RequestContext::UnregisterDiskQueue(int disk_id) {
+  unique_lock<mutex> lock(lock_);
+  DCHECK_EQ(disk_states_[disk_id].num_threads_in_op(), 0);
+  DCHECK(disk_states_[disk_id].done());
+  DecrementDiskRefCount(lock);
+}
+
 bool RequestContext::Validate() const {
   if (state_ == RequestContext::Inactive) {
     LOG(WARNING) << "state_ == RequestContext::Inactive";
@@ -375,6 +412,6 @@ void RequestContext::PerDiskState::ScheduleContext(const unique_lock<mutex>& con
   DCHECK(context_lock.mutex() == &context->lock_ && context_lock.owns_lock());
   if (is_on_queue_.Load() == 0 && !done_) {
     is_on_queue_.Store(1);
-    context->parent_->disk_queues_[disk_id]->EnqueueContext(context);
+    context->parent_->GetDiskQueue(disk_id)->EnqueueContext(context);
   }
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/56468726/be/src/runtime/io/request-context.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.h b/be/src/runtime/io/request-context.h
index adfe6d5..2687ee1 100644
--- a/be/src/runtime/io/request-context.h
+++ b/be/src/runtime/io/request-context.h
@@ -18,6 +18,8 @@
 #ifndef IMPALA_RUNTIME_IO_REQUEST_CONTEXT_H
 #define IMPALA_RUNTIME_IO_REQUEST_CONTEXT_H
 
+#include <boost/unordered_set.hpp>
+
 #include "runtime/io/disk-io-mgr.h"
 #include "util/condition-variable.h"
 
@@ -141,13 +143,12 @@ class RequestContext {
     disks_accessed_bitmap_ = disks_accessed_bitmap;
   }
 
- private:
-  DISALLOW_COPY_AND_ASSIGN(RequestContext);
+ protected:
+  // Protected methods are accessed by other classes in io:: but not external classes.
+  friend class DiskQueue;
   friend class DiskIoMgr;
   friend class ScanRange;
 
-  class PerDiskState;
-
   enum State {
     /// Reader is initialized and maps to a client
     Active,
@@ -164,23 +165,20 @@ class RequestContext {
 
   RequestContext(DiskIoMgr* parent, int num_disks);
 
-  /// Cleans up a buffer. If the buffer was allocated with AllocateBuffersForRange(),
-  /// frees the buffer. Otherwise (e.g. a client or HDFS cache buffer), just prepares the
-  /// descriptor to be destroyed. After this is called, buffer->buffer() is NULL.
-  /// Does not acquire 'lock_'.
-  void FreeBuffer(BufferDescriptor* buffer);
+  /// Called when dequeueing this RequestContext from the disk queue to increment the
+  /// count of disk threads with a reference to this context for 'disk_id. These threads
+  /// do not hold any locks while reading from HDFS, so we need to prevent the
+  /// RequestContext from being destroyed underneath them.
+  void IncrementDiskThreadAfterDequeue(int disk_id);
+
+  /// Called when the disk queue for disk 'disk_id' shuts down. Only used in backend
+  /// tests - disk queues are not shut down for the singleton DiskIoMgr in a daemon.
+  void UnregisterDiskQueue(int disk_id);
 
   /// Decrements the number of active disks for this reader.  If the disk count
   /// goes to 0, the disk complete condition variable is signaled.
   /// 'lock_' must be held via 'lock'.
-  void DecrementDiskRefCount(const boost::unique_lock<boost::mutex>& lock) {
-    DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
-    DCHECK_GT(num_disks_with_ranges_, 0);
-    if (--num_disks_with_ranges_ == 0) {
-      disks_complete_cond_var_.NotifyAll();
-    }
-    DCHECK(Validate()) << std::endl << DebugString();
-  }
+  void DecrementDiskRefCount(const boost::unique_lock<boost::mutex>& lock);
 
   /// Reader & Disk Scheduling: Readers that currently can't do work are not on
   /// the disk's queue. These readers are ones that don't have any ranges in the
@@ -192,14 +190,14 @@ class RequestContext {
   /// Adds range to in_flight_ranges, scheduling this reader on the disk threads
   /// if necessary.
   /// 'lock_' must be held via 'lock'. Only valid to call if this context is active.
-  void ScheduleScanRange(const boost::unique_lock<boost::mutex>& lock, ScanRange* range) {
-    DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
-    DCHECK_EQ(state_, Active);
-    DCHECK(range != nullptr);
-    RequestContext::PerDiskState& state = disk_states_[range->disk_id()];
-    state.in_flight_ranges()->Enqueue(range);
-    state.ScheduleContext(lock, this, range->disk_id());
-  }
+  void ScheduleScanRange(const boost::unique_lock<boost::mutex>& lock, ScanRange* range);
+
+  // Called from the disk thread for 'disk_id' to get the next request range to process
+  // for this context for the disk. Returns nullptr if there are no ranges currently
+  // available. The calling disk thread should hold no locks and must hold a refcount
+  // obtained from IncrementDiskThreadAfterDequeue() to ensure that the context is not
+  // destroyed while executing this function.
+  RequestRange* GetNextRequestRange(int disk_id);
 
   /// Called from a disk thread when a read completes. Decrements the disk thread count
   /// and other bookkeeping and re-schedules 'range' if there are more reads to do.
@@ -239,14 +237,7 @@ class RequestContext {
   void RemoveActiveScanRangeLocked(
       const boost::unique_lock<boost::mutex>& lock, ScanRange* range);
 
-  /// Validates invariants of reader.  Reader lock must be taken beforehand.
-  bool Validate() const;
-
-  /// Dumps out reader information.  Lock should be taken by caller
-  std::string DebugString() const;
-
-  /// Parent object
-  DiskIoMgr* const parent_;
+  // Counters are updated by other classes - expose to other io:: classes for convenience.
 
   /// Total bytes read for this reader
   RuntimeProfile::Counter* bytes_read_counter_ = nullptr;
@@ -291,6 +282,18 @@ class RequestContext {
   /// Total number of file handle opens where the file handle was not in the cache
   AtomicInt32 cached_file_handles_miss_count_{0};
 
+ private:
+  DISALLOW_COPY_AND_ASSIGN(RequestContext);
+
+  /// Validates invariants of reader. 'lock_' must be held by caller.
+  bool Validate() const;
+
+  /// Dumps out reader information. 'lock_' must be held by caller.
+  std::string DebugString() const;
+
+  /// Parent object
+  DiskIoMgr* const parent_;
+
   /// All fields below are accessed by multiple threads and the lock needs to be
   /// taken before accessing them. Must be acquired before ScanRange::lock_ if both
   /// are held simultaneously.
@@ -332,166 +335,189 @@ class RequestContext {
   /// Condition variable for UnregisterContext() to wait for all disks to complete
   ConditionVariable disks_complete_cond_var_;
 
-  /// Struct containing state per disk. See comments in the disk read loop on how
-  /// they are used.
-  class PerDiskState {
-   public:
-    bool done() const { return done_; }
-    void set_done(bool b) { done_ = b; }
+ private:
+  /// Per disk states to synchronize multiple disk threads accessing the same request
+  /// context.
+  class PerDiskState;
+  std::vector<PerDiskState> disk_states_;
+};
+
+/// Struct containing state per disk. See comments in the disk read loop on how
+/// they are used.
+class RequestContext::PerDiskState {
+ public:
+  bool done() const { return done_; }
+  void set_done(bool b) { done_ = b; }
 
-    int num_remaining_ranges() const { return num_remaining_ranges_; }
-    int& num_remaining_ranges() { return num_remaining_ranges_; }
+  int num_remaining_ranges() const { return num_remaining_ranges_; }
+  int& num_remaining_ranges() { return num_remaining_ranges_; }
 
-    ScanRange* next_scan_range_to_start() { return next_scan_range_to_start_; }
-    void set_next_scan_range_to_start(ScanRange* range) {
-      next_scan_range_to_start_ = range;
-    }
+  ScanRange* next_scan_range_to_start() { return next_scan_range_to_start_; }
+  void set_next_scan_range_to_start(ScanRange* range) {
+    next_scan_range_to_start_ = range;
+  }
 
-    bool is_on_queue() const { return is_on_queue_.Load() != 0; }
+  bool is_on_queue() const { return is_on_queue_.Load() != 0; }
 
-    int num_threads_in_op() const { return num_threads_in_op_.Load(); }
+  int num_threads_in_op() const { return num_threads_in_op_.Load(); }
 
-    const InternalQueue<ScanRange>* unstarted_scan_ranges() const {
-      return &unstarted_scan_ranges_;
-    }
-    const InternalQueue<WriteRange>* unstarted_write_ranges() const {
-      return &unstarted_write_ranges_;
-    }
-    const InternalQueue<RequestRange>* in_flight_ranges() const {
-      return &in_flight_ranges_;
-    }
+  const InternalQueue<ScanRange>* unstarted_scan_ranges() const {
+    return &unstarted_scan_ranges_;
+  }
+  const InternalQueue<WriteRange>* unstarted_write_ranges() const {
+    return &unstarted_write_ranges_;
+  }
+  const InternalQueue<RequestRange>* in_flight_ranges() const {
+    return &in_flight_ranges_;
+  }
+
+  InternalQueue<ScanRange>* unstarted_scan_ranges() { return &unstarted_scan_ranges_; }
+  InternalQueue<WriteRange>* unstarted_write_ranges() {
+    return &unstarted_write_ranges_;
+  }
+  InternalQueue<RequestRange>* in_flight_ranges() { return &in_flight_ranges_; }
+
+  /// Schedules the request context on this disk if it's not already on the queue.
+  /// context->lock_ must be held by the caller via 'context_lock'.
+  void ScheduleContext(const boost::unique_lock<boost::mutex>& context_lock,
+      RequestContext* context, int disk_id);
+
+  /// See RequestContext::IncrementDiskThreadAfterDequeue() comment for usage.
+  ///
+  /// The caller does not need to hold 'lock_', so this can execute concurrently with
+  /// itself and DecrementDiskThread().
+  void IncrementDiskThreadAfterDequeue() {
+    /// Incrementing 'num_threads_in_op_' first so that there is no window when other
+    /// threads see 'is_on_queue_ == num_threads_in_op_ == 0' and think there are no
+    /// references left to this context.
+    num_threads_in_op_.Add(1);
+    is_on_queue_.Store(0);
+  }
 
-    InternalQueue<ScanRange>* unstarted_scan_ranges() { return &unstarted_scan_ranges_; }
-    InternalQueue<WriteRange>* unstarted_write_ranges() {
-      return &unstarted_write_ranges_;
+  /// Decrement the count of disks threads with a reference to this context. Does final
+  /// cleanup if the context is cancelled and this is the last thread for the disk.
+  /// context->lock_ must be held by the caller via 'context_lock'.
+  void DecrementDiskThread(const boost::unique_lock<boost::mutex>& context_lock,
+      RequestContext* context) {
+    DCHECK(context_lock.mutex() == &context->lock_ && context_lock.owns_lock());
+    num_threads_in_op_.Add(-1);
+
+    if (context->state_ != Cancelled) {
+      DCHECK_EQ(context->state_, Active);
+      return;
     }
-    InternalQueue<RequestRange>* in_flight_ranges() { return &in_flight_ranges_; }
-
-    /// Schedules the request context on this disk if it's not already on the queue.
-    /// context->lock_ must be held by the caller via 'context_lock'.
-    void ScheduleContext(const boost::unique_lock<boost::mutex>& context_lock,
-        RequestContext* context, int disk_id);
-
-    /// Called when dequeueing this RequestContext from the disk queue to increment the
-    /// count of disk threads with a reference to this context. These threads do not hold
-    /// any locks while reading from HDFS, so we need to prevent the RequestContext from
-    /// being destroyed underneath them.
-    ///
-    /// The caller does not need to hold 'lock_', so this can execute concurrently with
-    /// itself and DecrementDiskThread().
-    void IncrementDiskThreadAfterDequeue() {
-      /// Incrementing 'num_threads_in_op_' first so that there is no window when other
-      /// threads see 'is_on_queue_ == num_threads_in_op_ == 0' and think there are no
-      /// references left to this context.
-      num_threads_in_op_.Add(1);
-      is_on_queue_.Store(0);
+    // The state is cancelled, check to see if we're the last thread to touch the
+    // context on this disk. We need to load 'is_on_queue_' and 'num_threads_in_op_'
+    // in this order to avoid a race with IncrementDiskThreadAfterDequeue().
+    if (is_on_queue_.Load() == 0 && num_threads_in_op_.Load() == 0 && !done_) {
+      context->DecrementDiskRefCount(context_lock);
+      done_ = true;
     }
+  }
 
-    /// Decrement the count of disks threads with a reference to this context. Does final
-    /// cleanup if the context is cancelled and this is the last thread for the disk.
-    /// context->lock_ must be held by the caller via 'context_lock'.
-    void DecrementDiskThread(const boost::unique_lock<boost::mutex>& context_lock,
-        RequestContext* context) {
-      DCHECK(context_lock.mutex() == &context->lock_ && context_lock.owns_lock());
-      num_threads_in_op_.Add(-1);
-
-      if (context->state_ != Cancelled) {
-        DCHECK_EQ(context->state_, Active);
-        return;
-      }
-      // The state is cancelled, check to see if we're the last thread to touch the
-      // context on this disk. We need to load 'is_on_queue_' and 'num_threads_in_op_'
-      // in this order to avoid a race with IncrementDiskThreadAfterDequeue().
-      if (is_on_queue_.Load() == 0 && num_threads_in_op_.Load() == 0 && !done_) {
-        context->DecrementDiskRefCount(context_lock);
-        done_ = true;
-      }
-    }
+ private:
+  /// If true, this disk is all done for this request context, including any cleanup.
+  /// If done is true, it means that this request must not be on this disk's queue
+  /// *AND* there are no threads currently working on this context. To satisfy
+  /// this, only the last thread (per disk) can set this to true.
+  bool done_ = true;
+
+  /// For each disk, keeps track if the context is on this disk's queue, indicating
+  /// the disk must do some work for this context. 1 means that the context is on the
+  /// disk queue, 0 means that it's not on the queue (either because it has on ranges
+  /// active for the disk or because a disk thread dequeued the context and is
+  /// currently processing a request).
+  ///
+  /// The disk needs to do work in 4 cases:
+  ///  1) in_flight_ranges is not empty, the disk needs to read for this reader.
+  ///  2) next_range_to_start is NULL, the disk needs to prepare a scan range to be
+  ///     read next.
+  ///  3) the reader has been cancelled and this disk needs to participate in the
+  ///     cleanup.
+  ///  4) A write range is added to queue.
+  /// In general, we only want to put a context on the disk queue if there is something
+  /// useful that can be done. If there's nothing useful, the disk queue will wake up
+  /// and then remove the reader from the queue. Doing this causes thrashing of the
+  /// threads.
+  ///
+  /// This variable is important during context cancellation because it indicates
+  /// whether a queue has a reference to the context that must be released before
+  /// the context is considered unregistered. Atomically set to false after
+  /// incrementing 'num_threads_in_op_' when dequeueing so that there is no window
+  /// when other threads see 'is_on_queue_ == num_threads_in_op_ == 0' and think there
+  /// are no references left to this context.
+  /// TODO: this could be combined with 'num_threads_in_op_' to be a single refcount.
+  AtomicInt32 is_on_queue_{0};
+
+  /// For each disks, the number of request ranges that have not been fully read.
+  /// In the non-cancellation path, this will hit 0, and done will be set to true
+  /// by the disk thread. This is undefined in the cancellation path (the various
+  /// threads notice by looking at the RequestContext's state_).
+  int num_remaining_ranges_ = 0;
+
+  /// Queue of ranges that have not started being read.  This list is exclusive
+  /// with in_flight_ranges.
+  InternalQueue<ScanRange> unstarted_scan_ranges_;
+
+  /// Queue of pending IO requests for this disk in the order that they will be
+  /// processed. A ScanRange is added to this queue when it is returned in
+  /// GetNextUnstartedRange(), or when it is added with schedule_mode == IMMEDIATELY.
+  /// A WriteRange is added to this queue from unstarted_write_ranges_ for each
+  /// invocation of GetNextRequestRange() in WorkLoop().
+  /// The size of this queue is always less than or equal to num_remaining_ranges.
+  InternalQueue<RequestRange> in_flight_ranges_;
+
+  /// The next range to start for this reader on this disk. Each disk (for each reader)
+  /// picks the next range to start. The range is set here and also added to the
+  /// ready_to_start_ranges_ queue. The reader pulls from the queue in FIFO order,
+  /// so the ranges from different disks are round-robined. When the range is pulled
+  /// off the ready_to_start_ranges_ queue, it sets this variable to NULL, so the disk
+  /// knows to populate it again and add it to ready_to_start_ranges_ i.e. it is used
+  /// as a flag by DiskIoMgr::GetNextScanRange to determine if it needs to add another
+  /// range to ready_to_start_ranges_.
+  ScanRange* next_scan_range_to_start_ = nullptr;
+
+  /// For each disk, the number of disk threads issuing the underlying read/write on
+  /// behalf of this context. There are a few places where we release the context lock,
+  /// do some work, and then grab the lock again.  Because we don't hold the lock for
+  /// the entire operation, we need this ref count to keep track of which thread should
+  /// do final resource cleanup during cancellation.
+  /// Only the thread that sees the count at 0 should do the final cleanup.
+  AtomicInt32 num_threads_in_op_{0};
+
+  /// Queue of write ranges to process for this disk. A write range is always added
+  /// to in_flight_ranges_ in GetNextRequestRange(). There is a separate
+  /// unstarted_read_ranges_ and unstarted_write_ranges_ to alternate between reads
+  /// and writes. (Otherwise, since next_scan_range_to_start is set
+  /// in GetNextRequestRange() whenever it is null, repeated calls to
+  /// GetNextRequestRange() and GetNextUnstartedRange() may result in only reads being
+  /// processed)
+  InternalQueue<WriteRange> unstarted_write_ranges_;
+};
 
-   private:
-    /// If true, this disk is all done for this request context, including any cleanup.
-    /// If done is true, it means that this request must not be on this disk's queue
-    /// *AND* there are no threads currently working on this context. To satisfy
-    /// this, only the last thread (per disk) can set this to true.
-    bool done_ = true;
-
-    /// For each disk, keeps track if the context is on this disk's queue, indicating
-    /// the disk must do some work for this context. 1 means that the context is on the
-    /// disk queue, 0 means that it's not on the queue (either because it has on ranges
-    /// active for the disk or because a disk thread dequeued the context and is
-    /// currently processing a request).
-    ///
-    /// The disk needs to do work in 4 cases:
-    ///  1) in_flight_ranges is not empty, the disk needs to read for this reader.
-    ///  2) next_range_to_start is NULL, the disk needs to prepare a scan range to be
-    ///     read next.
-    ///  3) the reader has been cancelled and this disk needs to participate in the
-    ///     cleanup.
-    ///  4) A write range is added to queue.
-    /// In general, we only want to put a context on the disk queue if there is something
-    /// useful that can be done. If there's nothing useful, the disk queue will wake up
-    /// and then remove the reader from the queue. Doing this causes thrashing of the
-    /// threads.
-    ///
-    /// This variable is important during context cancellation because it indicates
-    /// whether a queue has a reference to the context that must be released before
-    /// the context is considered unregistered. Atomically set to false after
-    /// incrementing 'num_threads_in_op_' when dequeueing so that there is no window
-    /// when other threads see 'is_on_queue_ == num_threads_in_op_ == 0' and think there
-    /// are no references left to this context.
-    /// TODO: this could be combined with 'num_threads_in_op_' to be a single refcount.
-    AtomicInt32 is_on_queue_{0};
-
-    /// For each disks, the number of request ranges that have not been fully read.
-    /// In the non-cancellation path, this will hit 0, and done will be set to true
-    /// by the disk thread. This is undefined in the cancellation path (the various
-    /// threads notice by looking at the RequestContext's state_).
-    int num_remaining_ranges_ = 0;
-
-    /// Queue of ranges that have not started being read.  This list is exclusive
-    /// with in_flight_ranges.
-    InternalQueue<ScanRange> unstarted_scan_ranges_;
-
-    /// Queue of pending IO requests for this disk in the order that they will be
-    /// processed. A ScanRange is added to this queue when it is returned in
-    /// GetNextUnstartedRange(), or when it is added with schedule_mode == IMMEDIATELY.
-    /// A WriteRange is added to this queue from unstarted_write_ranges_ for each
-    /// invocation of GetNextRequestRange() in WorkLoop().
-    /// The size of this queue is always less than or equal to num_remaining_ranges.
-    InternalQueue<RequestRange> in_flight_ranges_;
-
-    /// The next range to start for this reader on this disk. Each disk (for each reader)
-    /// picks the next range to start. The range is set here and also added to the
-    /// ready_to_start_ranges_ queue. The reader pulls from the queue in FIFO order,
-    /// so the ranges from different disks are round-robined. When the range is pulled
-    /// off the ready_to_start_ranges_ queue, it sets this variable to NULL, so the disk
-    /// knows to populate it again and add it to ready_to_start_ranges_ i.e. it is used
-    /// as a flag by DiskIoMgr::GetNextScanRange to determine if it needs to add another
-    /// range to ready_to_start_ranges_.
-    ScanRange* next_scan_range_to_start_ = nullptr;
-
-    /// For each disk, the number of disk threads issuing the underlying read/write on
-    /// behalf of this context. There are a few places where we release the context lock,
-    /// do some work, and then grab the lock again.  Because we don't hold the lock for
-    /// the entire operation, we need this ref count to keep track of which thread should
-    /// do final resource cleanup during cancellation.
-    /// Only the thread that sees the count at 0 should do the final cleanup.
-    AtomicInt32 num_threads_in_op_{0};
-
-    /// Queue of write ranges to process for this disk. A write range is always added
-    /// to in_flight_ranges_ in GetNextRequestRange(). There is a separate
-    /// unstarted_read_ranges_ and unstarted_write_ranges_ to alternate between reads
-    /// and writes. (Otherwise, since next_scan_range_to_start is set
-    /// in GetNextRequestRange() whenever it is null, repeated calls to
-    /// GetNextRequestRange() and GetNextUnstartedRange() may result in only reads being
-    /// processed)
-    InternalQueue<WriteRange> unstarted_write_ranges_;
-  };
+inline void RequestContext::IncrementDiskThreadAfterDequeue(int disk_id) {
+  disk_states_[disk_id].IncrementDiskThreadAfterDequeue();
+}
 
-  /// Per disk states to synchronize multiple disk threads accessing the same request
-  /// context.
-  std::vector<PerDiskState> disk_states_;
-};
+inline void RequestContext::DecrementDiskRefCount(
+    const boost::unique_lock<boost::mutex>& lock) {
+  DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
+  DCHECK_GT(num_disks_with_ranges_, 0);
+  if (--num_disks_with_ranges_ == 0) {
+    disks_complete_cond_var_.NotifyAll();
+  }
+  DCHECK(Validate()) << std::endl << DebugString();
+}
+
+inline void RequestContext::ScheduleScanRange(
+    const boost::unique_lock<boost::mutex>& lock, ScanRange* range) {
+  DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
+  DCHECK_EQ(state_, Active);
+  DCHECK(range != nullptr);
+  RequestContext::PerDiskState& state = disk_states_[range->disk_id()];
+  state.in_flight_ranges()->Enqueue(range);
+  state.ScheduleContext(lock, this, range->disk_id());
+}
 }
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/56468726/be/src/runtime/io/request-ranges.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index 3e266d6..0d2ba85 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -20,6 +20,7 @@
 
 #include <cstdint>
 #include <deque>
+#include <functional>
 
 #include <boost/thread/mutex.hpp>
 
@@ -29,12 +30,11 @@
 #include "runtime/bufferpool/buffer-pool.h"
 #include "util/condition-variable.h"
 #include "util/internal-queue.h"
-#include "util/mem-range.h"
 
 namespace impala {
 namespace io {
 class DiskIoMgr;
-struct DiskQueue;
+class DiskQueue;
 class ExclusiveHdfsFileHandle;
 class RequestContext;
 class ScanRange;
@@ -57,10 +57,9 @@ class BufferDescriptor {
   /// Returns the offset within the scan range that this buffer starts at
   int64_t scan_range_offset() const { return scan_range_offset_; }
 
- private:
+ protected:
+  // Protected methods are called from other classes in io::.
   friend class DiskIoMgr;
-  friend class ScanRange;
-  friend class RequestContext;
 
   /// Create a buffer descriptor for a new reader, range and data buffer.
   BufferDescriptor(DiskIoMgr* io_mgr, RequestContext* reader,
@@ -71,6 +70,12 @@ class BufferDescriptor {
       ScanRange* scan_range, BufferPool::ClientHandle* bp_client,
       BufferPool::BufferHandle handle);
 
+ private:
+  DISALLOW_COPY_AND_ASSIGN(BufferDescriptor);
+
+  /// This class is tightly coupled with ScanRange. Making them friends is easiest.
+  friend class ScanRange;
+
   /// Return true if this is a cached buffer owned by HDFS.
   bool is_cached() const;
 
@@ -78,6 +83,12 @@ class BufferDescriptor {
   /// constructing the scan range.
   bool is_client_buffer() const;
 
+  /// Releases memory resources for this buffer. If the buffer was allocated with
+  /// DiskIoMgr::AllocateBuffersForRange(), frees the buffer. Otherwise (e.g. a client
+  /// or HDFS cache buffer), just prepares this descriptor to be destroyed. After this
+  /// is called, buffer() is NULL. Does not acquire 'lock_'.
+  void Free();
+
   DiskIoMgr* const io_mgr_;
 
   /// Reader that this buffer is for.
@@ -86,8 +97,8 @@ class BufferDescriptor {
   /// Scan range that this buffer is for. Non-NULL when initialised.
   ScanRange* const scan_range_;
 
-  /// Buffer for the read contents. Must be set to NULL in RequestContext::FreeBuffer()
-  /// before destruction of the descriptor.
+  /// Buffer for the read contents. Must be set to NULL in Free() before destruction
+  /// of a descriptor.
   uint8_t* buffer_;
 
   /// length of buffer_. For buffers from cached reads, the length is 0.
@@ -264,24 +275,72 @@ class ScanRange : public RequestRange {
 
   int64_t mtime() const { return mtime_; }
 
- private:
+ protected:
+  // Protected methods are called from other classes in io::.
   friend class BufferDescriptor;
   friend class DiskIoMgr;
+  friend class DiskQueue;
   friend class RequestContext;
 
+  // Tag for the buffer associated with range. See external_buffer_tag_ for details.
+  enum class ExternalBufferTag { CLIENT_BUFFER, CACHED_BUFFER, NO_BUFFER };
+
   /// Initialize internal fields
   void InitInternal(DiskIoMgr* io_mgr, RequestContext* reader);
 
+  /// Reads from the DN cache. On success, sets cached_buffer_ to the DN buffer
+  /// and *read_succeeded to true.
+  /// If the data is not cached, returns ok() and *read_succeeded is set to false.
+  /// Returns a non-ok status if it ran into a non-continuable error.
+  ///  The reader lock must be held by the caller.
+  Status ReadFromCache(const boost::unique_lock<boost::mutex>& reader_lock,
+      bool* read_succeeded) WARN_UNUSED_RESULT;
+
+  /// Add buffers for the range to read data into and schedule the range if blocked.
+  /// If 'returned' is true, the buffers returned from GetNext() that are being recycled
+  /// via ReturnBuffer(). Otherwise the buffers are newly allocated buffers to be added.
+  void AddUnusedBuffers(
+      std::vector<std::unique_ptr<BufferDescriptor>>&& buffers, bool returned);
+
+  /// Called from a disk I/O thread to read the next buffer of data for this range. The
+  /// returned ReadOutcome describes what the result of the read was. 'disk_id' is the
+  /// ID of the disk queue. Caller must not hold 'lock_'.
+  ReadOutcome DoRead(int disk_id);
+
+  /// Cleans up a buffer that was not returned to the client.
+  /// Either ReturnBuffer() or CleanUpBuffer() is called for every BufferDescriptor.
+  /// The caller must hold 'lock_' via 'scan_range_lock'.
+  /// This function may acquire 'hdfs_lock_'
+  void CleanUpBuffer(const boost::unique_lock<boost::mutex>& scan_range_lock,
+      std::unique_ptr<BufferDescriptor> buffer);
+
+  /// Same as CleanUpBuffer() except cleans up multiple buffers and caller must not
+  /// hold 'lock_'.
+  void CleanUpBuffers(std::vector<std::unique_ptr<BufferDescriptor>>&& buffers);
+
+  /// Same as Cancel() except doesn't remove the scan range from
+  /// reader_->active_scan_ranges_ or wait for in-flight reads to finish.
+  /// This is invoked by RequestContext::Cancel(), which removes the range itself
+  /// to avoid invalidating its active_scan_ranges_ iterator. If 'read_error' is
+  /// true, this is being called from a disk thread to propagate a read error, so
+  /// 'read_in_flight_' is set to false and threads in WaitForInFlightRead() are
+  /// woken up.
+  void CancelInternal(const Status& status, bool read_error);
+
+  /// Marks the scan range as blocked waiting for a buffer. Caller must not hold 'lock_'.
+  void SetBlockedOnBuffer();
+
+  ExternalBufferTag external_buffer_tag() const { return external_buffer_tag_; }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(ScanRange);
+
   /// Enqueues a ready buffer with valid data for this range. This does not block.
   /// The caller passes ownership of buffer to the scan range and it is not
   /// valid to access buffer after this call. Returns false if the scan range was
   /// cancelled.
   bool EnqueueReadyBuffer(std::unique_ptr<BufferDescriptor> buffer);
 
-  /// Validates the internal state of this range. lock_ must be taken
-  /// before calling this.
-  bool Validate();
-
   /// Maximum length in bytes for hdfsRead() calls.
   int64_t MaxReadChunkSize() const;
 
@@ -318,62 +377,20 @@ class ScanRange : public RequestRange {
   /// pointer.
   void GetHdfsStatistics(hdfsFile fh);
 
-  /// Reads from the DN cache. On success, sets cached_buffer_ to the DN buffer
-  /// and *read_succeeded to true.
-  /// If the data is not cached, returns ok() and *read_succeeded is set to false.
-  /// Returns a non-ok status if it ran into a non-continuable error.
-  ///  The reader lock must be held by the caller.
-  Status ReadFromCache(const boost::unique_lock<boost::mutex>& reader_lock,
-      bool* read_succeeded) WARN_UNUSED_RESULT;
-
-  /// Add buffers for the range to read data into and schedule the range if blocked.
-  /// If 'returned' is true, the buffers returned from GetNext() that are being recycled
-  /// via ReturnBuffer(). Otherwise the buffers are newly allocated buffers to be added.
-  void AddUnusedBuffers(
-      std::vector<std::unique_ptr<BufferDescriptor>>&& buffers, bool returned);
-
   /// Remove a buffer from 'unused_iomgr_buffers_' and update
   /// 'unused_iomgr_buffer_bytes_'. If 'unused_iomgr_buffers_' is empty, return NULL.
   /// 'lock_' must be held by the caller via 'scan_range_lock'.
   std::unique_ptr<BufferDescriptor> GetUnusedBuffer(
       const boost::unique_lock<boost::mutex>& scan_range_lock);
 
-  /// Called from a disk I/O thread to read the next buffer of data for this range. The
-  /// returned ReadOutcome describes what the result of the read was. 'disk_id' is the
-  /// ID of the disk queue. Caller must not hold 'lock_'.
-  ReadOutcome DoRead(int disk_id);
-
-  /// Cleans up a buffer that was not returned to the client.
-  /// Either ReturnBuffer() or CleanUpBuffer() is called for every BufferDescriptor.
-  /// The caller must hold 'lock_' via 'scan_range_lock'.
-  /// This function may acquire 'hdfs_lock_'
-  void CleanUpBuffer(const boost::unique_lock<boost::mutex>& scan_range_lock,
-      std::unique_ptr<BufferDescriptor> buffer);
-
-  /// Same as CleanUpBuffer() except cleans up multiple buffers and caller must not
-  /// hold 'lock_'.
-  void CleanUpBuffers(std::vector<std::unique_ptr<BufferDescriptor>>&& buffers);
-
   /// Clean up all buffers in 'unused_iomgr_buffers_'. Only valid to call when the scan
   /// range is cancelled or at eos. The caller must hold 'lock_' via 'scan_range_lock'.
   void CleanUpUnusedBuffers(const boost::unique_lock<boost::mutex>& scan_range_lock);
 
-  /// Same as Cancel() except doesn't remove the scan range from
-  /// reader_->active_scan_ranges_ or wait for in-flight reads to finish.
-  /// This is invoked by RequestContext::Cancel(), which removes the range itself
-  /// to avoid invalidating its active_scan_ranges_ iterator. If 'read_error' is
-  /// true, this is being called from a disk thread to propagate a read error, so
-  /// 'read_in_flight_' is set to false and threads in WaitForInFlightRead() are
-  /// woken up.
-  void CancelInternal(const Status& status, bool read_error);
-
   /// Waits for any in-flight read to complete. Called after CancelInternal() to ensure
   /// no more reads will occur for the scan range.
   void WaitForInFlightRead();
 
-  /// Marks the scan range as blocked waiting for a buffer. Caller must not hold 'lock_'.
-  void SetBlockedOnBuffer();
-
   /// Returns true if no more buffers will be returned to clients in the future,
   /// either because of hitting eosr or cancellation.
   bool all_buffers_returned(const boost::unique_lock<boost::mutex>& lock) const {
@@ -381,6 +398,10 @@ class ScanRange : public RequestRange {
     return !cancel_status_.ok() || (eosr_queued_ && ready_buffers_.empty());
   }
 
+  /// Validates the internal state of this range. lock_ must be taken
+  /// before calling this.
+  bool Validate();
+
   /// Pointer to caller specified metadata. This is untouched by the io manager
   /// and the caller can put whatever auxiliary data in here.
   void* meta_data_ = nullptr;
@@ -429,7 +450,6 @@ class ScanRange : public RequestRange {
 
   /// Tagged union that holds a buffer for the cases when there is a buffer allocated
   /// externally from DiskIoMgr that is associated with the scan range.
-  enum class ExternalBufferTag { CLIENT_BUFFER, CACHED_BUFFER, NO_BUFFER };
   ExternalBufferTag external_buffer_tag_;
   union {
     /// Valid if the 'external_buffer_tag_' is CLIENT_BUFFER.
@@ -548,13 +568,10 @@ class WriteRange : public RequestRange {
   void SetData(const uint8_t* buffer, int64_t len);
 
   const uint8_t* data() const { return data_; }
+  WriteDoneCallback callback() const { return callback_; }
 
  private:
-  friend class DiskIoMgr;
-  friend class RequestContext;
-  friend class ScanRange;
-  friend class LocalFileSystem;
-
+  DISALLOW_COPY_AND_ASSIGN(WriteRange);
   /// Data to be written. RequestRange::len_ contains the length of data
   /// to be written.
   const uint8_t* data_;

http://git-wip-us.apache.org/repos/asf/impala/blob/56468726/be/src/runtime/io/scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index 409e743..163da28 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include "runtime/exec-env.h"
 #include "runtime/io/disk-io-mgr.h"
 #include "runtime/io/disk-io-mgr-internal.h"
 #include "util/error-util.h"
@@ -206,7 +207,7 @@ ReadOutcome ScanRange::DoRead(int disk_id) {
     buffer_desc->scan_range_offset_ = bytes_read_ - buffer_desc->len_;
 
     COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, buffer_desc->len_);
-    COUNTER_ADD(&io_mgr_->total_bytes_read_counter_, buffer_desc->len_);
+    COUNTER_ADD(io_mgr_->total_bytes_read_counter(), buffer_desc->len_);
     COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, -1L);
   }
 
@@ -215,7 +216,7 @@ ReadOutcome ScanRange::DoRead(int disk_id) {
   if (!read_status.ok()) {
     // Free buffer to release resources before we cancel the range so that all buffers
     // are freed at cancellation.
-    reader_->FreeBuffer(buffer_desc.get());
+    buffer_desc->Free();
     buffer_desc.reset();
 
     // Propagate 'read_status' to the scan range. This will also wake up any waiting
@@ -250,7 +251,7 @@ void ScanRange::CleanUpBuffer(
   DCHECK(scan_range_lock.mutex() == &lock_ && scan_range_lock.owns_lock());
   DCHECK(buffer_desc != nullptr);
   DCHECK_EQ(this, buffer_desc->scan_range_);
-  buffer_desc->reader_->FreeBuffer(buffer_desc.get());
+  buffer_desc->Free();
 
   if (all_buffers_returned(scan_range_lock) && num_buffers_in_reader_.Load() == 0) {
     // Close the scan range if there are no more buffers in the reader and no more buffers
@@ -593,7 +594,7 @@ Status ScanRange::Read(
     int64_t max_chunk_size = MaxReadChunkSize();
     Status status = Status::OK();
     {
-      ScopedTimer<MonotonicStopWatch> io_mgr_read_timer(&io_mgr_->read_timer_);
+      ScopedTimer<MonotonicStopWatch> io_mgr_read_timer(io_mgr_->read_timer());
       ScopedTimer<MonotonicStopWatch> req_context_read_timer(reader_->read_timer_);
       while (*bytes_read < bytes_to_read) {
         int chunk_size = min(bytes_to_read - *bytes_read, max_chunk_size);
@@ -716,7 +717,7 @@ Status ScanRange::ReadFromCache(
     DCHECK(exclusive_hdfs_fh_ != nullptr);
     DCHECK(external_buffer_tag_ == ExternalBufferTag::NO_BUFFER);
     cached_buffer_ =
-      hadoopReadZero(exclusive_hdfs_fh_->file(), io_mgr_->cached_read_options_, len());
+      hadoopReadZero(exclusive_hdfs_fh_->file(), io_mgr_->cached_read_options(), len());
     if (cached_buffer_ != nullptr) {
       external_buffer_tag_ = ExternalBufferTag::CACHED_BUFFER;
     }
@@ -775,3 +776,43 @@ void ScanRange::GetHdfsStatistics(hdfsFile hdfs_file) {
     hdfsFileClearReadStatistics(hdfs_file);
   }
 }
+
+BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr,
+    RequestContext* reader, ScanRange* scan_range, uint8_t* buffer,
+    int64_t buffer_len)
+  : io_mgr_(io_mgr),
+    reader_(reader),
+    scan_range_(scan_range),
+    buffer_(buffer),
+    buffer_len_(buffer_len) {
+  DCHECK(io_mgr != nullptr);
+  DCHECK(scan_range != nullptr);
+  DCHECK(buffer != nullptr);
+  DCHECK_GE(buffer_len, 0);
+}
+
+BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr, RequestContext* reader,
+    ScanRange* scan_range, BufferPool::ClientHandle* bp_client,
+    BufferPool::BufferHandle handle) :
+  io_mgr_(io_mgr),
+  reader_(reader),
+  scan_range_(scan_range),
+  buffer_(handle.data()),
+  buffer_len_(handle.len()),
+  bp_client_(bp_client),
+  handle_(move(handle)) {
+  DCHECK(io_mgr != nullptr);
+  DCHECK(scan_range != nullptr);
+  DCHECK(bp_client_->is_registered());
+  DCHECK(handle_.is_open());
+}
+
+void BufferDescriptor::Free() {
+  DCHECK(buffer_ != nullptr);
+  if (!is_cached() && !is_client_buffer()) {
+    // Only buffers that were allocated by DiskIoMgr need to be freed.
+    ExecEnv::GetInstance()->buffer_pool()->FreeBuffer(
+        bp_client_, &handle_);
+  }
+  buffer_ = nullptr;
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/56468726/be/src/runtime/row-batch.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/row-batch.h b/be/src/runtime/row-batch.h
index 270318a..d8cbe7c 100644
--- a/be/src/runtime/row-batch.h
+++ b/be/src/runtime/row-batch.h
@@ -29,7 +29,6 @@
 #include "kudu/util/slice.h"
 #include "runtime/bufferpool/buffer-pool.h"
 #include "runtime/descriptors.h"
-#include "runtime/io/disk-io-mgr.h"
 #include "runtime/mem-pool.h"
 
 namespace kudu {