You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/23 22:51:35 UTC

[08/15] impala git commit: IMPALA-4835: Part 1: simplify I/O mgr mem mgmt and cancellation

IMPALA-4835: Part 1: simplify I/O mgr mem mgmt and cancellation

In preparation for switching the I/O mgr to the buffer pool, this
removes and cleans up a lot of code so that the switchover patch starts
from a cleaner slate.

* Remove the free buffer cache (which will be replaced by buffer pool's
  own caching).
* Make memory limit exceeded error checking synchronous (in anticipation
  of having to propagate buffer pool errors synchronously).
* Simplify error propagation - remove the (ineffectual) code that
  enqueued BufferDescriptors containing error statuses.
* Document locking scheme better in a few places, make it part of the
  function signature when it seemed reasonable.
* Move ReturnBuffer() to ScanRange, because it is intrinsically
  connected with the lifecycle of a scan range.
* Separate external ReturnBuffer() and internal CleanUpBuffer()
  interfaces - previously callers of ReturnBuffer() were fudging
  the num_buffers_in_reader accounting to make the external interface work.
* Eliminate redundant state in ScanRange: 'eosr_returned_' and
  'is_cancelled_'.
* Clarify the logic around calling Close() for the last
  BufferDescriptor.
  -> There appeared to be an implicit assumption that buffers would be
     freed in the order they were returned from the scan range, so that
     the "eos" buffer was returned last. Instead just count the number
     of outstanding buffers to detect the last one.
  -> Touching the is_cancelled_ field without holding a lock was hard to
     reason about - violated locking rules and it was unclear that it
     was race-free.
* Remove DiskIoMgr::Read() to simplify the interface. It is trivial to
  inline at the callsites.

This will probably regress performance somewhat because of the cache
removal, so my plan is to merge it around the same time as switching
the I/O mgr to allocate from the buffer pool. I'm keeping the patches
separate to make reviewing easier.

Testing:
* Ran exhaustive tests
* Ran the disk-io-mgr-stress-test overnight

Change-Id: If5cb42437d11c13bc4a55c3ab426b66777332bd1
Reviewed-on: http://gerrit.cloudera.org:8080/8414
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Tim Armstrong <ta...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/9415


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/3b3bf871
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/3b3bf871
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/3b3bf871

Branch: refs/heads/2.x
Commit: 3b3bf8710d88458c4204fd4151aa79239ffe4488
Parents: 7c4689c
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Sun Oct 29 12:38:47 2017 -0700
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Fri Feb 23 22:51:01 2018 +0000

----------------------------------------------------------------------
 be/src/exec/hdfs-parquet-scanner.cc     |   5 +-
 be/src/exec/hdfs-scan-node-base.cc      |  31 +-
 be/src/exec/hdfs-scan-node.cc           |   5 +-
 be/src/exec/scanner-context.cc          |   9 +-
 be/src/runtime/exec-env.cc              |   7 +-
 be/src/runtime/io/disk-io-mgr-stress.cc |  20 +-
 be/src/runtime/io/disk-io-mgr-test.cc   | 171 +++------
 be/src/runtime/io/disk-io-mgr.cc        | 504 +++++----------------------
 be/src/runtime/io/disk-io-mgr.h         | 124 +------
 be/src/runtime/io/request-context.cc    | 148 ++++++--
 be/src/runtime/io/request-context.h     | 187 ++++++----
 be/src/runtime/io/request-ranges.h      | 107 +++---
 be/src/runtime/io/scan-range.cc         | 142 ++++----
 be/src/runtime/mem-tracker.h            |   1 -
 be/src/runtime/test-env.cc              |   2 +-
 be/src/runtime/tmp-file-mgr-test.cc     |   3 +-
 be/src/runtime/tmp-file-mgr.cc          |   2 +-
 be/src/util/impalad-metrics.cc          |  13 +-
 be/src/util/impalad-metrics.h           |   9 -
 19 files changed, 579 insertions(+), 911 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/3b3bf871/be/src/exec/hdfs-parquet-scanner.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc
index e279369..bb3d091 100644
--- a/be/src/exec/hdfs-parquet-scanner.cc
+++ b/be/src/exec/hdfs-parquet-scanner.cc
@@ -1440,11 +1440,12 @@ Status HdfsParquetScanner::ProcessFooter() {
 
     unique_ptr<BufferDescriptor> io_buffer;
     RETURN_IF_ERROR(
-        io_mgr->Read(scan_node_->reader_context(), metadata_range, &io_buffer));
+        io_mgr->AddScanRange(scan_node_->reader_context(), metadata_range, true));
+    RETURN_IF_ERROR(metadata_range->GetNext(&io_buffer));
     DCHECK_EQ(io_buffer->buffer(), metadata_buffer.buffer());
     DCHECK_EQ(io_buffer->len(), metadata_size);
     DCHECK(io_buffer->eosr());
-    io_mgr->ReturnBuffer(move(io_buffer));
+    metadata_range->ReturnBuffer(move(io_buffer));
   }
 
   // Deserialize file header

http://git-wip-us.apache.org/repos/asf/impala/blob/3b3bf871/be/src/exec/hdfs-scan-node-base.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node-base.cc b/be/src/exec/hdfs-scan-node-base.cc
index 2f3d0c5..0660b9b 100644
--- a/be/src/exec/hdfs-scan-node-base.cc
+++ b/be/src/exec/hdfs-scan-node-base.cc
@@ -352,13 +352,10 @@ Status HdfsScanNodeBase::Open(RuntimeState* state) {
   num_scanner_threads_started_counter_ =
       ADD_COUNTER(runtime_profile(), NUM_SCANNER_THREADS_STARTED, TUnit::UNIT);
 
-  runtime_state_->io_mgr()->set_bytes_read_counter(
-      reader_context_.get(), bytes_read_counter());
-  runtime_state_->io_mgr()->set_read_timer(reader_context_.get(), read_timer());
-  runtime_state_->io_mgr()->set_active_read_thread_counter(
-      reader_context_.get(), &active_hdfs_read_thread_counter_);
-  runtime_state_->io_mgr()->set_disks_access_bitmap(
-      reader_context_.get(), &disks_accessed_bitmap_);
+  reader_context_->set_bytes_read_counter(bytes_read_counter());
+  reader_context_->set_read_timer(read_timer());
+  reader_context_->set_active_read_thread_counter(&active_hdfs_read_thread_counter_);
+  reader_context_->set_disks_accessed_bitmap(&disks_accessed_bitmap_);
 
   average_scanner_thread_concurrency_ = runtime_profile()->AddSamplingCounter(
       AVERAGE_SCANNER_THREAD_CONCURRENCY, &active_scanner_thread_counter_);
@@ -824,20 +821,14 @@ void HdfsScanNodeBase::StopAndFinalizeCounters() {
       Substitute("Codegen enabled: $0 out of $1", num_enabled, total));
 
   if (reader_context_ != nullptr) {
-    bytes_read_local_->Set(
-        runtime_state_->io_mgr()->bytes_read_local(reader_context_.get()));
-    bytes_read_short_circuit_->Set(
-        runtime_state_->io_mgr()->bytes_read_short_circuit(reader_context_.get()));
-    bytes_read_dn_cache_->Set(
-        runtime_state_->io_mgr()->bytes_read_dn_cache(reader_context_.get()));
-    num_remote_ranges_->Set(static_cast<int64_t>(
-        runtime_state_->io_mgr()->num_remote_ranges(reader_context_.get())));
-    unexpected_remote_bytes_->Set(
-        runtime_state_->io_mgr()->unexpected_remote_bytes(reader_context_.get()));
-    cached_file_handles_hit_count_->Set(
-        runtime_state_->io_mgr()->cached_file_handles_hit_count(reader_context_.get()));
+    bytes_read_local_->Set(reader_context_->bytes_read_local());
+    bytes_read_short_circuit_->Set(reader_context_->bytes_read_short_circuit());
+    bytes_read_dn_cache_->Set(reader_context_->bytes_read_dn_cache());
+    num_remote_ranges_->Set(reader_context_->num_remote_ranges());
+    unexpected_remote_bytes_->Set(reader_context_->unexpected_remote_bytes());
+    cached_file_handles_hit_count_->Set(reader_context_->cached_file_handles_hit_count());
     cached_file_handles_miss_count_->Set(
-        runtime_state_->io_mgr()->cached_file_handles_miss_count(reader_context_.get()));
+        reader_context_->cached_file_handles_miss_count());
 
     if (unexpected_remote_bytes_->value() >= UNEXPECTED_REMOTE_BYTES_WARN_THRESHOLD) {
       runtime_state_->LogError(ErrorMsg(TErrorCode::GENERAL, Substitute(

http://git-wip-us.apache.org/repos/asf/impala/blob/3b3bf871/be/src/exec/hdfs-scan-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/hdfs-scan-node.cc b/be/src/exec/hdfs-scan-node.cc
index 710a8af..b32a743 100644
--- a/be/src/exec/hdfs-scan-node.cc
+++ b/be/src/exec/hdfs-scan-node.cc
@@ -26,6 +26,7 @@
 #include "exec/scanner-context.h"
 #include "runtime/descriptors.h"
 #include "runtime/fragment-instance-state.h"
+#include "runtime/io/request-context.h"
 #include "runtime/runtime-filter.inline.h"
 #include "runtime/runtime-state.h"
 #include "runtime/mem-tracker.h"
@@ -549,9 +550,7 @@ Status HdfsScanNode::ProcessSplit(const vector<FilterContext>& filter_ctxs,
 void HdfsScanNode::SetDoneInternal() {
   if (done_) return;
   done_ = true;
-  if (reader_context_ != nullptr) {
-    runtime_state_->io_mgr()->CancelContext(reader_context_.get());
-  }
+  if (reader_context_ != nullptr) reader_context_->Cancel();
   materialized_row_batches_->Shutdown();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3b3bf871/be/src/exec/scanner-context.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/scanner-context.cc b/be/src/exec/scanner-context.cc
index abdde07..79e7a85 100644
--- a/be/src/exec/scanner-context.cc
+++ b/be/src/exec/scanner-context.cc
@@ -143,8 +143,10 @@ Status ScannerContext::Stream::GetNextBuffer(int64_t read_past_size) {
     ScanRange* range = parent_->scan_node_->AllocateScanRange(
         scan_range_->fs(), filename(), read_past_buffer_size, offset, partition_id,
         scan_range_->disk_id(), false, BufferOpts::Uncached());
-    RETURN_IF_ERROR(parent_->state_->io_mgr()->Read(
-        parent_->scan_node_->reader_context(), range, &io_buffer_));
+    RETURN_IF_ERROR(parent_->state_->io_mgr()->AddScanRange(
+        parent_->scan_node_->reader_context(), range, true));
+    RETURN_IF_ERROR(range->GetNext(&io_buffer_));
+    DCHECK(io_buffer_->eosr());
   }
 
   DCHECK(io_buffer_ != nullptr);
@@ -324,7 +326,8 @@ Status ScannerContext::Stream::CopyIoToBoundary(int64_t num_bytes) {
 
 void ScannerContext::Stream::ReturnIoBuffer() {
   DCHECK(io_buffer_ != nullptr);
-  ExecEnv::GetInstance()->disk_io_mgr()->ReturnBuffer(move(io_buffer_));
+  ScanRange* range = io_buffer_->scan_range();
+  range->ReturnBuffer(move(io_buffer_));
   io_buffer_pos_ = nullptr;
   io_buffer_bytes_left_ = 0;
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/3b3bf871/be/src/runtime/exec-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/exec-env.cc b/be/src/runtime/exec-env.cc
index 51eece5..2f19786 100644
--- a/be/src/runtime/exec-env.cc
+++ b/be/src/runtime/exec-env.cc
@@ -36,9 +36,9 @@
 #include "runtime/client-cache.h"
 #include "runtime/coordinator.h"
 #include "runtime/data-stream-mgr.h"
-#include "runtime/io/disk-io-mgr.h"
 #include "runtime/hbase-table-factory.h"
 #include "runtime/hdfs-fs-cache.h"
+#include "runtime/io/disk-io-mgr.h"
 #include "runtime/krpc-data-stream-mgr.h"
 #include "runtime/lib-cache.h"
 #include "runtime/mem-tracker.h"
@@ -367,10 +367,7 @@ Status ExecEnv::Init() {
   LOG(INFO) << "Buffer pool limit: "
             << PrettyPrinter::Print(buffer_pool_limit, TUnit::BYTES);
 
-  RETURN_IF_ERROR(disk_io_mgr_->Init(mem_tracker_.get()));
-
-  mem_tracker_->AddGcFunction(
-      [this](int64_t bytes_to_free) { disk_io_mgr_->GcIoBuffers(bytes_to_free); });
+  RETURN_IF_ERROR(disk_io_mgr_->Init());
 
   // Start services in order to ensure that dependencies between them are met
   if (enable_webserver_) {

http://git-wip-us.apache.org/repos/asf/impala/blob/3b3bf871/be/src/runtime/io/disk-io-mgr-stress.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc b/be/src/runtime/io/disk-io-mgr-stress.cc
index 8815357..cfe71ab 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress.cc
@@ -77,7 +77,7 @@ DiskIoMgrStress::DiskIoMgrStress(int num_disks, int num_threads_per_disk,
 
   io_mgr_.reset(new DiskIoMgr(num_disks, num_threads_per_disk, num_threads_per_disk,
       MIN_READ_BUFFER_SIZE, MAX_READ_BUFFER_SIZE));
-  Status status = io_mgr_->Init(&mem_tracker_);
+  Status status = io_mgr_->Init();
   CHECK(status.ok());
 
   // Initialize some data files.  It doesn't really matter how many there are.
@@ -137,7 +137,7 @@ void DiskIoMgrStress::ClientThread(int client_id) {
 
         // Copy the bytes from this read into the result buffer.
         memcpy(read_buffer + file_offset, buffer->buffer(), buffer->len());
-        io_mgr_->ReturnBuffer(move(buffer));
+        range->ReturnBuffer(move(buffer));
         bytes_read += len;
 
         CHECK_GE(bytes_read, 0);
@@ -159,6 +159,7 @@ void DiskIoMgrStress::ClientThread(int client_id) {
     // Unregister the old client and get a new one
     unique_lock<mutex> lock(client->lock);
     io_mgr_->UnregisterContext(client->reader.get());
+    client->reader.reset();
     NewClient(client_id);
   }
 
@@ -170,11 +171,9 @@ void DiskIoMgrStress::ClientThread(int client_id) {
 // Cancel a random reader
 void DiskIoMgrStress::CancelRandomReader() {
   if (!includes_cancellation_) return;
-
-  int rand_client = rand() % num_clients_;
-
-  unique_lock<mutex> lock(clients_[rand_client].lock);
-  io_mgr_->CancelContext(clients_[rand_client].reader.get());
+  Client* rand_client = &clients_[rand() % num_clients_];
+  unique_lock<mutex> lock(rand_client->lock);
+  rand_client->reader->Cancel();
 }
 
 void DiskIoMgrStress::Run(int sec) {
@@ -199,10 +198,12 @@ void DiskIoMgrStress::Run(int sec) {
 
   for (int i = 0; i < num_clients_; ++i) {
     unique_lock<mutex> lock(clients_[i].lock);
-    if (clients_[i].reader != NULL) io_mgr_->CancelContext(clients_[i].reader.get());
+    if (clients_[i].reader != NULL) clients_[i].reader->Cancel();
   }
-
   readers_.join_all();
+
+  for (unique_ptr<MemTracker>& mem_tracker : client_mem_trackers_) mem_tracker->Close();
+  mem_tracker_.Close();
 }
 
 // Initialize a client to read one of the files at random.  The scan ranges are
@@ -240,6 +241,7 @@ void DiskIoMgrStress::NewClient(int i) {
     assigned_len += range_len;
   }
 
+  if (client_mem_trackers_[i] != nullptr) client_mem_trackers_[i]->Close();
   client_mem_trackers_[i].reset(new MemTracker(-1, "", &mem_tracker_));
   client.reader = io_mgr_->RegisterContext(client_mem_trackers_[i].get());
   Status status = io_mgr_->AddScanRanges(client.reader.get(), client.scan_ranges);

http://git-wip-us.apache.org/repos/asf/impala/blob/3b3bf871/be/src/runtime/io/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
index b03ec31..e099285 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -22,11 +22,13 @@
 
 #include "codegen/llvm-codegen.h"
 #include "common/init.h"
-#include "runtime/io/request-context.h"
 #include "runtime/io/disk-io-mgr-stress.h"
 #include "runtime/io/disk-io-mgr.h"
+#include "runtime/io/request-context.h"
 #include "runtime/mem-tracker.h"
+#include "runtime/test-env.h"
 #include "runtime/thread-resource-mgr.h"
+#include "service/fe-support.h"
 #include "testutil/gtest-util.h"
 #include "util/condition-variable.h"
 #include "util/cpu-info.h"
@@ -49,7 +51,10 @@ namespace io {
 class DiskIoMgrTest : public testing::Test {
  public:
 
-  virtual void SetUp() {}
+  virtual void SetUp() {
+    test_env_.reset(new TestEnv);
+    ASSERT_OK(test_env_->Init());
+  }
 
   virtual void TearDown() {
     pool_.Clear();
@@ -117,13 +122,14 @@ class DiskIoMgrTest : public testing::Test {
   static void ValidateSyncRead(DiskIoMgr* io_mgr, RequestContext* reader,
       ScanRange* range, const char* expected, int expected_len = -1) {
     unique_ptr<BufferDescriptor> buffer;
-    ASSERT_OK(io_mgr->Read(reader, range, &buffer));
+    ASSERT_OK(io_mgr->AddScanRange(reader, range, true));
+    ASSERT_OK(range->GetNext(&buffer));
     ASSERT_TRUE(buffer != nullptr);
     EXPECT_EQ(buffer->len(), range->len());
     if (expected_len < 0) expected_len = strlen(expected);
     int cmp = memcmp(buffer->buffer(), expected, expected_len);
     EXPECT_TRUE(cmp == 0);
-    io_mgr->ReturnBuffer(move(buffer));
+    range->ReturnBuffer(move(buffer));
   }
 
   static void ValidateScanRange(DiskIoMgr* io_mgr, ScanRange* range,
@@ -136,13 +142,13 @@ class DiskIoMgrTest : public testing::Test {
       Status status = range->GetNext(&buffer);
       ASSERT_TRUE(status.ok() || status.code() == expected_status.code());
       if (buffer == nullptr || !status.ok()) {
-        if (buffer != nullptr) io_mgr->ReturnBuffer(move(buffer));
+        if (buffer != nullptr) range->ReturnBuffer(move(buffer));
         break;
       }
       ASSERT_LE(buffer->len(), expected_len);
       memcpy(result + range->offset() + buffer->scan_range_offset(),
           buffer->buffer(), buffer->len());
-      io_mgr->ReturnBuffer(move(buffer));
+      range->ReturnBuffer(move(buffer));
     }
     ValidateEmptyOrCorrect(expected, result, expected_len);
   }
@@ -177,6 +183,8 @@ class DiskIoMgrTest : public testing::Test {
     return range;
   }
 
+  boost::scoped_ptr<TestEnv> test_env_;
+
   ObjectPool pool_;
 
   mutex written_mutex_;
@@ -204,14 +212,13 @@ TEST_F(DiskIoMgrTest, SingleWriter) {
 
   scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 1, 10));
   MemTracker reader_mem_tracker(LARGE_MEM_LIMIT);
-  ASSERT_OK(read_io_mgr->Init(&reader_mem_tracker));
-  unique_ptr<RequestContext> reader =
-      read_io_mgr->RegisterContext(&reader_mem_tracker);
+  ASSERT_OK(read_io_mgr->Init());
+  unique_ptr<RequestContext> reader = read_io_mgr->RegisterContext(&reader_mem_tracker);
   for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
     for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
       pool_.Clear(); // Destroy scan ranges from previous iterations.
       DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 10);
-      ASSERT_OK(io_mgr.Init(&mem_tracker));
+      ASSERT_OK(io_mgr.Init());
       unique_ptr<RequestContext> writer = io_mgr.RegisterContext(&mem_tracker);
       for (int i = 0; i < num_ranges; ++i) {
         int32_t* data = pool_.Add(new int32_t);
@@ -247,7 +254,7 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
   num_ranges_written_ = 0;
   string tmp_file = "/non-existent/file.txt";
   DiskIoMgr io_mgr(1, 1, 1, 1, 10);
-  ASSERT_OK(io_mgr.Init(&mem_tracker));
+  ASSERT_OK(io_mgr.Init());
   unique_ptr<RequestContext> writer = io_mgr.RegisterContext(nullptr);
   int32_t* data = pool_.Add(new int32_t);
   *data = rand();
@@ -308,19 +315,18 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) {
 
   scoped_ptr<DiskIoMgr> read_io_mgr(new DiskIoMgr(1, 1, 1, 1, 10));
   MemTracker reader_mem_tracker(LARGE_MEM_LIMIT);
-  ASSERT_OK(read_io_mgr->Init(&reader_mem_tracker));
-  unique_ptr<RequestContext> reader =
-      read_io_mgr->RegisterContext(&reader_mem_tracker);
+  ASSERT_OK(read_io_mgr->Init());
+  unique_ptr<RequestContext> reader = read_io_mgr->RegisterContext(&reader_mem_tracker);
   for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
     for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
       pool_.Clear(); // Destroy scan ranges from previous iterations.
       DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 10);
-      ASSERT_OK(io_mgr.Init(&mem_tracker));
+      ASSERT_OK(io_mgr.Init());
       unique_ptr<RequestContext> writer = io_mgr.RegisterContext(&mem_tracker);
       Status validate_status = Status::OK();
       for (int i = 0; i < num_ranges; ++i) {
         if (i == num_ranges_before_cancel) {
-          io_mgr.CancelContext(writer.get());
+          writer->Cancel();
           validate_status = Status::CANCELLED;
         }
         int32_t* data = pool_.Add(new int32_t);
@@ -375,10 +381,9 @@ TEST_F(DiskIoMgrTest, SingleReader) {
         if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
         DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
 
-        ASSERT_OK(io_mgr.Init(&mem_tracker));
+        ASSERT_OK(io_mgr.Init());
         MemTracker reader_mem_tracker;
-        unique_ptr<RequestContext> reader =
-            io_mgr.RegisterContext(&reader_mem_tracker);
+        unique_ptr<RequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker);
 
         vector<ScanRange*> ranges;
         for (int i = 0; i < len; ++i) {
@@ -426,10 +431,9 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) {
       if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
       DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
 
-      ASSERT_OK(io_mgr.Init(&mem_tracker));
+      ASSERT_OK(io_mgr.Init());
       MemTracker reader_mem_tracker;
-      unique_ptr<RequestContext> reader =
-          io_mgr.RegisterContext(&reader_mem_tracker);
+      unique_ptr<RequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker);
 
       vector<ScanRange*> ranges_first_half;
       vector<ScanRange*> ranges_second_half;
@@ -496,11 +500,9 @@ TEST_F(DiskIoMgrTest, SyncReadTest) {
       DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk,
           MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
 
-      ASSERT_OK(io_mgr.Init(&mem_tracker));
+      ASSERT_OK(io_mgr.Init());
       MemTracker reader_mem_tracker;
-      unique_ptr<RequestContext> reader =
-          io_mgr.RegisterContext(&reader_mem_tracker);
-
+      unique_ptr<RequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker);
       ScanRange* complete_range =
           InitRange(tmp_file, 0, strlen(data), 0, stat_val.st_mtime);
 
@@ -563,10 +565,9 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) {
       if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
       DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
 
-      ASSERT_OK(io_mgr.Init(&mem_tracker));
+      ASSERT_OK(io_mgr.Init());
       MemTracker reader_mem_tracker;
-      unique_ptr<RequestContext> reader =
-          io_mgr.RegisterContext(&reader_mem_tracker);
+      unique_ptr<RequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker);
 
       vector<ScanRange*> ranges;
       for (int i = 0; i < len; ++i) {
@@ -591,11 +592,11 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) {
             strlen(data), Status::CANCELLED, 0, &num_ranges_processed));
       }
 
-      io_mgr.CancelContext(reader.get());
+      reader->Cancel();
       sched_yield();
 
       threads.join_all();
-      EXPECT_TRUE(io_mgr.context_status(reader.get()).IsCancelled());
+      EXPECT_TRUE(reader->IsCancelled());
       io_mgr.UnregisterContext(reader.get());
       EXPECT_EQ(reader_mem_tracker.consumption(), 0);
     }
@@ -621,7 +622,7 @@ TEST_F(DiskIoMgrTest, MemLimits) {
     MemTracker root_mem_tracker(mem_limit_num_buffers * MAX_BUFFER_SIZE);
     DiskIoMgr io_mgr(1, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
 
-    ASSERT_OK(io_mgr.Init(&root_mem_tracker));
+    ASSERT_OK(io_mgr.Init());
     MemTracker reader_mem_tracker(-1, "Reader", &root_mem_tracker);
     unique_ptr<RequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker);
 
@@ -632,12 +633,13 @@ TEST_F(DiskIoMgrTest, MemLimits) {
     ASSERT_OK(io_mgr.AddScanRanges(reader.get(), ranges));
 
     // Don't return buffers to force memory pressure
-    vector<unique_ptr<BufferDescriptor>> buffers;
+    vector<pair<ScanRange*, unique_ptr<BufferDescriptor>>> buffers;
 
     AtomicInt32 num_ranges_processed;
     ScanRangeThread(&io_mgr, reader.get(), data, strlen(data), Status::MemLimitExceeded(),
         1, &num_ranges_processed);
 
+    bool hit_mem_limit_exceeded = false;
     char result[strlen(data) + 1];
     // Keep reading new ranges without returning buffers. This forces us
     // to go over the limit eventually.
@@ -646,25 +648,27 @@ TEST_F(DiskIoMgrTest, MemLimits) {
       ScanRange* range = nullptr;
       Status status = io_mgr.GetNextRange(reader.get(), &range);
       ASSERT_TRUE(status.ok() || status.IsMemLimitExceeded());
+      hit_mem_limit_exceeded |= status.IsMemLimitExceeded();
       if (range == nullptr) break;
 
       while (true) {
         unique_ptr<BufferDescriptor> buffer;
         Status status = range->GetNext(&buffer);
         ASSERT_TRUE(status.ok() || status.IsMemLimitExceeded());
+        hit_mem_limit_exceeded |= status.IsMemLimitExceeded();
         if (buffer == nullptr) break;
         memcpy(result + range->offset() + buffer->scan_range_offset(),
             buffer->buffer(), buffer->len());
-        buffers.push_back(move(buffer));
+        buffers.emplace_back(range, move(buffer));
       }
       ValidateEmptyOrCorrect(data, result, strlen(data));
     }
 
     for (int i = 0; i < buffers.size(); ++i) {
-      io_mgr.ReturnBuffer(move(buffers[i]));
+      buffers[i].first->ReturnBuffer(move(buffers[i].second));
     }
 
-    EXPECT_TRUE(io_mgr.context_status(reader.get()).IsMemLimitExceeded());
+    EXPECT_TRUE(hit_mem_limit_exceeded) << "Should have run out of memory";
     io_mgr.UnregisterContext(reader.get());
     EXPECT_EQ(reader_mem_tracker.consumption(), 0);
   }
@@ -689,7 +693,7 @@ TEST_F(DiskIoMgrTest, CachedReads) {
   {
     DiskIoMgr io_mgr(num_disks, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
 
-    ASSERT_OK(io_mgr.Init(&mem_tracker));
+    ASSERT_OK(io_mgr.Init());
     MemTracker reader_mem_tracker;
     unique_ptr<RequestContext> reader = io_mgr.RegisterContext(&reader_mem_tracker);
 
@@ -762,7 +766,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
       for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
         DiskIoMgr io_mgr(num_disks, threads_per_disk, threads_per_disk, MIN_BUFFER_SIZE,
             MAX_BUFFER_SIZE);
-        ASSERT_OK(io_mgr.Init(&mem_tracker));
+        ASSERT_OK(io_mgr.Init());
         for (int file_index = 0; file_index < num_contexts; ++file_index) {
           contexts[file_index] = io_mgr.RegisterContext(&mem_tracker);
         }
@@ -809,7 +813,6 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
             threads.join_all();
           } // for (int context_index
         } // while (read_offset < file_size)
-
         for (int file_index = 0; file_index < num_contexts; ++file_index) {
           io_mgr.UnregisterContext(contexts[file_index].get());
         }
@@ -874,7 +877,7 @@ TEST_F(DiskIoMgrTest, MultipleReader) {
 
         DiskIoMgr io_mgr(num_disks, threads_per_disk, threads_per_disk, MIN_BUFFER_SIZE,
             MAX_BUFFER_SIZE);
-        EXPECT_OK(io_mgr.Init(&mem_tracker));
+        ASSERT_OK(io_mgr.Init());
 
         for (int i = 0; i < NUM_READERS; ++i) {
           readers[i] = io_mgr.RegisterContext(&mem_tracker);
@@ -915,76 +918,6 @@ TEST_F(DiskIoMgrTest, StressTest) {
   test.Run(2); // In seconds
 }
 
-TEST_F(DiskIoMgrTest, Buffers) {
-  // Test default min/max buffer size
-  int min_buffer_size = 1024;
-  int max_buffer_size = 8 * 1024 * 1024; // 8 MB
-  MemTracker root_mem_tracker(max_buffer_size * 2);
-
-  DiskIoMgr io_mgr(1, 1, 1, min_buffer_size, max_buffer_size);
-  ASSERT_OK(io_mgr.Init(&root_mem_tracker));
-  ASSERT_EQ(root_mem_tracker.consumption(), 0);
-
-  MemTracker reader_mem_tracker(-1, "Reader", &root_mem_tracker);
-  unique_ptr<RequestContext> reader;
-  reader = io_mgr.RegisterContext(&reader_mem_tracker);
-
-  ScanRange* dummy_range = InitRange("dummy", 0, 0, 0, 0);
-
-  // buffer length should be rounded up to min buffer size
-  int64_t buffer_len = 1;
-  unique_ptr<BufferDescriptor> buffer_desc;
-  buffer_desc = io_mgr.GetFreeBuffer(reader.get(), dummy_range, buffer_len);
-  EXPECT_TRUE(buffer_desc->buffer() != nullptr);
-  EXPECT_EQ(min_buffer_size, buffer_desc->buffer_len());
-  EXPECT_EQ(1, io_mgr.num_allocated_buffers_.Load());
-  io_mgr.FreeBufferMemory(buffer_desc.get());
-  io_mgr.ReturnBuffer(move(buffer_desc));
-  EXPECT_EQ(min_buffer_size, root_mem_tracker.consumption());
-
-  // reuse buffer
-  buffer_len = min_buffer_size;
-  buffer_desc = io_mgr.GetFreeBuffer(reader.get(), dummy_range, buffer_len);
-  EXPECT_TRUE(buffer_desc->buffer() != nullptr);
-  EXPECT_EQ(min_buffer_size, buffer_desc->buffer_len());
-  EXPECT_EQ(1, io_mgr.num_allocated_buffers_.Load());
-  io_mgr.FreeBufferMemory(buffer_desc.get());
-  io_mgr.ReturnBuffer(move(buffer_desc));
-  EXPECT_EQ(min_buffer_size, root_mem_tracker.consumption());
-
-  // bump up to next buffer size
-  buffer_len = min_buffer_size + 1;
-  buffer_desc = io_mgr.GetFreeBuffer(reader.get(), dummy_range, buffer_len);
-  EXPECT_TRUE(buffer_desc->buffer() != nullptr);
-  EXPECT_EQ(min_buffer_size * 2, buffer_desc->buffer_len());
-  EXPECT_EQ(2, io_mgr.num_allocated_buffers_.Load());
-  EXPECT_EQ(min_buffer_size * 3, root_mem_tracker.consumption());
-
-  // gc unused buffer
-  io_mgr.GcIoBuffers();
-  EXPECT_EQ(1, io_mgr.num_allocated_buffers_.Load());
-  EXPECT_EQ(min_buffer_size * 2, root_mem_tracker.consumption());
-
-  io_mgr.FreeBufferMemory(buffer_desc.get());
-  io_mgr.ReturnBuffer(move(buffer_desc));
-
-  // max buffer size
-  buffer_len = max_buffer_size;
-  buffer_desc = io_mgr.GetFreeBuffer(reader.get(), dummy_range, buffer_len);
-  EXPECT_TRUE(buffer_desc->buffer() != nullptr);
-  EXPECT_EQ(max_buffer_size, buffer_desc->buffer_len());
-  EXPECT_EQ(2, io_mgr.num_allocated_buffers_.Load());
-  io_mgr.FreeBufferMemory(buffer_desc.get());
-  io_mgr.ReturnBuffer(move(buffer_desc));
-  EXPECT_EQ(min_buffer_size * 2 + max_buffer_size, root_mem_tracker.consumption());
-
-  // gc buffers
-  io_mgr.GcIoBuffers();
-  EXPECT_EQ(io_mgr.num_allocated_buffers_.Load(), 0);
-  EXPECT_EQ(root_mem_tracker.consumption(), 0);
-  io_mgr.UnregisterContext(reader.get());
-}
-
 // IMPALA-2366: handle partial read where range goes past end of file.
 TEST_F(DiskIoMgrTest, PartialRead) {
   MemTracker mem_tracker(LARGE_MEM_LIMIT);
@@ -1000,7 +933,7 @@ TEST_F(DiskIoMgrTest, PartialRead) {
 
   scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, 1, read_len, read_len));
 
-  ASSERT_OK(io_mgr->Init(&mem_tracker));
+  ASSERT_OK(io_mgr->Init());
   MemTracker reader_mem_tracker;
   unique_ptr<RequestContext> reader;
   reader = io_mgr->RegisterContext(&reader_mem_tracker);
@@ -1008,11 +941,12 @@ TEST_F(DiskIoMgrTest, PartialRead) {
   // We should not read past the end of file.
   ScanRange* range = InitRange(tmp_file, 0, read_len, 0, stat_val.st_mtime);
   unique_ptr<BufferDescriptor> buffer;
-  ASSERT_OK(io_mgr->Read(reader.get(), range, &buffer));
+  ASSERT_OK(io_mgr->AddScanRange(reader.get(), range, true));
+  ASSERT_OK(range->GetNext(&buffer));
   ASSERT_TRUE(buffer->eosr());
   ASSERT_EQ(len, buffer->len());
   ASSERT_TRUE(memcmp(buffer->buffer(), data, len) == 0);
-  io_mgr->ReturnBuffer(move(buffer));
+  range->ReturnBuffer(move(buffer));
 
   io_mgr->UnregisterContext(reader.get());
   pool_.Clear();
@@ -1032,7 +966,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
 
   scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, 1, read_len, read_len));
 
-  ASSERT_OK(io_mgr->Init(&mem_tracker));
+  ASSERT_OK(io_mgr->Init());
   // Reader doesn't need to provide mem tracker if it's providing buffers.
   MemTracker* reader_mem_tracker = nullptr;
   unique_ptr<RequestContext> reader;
@@ -1055,7 +989,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
 
     // DiskIoMgr should not have allocated memory.
     EXPECT_EQ(mem_tracker.consumption(), 0);
-    io_mgr->ReturnBuffer(move(io_buffer));
+    range->ReturnBuffer(move(io_buffer));
   }
 
   io_mgr->UnregisterContext(reader.get());
@@ -1072,7 +1006,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
 
   scoped_ptr<DiskIoMgr> io_mgr(new DiskIoMgr(1, 1, 1, SCAN_LEN, SCAN_LEN));
 
-  ASSERT_OK(io_mgr->Init(&mem_tracker));
+  ASSERT_OK(io_mgr->Init());
   // Reader doesn't need to provide mem tracker if it's providing buffers.
   MemTracker* reader_mem_tracker = nullptr;
   unique_ptr<RequestContext> reader;
@@ -1086,7 +1020,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
 
     /// Also test the cancellation path. Run multiple iterations since it is racy whether
     /// the read fails before the cancellation.
-    if (i >= 1) io_mgr->CancelContext(reader.get());
+    if (i >= 1) reader->Cancel();
 
     unique_ptr<BufferDescriptor> io_buffer;
     ASSERT_FALSE(range->GetNext(&io_buffer).ok());
@@ -1114,7 +1048,7 @@ TEST_F(DiskIoMgrTest, VerifyNumThreadsParameter) {
   const int num_io_threads_per_rotational_or_ssd = 2;
   DiskIoMgr io_mgr(1, num_io_threads_per_rotational_or_ssd,
       num_io_threads_per_rotational_or_ssd, 1, 10);
-  ASSERT_OK(io_mgr.Init(&mem_tracker));
+  ASSERT_OK(io_mgr.Init());
   const int num_io_threads = io_mgr.disk_thread_group_.Size();
   ASSERT_TRUE(num_io_threads ==
       num_io_threads_per_rotational_or_ssd + num_io_threads_for_remote_disks);
@@ -1125,5 +1059,6 @@ TEST_F(DiskIoMgrTest, VerifyNumThreadsParameter) {
 int main(int argc, char** argv) {
   ::testing::InitGoogleTest(&argc, argv);
   impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  impala::InitFeSupport();
   return RUN_ALL_TESTS();
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/3b3bf871/be/src/runtime/io/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index f3d69f1..8c00ef8 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -157,34 +157,6 @@ string DiskIoMgr::DebugString() {
   return ss.str();
 }
 
-BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr,
-    RequestContext* reader, ScanRange* scan_range, uint8_t* buffer,
-    int64_t buffer_len, MemTracker* mem_tracker)
-  : io_mgr_(io_mgr),
-    reader_(reader),
-    mem_tracker_(mem_tracker),
-    scan_range_(scan_range),
-    buffer_(buffer),
-    buffer_len_(buffer_len) {
-  DCHECK(io_mgr != nullptr);
-  DCHECK(scan_range != nullptr);
-  DCHECK(buffer != nullptr);
-  DCHECK_GE(buffer_len, 0);
-  DCHECK_NE(scan_range->external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER,
-      mem_tracker == nullptr);
-}
-
-void BufferDescriptor::TransferOwnership(MemTracker* dst) {
-  DCHECK(dst != nullptr);
-  DCHECK(!is_client_buffer());
-  // Memory of cached buffers is not tracked against a tracker.
-  if (is_cached()) return;
-  DCHECK(mem_tracker_ != nullptr);
-  dst->Consume(buffer_len_);
-  mem_tracker_->Release(buffer_len_);
-  mem_tracker_ = dst;
-}
-
 WriteRange::WriteRange(
     const string& file, int64_t file_offset, int disk_id, WriteDoneCallback callback)
   : RequestRange(RequestType::WRITE), callback_(callback) {
@@ -235,8 +207,6 @@ DiskIoMgr::DiskIoMgr() :
         FLAGS_num_file_handle_cache_partitions,
         FLAGS_unused_file_handle_timeout_sec) {
   DCHECK_LE(READ_SIZE_MIN_VALUE, FLAGS_read_size);
-  int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, min_buffer_size_);
-  free_buffers_.resize(BitUtil::Log2Ceiling64(max_buffer_size_scaled) + 1);
   int num_local_disks = DiskInfo::num_disks();
   if (FLAGS_num_disks < 0 || FLAGS_num_disks > DiskInfo::num_disks()) {
     LOG(WARNING) << "Number of disks specified should be between 0 and the number of "
@@ -250,7 +220,7 @@ DiskIoMgr::DiskIoMgr() :
 }
 
 DiskIoMgr::DiskIoMgr(int num_local_disks, int threads_per_rotational_disk,
-    int threads_per_solid_state_disk, int min_buffer_size, int max_buffer_size) :
+    int threads_per_solid_state_disk, int64_t min_buffer_size, int64_t max_buffer_size) :
     num_io_threads_per_rotational_disk_(threads_per_rotational_disk),
     num_io_threads_per_solid_state_disk_(threads_per_solid_state_disk),
     max_buffer_size_(max_buffer_size),
@@ -262,8 +232,6 @@ DiskIoMgr::DiskIoMgr(int num_local_disks, int threads_per_rotational_disk,
         FileSystemUtil::MaxNumFileHandles()),
         FLAGS_num_file_handle_cache_partitions,
         FLAGS_unused_file_handle_timeout_sec) {
-  int64_t max_buffer_size_scaled = BitUtil::Ceil(max_buffer_size_, min_buffer_size_);
-  free_buffers_.resize(BitUtil::Log2Ceiling64(max_buffer_size_scaled) + 1);
   if (num_local_disks == 0) num_local_disks = DiskInfo::num_disks();
   disk_queues_.resize(num_local_disks + REMOTE_NUM_DISKS);
   CheckSseSupport();
@@ -287,37 +255,22 @@ DiskIoMgr::~DiskIoMgr() {
   for (int i = 0; i < disk_queues_.size(); ++i) {
     if (disk_queues_[i] == nullptr) continue;
     int disk_id = disk_queues_[i]->disk_id;
-    for (list<RequestContext*>::iterator it = disk_queues_[i]->request_contexts.begin();
-        it != disk_queues_[i]->request_contexts.end(); ++it) {
-      DCHECK_EQ((*it)->disk_states_[disk_id].num_threads_in_op(), 0);
-      DCHECK((*it)->disk_states_[disk_id].done());
-      (*it)->DecrementDiskRefCount();
+    for (RequestContext* context : disk_queues_[i]->request_contexts) {
+      unique_lock<mutex> context_lock(context->lock_);
+      DCHECK_EQ(context->disk_states_[disk_id].num_threads_in_op(), 0);
+      DCHECK(context->disk_states_[disk_id].done());
+      context->DecrementDiskRefCount(context_lock);
     }
   }
 
-  DCHECK_EQ(num_buffers_in_readers_.Load(), 0);
-
-  // Delete all allocated buffers
-  int num_free_buffers = 0;
-  for (int idx = 0; idx < free_buffers_.size(); ++idx) {
-    num_free_buffers += free_buffers_[idx].size();
-  }
-  DCHECK_EQ(num_allocated_buffers_.Load(), num_free_buffers);
-  GcIoBuffers();
-
   for (int i = 0; i < disk_queues_.size(); ++i) {
     delete disk_queues_[i];
   }
 
-  if (free_buffer_mem_tracker_ != nullptr) free_buffer_mem_tracker_->Close();
   if (cached_read_options_ != nullptr) hadoopRzOptionsFree(cached_read_options_);
 }
 
-Status DiskIoMgr::Init(MemTracker* process_mem_tracker) {
-  DCHECK(process_mem_tracker != nullptr);
-  free_buffer_mem_tracker_.reset(
-      new MemTracker(-1, "Free Disk IO Buffers", process_mem_tracker, false));
-
+Status DiskIoMgr::Init() {
   for (int i = 0; i < disk_queues_.size(); ++i) {
     disk_queues_[i] = new DiskQueue(i);
     int num_threads_per_disk;
@@ -372,88 +325,6 @@ void DiskIoMgr::UnregisterContext(RequestContext* reader) {
   reader->CancelAndMarkInactive();
 }
 
-// Cancellation requires coordination from multiple threads.  Each thread that currently
-// has a reference to the request context must notice the cancel and remove it from its
-// tracking structures.  The last thread to touch the context should deallocate (aka
-// recycle) the request context object.  Potential threads are:
-//  1. Disk threads that are currently reading for this reader.
-//  2. Caller threads that are waiting in GetNext.
-//
-// The steps are:
-// 1. Cancel will immediately set the context in the Cancelled state.  This prevents any
-// other thread from adding more ready buffers to the context (they all take a lock and
-// check the state before doing so), or any write ranges to the context.
-// 2. Cancel will call cancel on each ScanRange that is not yet complete, unblocking
-// any threads in GetNext(). The reader will see the cancelled Status returned. Cancel
-// also invokes the callback for the WriteRanges with the cancelled state.
-// 3. Disk threads notice the context is cancelled either when picking the next context
-// to process or when they try to enqueue a ready buffer.  Upon noticing the cancelled
-// state, removes the context from the disk queue.  The last thread per disk with an
-// outstanding reference to the context decrements the number of disk queues the context
-// is on.
-void DiskIoMgr::CancelContext(RequestContext* context) {
-  context->Cancel(Status::CANCELLED);
-}
-
-void DiskIoMgr::set_read_timer(RequestContext* r, RuntimeProfile::Counter* c) {
-  r->read_timer_ = c;
-}
-
-void DiskIoMgr::set_bytes_read_counter(RequestContext* r, RuntimeProfile::Counter* c) {
-  r->bytes_read_counter_ = c;
-}
-
-void DiskIoMgr::set_active_read_thread_counter(RequestContext* r,
-    RuntimeProfile::Counter* c) {
-  r->active_read_thread_counter_ = c;
-}
-
-void DiskIoMgr::set_disks_access_bitmap(RequestContext* r,
-    RuntimeProfile::Counter* c) {
-  r->disks_accessed_bitmap_ = c;
-}
-
-int64_t DiskIoMgr::queue_size(RequestContext* reader) const {
-  return reader->num_ready_buffers_.Load();
-}
-
-Status DiskIoMgr::context_status(RequestContext* context) const {
-  unique_lock<mutex> lock(context->lock_);
-  return context->status_;
-}
-
-int64_t DiskIoMgr::bytes_read_local(RequestContext* reader) const {
-  return reader->bytes_read_local_.Load();
-}
-
-int64_t DiskIoMgr::bytes_read_short_circuit(RequestContext* reader) const {
-  return reader->bytes_read_short_circuit_.Load();
-}
-
-int64_t DiskIoMgr::bytes_read_dn_cache(RequestContext* reader) const {
-  return reader->bytes_read_dn_cache_.Load();
-}
-
-int DiskIoMgr::num_remote_ranges(RequestContext* reader) const {
-  return reader->num_remote_ranges_.Load();
-}
-
-int64_t DiskIoMgr::unexpected_remote_bytes(RequestContext* reader) const {
-  return reader->unexpected_remote_bytes_.Load();
-}
-
-int DiskIoMgr::cached_file_handles_hit_count(RequestContext* reader) const {
-  return reader->cached_file_handles_hit_count_.Load();
-}
-
-int DiskIoMgr::cached_file_handles_miss_count(RequestContext* reader) const {
-  return reader->cached_file_handles_miss_count_.Load();
-}
-
-int64_t DiskIoMgr::GetReadThroughput() {
-  return RuntimeProfile::UnitsPerSecond(&total_bytes_read_counter_, &read_timer_);
-}
-
 Status DiskIoMgr::ValidateScanRange(ScanRange* range) {
   int disk_id = range->disk_id_;
   if (disk_id < 0 || disk_id >= disk_queues_.size()) {
@@ -485,10 +356,7 @@ Status DiskIoMgr::AddScanRanges(RequestContext* reader,
   unique_lock<mutex> reader_lock(reader->lock_);
   DCHECK(reader->Validate()) << endl << reader->DebugString();
 
-  if (reader->state_ == RequestContext::Cancelled) {
-    DCHECK(!reader->status_.ok());
-    return reader->status_;
-  }
+  if (reader->state_ == RequestContext::Cancelled) return Status::CANCELLED;
 
   // Add each range to the queue of the disk the range is on
   for (int i = 0; i < ranges.size(); ++i) {
@@ -507,7 +375,7 @@ Status DiskIoMgr::AddScanRanges(RequestContext* reader,
         continue;
       }
     }
-    reader->AddRequestRange(range, schedule_immediately);
+    reader->AddRequestRange(reader_lock, range, schedule_immediately);
   }
   DCHECK(reader->Validate()) << endl << reader->DebugString();
 
@@ -533,8 +401,7 @@ Status DiskIoMgr::GetNextRange(RequestContext* reader, ScanRange** range) {
 
   while (true) {
     if (reader->state_ == RequestContext::Cancelled) {
-      DCHECK(!reader->status_.ok());
-      status = reader->status_;
+      status = Status::CANCELLED;
       break;
     }
 
@@ -553,7 +420,7 @@ Status DiskIoMgr::GetNextRange(RequestContext* reader, ScanRange** range) {
       if (cached_read_succeeded) return Status::OK();
 
       // This range ended up not being cached. Loop again and pick up a new range.
-      reader->AddRequestRange(*range, false);
+      reader->AddRequestRange(reader_lock, *range, false);
       DCHECK(reader->Validate()) << endl << reader->DebugString();
       *range = nullptr;
       continue;
@@ -569,185 +436,13 @@ Status DiskIoMgr::GetNextRange(RequestContext* reader, ScanRange** range) {
       // Set this to nullptr, the next time this disk runs for this reader, it will
       // get another range ready.
       reader->disk_states_[disk_id].set_next_scan_range_to_start(nullptr);
-      reader->ScheduleScanRange(*range);
+      reader->ScheduleScanRange(reader_lock, *range);
       break;
     }
   }
   return status;
 }
 
-Status DiskIoMgr::Read(RequestContext* reader,
-    ScanRange* range, std::unique_ptr<BufferDescriptor>* buffer) {
-  DCHECK(range != nullptr);
-  DCHECK(buffer != nullptr);
-  *buffer = nullptr;
-
-  if (range->len() > max_buffer_size_
-      && range->external_buffer_tag_ != ScanRange::ExternalBufferTag::CLIENT_BUFFER) {
-    return Status(TErrorCode::DISK_IO_ERROR, Substitute("Internal error: cannot "
-        "perform sync read of '$0' bytes that is larger than the max read buffer size "
-        "'$1'.", range->len(), max_buffer_size_));
-  }
-
-  vector<ScanRange*> ranges;
-  ranges.push_back(range);
-  RETURN_IF_ERROR(AddScanRanges(reader, ranges, true));
-  RETURN_IF_ERROR(range->GetNext(buffer));
-  DCHECK((*buffer) != nullptr);
-  DCHECK((*buffer)->eosr());
-  return Status::OK();
-}
-
-void DiskIoMgr::ReturnBuffer(unique_ptr<BufferDescriptor> buffer_desc) {
-  DCHECK(buffer_desc != nullptr);
-  if (!buffer_desc->status_.ok()) DCHECK(buffer_desc->buffer_ == nullptr);
-
-  RequestContext* reader = buffer_desc->reader_;
-  if (buffer_desc->buffer_ != nullptr) {
-    if (!buffer_desc->is_cached() && !buffer_desc->is_client_buffer()) {
-      // Buffers the were not allocated by DiskIoMgr don't need to be freed.
-      FreeBufferMemory(buffer_desc.get());
-    }
-    buffer_desc->buffer_ = nullptr;
-    num_buffers_in_readers_.Add(-1);
-    reader->num_buffers_in_reader_.Add(-1);
-  } else {
-    // A nullptr buffer means there was an error in which case there is no buffer
-    // to return.
-  }
-
-  if (buffer_desc->eosr_ || buffer_desc->scan_range_->is_cancelled_) {
-    // Need to close the scan range if returning the last buffer or the scan range
-    // has been cancelled (and the caller might never get the last buffer).
-    // Close() is idempotent so multiple cancelled buffers is okay.
-    buffer_desc->scan_range_->Close();
-  }
-}
-
-unique_ptr<BufferDescriptor> DiskIoMgr::GetFreeBuffer(
-    RequestContext* reader, ScanRange* range, int64_t buffer_size) {
-  DCHECK_LE(buffer_size, max_buffer_size_);
-  DCHECK_GT(buffer_size, 0);
-  buffer_size = min(static_cast<int64_t>(max_buffer_size_), buffer_size);
-  int idx = free_buffers_idx(buffer_size);
-  // Quantize buffer size to nearest power of 2 greater than the specified buffer size and
-  // convert to bytes
-  buffer_size = (1LL << idx) * min_buffer_size_;
-
-  // Track memory against the reader. This is checked the next time we start
-  // a read for the next reader in DiskIoMgr::GetNextScanRange().
-  DCHECK(reader->mem_tracker_ != nullptr);
-  reader->mem_tracker_->Consume(buffer_size);
-
-  uint8_t* buffer = nullptr;
-  {
-    unique_lock<mutex> lock(free_buffers_lock_);
-    if (free_buffers_[idx].empty()) {
-      num_allocated_buffers_.Add(1);
-      if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != nullptr) {
-        ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(1L);
-      }
-      if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != nullptr) {
-        ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(buffer_size);
-      }
-      // We already tracked this memory against the reader's MemTracker.
-      buffer = new uint8_t[buffer_size];
-    } else {
-      if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != nullptr) {
-        ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(-1L);
-      }
-      buffer = free_buffers_[idx].front();
-      free_buffers_[idx].pop_front();
-      free_buffer_mem_tracker_->Release(buffer_size);
-      ASAN_UNPOISON_MEMORY_REGION(buffer, buffer_size);
-    }
-  }
-
-  // Validate more invariants.
-  DCHECK(range != nullptr);
-  DCHECK(reader != nullptr);
-  DCHECK(buffer != nullptr);
-  return unique_ptr<BufferDescriptor>(new BufferDescriptor(
-      this, reader, range, buffer, buffer_size, reader->mem_tracker_));
-}
-
-void DiskIoMgr::GcIoBuffers(int64_t bytes_to_free) {
-  unique_lock<mutex> lock(free_buffers_lock_);
-  int buffers_freed = 0;
-  int bytes_freed = 0;
-  // Free small-to-large to avoid retaining many small buffers and fragmenting memory.
-  for (int idx = 0; idx < free_buffers_.size(); ++idx) {
-    deque<uint8_t*>* free_buffers = &free_buffers_[idx];
-    while (
-        !free_buffers->empty() && (bytes_to_free == -1 || bytes_freed <= bytes_to_free)) {
-      uint8_t* buffer = free_buffers->front();
-      free_buffers->pop_front();
-      int64_t buffer_size = (1LL << idx) * min_buffer_size_;
-      ASAN_UNPOISON_MEMORY_REGION(buffer, buffer_size);
-      delete[] buffer;
-      free_buffer_mem_tracker_->Release(buffer_size);
-      num_allocated_buffers_.Add(-1);
-
-      ++buffers_freed;
-      bytes_freed += buffer_size;
-    }
-    if (bytes_to_free != -1 && bytes_freed >= bytes_to_free) break;
-  }
-
-  if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != nullptr) {
-    ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(-buffers_freed);
-  }
-  if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != nullptr) {
-    ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(-bytes_freed);
-  }
-  if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != nullptr) {
-    ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(-buffers_freed);
-  }
-}
-
-void DiskIoMgr::FreeBufferMemory(BufferDescriptor* desc) {
-  DCHECK(!desc->is_cached());
-  DCHECK(!desc->is_client_buffer());
-  uint8_t* buffer = desc->buffer_;
-  int64_t buffer_size = desc->buffer_len_;
-  int idx = free_buffers_idx(buffer_size);
-  DCHECK_EQ(BitUtil::Ceil(buffer_size, min_buffer_size_) & ~(1LL << idx), 0)
-      << "buffer_size_ / min_buffer_size_ should be power of 2, got buffer_size = "
-      << buffer_size << ", min_buffer_size_ = " << min_buffer_size_;
-
-  {
-    unique_lock<mutex> lock(free_buffers_lock_);
-    if (!FLAGS_disable_mem_pools &&
-        free_buffers_[idx].size() < FLAGS_max_free_io_buffers) {
-      // Poison buffers stored in cache.
-      ASAN_POISON_MEMORY_REGION(buffer, buffer_size);
-      free_buffers_[idx].push_back(buffer);
-      if (ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS != nullptr) {
-        ImpaladMetrics::IO_MGR_NUM_UNUSED_BUFFERS->Increment(1L);
-      }
-      // This consume call needs to be protected by 'free_buffers_lock_' to avoid a race
-      // with a Release() call for the same buffer that could make consumption negative.
-      // Note: we can't use TryConsume(), which can indirectly call GcIoBuffers().
-      // TODO: after IMPALA-3200 is completed, we should be able to leverage the buffer
-      // pool's free lists, and remove these free lists.
-      free_buffer_mem_tracker_->Consume(buffer_size);
-    } else {
-      num_allocated_buffers_.Add(-1);
-      delete[] buffer;
-      if (ImpaladMetrics::IO_MGR_NUM_BUFFERS != nullptr) {
-        ImpaladMetrics::IO_MGR_NUM_BUFFERS->Increment(-1L);
-      }
-      if (ImpaladMetrics::IO_MGR_TOTAL_BYTES != nullptr) {
-        ImpaladMetrics::IO_MGR_TOTAL_BYTES->Increment(-buffer_size);
-      }
-    }
-  }
-
-  // We transferred the buffer ownership from the BufferDescriptor to the DiskIoMgr.
-  desc->mem_tracker_->Release(buffer_size);
-  desc->buffer_ = nullptr;
-}
-
 // This function gets the next RequestRange to work on for this disk. It checks for
 // cancellation and
 // a) Updates ready_to_start_ranges if there are no scan ranges queued for this disk.
@@ -785,7 +480,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
       disk_queue->request_contexts.pop_front();
       DCHECK(*request_context != nullptr);
       request_disk_state = &((*request_context)->disk_states_[disk_id]);
-      request_disk_state->IncrementRequestThreadAndDequeue();
+      request_disk_state->IncrementDiskThreadAndDequeue();
     }
 
     // NOTE: no locks were taken in between.  We need to be careful about what state
@@ -793,27 +488,13 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
     // There are some invariants here.  Only one disk thread can have the
     // same reader here (the reader is removed from the queue).  There can be
     // other disk threads operating on this reader in other functions though.
-
-    // We just picked a reader. Before we may allocate a buffer on its behalf, check that
-    // it has not exceeded any memory limits (e.g. the query or process limit).
-    // TODO: once IMPALA-3200 is fixed, we should be able to remove the free lists and
-    // move these memory limit checks to GetFreeBuffer().
-    // Note that calling AnyLimitExceeded() can result in a call to GcIoBuffers().
-    // TODO: IMPALA-3209: we should not force a reader over its memory limit by
-    // pushing more buffers to it. Most readers can make progress and operate within
-    // a fixed memory limit.
-    if ((*request_context)->mem_tracker_ != nullptr
-        && (*request_context)->mem_tracker_->AnyLimitExceeded()) {
-      (*request_context)->Cancel(Status::MemLimitExceeded());
-    }
-
     unique_lock<mutex> request_lock((*request_context)->lock_);
     VLOG_FILE << "Disk (id=" << disk_id << ") reading for "
         << (*request_context)->DebugString();
 
     // Check if reader has been cancelled
     if ((*request_context)->state_ == RequestContext::Cancelled) {
-      request_disk_state->DecrementRequestThreadAndCheckDone(*request_context);
+      request_disk_state->DecrementDiskThread(request_lock, *request_context);
       continue;
     }
 
@@ -856,7 +537,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
 
     // There are no inflight ranges, nothing to do.
     if (request_disk_state->in_flight_ranges()->empty()) {
-      request_disk_state->DecrementRequestThread();
+      request_disk_state->DecrementDiskThread(request_lock, *request_context);
       continue;
     }
     DCHECK_GT(request_disk_state->num_remaining_ranges(), 0);
@@ -865,7 +546,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
 
     // Now that we've picked a request range, put the context back on the queue so
     // another thread can pick up another request range for this context.
-    request_disk_state->ScheduleContext(*request_context, disk_id);
+    request_disk_state->ScheduleContext(request_lock, *request_context, disk_id);
     DCHECK((*request_context)->Validate()) << endl << (*request_context)->DebugString();
     return true;
   }
@@ -879,81 +560,67 @@ void DiskIoMgr::HandleWriteFinished(
   // Copy disk_id before running callback: the callback may modify write_range.
   int disk_id = write_range->disk_id_;
 
-  // Execute the callback before decrementing the thread count. Otherwise CancelContext()
-  // that waits for the disk ref count to be 0 will return, creating a race, e.g. see
-  // IMPALA-1890.
+  // Execute the callback before decrementing the thread count. Otherwise
+  // RequestContext::Cancel() that waits for the disk ref count to be 0 will
+  // return, creating a race, e.g. see IMPALA-1890.
   // The status of the write does not affect the status of the writer context.
   write_range->callback_(write_status);
   {
     unique_lock<mutex> writer_lock(writer->lock_);
     DCHECK(writer->Validate()) << endl << writer->DebugString();
     RequestContext::PerDiskState& state = writer->disk_states_[disk_id];
-    if (writer->state_ == RequestContext::Cancelled) {
-      state.DecrementRequestThreadAndCheckDone(writer);
-    } else {
-      state.DecrementRequestThread();
-    }
+    state.DecrementDiskThread(writer_lock, writer);
     --state.num_remaining_ranges();
   }
 }
 
 void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, RequestContext* reader,
-    unique_ptr<BufferDescriptor> buffer) {
+    Status read_status, unique_ptr<BufferDescriptor> buffer) {
   unique_lock<mutex> reader_lock(reader->lock_);
 
-  RequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id];
+  RequestContext::PerDiskState* disk_state = &reader->disk_states_[disk_queue->disk_id];
   DCHECK(reader->Validate()) << endl << reader->DebugString();
-  DCHECK_GT(state.num_threads_in_op(), 0);
-  DCHECK(buffer->buffer_ != nullptr);
-
-  if (reader->state_ == RequestContext::Cancelled) {
-    state.DecrementRequestThreadAndCheckDone(reader);
-    DCHECK(reader->Validate()) << endl << reader->DebugString();
-    if (!buffer->is_client_buffer()) FreeBufferMemory(buffer.get());
-    buffer->buffer_ = nullptr;
-    ScanRange* scan_range = buffer->scan_range_;
-    scan_range->Cancel(reader->status_);
-    // Enqueue the buffer to use the scan range's buffer cleanup path.
-    scan_range->EnqueueBuffer(reader_lock, move(buffer));
-    return;
-  }
-
-  DCHECK_EQ(reader->state_, RequestContext::Active);
+  DCHECK_GT(disk_state->num_threads_in_op(), 0);
   DCHECK(buffer->buffer_ != nullptr);
+  DCHECK(!buffer->is_cached()) << "HDFS cache reads don't go through this code path.";
 
-  // Update the reader's scan ranges.  There are a three cases here:
-  //  1. Read error
-  //  2. End of scan range
-  //  3. Middle of scan range
-  if (!buffer->status_.ok()) {
-    // Error case
-    if (!buffer->is_client_buffer()) FreeBufferMemory(buffer.get());
-    buffer->buffer_ = nullptr;
-    buffer->eosr_ = true;
-    --state.num_remaining_ranges();
-    buffer->scan_range_->Cancel(buffer->status_);
-  } else if (buffer->eosr_) {
-    --state.num_remaining_ranges();
-  }
-
-  // After calling EnqueueBuffer(), it is no longer valid to read from buffer.
+  // After calling EnqueueBuffer() below, it is no longer valid to read from buffer.
   // Store the state we need before calling EnqueueBuffer().
   bool eosr = buffer->eosr_;
+
+  // TODO: IMPALA-4249: it safe to touch 'scan_range' until DecrementDiskThread() is
+  // called because all clients of DiskIoMgr keep ScanRange objects alive until they
+  // unregister their RequestContext.
   ScanRange* scan_range = buffer->scan_range_;
-  bool is_cached = buffer->is_cached();
-  bool queue_full = scan_range->EnqueueBuffer(reader_lock, move(buffer));
-  if (eosr) {
-    // For cached buffers, we can't close the range until the cached buffer is returned.
-    // Close() is called from DiskIoMgr::ReturnBuffer().
-    if (!is_cached) scan_range->Close();
-  } else {
-    if (queue_full) {
-      reader->blocked_ranges_.Enqueue(scan_range);
-    } else {
-      reader->ScheduleScanRange(scan_range);
+  bool scan_range_done = eosr;
+  if (read_status.ok() && reader->state_ != RequestContext::Cancelled) {
+    DCHECK_EQ(reader->state_, RequestContext::Active);
+    // Read successfully - update the reader's scan ranges.  There are two cases here:
+    //  1. End of scan range
+    //  2. Middle of scan range
+    bool queue_full = scan_range->EnqueueBuffer(reader_lock, move(buffer));
+    if (!eosr) {
+      if (queue_full) {
+        reader->blocked_ranges_.Enqueue(scan_range);
+      } else {
+        reader->ScheduleScanRange(reader_lock, scan_range);
+      }
     }
+  } else {
+    // The scan range will be cancelled, either because we hit an error or because the
+    // request context was cancelled.  The buffer is not needed - we must free it.
+    reader->FreeBuffer(buffer.get());
+    reader->num_used_buffers_.Add(-1);
+    // Propagate the error or cancellation by cancelling the scan range.
+    scan_range->Cancel(read_status.ok() ? Status::CANCELLED : read_status);
+    scan_range_done = true;
+  }
+  if (scan_range_done) {
+    scan_range->Close();
+    --disk_state->num_remaining_ranges();
   }
-  state.DecrementRequestThread();
+  DCHECK(reader->Validate()) << endl << reader->DebugString();
+  disk_state->DecrementDiskThread(reader_lock, reader);
 }
 
 void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
@@ -997,10 +664,13 @@ void DiskIoMgr::ReadRange(
   unique_ptr<BufferDescriptor> buffer_desc;
   if (range->external_buffer_tag_ == ScanRange::ExternalBufferTag::CLIENT_BUFFER) {
     buffer_desc = unique_ptr<BufferDescriptor>(new BufferDescriptor(this, reader, range,
-        range->client_buffer_.data, range->client_buffer_.len, nullptr));
+        range->client_buffer_.data, range->client_buffer_.len));
   } else {
+    DCHECK(range->external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER)
+        << "This code path does not handle other buffer types, i.e. HDFS cache"
+        << static_cast<int>(range->external_buffer_tag_);
     // Need to allocate a buffer to read into.
-    int64_t buffer_size = ::min(bytes_remaining, static_cast<int64_t>(max_buffer_size_));
+    int64_t buffer_size = min(bytes_remaining, max_buffer_size_);
     buffer_desc = TryAllocateNextBufferForRange(disk_queue, reader, range, buffer_size);
     if (buffer_desc == nullptr) return;
   }
@@ -1008,8 +678,8 @@ void DiskIoMgr::ReadRange(
 
   // No locks in this section.  Only working on local vars.  We don't want to hold a
   // lock across the read call.
-  buffer_desc->status_ = range->Open(detail::is_file_handle_caching_enabled());
-  if (buffer_desc->status_.ok()) {
+  Status read_status = range->Open(detail::is_file_handle_caching_enabled());
+  if (read_status.ok()) {
     // Update counters.
     if (reader->active_read_thread_counter_) {
       reader->active_read_thread_counter_->Add(1L);
@@ -1021,7 +691,7 @@ void DiskIoMgr::ReadRange(
     SCOPED_TIMER(&read_timer_);
     SCOPED_TIMER(reader->read_timer_);
 
-    buffer_desc->status_ = range->Read(buffer_desc->buffer_, buffer_desc->buffer_len_,
+    read_status = range->Read(buffer_desc->buffer_, buffer_desc->buffer_len_,
         &buffer_desc->len_, &buffer_desc->eosr_);
     buffer_desc->scan_range_offset_ = range->bytes_read_ - buffer_desc->len_;
 
@@ -1036,29 +706,25 @@ void DiskIoMgr::ReadRange(
   }
 
   // Finished read, update reader/disk based on the results
-  HandleReadFinished(disk_queue, reader, move(buffer_desc));
+  HandleReadFinished(disk_queue, reader, read_status, move(buffer_desc));
 }
 
 unique_ptr<BufferDescriptor> DiskIoMgr::TryAllocateNextBufferForRange(
     DiskQueue* disk_queue, RequestContext* reader, ScanRange* range,
     int64_t buffer_size) {
   DCHECK(reader->mem_tracker_ != nullptr);
+  // TODO: replace this with reservation check (if needed at all).
   bool enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY;
-  if (!enough_memory) {
-    // Low memory, GC all the buffers and try again.
-    GcIoBuffers();
-    enough_memory = reader->mem_tracker_->SpareCapacity() > LOW_MEMORY;
-  }
 
+  RequestContext::PerDiskState* disk_state = &reader->disk_states_[disk_queue->disk_id];
   if (!enough_memory) {
-    RequestContext::PerDiskState& state = reader->disk_states_[disk_queue->disk_id];
     unique_lock<mutex> reader_lock(reader->lock_);
 
     // Just grabbed the reader lock, check for cancellation.
     if (reader->state_ == RequestContext::Cancelled) {
       DCHECK(reader->Validate()) << endl << reader->DebugString();
-      state.DecrementRequestThreadAndCheckDone(reader);
-      range->Cancel(reader->status_);
+      disk_state->DecrementDiskThread(reader_lock, reader);
+      range->Cancel(Status::CANCELLED);
       DCHECK(reader->Validate()) << endl << reader->DebugString();
       return nullptr;
     }
@@ -1068,7 +734,7 @@ unique_ptr<BufferDescriptor> DiskIoMgr::TryAllocateNextBufferForRange(
       // (it already has one queued). Skip this range and pick it up later.
       range->blocked_on_queue_ = true;
       reader->blocked_ranges_.Enqueue(range);
-      state.DecrementRequestThread();
+      disk_state->DecrementDiskThread(reader_lock, reader);
       return nullptr;
     } else {
       // We need to get a buffer anyway since there are none queued. The query
@@ -1076,7 +742,18 @@ unique_ptr<BufferDescriptor> DiskIoMgr::TryAllocateNextBufferForRange(
       // now.
     }
   }
-  unique_ptr<BufferDescriptor> buffer_desc = GetFreeBuffer(reader, range, buffer_size);
+  unique_ptr<BufferDescriptor> buffer_desc;
+  Status status = reader->AllocBuffer(range, buffer_size, &buffer_desc);
+  if (!status.ok()) {
+    // Hit memory limit - cancel range.
+    range->Cancel(status);
+    {
+      unique_lock<mutex> reader_lock(reader->lock_);
+      disk_state->DecrementDiskThread(reader_lock, reader);
+      DCHECK(reader->Validate()) << endl << reader->DebugString();
+    }
+    return nullptr;
+  }
   DCHECK(buffer_desc != nullptr);
   return buffer_desc;
 }
@@ -1139,23 +816,10 @@ Status DiskIoMgr::WriteRangeHelper(FILE* file_handle, WriteRange* write_range) {
   return Status::OK();
 }
 
-int DiskIoMgr::free_buffers_idx(int64_t buffer_size) {
-  int64_t buffer_size_scaled = BitUtil::Ceil(buffer_size, min_buffer_size_);
-  int idx = BitUtil::Log2Ceiling64(buffer_size_scaled);
-  DCHECK_GE(idx, 0);
-  DCHECK_LT(idx, free_buffers_.size());
-  return idx;
-}
-
 Status DiskIoMgr::AddWriteRange(RequestContext* writer, WriteRange* write_range) {
   unique_lock<mutex> writer_lock(writer->lock_);
-
-  if (writer->state_ == RequestContext::Cancelled) {
-    DCHECK(!writer->status_.ok());
-    return writer->status_;
-  }
-
-  writer->AddRequestRange(write_range, false);
+  if (writer->state_ == RequestContext::Cancelled) return Status::CANCELLED;
+  writer->AddRequestRange(writer_lock, write_range, false);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3b3bf871/be/src/runtime/io/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
index 4fa078b..20f5d44 100644
--- a/be/src/runtime/io/disk-io-mgr.h
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -144,8 +144,8 @@ namespace io {
 ///
 /// As a caller reads from a scan range, these buffers are wrapped in BufferDescriptors
 /// and returned to the caller. The caller must always call ReturnBuffer() on the buffer
-/// descriptor to allow recycling of the associated buffer (if there is an
-/// IoMgr-allocated or HDFS cached buffer).
+/// descriptor to allow freeing of the associated buffer (if there is an IoMgr-allocated
+/// or HDFS cached buffer).
 ///
 /// Caching support:
 /// Scan ranges contain metadata on whether or not it is cached on the DN. In that
@@ -207,7 +207,7 @@ class DiskIoMgr : public CacheLineAligned {
   ///  - min_buffer_size: minimum io buffer size (in bytes)
   ///  - max_buffer_size: maximum io buffer size (in bytes). Also the max read size.
   DiskIoMgr(int num_disks, int threads_per_rotational_disk,
-      int threads_per_solid_state_disk, int min_buffer_size, int max_buffer_size);
+      int threads_per_solid_state_disk, int64_t min_buffer_size, int64_t max_buffer_size);
 
   /// Create DiskIoMgr with default configs.
   DiskIoMgr();
@@ -217,7 +217,8 @@ class DiskIoMgr : public CacheLineAligned {
   ~DiskIoMgr();
 
   /// Initialize the IoMgr. Must be called once before any of the other APIs.
-  Status Init(MemTracker* process_mem_tracker) WARN_UNUSED_RESULT;
+  Status Init() WARN_UNUSED_RESULT;
+
 
   /// Allocates tracking structure for a request context.
   /// Register a new request context and return it to the caller. The caller must call
@@ -236,12 +237,6 @@ class DiskIoMgr : public CacheLineAligned {
   /// up.
   void UnregisterContext(RequestContext* context);
 
-  /// This function cancels the context asychronously. All outstanding requests
-  /// are aborted and tracking structures cleaned up. This does not need to be
-  /// called if the context finishes normally.
-  /// This will also fail any outstanding GetNext()/Read requests.
-  void CancelContext(RequestContext* context);
-
   /// Adds the scan ranges to the queues. This call is non-blocking. The caller must
   /// not deallocate the scan range pointers before UnregisterContext().
   /// If schedule_immediately, the ranges are immediately put on the read queue
@@ -266,52 +261,14 @@ class DiskIoMgr : public CacheLineAligned {
   /// This call is blocking.
   Status GetNextRange(RequestContext* reader, ScanRange** range) WARN_UNUSED_RESULT;
 
-  /// Reads the range and returns the result in buffer.
-  /// This behaves like the typical synchronous read() api, blocking until the data
-  /// is read. This can be called while there are outstanding ScanRanges and is
-  /// thread safe. Multiple threads can be calling Read() per reader at a time.
-  /// range *cannot* have already been added via AddScanRanges.
-  /// This can only be used if the scan range fits in a single IO buffer (i.e. is smaller
-  /// than max_read_buffer_size()) or if reading into a client-provided buffer.
-  Status Read(RequestContext* reader, ScanRange* range,
-      std::unique_ptr<BufferDescriptor>* buffer) WARN_UNUSED_RESULT;
-
-  /// Returns the buffer to the IoMgr. This must be called for every buffer
-  /// returned by GetNext()/Read() that did not return an error. This is non-blocking.
-  /// After calling this, the buffer descriptor is invalid and cannot be accessed.
-  void ReturnBuffer(std::unique_ptr<BufferDescriptor> buffer);
-
   /// Determine which disk queue this file should be assigned to.  Returns an index into
   /// disk_queues_.  The disk_id is the volume ID for the local disk that holds the
   /// files, or -1 if unknown.  Flag expected_local is true iff this impalad is
   /// co-located with the datanode for this file.
   int AssignQueue(const char* file, int disk_id, bool expected_local);
 
-  /// TODO: The functions below can be moved to RequestContext.
-  /// Returns the current status of the context.
-  Status context_status(RequestContext* context) const WARN_UNUSED_RESULT;
-
-  void set_bytes_read_counter(RequestContext*, RuntimeProfile::Counter*);
-  void set_read_timer(RequestContext*, RuntimeProfile::Counter*);
-  void set_active_read_thread_counter(RequestContext*, RuntimeProfile::Counter*);
-  void set_disks_access_bitmap(RequestContext*, RuntimeProfile::Counter*);
-
-  int64_t queue_size(RequestContext* reader) const;
-  int64_t bytes_read_local(RequestContext* reader) const;
-  int64_t bytes_read_short_circuit(RequestContext* reader) const;
-  int64_t bytes_read_dn_cache(RequestContext* reader) const;
-  int num_remote_ranges(RequestContext* reader) const;
-  int64_t unexpected_remote_bytes(RequestContext* reader) const;
-  int cached_file_handles_hit_count(RequestContext* reader) const;
-  int cached_file_handles_miss_count(RequestContext* reader) const;
-
-  /// Returns the read throughput across all readers.
-  /// TODO: should this be a sliding window?  This should report metrics for the
-  /// last minute, hour and since the beginning.
-  int64_t GetReadThroughput();
-
   /// Returns the maximum read buffer size
-  int max_read_buffer_size() const { return max_buffer_size_; }
+  int64_t max_read_buffer_size() const { return max_buffer_size_; }
 
   /// Returns the total number of disk queues (both local and remote).
   int num_total_disks() const { return disk_queues_.size(); }
@@ -361,10 +318,6 @@ class DiskIoMgr : public CacheLineAligned {
   Status ReopenCachedHdfsFileHandle(const hdfsFS& fs, std::string* fname, int64_t mtime,
       CachedHdfsFileHandle** fid);
 
-  /// Garbage collect unused I/O buffers up to 'bytes_to_free', or all the buffers if
-  /// 'bytes_to_free' is -1.
-  void GcIoBuffers(int64_t bytes_to_free = -1);
-
   /// The maximum number of ready buffers that can be queued in a scan range. Having two
   /// queued buffers (plus the buffer that is returned to the client) gives good
   /// performance in most scenarios:
@@ -403,14 +356,6 @@ class DiskIoMgr : public CacheLineAligned {
   friend class DiskIoMgrTest_Buffers_Test;
   friend class DiskIoMgrTest_VerifyNumThreadsParameter_Test;
 
-  /// Memory tracker for unused I/O buffers owned by DiskIoMgr.
-  boost::scoped_ptr<MemTracker> free_buffer_mem_tracker_;
-
-  /// Memory tracker for I/O buffers where the RequestContext has no MemTracker.
-  /// TODO: once IMPALA-3200 is fixed, there should be no more cases where readers don't
-  /// provide a MemTracker.
-  boost::scoped_ptr<MemTracker> unowned_buffer_mem_tracker_;
-
   /// Number of worker(read) threads per rotational disk. Also the max depth of queued
   /// work to the disk.
   const int num_io_threads_per_rotational_disk_;
@@ -420,10 +365,10 @@ class DiskIoMgr : public CacheLineAligned {
   const int num_io_threads_per_solid_state_disk_;
 
   /// Maximum read size. This is also the maximum size of each allocated buffer.
-  const int max_buffer_size_;
+  const int64_t max_buffer_size_;
 
   /// The minimum size of each read buffer.
-  const int min_buffer_size_;
+  const int64_t min_buffer_size_;
 
   /// Thread group containing all the worker threads.
   ThreadGroup disk_thread_group_;
@@ -441,28 +386,6 @@ class DiskIoMgr : public CacheLineAligned {
   /// Total time spent in hdfs reading
   RuntimeProfile::Counter read_timer_;
 
-  /// Protects free_buffers_
-  boost::mutex free_buffers_lock_;
-
-  /// Free buffers that can be handed out to clients. There is one list for each buffer
-  /// size, indexed by the Log2 of the buffer size in units of min_buffer_size_. The
-  /// maximum buffer size is max_buffer_size_, so the maximum index is
-  /// Log2(max_buffer_size_ / min_buffer_size_).
-  //
-  /// E.g. if min_buffer_size_ = 1024 bytes:
-  ///  free_buffers_[0]  => list of free buffers with size 1024 B
-  ///  free_buffers_[1]  => list of free buffers with size 2048 B
-  ///  free_buffers_[10] => list of free buffers with size 1 MB
-  ///  free_buffers_[13] => list of free buffers with size 8 MB
-  ///  free_buffers_[n]  => list of free buffers with size 2^n * 1024 B
-  std::vector<std::deque<uint8_t*>> free_buffers_;
-
-  /// Total number of allocated buffers, used for debugging.
-  AtomicInt32 num_allocated_buffers_;
-
-  /// Total number of buffers in readers
-  AtomicInt32 num_buffers_in_readers_;
-
   /// Per disk queues. This is static and created once at Init() time.  One queue is
   /// allocated for each local disk on the system and for each remote filesystem type.
   /// It is indexed by disk id.
@@ -478,23 +401,6 @@ class DiskIoMgr : public CacheLineAligned {
   // handles are closed.
   FileHandleCache file_handle_cache_;
 
-  /// Returns the index into free_buffers_ for a given buffer size
-  int free_buffers_idx(int64_t buffer_size);
-
-  /// Returns a buffer to read into with size between 'buffer_size' and
-  /// 'max_buffer_size_', If there is an appropriately-sized free buffer in the
-  /// 'free_buffers_', that is returned, otherwise a new one is allocated.
-  /// The returned *buffer_size must be between 0 and 'max_buffer_size_'.
-  /// The buffer memory is tracked against reader's mem tracker, or
-  /// 'unowned_buffer_mem_tracker_' if the reader does not have one.
-  std::unique_ptr<BufferDescriptor> GetFreeBuffer(
-      RequestContext* reader, ScanRange* range, int64_t buffer_size);
-
-  /// Disassociates the desc->buffer_ memory from 'desc' (which cannot be nullptr), either
-  /// freeing it or returning it to 'free_buffers_'. Memory tracking is updated to
-  /// reflect the transfer of ownership from desc->mem_tracker_ to the disk I/O mgr.
-  void FreeBufferMemory(BufferDescriptor* desc);
-
   /// Disk worker thread loop. This function retrieves the next range to process on
   /// the disk queue and invokes ReadRange() or Write() depending on the type of Range().
   /// There can be multiple threads per disk running this loop.
@@ -508,10 +414,12 @@ class DiskIoMgr : public CacheLineAligned {
   bool GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
       RequestContext** request_context);
 
-  /// Updates disk queue and reader state after a read is complete. The read result
-  /// is captured in the buffer descriptor.
+  /// Updates disk queue and reader state after a read is complete. If the read
+  /// was successful, 'read_status' is ok and 'buffer' contains the result of the
+  /// read. If the read failed with an error, 'read_status' contains the error and
+  /// 'buffer' has the buffer that was meant to hold the result of the read.
   void HandleReadFinished(DiskQueue* disk_queue, RequestContext* reader,
-      std::unique_ptr<BufferDescriptor> buffer);
+      Status read_status, std::unique_ptr<BufferDescriptor> buffer);
 
   /// Invokes write_range->callback_  after the range has been written and
   /// updates per-disk state and handle state. The status of the write OK/RUNTIME_ERROR
@@ -534,13 +442,15 @@ class DiskIoMgr : public CacheLineAligned {
   /// Does not open or close the file that is written.
   Status WriteRangeHelper(FILE* file_handle, WriteRange* write_range) WARN_UNUSED_RESULT;
 
-  /// Reads the specified scan range and calls HandleReadFinished when done.
+  /// Reads the specified scan range and calls HandleReadFinished() when done.
   void ReadRange(DiskQueue* disk_queue, RequestContext* reader, ScanRange* range);
 
   /// Try to allocate the next buffer for the scan range, returning the new buffer
   /// if successful. If 'reader' is cancelled, cancels the range and returns nullptr.
   /// If there is memory pressure and buffers are already queued, adds the range
-  /// to the blocked ranges and returns nullptr.
+  /// to the blocked ranges and returns nullptr. If buffers are not queued and no more
+  /// buffers can be allocated, cancels the range with a MEM_LIMIT_EXCEEDED error and
+  /// returns nullptr.
   std::unique_ptr<BufferDescriptor> TryAllocateNextBufferForRange(DiskQueue* disk_queue,
       RequestContext* reader, ScanRange* range, int64_t buffer_size);
 };

http://git-wip-us.apache.org/repos/asf/impala/blob/3b3bf871/be/src/runtime/io/request-context.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.cc b/be/src/runtime/io/request-context.cc
index 287f53a..031b976 100644
--- a/be/src/runtime/io/request-context.cc
+++ b/be/src/runtime/io/request-context.cc
@@ -22,32 +22,100 @@
 using namespace impala;
 using namespace impala::io;
 
-void RequestContext::Cancel(const Status& status) {
-  DCHECK(!status.ok());
+BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr,
+    RequestContext* reader, ScanRange* scan_range, uint8_t* buffer,
+    int64_t buffer_len)
+  : io_mgr_(io_mgr),
+    reader_(reader),
+    scan_range_(scan_range),
+    buffer_(buffer),
+    buffer_len_(buffer_len) {
+  DCHECK(io_mgr != nullptr);
+  DCHECK(scan_range != nullptr);
+  DCHECK(buffer != nullptr);
+  DCHECK_GE(buffer_len, 0);
+}
+
+Status RequestContext::AllocBuffer(ScanRange* range, int64_t buffer_size,
+    unique_ptr<BufferDescriptor>* buffer_desc) {
+  DCHECK(range->external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER)
+      << static_cast<int>(range->external_buffer_tag_);
+  DCHECK_LE(buffer_size, parent_->max_buffer_size_);
+  DCHECK_GT(buffer_size, 0);
+  buffer_size = BitUtil::RoundUpToPowerOfTwo(
+      max(parent_->min_buffer_size_, min(parent_->max_buffer_size_, buffer_size)));
+
+  DCHECK(mem_tracker_ != nullptr);
+  if (!mem_tracker_->TryConsume(buffer_size)) {
+    return mem_tracker_->MemLimitExceeded(nullptr, "disk I/O buffer", buffer_size);
+  }
+
+  uint8_t* buffer = reinterpret_cast<uint8_t*>(malloc(buffer_size));
+  if (buffer == nullptr) {
+    mem_tracker_->Release(buffer_size);
+    return Status(TErrorCode::INTERNAL_ERROR,
+        Substitute("Could not malloc buffer of $0 bytes"));
+  }
+  buffer_desc->reset(new BufferDescriptor(parent_, this, range, buffer, buffer_size));
+  return Status::OK();
+}
+
+void RequestContext::FreeBuffer(BufferDescriptor* buffer) {
+  DCHECK(buffer->buffer_ != nullptr);
+  if (!buffer->is_cached() && !buffer->is_client_buffer()) {
+    // Only buffers that were not allocated by DiskIoMgr need to have memory freed.
+    free(buffer->buffer_);
+    mem_tracker_->Release(buffer->buffer_len_);
+  }
+  buffer->buffer_ = nullptr;
+}
 
+// Cancellation of a RequestContext requires coordination from multiple threads that may
+// hold references to the context:
+//  1. Disk threads that are currently processing a range for this context.
+//  2. Caller threads that are waiting in GetNext().
+//
+// Each thread that currently has a reference to the request context must notice the
+// cancel, cancel any pending operations involving the context and remove the contxt from
+// tracking structures. Once no more operations are pending on the context and no more
+// I/O mgr threads hold references to the context, the context can be marked inactive
+// (see CancelAndMarkInactive()), after which the owner of the context object can free
+// it.
+//
+// The steps are:
+// 1. Cancel() will immediately set the context in the Cancelled state. This prevents any
+// other thread from adding more ready buffers to the context (they all take a lock and
+// check the state before doing so), or any write ranges to the context.
+// 2. Cancel() will call Cancel() on each ScanRange that is not yet complete, unblocking
+// any threads in GetNext(). If there was no prior error for a scan range, any reads from
+// that scan range will return a CANCELLED Status. Cancel() also invokes callbacks for
+// all WriteRanges with a CANCELLED Status.
+// 3. Disk threads notice the context is cancelled either when picking the next context
+// to process or when they try to enqueue a ready buffer. Upon noticing the cancelled
+// state, removes the context from the disk queue. The last thread per disk then calls
+// DecrementDiskRefCount(). After the last disk thread has called DecrementDiskRefCount(),
+// cancellation is done and it is safe to unregister the context.
+void RequestContext::Cancel() {
   // Callbacks are collected in this vector and invoked while no lock is held.
   vector<WriteRange::WriteDoneCallback> write_callbacks;
   {
-    lock_guard<mutex> lock(lock_);
+    unique_lock<mutex> lock(lock_);
     DCHECK(Validate()) << endl << DebugString();
 
     // Already being cancelled
     if (state_ == RequestContext::Cancelled) return;
 
-    DCHECK(status_.ok());
-    status_ = status;
-
     // The reader will be put into a cancelled state until call cleanup is complete.
     state_ = RequestContext::Cancelled;
 
     // Cancel all scan ranges for this reader. Each range could be one one of
     // four queues.
     for (int i = 0; i < disk_states_.size(); ++i) {
-      RequestContext::PerDiskState& state = disk_states_[i];
-      RequestRange* range = NULL;
-      while ((range = state.in_flight_ranges()->Dequeue()) != NULL) {
+      PerDiskState& state = disk_states_[i];
+      RequestRange* range = nullptr;
+      while ((range = state.in_flight_ranges()->Dequeue()) != nullptr) {
         if (range->request_type() == RequestType::READ) {
-          static_cast<ScanRange*>(range)->Cancel(status);
+          static_cast<ScanRange*>(range)->Cancel(Status::CANCELLED);
         } else {
           DCHECK(range->request_type() == RequestType::WRITE);
           write_callbacks.push_back(static_cast<WriteRange*>(range)->callback_);
@@ -55,36 +123,36 @@ void RequestContext::Cancel(const Status& status) {
       }
 
       ScanRange* scan_range;
-      while ((scan_range = state.unstarted_scan_ranges()->Dequeue()) != NULL) {
-        scan_range->Cancel(status);
+      while ((scan_range = state.unstarted_scan_ranges()->Dequeue()) != nullptr) {
+        scan_range->Cancel(Status::CANCELLED);
       }
       WriteRange* write_range;
-      while ((write_range = state.unstarted_write_ranges()->Dequeue()) != NULL) {
+      while ((write_range = state.unstarted_write_ranges()->Dequeue()) != nullptr) {
         write_callbacks.push_back(write_range->callback_);
       }
     }
 
-    ScanRange* range = NULL;
-    while ((range = ready_to_start_ranges_.Dequeue()) != NULL) {
-      range->Cancel(status);
+    ScanRange* range = nullptr;
+    while ((range = ready_to_start_ranges_.Dequeue()) != nullptr) {
+      range->Cancel(Status::CANCELLED);
     }
-    while ((range = blocked_ranges_.Dequeue()) != NULL) {
-      range->Cancel(status);
+    while ((range = blocked_ranges_.Dequeue()) != nullptr) {
+      range->Cancel(Status::CANCELLED);
     }
-    while ((range = cached_ranges_.Dequeue()) != NULL) {
-      range->Cancel(status);
+    while ((range = cached_ranges_.Dequeue()) != nullptr) {
+      range->Cancel(Status::CANCELLED);
     }
 
-    // Schedule reader on all disks. The disks will notice it is cancelled and do any
-    // required cleanup
+    // Ensure that the reader is scheduled on all disks (it may already be scheduled on
+    // some). The disk threads will notice that the context is cancelled and do any
+    // required cleanup for the disk state.
     for (int i = 0; i < disk_states_.size(); ++i) {
-      RequestContext::PerDiskState& state = disk_states_[i];
-      state.ScheduleContext(this, i);
+      disk_states_[i].ScheduleContext(lock, this, i);
     }
   }
 
   for (const WriteRange::WriteDoneCallback& write_callback: write_callbacks) {
-    write_callback(status_);
+    write_callback(Status::CANCELLED);
   }
 
   // Signal reader and unblock the GetNext/Read thread.  That read will fail with
@@ -93,7 +161,7 @@ void RequestContext::Cancel(const Status& status) {
 }
 
 void RequestContext::CancelAndMarkInactive() {
-  Cancel(Status::CANCELLED);
+  Cancel();
 
   boost::unique_lock<boost::mutex> l(lock_);
   DCHECK_NE(state_, Inactive);
@@ -105,14 +173,22 @@ void RequestContext::CancelAndMarkInactive() {
   // Validate that no buffers were leaked from this context.
   DCHECK_EQ(num_buffers_in_reader_.Load(), 0) << endl << DebugString();
   DCHECK_EQ(num_used_buffers_.Load(), 0) << endl << DebugString();
+
+  // Validate that no threads are active and the context is not queued.
+  for (const PerDiskState& disk_state : disk_states_) {
+    DCHECK_EQ(0, disk_state.in_flight_ranges()->size()) << endl << DebugString();
+    DCHECK_EQ(0, disk_state.unstarted_scan_ranges()->size()) << endl << DebugString();
+    DCHECK_EQ(0, disk_state.num_threads_in_op()) << endl << DebugString();
+    DCHECK(!disk_state.is_on_queue()) << endl << DebugString();
+  }
   DCHECK(Validate()) << endl << DebugString();
   state_ = Inactive;
 }
 
-void RequestContext::AddRequestRange(
+void RequestContext::AddRequestRange(const unique_lock<mutex>& lock,
     RequestRange* range, bool schedule_immediately) {
-  // DCHECK(lock_.is_locked()); // TODO: boost should have this API
-  RequestContext::PerDiskState& state = disk_states_[range->disk_id()];
+  DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
+  PerDiskState& state = disk_states_[range->disk_id()];
   if (state.done()) {
     DCHECK_EQ(state.num_remaining_ranges(), 0);
     state.set_done(false);
@@ -123,7 +199,7 @@ void RequestContext::AddRequestRange(
   if (range->request_type() == RequestType::READ) {
     ScanRange* scan_range = static_cast<ScanRange*>(range);
     if (schedule_immediately) {
-      ScheduleScanRange(scan_range);
+      ScheduleScanRange(lock, scan_range);
     } else {
       state.unstarted_scan_ranges()->Enqueue(scan_range);
       num_unstarted_scan_ranges_.Add(1);
@@ -143,7 +219,7 @@ void RequestContext::AddRequestRange(
     schedule_context = true;
   }
 
-  if (schedule_context) state.ScheduleContext(this, range->disk_id());
+  if (schedule_context) state.ScheduleContext(lock, this, range->disk_id());
   ++state.num_remaining_ranges();
 }
 
@@ -159,8 +235,7 @@ string RequestContext::DebugString() const {
   if (state_ == RequestContext::Cancelled) ss << "Cancelled";
   if (state_ == RequestContext::Active) ss << "Active";
   if (state_ != RequestContext::Inactive) {
-    ss << " status_=" << (status_.ok() ? "OK" : status_.GetDetail())
-       << " #ready_buffers=" << num_ready_buffers_.Load()
+    ss << " #ready_buffers=" << num_ready_buffers_.Load()
        << " #used_buffers=" << num_used_buffers_.Load()
        << " #num_buffers_in_reader=" << num_buffers_in_reader_.Load()
        << " #finished_scan_ranges=" << num_finished_ranges_.Load()
@@ -284,10 +359,11 @@ bool RequestContext::Validate() const {
   return true;
 }
 
-void RequestContext::PerDiskState::ScheduleContext(
+void RequestContext::PerDiskState::ScheduleContext(const unique_lock<mutex>& context_lock,
     RequestContext* context, int disk_id) {
-  if (!is_on_queue_ && !done_) {
-    is_on_queue_ = true;
+  DCHECK(context_lock.mutex() == &context->lock_ && context_lock.owns_lock());
+  if (is_on_queue_.Load() == 0 && !done_) {
+    is_on_queue_.Store(1);
     context->parent_->disk_queues_[disk_id]->EnqueueContext(context);
   }
 }