You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by bh...@apache.org on 2017/09/15 04:33:39 UTC

[2/4] incubator-impala git commit: IMPALA-5417: make I/O buffer queue fixed-size

IMPALA-5417: make I/O buffer queue fixed-size

This removes the dynamically-varying queue size behaviour in the I/O
manager. The motivation is to bound resource consumption of scans
and make it possible to reserve memory for I/O buffers upfront.

Does some cleanup/documentation of the locking policy. Fix some cases
in ScanRange::GetNext() where members documented as being protected by
ScanRange::lock_ were accessed without holding it. I think the races
were either benign or prevented by holding DiskIoRequestContext::lock_
in practice.

Testing:
Ran exhaustive build.

Perf:
Ran the full set of workloads (TPC-H, TPC-DS, targeted) on a 16 node
cluster. Everything was within normal variance.

Change-Id: If7cc3f7199f5320db00b7face97a96cdadb6f83f
Reviewed-on: http://gerrit.cloudera.org:8080/7408
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/50d603d3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/50d603d3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/50d603d3

Branch: refs/heads/master
Commit: 50d603d306c58ef42605eeb715356e1003dedeb4
Parents: 3e73ce2
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Jun 14 08:43:58 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Fri Sep 15 01:59:29 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/disk-io-mgr-internal.h        |  23 +-
 be/src/runtime/disk-io-mgr-reader-context.cc |   2 -
 be/src/runtime/disk-io-mgr-scan-range.cc     |  70 ++--
 be/src/runtime/disk-io-mgr-test.cc           | 445 ++++++++++------------
 be/src/runtime/disk-io-mgr.cc                |  10 +-
 be/src/runtime/disk-io-mgr.h                 |  61 +--
 6 files changed, 279 insertions(+), 332 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50d603d3/be/src/runtime/disk-io-mgr-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-internal.h b/be/src/runtime/disk-io-mgr-internal.h
index 30bcd60..a9acca4 100644
--- a/be/src/runtime/disk-io-mgr-internal.h
+++ b/be/src/runtime/disk-io-mgr-internal.h
@@ -180,17 +180,13 @@ class DiskIoRequestContext {
     state.ScheduleContext(this, range->disk_id());
   }
 
-  /// Cancels the context with status code 'status'.
+  /// Cancels the context with status code 'status'
   void Cancel(const Status& status);
 
   /// Adds request range to disk queue for this request context. Currently,
   /// schedule_immediately must be false is RequestRange is a write range.
   void AddRequestRange(RequestRange* range, bool schedule_immediately);
 
-  /// Returns the default queue capacity for scan ranges. This is updated
-  /// as the reader processes ranges.
-  int initial_scan_range_queue_capacity() const { return initial_queue_capacity_; }
-
   /// Validates invariants of reader.  Reader lock must be taken beforehand.
   bool Validate() const;
 
@@ -265,22 +261,9 @@ class DiskIoRequestContext {
   /// This is the sum of all queued buffers in all ranges for this reader context.
   AtomicInt32 num_ready_buffers_;
 
-  /// The total (sum) of queue capacities for finished scan ranges. This value
-  /// divided by num_finished_ranges_ is the average for finished ranges and
-  /// used to seed the starting queue capacity for future ranges. The assumption
-  /// is that if previous ranges were fast, new ones will be fast too. The scan
-  /// range adjusts the queue capacity dynamically so a rough approximation will do.
-  AtomicInt32 total_range_queue_capacity_;
-
-  /// The initial queue size for new scan ranges. This is always
-  /// total_range_queue_capacity_ / num_finished_ranges_ but stored as a separate
-  /// variable to allow reading this value without taking a lock. Doing the division
-  /// at read time (with no lock) could lead to a race where only
-  /// total_range_queue_capacity_ or num_finished_ranges_ was updated.
-  int initial_queue_capacity_;
-
   /// All fields below are accessed by multiple threads and the lock needs to be
-  /// taken before accessing them.
+  /// taken before accessing them. Must be acquired before ScanRange::lock_ if both
+  /// are held simultaneously.
   boost::mutex lock_;
 
   /// Current state of the reader

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50d603d3/be/src/runtime/disk-io-mgr-reader-context.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-reader-context.cc b/be/src/runtime/disk-io-mgr-reader-context.cc
index 6f3fe77..77f332a 100644
--- a/be/src/runtime/disk-io-mgr-reader-context.cc
+++ b/be/src/runtime/disk-io-mgr-reader-context.cc
@@ -157,7 +157,6 @@ void DiskIoRequestContext::Reset(MemTracker* tracker) {
   num_used_buffers_.Store(0);
   num_buffers_in_reader_.Store(0);
   num_ready_buffers_.Store(0);
-  total_range_queue_capacity_.Store(0);
   num_finished_ranges_.Store(0);
   num_remote_ranges_.Store(0);
   bytes_read_local_.Store(0);
@@ -166,7 +165,6 @@ void DiskIoRequestContext::Reset(MemTracker* tracker) {
   unexpected_remote_bytes_.Store(0);
   cached_file_handles_hit_count_.Store(0);
   cached_file_handles_miss_count_.Store(0);
-  initial_queue_capacity_ = DiskIoMgr::DEFAULT_QUEUE_CAPACITY;
 
   DCHECK(ready_to_start_ranges_.empty());
   DCHECK(blocked_ranges_.empty());

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50d603d3/be/src/runtime/disk-io-mgr-scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-scan-range.cc b/be/src/runtime/disk-io-mgr-scan-range.cc
index 8ed5138..c5c9514 100644
--- a/be/src/runtime/disk-io-mgr-scan-range.cc
+++ b/be/src/runtime/disk-io-mgr-scan-range.cc
@@ -24,11 +24,6 @@
 
 using namespace impala;
 
-// A very large max value to prevent things from going out of control. Not
-// expected to ever hit this value (1GB of buffered data per range).
-const int MAX_QUEUE_CAPACITY = 128;
-const int MIN_QUEUE_CAPACITY = 2;
-
 DEFINE_bool(use_hdfs_pread, false, "Enables using hdfsPread() instead of hdfsRead() "
     "when performing HDFS read operations. This is necessary to use HDFS hedged reads "
     "(assuming the HDFS client is configured to do so).");
@@ -46,8 +41,9 @@ DEFINE_int64(adls_read_chunk_size, 128 * 1024, "The maximum read chunk size to u
 // any time and only one thread will remove from the queue. This is to guarantee
 // that buffers are queued and read in file order.
 
-// This must be called with the reader lock taken.
-bool DiskIoMgr::ScanRange::EnqueueBuffer(unique_ptr<BufferDescriptor> buffer) {
+bool DiskIoMgr::ScanRange::EnqueueBuffer(
+    const unique_lock<mutex>& reader_lock, unique_ptr<BufferDescriptor> buffer) {
+  DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock());
   {
     unique_lock<mutex> scan_range_lock(lock_);
     DCHECK(Validate()) << DebugString();
@@ -67,13 +63,8 @@ bool DiskIoMgr::ScanRange::EnqueueBuffer(unique_ptr<BufferDescriptor> buffer) {
     eosr_queued_ = buffer->eosr();
     ready_buffers_.emplace_back(move(buffer));
 
-    blocked_on_queue_ = ready_buffers_.size() >= ready_buffers_capacity_;
-    if (blocked_on_queue_ && ready_buffers_capacity_ > MIN_QUEUE_CAPACITY) {
-      // We have filled the queue, indicating we need back pressure on
-      // the producer side (i.e. we are pushing buffers faster than they
-      // are pulled off, throttle this range more).
-      --ready_buffers_capacity_;
-    }
+    DCHECK_LE(ready_buffers_.size(), SCAN_RANGE_READY_BUFFER_LIMIT);
+    blocked_on_queue_ = ready_buffers_.size() == SCAN_RANGE_READY_BUFFER_LIMIT;
   }
 
   buffer_ready_cv_.notify_one();
@@ -83,18 +74,12 @@ bool DiskIoMgr::ScanRange::EnqueueBuffer(unique_ptr<BufferDescriptor> buffer) {
 
 Status DiskIoMgr::ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) {
   DCHECK(*buffer == nullptr);
+  bool eosr;
   {
     unique_lock<mutex> scan_range_lock(lock_);
     if (eosr_returned_) return Status::OK();
     DCHECK(Validate()) << DebugString();
 
-    if (ready_buffers_.empty()) {
-      // The queue is empty indicating this thread could use more
-      // IO. Increase the capacity to allow for more queueing.
-      ++ready_buffers_capacity_ ;
-      ready_buffers_capacity_ = ::min(ready_buffers_capacity_, MAX_QUEUE_CAPACITY);
-    }
-
     while (ready_buffers_.empty() && !is_cancelled_) {
       buffer_ready_cv_.wait(scan_range_lock);
     }
@@ -106,9 +91,11 @@ Status DiskIoMgr::ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) {
 
     // Remove the first ready buffer from the queue and return it
     DCHECK(!ready_buffers_.empty());
+    DCHECK_LE(ready_buffers_.size(), SCAN_RANGE_READY_BUFFER_LIMIT);
     *buffer = move(ready_buffers_.front());
     ready_buffers_.pop_front();
     eosr_returned_ = (*buffer)->eosr();
+    eosr = (*buffer)->eosr();
   }
 
   // Update tracking counters. The buffer has now moved from the IoMgr to the
@@ -117,6 +104,7 @@ Status DiskIoMgr::ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) {
   reader_->num_buffers_in_reader_.Add(1);
   reader_->num_ready_buffers_.Add(-1);
   reader_->num_used_buffers_.Add(-1);
+  if (eosr) reader_->num_finished_ranges_.Add(1);
 
   Status status = (*buffer)->status_;
   if (!status.ok()) {
@@ -125,12 +113,6 @@ Status DiskIoMgr::ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) {
   }
 
   unique_lock<mutex> reader_lock(reader_->lock_);
-  if (eosr_returned_) {
-    reader_->total_range_queue_capacity_.Add(ready_buffers_capacity_);
-    reader_->num_finished_ranges_.Add(1);
-    reader_->initial_queue_capacity_ = reader_->total_range_queue_capacity_.Load() /
-        reader_->num_finished_ranges_.Load();
-  }
 
   DCHECK(reader_->Validate()) << endl << reader_->DebugString();
   if (reader_->state_ == DiskIoRequestContext::Cancelled) {
@@ -140,13 +122,19 @@ Status DiskIoMgr::ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) {
     return status_;
   }
 
-  bool was_blocked = blocked_on_queue_;
-  blocked_on_queue_ = ready_buffers_.size() >= ready_buffers_capacity_;
-  if (was_blocked && !blocked_on_queue_ && !eosr_queued_) {
-    // This scan range was blocked and is no longer, add it to the reader
-    // queue again.
-    reader_->blocked_ranges_.Remove(this);
-    reader_->ScheduleScanRange(this);
+  {
+    // Check to see if we can re-schedule a blocked range. Note that EnqueueBuffer()
+    // may have been called after we released 'lock_' above so we need to re-check
+    // whether the queue is full.
+    unique_lock<mutex> scan_range_lock(lock_);
+    if (blocked_on_queue_ && ready_buffers_.size() < SCAN_RANGE_READY_BUFFER_LIMIT
+        && !eosr_queued_) {
+      blocked_on_queue_ = false;
+      // This scan range was blocked and is no longer, add it to the reader
+      // queue again.
+      reader_->blocked_ranges_.Remove(this);
+      reader_->ScheduleScanRange(this);
+    }
   }
   return Status::OK();
 }
@@ -191,7 +179,6 @@ string DiskIoMgr::ScanRange::DebugString() const {
   ss << "file=" << file_ << " disk_id=" << disk_id_ << " offset=" << offset_
      << " len=" << len_ << " bytes_read=" << bytes_read_
      << " buffer_queue=" << ready_buffers_.size()
-     << " capacity=" << ready_buffers_capacity_
      << " hdfs_file=" << exclusive_hdfs_fh_;
   return ss.str();
 }
@@ -211,11 +198,10 @@ bool DiskIoMgr::ScanRange::Validate() {
   return true;
 }
 
-DiskIoMgr::ScanRange::ScanRange(int capacity)
+DiskIoMgr::ScanRange::ScanRange()
   : RequestRange(RequestType::READ),
     num_remote_bytes_(0),
     external_buffer_tag_(ExternalBufferTag::NO_BUFFER),
-    ready_buffers_capacity_(capacity),
     mtime_(-1) {}
 
 DiskIoMgr::ScanRange::~ScanRange() {
@@ -269,10 +255,6 @@ void DiskIoMgr::ScanRange::InitInternal(DiskIoMgr* io_mgr, DiskIoRequestContext*
   eosr_queued_= false;
   eosr_returned_= false;
   blocked_on_queue_ = false;
-  if (ready_buffers_capacity_ <= 0) {
-    ready_buffers_capacity_ = reader->initial_scan_range_queue_capacity();
-    DCHECK_GE(ready_buffers_capacity_, MIN_QUEUE_CAPACITY);
-  }
   DCHECK(Validate()) << DebugString();
 }
 
@@ -525,7 +507,9 @@ Status DiskIoMgr::ScanRange::Read(
   return Status::OK();
 }
 
-Status DiskIoMgr::ScanRange::ReadFromCache(bool* read_succeeded) {
+Status DiskIoMgr::ScanRange::ReadFromCache(
+    const unique_lock<mutex>& reader_lock, bool* read_succeeded) {
+  DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock());
   DCHECK(try_cache_);
   DCHECK_EQ(bytes_read_, 0);
   *read_succeeded = false;
@@ -580,7 +564,7 @@ Status DiskIoMgr::ScanRange::ReadFromCache(bool* read_succeeded) {
   desc->scan_range_offset_ = 0;
   desc->eosr_ = true;
   bytes_read_ = bytes_read;
-  EnqueueBuffer(move(desc));
+  EnqueueBuffer(reader_lock, move(desc));
   if (reader_->bytes_read_counter_ != nullptr) {
     COUNTER_ADD(reader_->bytes_read_counter_, bytes_read);
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50d603d3/be/src/runtime/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr-test.cc b/be/src/runtime/disk-io-mgr-test.cc
index 7c60efa..a6a719f 100644
--- a/be/src/runtime/disk-io-mgr-test.cc
+++ b/be/src/runtime/disk-io-mgr-test.cc
@@ -48,12 +48,10 @@ namespace impala {
 class DiskIoMgrTest : public testing::Test {
  public:
 
-  virtual void SetUp() {
-    pool_.reset(new ObjectPool);
-  }
+  virtual void SetUp() {}
 
   virtual void TearDown() {
-    pool_.reset();
+    pool_.Clear();
   }
   void WriteValidateCallback(int num_writes, DiskIoMgr::WriteRange** written_range,
       DiskIoMgr* io_mgr, DiskIoRequestContext* reader, int32_t* data,
@@ -64,7 +62,7 @@ class DiskIoMgrTest : public testing::Test {
       EXPECT_EQ(status.code(), expected_status.code());
     }
     if (status.ok()) {
-      DiskIoMgr::ScanRange* scan_range = pool_->Add(new DiskIoMgr::ScanRange());
+      DiskIoMgr::ScanRange* scan_range = pool_.Add(new DiskIoMgr::ScanRange());
       scan_range->Reset(NULL, (*written_range)->file(), (*written_range)->len(),
           (*written_range)->offset(), 0, false, DiskIoMgr::BufferOpts::Uncached());
       ValidateSyncRead(io_mgr, reader, scan_range, reinterpret_cast<const char*>(data),
@@ -165,21 +163,20 @@ class DiskIoMgrTest : public testing::Test {
     }
   }
 
-  DiskIoMgr::ScanRange* AllocateRange(int num_buffers) {
-    return pool_->Add(new DiskIoMgr::ScanRange(num_buffers));
+  DiskIoMgr::ScanRange* AllocateRange() {
+    return pool_.Add(new DiskIoMgr::ScanRange);
   }
 
-  DiskIoMgr::ScanRange* InitRange(int num_buffers, const char* file_path, int offset,
-      int len, int disk_id, int64_t mtime, void* meta_data = NULL,
-      bool is_cached = false) {
-    DiskIoMgr::ScanRange* range = AllocateRange(num_buffers);
+  DiskIoMgr::ScanRange* InitRange(const char* file_path, int offset, int len,
+      int disk_id, int64_t mtime, void* meta_data = NULL, bool is_cached = false) {
+    DiskIoMgr::ScanRange* range = AllocateRange();
     range->Reset(NULL, file_path, len, offset, disk_id, true,
         DiskIoMgr::BufferOpts(is_cached, mtime), meta_data);
     EXPECT_EQ(mtime, range->mtime());
     return range;
   }
 
-  scoped_ptr<ObjectPool> pool_;
+  ObjectPool pool_;
 
   mutex written_mutex_;
   condition_variable writes_done_;
@@ -211,19 +208,19 @@ TEST_F(DiskIoMgrTest, SingleWriter) {
   read_io_mgr->RegisterContext(&reader, &reader_mem_tracker);
   for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
     for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
-      pool_.reset(new ObjectPool);
+      pool_.Clear(); // Destroy scan ranges from previous iterations.
       DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 10);
       ASSERT_OK(io_mgr.Init(&mem_tracker));
       DiskIoRequestContext* writer;
       io_mgr.RegisterContext(&writer, &mem_tracker);
       for (int i = 0; i < num_ranges; ++i) {
-        int32_t* data = pool_->Add(new int32_t);
+        int32_t* data = pool_.Add(new int32_t);
         *data = rand();
-        DiskIoMgr::WriteRange** new_range = pool_->Add(new DiskIoMgr::WriteRange*);
+        DiskIoMgr::WriteRange** new_range = pool_.Add(new DiskIoMgr::WriteRange*);
         DiskIoMgr::WriteRange::WriteDoneCallback callback =
             bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, num_ranges,
                 new_range, read_io_mgr.get(), reader, data, Status::OK(), _1);
-        *new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, cur_offset,
+        *new_range = pool_.Add(new DiskIoMgr::WriteRange(tmp_file, cur_offset,
             num_ranges % num_disks, callback));
         (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
         EXPECT_OK(io_mgr.AddWriteRange(writer, *new_range));
@@ -253,16 +250,16 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
   ASSERT_OK(io_mgr.Init(&mem_tracker));
   DiskIoRequestContext* writer;
   io_mgr.RegisterContext(&writer, NULL);
-  int32_t* data = pool_->Add(new int32_t);
+  int32_t* data = pool_.Add(new int32_t);
   *data = rand();
 
   // Write to file in non-existent directory.
-  DiskIoMgr::WriteRange** new_range = pool_->Add(new DiskIoMgr::WriteRange*);
+  DiskIoMgr::WriteRange** new_range = pool_.Add(new DiskIoMgr::WriteRange*);
   DiskIoMgr::WriteRange::WriteDoneCallback callback =
       bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2, new_range,
           (DiskIoMgr*)NULL, (DiskIoRequestContext*)NULL, data,
           Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1);
-  *new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, rand(), 0, callback));
+  *new_range = pool_.Add(new DiskIoMgr::WriteRange(tmp_file, rand(), 0, callback));
 
   (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
   EXPECT_OK(io_mgr.AddWriteRange(writer, *new_range));
@@ -275,12 +272,12 @@ TEST_F(DiskIoMgrTest, InvalidWrite) {
     EXPECT_TRUE(false);
   }
 
-  new_range = pool_->Add(new DiskIoMgr::WriteRange*);
+  new_range = pool_.Add(new DiskIoMgr::WriteRange*);
   callback = bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this, 2,
       new_range, (DiskIoMgr*)NULL, (DiskIoRequestContext*)NULL,
       data, Status(TErrorCode::DISK_IO_ERROR, "Test Failure"), _1);
 
-  *new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, -1, 0, callback));
+  *new_range = pool_.Add(new DiskIoMgr::WriteRange(tmp_file, -1, 0, callback));
   (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
   EXPECT_OK(io_mgr.AddWriteRange(writer, *new_range));
 
@@ -317,7 +314,7 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) {
   read_io_mgr->RegisterContext(&reader, &reader_mem_tracker);
   for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
     for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
-      pool_.reset(new ObjectPool);
+      pool_.Clear(); // Destroy scan ranges from previous iterations.
       DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 10);
       ASSERT_OK(io_mgr.Init(&mem_tracker));
       DiskIoRequestContext* writer;
@@ -328,14 +325,14 @@ TEST_F(DiskIoMgrTest, SingleWriterCancel) {
           io_mgr.CancelContext(writer);
           validate_status = Status::CANCELLED;
         }
-        int32_t* data = pool_->Add(new int32_t);
+        int32_t* data = pool_.Add(new int32_t);
         *data = rand();
-        DiskIoMgr::WriteRange** new_range = pool_->Add(new DiskIoMgr::WriteRange*);
+        DiskIoMgr::WriteRange** new_range = pool_.Add(new DiskIoMgr::WriteRange*);
         DiskIoMgr::WriteRange::WriteDoneCallback callback =
             bind(mem_fn(&DiskIoMgrTest::WriteValidateCallback), this,
                 num_ranges_before_cancel, new_range, read_io_mgr.get(), reader, data,
                 Status::CANCELLED, _1);
-        *new_range = pool_->Add(new DiskIoMgr::WriteRange(tmp_file, cur_offset,
+        *new_range = pool_.Add(new DiskIoMgr::WriteRange(tmp_file, cur_offset,
             num_ranges % num_disks, callback));
         (*new_range)->SetData(reinterpret_cast<uint8_t*>(data), sizeof(int32_t));
         cur_offset += sizeof(int32_t);
@@ -372,41 +369,39 @@ TEST_F(DiskIoMgrTest, SingleReader) {
   int64_t iters = 0;
   for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
     for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
-      for (int num_buffers = 1; num_buffers <= 5; ++num_buffers) {
-        for (int num_read_threads = 1; num_read_threads <= 5; ++num_read_threads) {
-          pool_.reset(new ObjectPool);
-          LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
-                    << " num_disk=" << num_disks << " num_buffers=" << num_buffers
-                    << " num_read_threads=" << num_read_threads;
-
-          if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
-          DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
+      for (int num_read_threads = 1; num_read_threads <= 5; ++num_read_threads) {
+        ObjectPool pool;
+        LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
+                  << " num_disk=" << num_disks
+                  << " num_read_threads=" << num_read_threads;
 
-          ASSERT_OK(io_mgr.Init(&mem_tracker));
-          MemTracker reader_mem_tracker;
-          DiskIoRequestContext* reader;
-          io_mgr.RegisterContext(&reader, &reader_mem_tracker);
+        if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
+        DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
 
-          vector<DiskIoMgr::ScanRange*> ranges;
-          for (int i = 0; i < len; ++i) {
-            int disk_id = i % num_disks;
-            ranges.push_back(InitRange(num_buffers, tmp_file, 0, len, disk_id,
-                stat_val.st_mtime));
-          }
-          ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
+        ASSERT_OK(io_mgr.Init(&mem_tracker));
+        MemTracker reader_mem_tracker;
+        DiskIoRequestContext* reader;
+        io_mgr.RegisterContext(&reader, &reader_mem_tracker);
 
-          AtomicInt32 num_ranges_processed;
-          thread_group threads;
-          for (int i = 0; i < num_read_threads; ++i) {
-            threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
-                len, Status::OK(), 0, &num_ranges_processed));
-          }
-          threads.join_all();
+        vector<DiskIoMgr::ScanRange*> ranges;
+        for (int i = 0; i < len; ++i) {
+          int disk_id = i % num_disks;
+          ranges.push_back(
+              InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime));
+        }
+        ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
 
-          EXPECT_EQ(num_ranges_processed.Load(), ranges.size());
-          io_mgr.UnregisterContext(reader);
-          EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+        AtomicInt32 num_ranges_processed;
+        thread_group threads;
+        for (int i = 0; i < num_read_threads; ++i) {
+          threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
+              len, Status::OK(), 0, &num_ranges_processed));
         }
+        threads.join_all();
+
+        EXPECT_EQ(num_ranges_processed.Load(), ranges.size());
+        io_mgr.UnregisterContext(reader);
+        EXPECT_EQ(reader_mem_tracker.consumption(), 0);
       }
     }
   }
@@ -428,56 +423,53 @@ TEST_F(DiskIoMgrTest, AddScanRangeTest) {
   int64_t iters = 0;
   for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
     for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
-      for (int num_buffers = 1; num_buffers <= 5; ++num_buffers) {
-        pool_.reset(new ObjectPool);
-        LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
-                  << " num_disk=" << num_disks << " num_buffers=" << num_buffers;
-
-        if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
-        DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
+      pool_.Clear(); // Destroy scan ranges from previous iterations.
+      LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
+                << " num_disk=" << num_disks;
 
-        ASSERT_OK(io_mgr.Init(&mem_tracker));
-        MemTracker reader_mem_tracker;
-        DiskIoRequestContext* reader;
-        io_mgr.RegisterContext(&reader, &reader_mem_tracker);
+      if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
+      DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
 
-        vector<DiskIoMgr::ScanRange*> ranges_first_half;
-        vector<DiskIoMgr::ScanRange*> ranges_second_half;
-        for (int i = 0; i < len; ++i) {
-          int disk_id = i % num_disks;
-          if (i > len / 2) {
-            ranges_second_half.push_back(
-                InitRange(num_buffers, tmp_file, i, 1, disk_id,
-                stat_val.st_mtime));
-          } else {
-            ranges_first_half.push_back(InitRange(num_buffers, tmp_file, i, 1, disk_id,
-                stat_val.st_mtime));
-          }
+      ASSERT_OK(io_mgr.Init(&mem_tracker));
+      MemTracker reader_mem_tracker;
+      DiskIoRequestContext* reader;
+      io_mgr.RegisterContext(&reader, &reader_mem_tracker);
+
+      vector<DiskIoMgr::ScanRange*> ranges_first_half;
+      vector<DiskIoMgr::ScanRange*> ranges_second_half;
+      for (int i = 0; i < len; ++i) {
+        int disk_id = i % num_disks;
+        if (i > len / 2) {
+          ranges_second_half.push_back(
+              InitRange(tmp_file, i, 1, disk_id, stat_val.st_mtime));
+        } else {
+          ranges_first_half.push_back(
+              InitRange(tmp_file, i, 1, disk_id, stat_val.st_mtime));
         }
-        AtomicInt32 num_ranges_processed;
-
-        // Issue first half the scan ranges.
-        ASSERT_OK(io_mgr.AddScanRanges(reader, ranges_first_half));
+      }
+      AtomicInt32 num_ranges_processed;
 
-        // Read a couple of them
-        ScanRangeThread(&io_mgr, reader, data, strlen(data), Status::OK(), 2,
-            &num_ranges_processed);
+      // Issue first half the scan ranges.
+      ASSERT_OK(io_mgr.AddScanRanges(reader, ranges_first_half));
 
-        // Issue second half
-        ASSERT_OK(io_mgr.AddScanRanges(reader, ranges_second_half));
+      // Read a couple of them
+      ScanRangeThread(&io_mgr, reader, data, strlen(data), Status::OK(), 2,
+          &num_ranges_processed);
 
-        // Start up some threads and then cancel
-        thread_group threads;
-        for (int i = 0; i < 3; ++i) {
-          threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
-              strlen(data), Status::CANCELLED, 0, &num_ranges_processed));
-        }
+      // Issue second half
+      ASSERT_OK(io_mgr.AddScanRanges(reader, ranges_second_half));
 
-        threads.join_all();
-        EXPECT_EQ(num_ranges_processed.Load(), len);
-        io_mgr.UnregisterContext(reader);
-        EXPECT_EQ(reader_mem_tracker.consumption(), 0);
+      // Start up some threads and then cancel
+      thread_group threads;
+      for (int i = 0; i < 3; ++i) {
+        threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
+            strlen(data), Status::CANCELLED, 0, &num_ranges_processed));
       }
+
+      threads.join_all();
+      EXPECT_EQ(num_ranges_processed.Load(), len);
+      io_mgr.UnregisterContext(reader);
+      EXPECT_EQ(reader_mem_tracker.consumption(), 0);
     }
   }
   EXPECT_EQ(mem_tracker.consumption(), 0);
@@ -500,57 +492,55 @@ TEST_F(DiskIoMgrTest, SyncReadTest) {
   int64_t iters = 0;
   for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
     for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
-      for (int num_buffers = 1; num_buffers <= 5; ++num_buffers) {
-        pool_.reset(new ObjectPool);
-        LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
-                  << " num_disk=" << num_disks << " num_buffers=" << num_buffers;
+      pool_.Clear(); // Destroy scan ranges from previous iterations.
+      LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
+                << " num_disk=" << num_disks;
 
-        if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
-        DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk,
-            MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
+      if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
+      DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk,
+          MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
 
-        ASSERT_OK(io_mgr.Init(&mem_tracker));
-        MemTracker reader_mem_tracker;
-        DiskIoRequestContext* reader;
-        io_mgr.RegisterContext(&reader, &reader_mem_tracker);
+      ASSERT_OK(io_mgr.Init(&mem_tracker));
+      MemTracker reader_mem_tracker;
+      DiskIoRequestContext* reader;
+      io_mgr.RegisterContext(&reader, &reader_mem_tracker);
 
-        DiskIoMgr::ScanRange* complete_range = InitRange(1, tmp_file, 0, strlen(data), 0,
-            stat_val.st_mtime);
+      DiskIoMgr::ScanRange* complete_range = InitRange(tmp_file, 0, strlen(data), 0,
+          stat_val.st_mtime);
 
-        // Issue some reads before the async ones are issued
-        ValidateSyncRead(&io_mgr, reader, complete_range, data);
-        ValidateSyncRead(&io_mgr, reader, complete_range, data);
+      // Issue some reads before the async ones are issued
+      ValidateSyncRead(&io_mgr, reader, complete_range, data);
+      ValidateSyncRead(&io_mgr, reader, complete_range, data);
 
-        vector<DiskIoMgr::ScanRange*> ranges;
-        for (int i = 0; i < len; ++i) {
-          int disk_id = i % num_disks;
-          ranges.push_back(InitRange(num_buffers, tmp_file, 0, len, disk_id,
-              stat_val.st_mtime));
-        }
-        ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
+      vector<DiskIoMgr::ScanRange*> ranges;
+      for (int i = 0; i < len; ++i) {
+        int disk_id = i % num_disks;
+        ranges.push_back(InitRange(tmp_file, 0, len, disk_id,
+            stat_val.st_mtime));
+      }
+      ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
 
-        AtomicInt32 num_ranges_processed;
-        thread_group threads;
-        for (int i = 0; i < 5; ++i) {
-          threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
-              strlen(data), Status::OK(), 0, &num_ranges_processed));
-        }
+      AtomicInt32 num_ranges_processed;
+      thread_group threads;
+      for (int i = 0; i < 5; ++i) {
+        threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
+            strlen(data), Status::OK(), 0, &num_ranges_processed));
+      }
 
-        // Issue some more sync ranges
-        for (int i = 0; i < 5; ++i) {
-          sched_yield();
-          ValidateSyncRead(&io_mgr, reader, complete_range, data);
-        }
+      // Issue some more sync ranges
+      for (int i = 0; i < 5; ++i) {
+        sched_yield();
+        ValidateSyncRead(&io_mgr, reader, complete_range, data);
+      }
 
-        threads.join_all();
+      threads.join_all();
 
-        ValidateSyncRead(&io_mgr, reader, complete_range, data);
-        ValidateSyncRead(&io_mgr, reader, complete_range, data);
+      ValidateSyncRead(&io_mgr, reader, complete_range, data);
+      ValidateSyncRead(&io_mgr, reader, complete_range, data);
 
-        EXPECT_EQ(num_ranges_processed.Load(), ranges.size());
-        io_mgr.UnregisterContext(reader);
-        EXPECT_EQ(reader_mem_tracker.consumption(), 0);
-      }
+      EXPECT_EQ(num_ranges_processed.Load(), ranges.size());
+      io_mgr.UnregisterContext(reader);
+      EXPECT_EQ(reader_mem_tracker.consumption(), 0);
     }
   }
   EXPECT_EQ(mem_tracker.consumption(), 0);
@@ -571,51 +561,48 @@ TEST_F(DiskIoMgrTest, SingleReaderCancel) {
   int64_t iters = 0;
   for (int num_threads_per_disk = 1; num_threads_per_disk <= 5; ++num_threads_per_disk) {
     for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
-      for (int num_buffers = 1; num_buffers <= 5; ++num_buffers) {
-        pool_.reset(new ObjectPool);
-        LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
-                  << " num_disk=" << num_disks << " num_buffers=" << num_buffers;
-
-        if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
-        DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
+      pool_.Clear(); // Destroy scan ranges from previous iterations.
+      LOG(INFO) << "Starting test with num_threads_per_disk=" << num_threads_per_disk
+                << " num_disk=" << num_disks;
 
-        ASSERT_OK(io_mgr.Init(&mem_tracker));
-        MemTracker reader_mem_tracker;
-        DiskIoRequestContext* reader;
-        io_mgr.RegisterContext(&reader, &reader_mem_tracker);
+      if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
+      DiskIoMgr io_mgr(num_disks, num_threads_per_disk, num_threads_per_disk, 1, 1);
 
-        vector<DiskIoMgr::ScanRange*> ranges;
-        for (int i = 0; i < len; ++i) {
-          int disk_id = i % num_disks;
-          ranges.push_back(InitRange(num_buffers, tmp_file, 0, len, disk_id,
-              stat_val.st_mtime));
-        }
-        ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
+      ASSERT_OK(io_mgr.Init(&mem_tracker));
+      MemTracker reader_mem_tracker;
+      DiskIoRequestContext* reader;
+      io_mgr.RegisterContext(&reader, &reader_mem_tracker);
+
+      vector<DiskIoMgr::ScanRange*> ranges;
+      for (int i = 0; i < len; ++i) {
+        int disk_id = i % num_disks;
+        ranges.push_back(InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime));
+      }
+      ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
 
-        AtomicInt32 num_ranges_processed;
-        int num_succesful_ranges = ranges.size() / 2;
-        // Read half the ranges
-        for (int i = 0; i < num_succesful_ranges; ++i) {
-          ScanRangeThread(&io_mgr, reader, data, strlen(data), Status::OK(), 1,
-              &num_ranges_processed);
-        }
-        EXPECT_EQ(num_ranges_processed.Load(), num_succesful_ranges);
+      AtomicInt32 num_ranges_processed;
+      int num_succesful_ranges = ranges.size() / 2;
+      // Read half the ranges
+      for (int i = 0; i < num_succesful_ranges; ++i) {
+        ScanRangeThread(&io_mgr, reader, data, strlen(data), Status::OK(), 1,
+            &num_ranges_processed);
+      }
+      EXPECT_EQ(num_ranges_processed.Load(), num_succesful_ranges);
 
-        // Start up some threads and then cancel
-        thread_group threads;
-        for (int i = 0; i < 3; ++i) {
-          threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
-              strlen(data), Status::CANCELLED, 0, &num_ranges_processed));
-        }
+      // Start up some threads and then cancel
+      thread_group threads;
+      for (int i = 0; i < 3; ++i) {
+        threads.add_thread(new thread(ScanRangeThread, &io_mgr, reader, data,
+            strlen(data), Status::CANCELLED, 0, &num_ranges_processed));
+      }
 
-        io_mgr.CancelContext(reader);
-        sched_yield();
+      io_mgr.CancelContext(reader);
+      sched_yield();
 
-        threads.join_all();
-        EXPECT_TRUE(io_mgr.context_status(reader).IsCancelled());
-        io_mgr.UnregisterContext(reader);
-        EXPECT_EQ(reader_mem_tracker.consumption(), 0);
-      }
+      threads.join_all();
+      EXPECT_TRUE(io_mgr.context_status(reader).IsCancelled());
+      io_mgr.UnregisterContext(reader);
+      EXPECT_EQ(reader_mem_tracker.consumption(), 0);
     }
   }
   EXPECT_EQ(mem_tracker.consumption(), 0);
@@ -632,15 +619,10 @@ TEST_F(DiskIoMgrTest, MemLimits) {
   struct stat stat_val;
   stat(tmp_file, &stat_val);
 
-  const int num_buffers = 25;
-  // Give the reader more buffers than the limit
   const int mem_limit_num_buffers = 2;
-
-  int64_t iters = 0;
+  // Allocate enough ranges so that the total buffers exceeds the mem limit.
+  const int num_ranges = 25;
   {
-    pool_.reset(new ObjectPool);
-    if (++iters % 1000 == 0) LOG(ERROR) << "Starting iteration " << iters;
-
     MemTracker root_mem_tracker(mem_limit_num_buffers * MAX_BUFFER_SIZE);
     DiskIoMgr io_mgr(1, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
 
@@ -650,9 +632,8 @@ TEST_F(DiskIoMgrTest, MemLimits) {
     io_mgr.RegisterContext(&reader, &reader_mem_tracker);
 
     vector<DiskIoMgr::ScanRange*> ranges;
-    for (int i = 0; i < num_buffers; ++i) {
-      ranges.push_back(InitRange(num_buffers, tmp_file, 0, len, 0,
-          stat_val.st_mtime));
+    for (int i = 0; i < num_ranges; ++i) {
+      ranges.push_back(InitRange(tmp_file, 0, len, 0, stat_val.st_mtime));
     }
     ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
 
@@ -711,12 +692,7 @@ TEST_F(DiskIoMgrTest, CachedReads) {
   stat(tmp_file, &stat_val);
 
   const int num_disks = 2;
-  const int num_buffers = 3;
-
-  int64_t iters = 0;
   {
-    pool_.reset(new ObjectPool);
-    if (++iters % 5000 == 0) LOG(ERROR) << "Starting iteration " << iters;
     DiskIoMgr io_mgr(num_disks, 1, 1, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
 
     ASSERT_OK(io_mgr.Init(&mem_tracker));
@@ -725,7 +701,7 @@ TEST_F(DiskIoMgrTest, CachedReads) {
     io_mgr.RegisterContext(&reader, &reader_mem_tracker);
 
     DiskIoMgr::ScanRange* complete_range =
-        InitRange(1, tmp_file, 0, strlen(data), 0, stat_val.st_mtime, NULL, true);
+        InitRange(tmp_file, 0, strlen(data), 0, stat_val.st_mtime, NULL, true);
 
     // Issue some reads before the async ones are issued
     ValidateSyncRead(&io_mgr, reader, complete_range, data);
@@ -734,8 +710,8 @@ TEST_F(DiskIoMgrTest, CachedReads) {
     vector<DiskIoMgr::ScanRange*> ranges;
     for (int i = 0; i < len; ++i) {
       int disk_id = i % num_disks;
-      ranges.push_back(InitRange(num_buffers, tmp_file, 0, len, disk_id,
-          stat_val.st_mtime, NULL, true));
+      ranges.push_back(
+          InitRange(tmp_file, 0, len, disk_id, stat_val.st_mtime, NULL, true));
     }
     ASSERT_OK(io_mgr.AddScanRanges(reader, ranges));
 
@@ -797,7 +773,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
         for (int file_index = 0; file_index < num_contexts; ++file_index) {
           io_mgr.RegisterContext(&contexts[file_index], &mem_tracker);
         }
-        pool_.reset(new ObjectPool);
+        pool_.Clear();
         int read_offset = 0;
         int write_offset = 0;
         while (read_offset < file_size) {
@@ -808,7 +784,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
             vector<DiskIoMgr::ScanRange*> ranges;
             int num_scan_ranges = min<int>(num_reads_queued, write_offset - read_offset);
             for (int i = 0; i < num_scan_ranges; ++i) {
-              ranges.push_back(InitRange(1, file_name.c_str(), read_offset, 1,
+              ranges.push_back(InitRange(file_name.c_str(), read_offset, 1,
                   i % num_disks, stat_val.st_mtime));
               threads.add_thread(new thread(ScanRangeThread, &io_mgr,
                   contexts[context_index],
@@ -823,7 +799,7 @@ TEST_F(DiskIoMgrTest, MultipleReaderWriter) {
               DiskIoMgr::WriteRange::WriteDoneCallback callback =
                   bind(mem_fn(&DiskIoMgrTest::WriteCompleteCallback),
                       this, num_write_ranges, _1);
-              DiskIoMgr::WriteRange* new_range = pool_->Add(
+              DiskIoMgr::WriteRange* new_range = pool_.Add(
                   new DiskIoMgr::WriteRange(file_name,
                       write_offset, i % num_disks, callback));
               new_range->SetData(reinterpret_cast<const uint8_t*>
@@ -899,44 +875,41 @@ TEST_F(DiskIoMgrTest, MultipleReader) {
   for (int iteration = 0; iteration < ITERATIONS; ++iteration) {
     for (int threads_per_disk = 1; threads_per_disk <= 5; ++threads_per_disk) {
       for (int num_disks = 1; num_disks <= 5; num_disks += 2) {
-        for (int num_buffers = 1; num_buffers <= 5; ++num_buffers) {
-          pool_.reset(new ObjectPool);
-          LOG(INFO) << "Starting test with num_threads_per_disk=" << threads_per_disk
-                    << " num_disk=" << num_disks << " num_buffers=" << num_buffers;
-          if (++iters % 2500 == 0) LOG(ERROR) << "Starting iteration " << iters;
+        pool_.Clear(); // Destroy scan ranges from previous iterations.
+        LOG(INFO) << "Starting test with num_threads_per_disk=" << threads_per_disk
+                  << " num_disk=" << num_disks;
+        if (++iters % 2500 == 0) LOG(ERROR) << "Starting iteration " << iters;
 
-          DiskIoMgr io_mgr(num_disks, threads_per_disk, threads_per_disk, MIN_BUFFER_SIZE,
-              MAX_BUFFER_SIZE);
-          EXPECT_OK(io_mgr.Init(&mem_tracker));
+        DiskIoMgr io_mgr(num_disks, threads_per_disk, threads_per_disk, MIN_BUFFER_SIZE,
+            MAX_BUFFER_SIZE);
+        EXPECT_OK(io_mgr.Init(&mem_tracker));
 
-          for (int i = 0; i < NUM_READERS; ++i) {
-            io_mgr.RegisterContext(&readers[i], &mem_tracker);
+        for (int i = 0; i < NUM_READERS; ++i) {
+          io_mgr.RegisterContext(&readers[i], &mem_tracker);
 
-            vector<DiskIoMgr::ScanRange*> ranges;
-            for (int j = 0; j < DATA_LEN; ++j) {
-              int disk_id = j % num_disks;
-              ranges.push_back(
-                  InitRange(num_buffers,file_names[i].c_str(), j, 1, disk_id,
-                  mtimes[i]));
-            }
-            ASSERT_OK(io_mgr.AddScanRanges(readers[i], ranges));
+          vector<DiskIoMgr::ScanRange*> ranges;
+          for (int j = 0; j < DATA_LEN; ++j) {
+            int disk_id = j % num_disks;
+            ranges.push_back(
+                InitRange(file_names[i].c_str(), j, 1, disk_id, mtimes[i]));
           }
+          ASSERT_OK(io_mgr.AddScanRanges(readers[i], ranges));
+        }
 
-          AtomicInt32 num_ranges_processed;
-          thread_group threads;
-          for (int i = 0; i < NUM_READERS; ++i) {
-            for (int j = 0; j < NUM_THREADS_PER_READER; ++j) {
-              threads.add_thread(new thread(ScanRangeThread, &io_mgr, readers[i],
-                  data[i].c_str(), data[i].size(), Status::OK(), 0,
-                  &num_ranges_processed));
-            }
-          }
-          threads.join_all();
-          EXPECT_EQ(num_ranges_processed.Load(), DATA_LEN * NUM_READERS);
-          for (int i = 0; i < NUM_READERS; ++i) {
-            io_mgr.UnregisterContext(readers[i]);
+        AtomicInt32 num_ranges_processed;
+        thread_group threads;
+        for (int i = 0; i < NUM_READERS; ++i) {
+          for (int j = 0; j < NUM_THREADS_PER_READER; ++j) {
+            threads.add_thread(new thread(ScanRangeThread, &io_mgr, readers[i],
+                data[i].c_str(), data[i].size(), Status::OK(), 0,
+                &num_ranges_processed));
           }
         }
+        threads.join_all();
+        EXPECT_EQ(num_ranges_processed.Load(), DATA_LEN * NUM_READERS);
+        for (int i = 0; i < NUM_READERS; ++i) {
+          io_mgr.UnregisterContext(readers[i]);
+        }
       }
     }
   }
@@ -966,7 +939,7 @@ TEST_F(DiskIoMgrTest, Buffers) {
   DiskIoRequestContext* reader;
   io_mgr.RegisterContext(&reader, &reader_mem_tracker);
 
-  DiskIoMgr::ScanRange* dummy_range = InitRange(1, "dummy", 0, 0, 0, 0);
+  DiskIoMgr::ScanRange* dummy_range = InitRange("dummy", 0, 0, 0, 0);
 
   // buffer length should be rounded up to min buffer size
   int64_t buffer_len = 1;
@@ -1043,7 +1016,7 @@ TEST_F(DiskIoMgrTest, PartialRead) {
   io_mgr->RegisterContext(&reader, &reader_mem_tracker);
 
   // We should not read past the end of file.
-  DiskIoMgr::ScanRange* range = InitRange(1, tmp_file, 0, read_len, 0, stat_val.st_mtime);
+  DiskIoMgr::ScanRange* range = InitRange(tmp_file, 0, read_len, 0, stat_val.st_mtime);
   unique_ptr<DiskIoMgr::BufferDescriptor> buffer;
   ASSERT_OK(io_mgr->Read(reader, range, &buffer));
   ASSERT_TRUE(buffer->eosr());
@@ -1052,7 +1025,7 @@ TEST_F(DiskIoMgrTest, PartialRead) {
   io_mgr->ReturnBuffer(move(buffer));
 
   io_mgr->UnregisterContext(reader);
-  pool_.reset();
+  pool_.Clear();
   io_mgr.reset();
   EXPECT_EQ(reader_mem_tracker.consumption(), 0);
   EXPECT_EQ(mem_tracker.consumption(), 0);
@@ -1078,7 +1051,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
   for (int buffer_len : vector<int>({len - 1, len, len + 1})) {
     vector<uint8_t> client_buffer(buffer_len);
     int scan_len = min(len, buffer_len);
-    DiskIoMgr::ScanRange* range = AllocateRange(1);
+    DiskIoMgr::ScanRange* range = AllocateRange();
     range->Reset(NULL, tmp_file, scan_len, 0, 0, true,
         DiskIoMgr::BufferOpts::ReadInto(client_buffer.data(), buffer_len));
     ASSERT_OK(io_mgr->AddScanRange(reader, range, true));
@@ -1096,7 +1069,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
   }
 
   io_mgr->UnregisterContext(reader);
-  pool_.reset();
+  pool_.Clear();
   io_mgr.reset();
   EXPECT_EQ(mem_tracker.consumption(), 0);
 }
@@ -1116,7 +1089,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
   vector<uint8_t> client_buffer(SCAN_LEN);
   for (int i = 0; i < 1000; ++i) {
     io_mgr->RegisterContext(&reader, reader_mem_tracker);
-    DiskIoMgr::ScanRange* range = AllocateRange(1);
+    DiskIoMgr::ScanRange* range = AllocateRange();
     range->Reset(NULL, tmp_file, SCAN_LEN, 0, 0, true,
         DiskIoMgr::BufferOpts::ReadInto(client_buffer.data(), SCAN_LEN));
     ASSERT_OK(io_mgr->AddScanRange(reader, range, true));
@@ -1134,7 +1107,7 @@ TEST_F(DiskIoMgrTest, ReadIntoClientBufferError) {
     io_mgr->UnregisterContext(reader);
   }
 
-  pool_.reset();
+  pool_.Clear();
   io_mgr.reset();
   EXPECT_EQ(mem_tracker.consumption(), 0);
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50d603d3/be/src/runtime/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.cc b/be/src/runtime/disk-io-mgr.cc
index e77d9ca..7cc2af7 100644
--- a/be/src/runtime/disk-io-mgr.cc
+++ b/be/src/runtime/disk-io-mgr.cc
@@ -118,7 +118,7 @@ DEFINE_uint64(unused_file_handle_timeout_sec, 21600, "Maximum time, in seconds,
 // current queue size.
 static const int LOW_MEMORY = 64 * 1024 * 1024;
 
-const int DiskIoMgr::DEFAULT_QUEUE_CAPACITY = 2;
+const int DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT;
 
 AtomicInt32 DiskIoMgr::next_disk_id_;
 
@@ -583,7 +583,7 @@ Status DiskIoMgr::AddScanRanges(DiskIoRequestContext* reader,
     if (range->try_cache_) {
       if (schedule_immediately) {
         bool cached_read_succeeded;
-        RETURN_IF_ERROR(range->ReadFromCache(&cached_read_succeeded));
+        RETURN_IF_ERROR(range->ReadFromCache(reader_lock, &cached_read_succeeded));
         if (cached_read_succeeded) continue;
         // Cached read failed, fall back to AddRequestRange() below.
       } else {
@@ -633,7 +633,7 @@ Status DiskIoMgr::GetNextRange(DiskIoRequestContext* reader, ScanRange** range)
       *range = reader->cached_ranges_.Dequeue();
       DCHECK((*range)->try_cache_);
       bool cached_read_succeeded;
-      RETURN_IF_ERROR((*range)->ReadFromCache(&cached_read_succeeded));
+      RETURN_IF_ERROR((*range)->ReadFromCache(reader_lock, &cached_read_succeeded));
       if (cached_read_succeeded) return Status::OK();
 
       // This range ended up not being cached. Loop again and pick up a new range.
@@ -994,7 +994,7 @@ void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, DiskIoRequestContext*
     ScanRange* scan_range = buffer->scan_range_;
     scan_range->Cancel(reader->status_);
     // Enqueue the buffer to use the scan range's buffer cleanup path.
-    scan_range->EnqueueBuffer(move(buffer));
+    scan_range->EnqueueBuffer(reader_lock, move(buffer));
     return;
   }
 
@@ -1021,7 +1021,7 @@ void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, DiskIoRequestContext*
   bool eosr = buffer->eosr_;
   ScanRange* scan_range = buffer->scan_range_;
   bool is_cached = buffer->is_cached();
-  bool queue_full = scan_range->EnqueueBuffer(move(buffer));
+  bool queue_full = scan_range->EnqueueBuffer(reader_lock, move(buffer));
   if (eosr) {
     // For cached buffers, we can't close the range until the cached buffer is returned.
     // Close() is called from DiskIoMgr::ReturnBuffer().

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/50d603d3/be/src/runtime/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/disk-io-mgr.h b/be/src/runtime/disk-io-mgr.h
index 138f973..ed33942 100644
--- a/be/src/runtime/disk-io-mgr.h
+++ b/be/src/runtime/disk-io-mgr.h
@@ -129,16 +129,12 @@ class MemTracker;
 /// To have multiple reading threads, the caller would simply spin up the threads
 /// and each would process the loops above.
 //
-/// To control the number of IO buffers, each scan range has a soft max capacity for
-/// the number of queued buffers. If the number of buffers is at capacity, the IoMgr
-/// will no longer read for that scan range until the caller has processed a buffer.
-/// This capacity does not need to be fixed, and the caller can dynamically adjust
-/// it if necessary.
-//
-/// As an example: If we allowed 5 buffers per range on a 24 core, 72 thread
-/// (we default to allowing 3x threads) machine, we should see at most
-/// 72 * 5 * 8MB = 2.8GB in io buffers memory usage. This should remain roughly constant
-/// regardless of how many concurrent readers are running.
+/// To control the number of IO buffers, each scan range has a limit of two queued
+/// buffers (SCAN_RANGE_READY_BUFFER_LIMIT). If the number of buffers is at capacity,
+/// the IoMgr will no longer read for that scan range until the caller has processed
+/// a buffer. Assuming the client returns each buffer before requesting the next one
+/// from the scan range, then this will consume up to 3 * 8MB = 24MB of I/O buffers per
+/// scan range.
 //
 /// Buffer Management:
 /// Buffers for reads are either a) allocated by the IoMgr and transferred to the caller,
@@ -382,8 +378,7 @@ class DiskIoMgr : public CacheLineAligned {
   /// the IoMgr.
   class ScanRange : public RequestRange {
    public:
-    /// The initial queue capacity for this.  Specify -1 to use IoMgr default.
-    ScanRange(int initial_capacity = -1);
+    ScanRange();
 
     virtual ~ScanRange();
 
@@ -401,7 +396,6 @@ class DiskIoMgr : public CacheLineAligned {
     void* meta_data() const { return meta_data_; }
     bool try_cache() const { return try_cache_; }
     bool expected_local() const { return expected_local_; }
-    int ready_buffers_capacity() const { return ready_buffers_capacity_; }
 
     /// Returns the next buffer for this scan range. buffer is an output parameter.
     /// This function blocks until a buffer is ready or an error occurred. If this is
@@ -431,8 +425,10 @@ class DiskIoMgr : public CacheLineAligned {
     /// Enqueues a buffer for this range. This does not block.
     /// Returns true if this scan range has hit the queue capacity, false otherwise.
     /// The caller passes ownership of buffer to the scan range and it is not
-    /// valid to access buffer after this call.
-    bool EnqueueBuffer(std::unique_ptr<BufferDescriptor> buffer);
+    /// valid to access buffer after this call. The reader lock must be held by the
+    /// caller.
+    bool EnqueueBuffer(const boost::unique_lock<boost::mutex>& reader_lock,
+        std::unique_ptr<BufferDescriptor> buffer);
 
     /// Cleanup any queued buffers (i.e. due to cancellation). This cannot
     /// be called with any locks taken.
@@ -475,7 +471,9 @@ class DiskIoMgr : public CacheLineAligned {
     /// and *read_succeeded to true.
     /// If the data is not cached, returns ok() and *read_succeeded is set to false.
     /// Returns a non-ok status if it ran into a non-continuable error.
-    Status ReadFromCache(bool* read_succeeded) WARN_UNUSED_RESULT;
+    ///  The reader lock must be held by the caller.
+    Status ReadFromCache(const boost::unique_lock<boost::mutex>& reader_lock,
+        bool* read_succeeded) WARN_UNUSED_RESULT;
 
     /// Pointer to caller specified metadata. This is untouched by the io manager
     /// and the caller can put whatever auxiliary data in here.
@@ -540,7 +538,9 @@ class DiskIoMgr : public CacheLineAligned {
     };
 
     /// Lock protecting fields below.
-    /// This lock should not be taken during Open/Read/Close.
+    /// This lock should not be taken during Open()/Read()/Close().
+    /// If DiskIoRequestContext::lock_ and this lock need to be held simultaneously,
+    /// DiskIoRequestContext::lock_ must be taken first.
     boost::mutex lock_;
 
     /// Number of bytes read so far for this scan range
@@ -566,12 +566,6 @@ class DiskIoMgr : public CacheLineAligned {
     boost::condition_variable buffer_ready_cv_;
     std::deque<std::unique_ptr<BufferDescriptor>> ready_buffers_;
 
-    /// The soft capacity limit for ready_buffers_. ready_buffers_ can exceed
-    /// the limit temporarily as the capacity is adjusted dynamically.
-    /// In that case, the capcity is only realized when the caller removes buffers
-    /// from ready_buffers_.
-    int ready_buffers_capacity_;
-
     /// Lock that should be taken during hdfs calls. Only one thread (the disk reading
     /// thread) calls into hdfs at a time so this lock does not have performance impact.
     /// This lock only serves to coordinate cleanup. Specifically it serves to ensure
@@ -799,9 +793,24 @@ class DiskIoMgr : public CacheLineAligned {
   /// 'bytes_to_free' is -1.
   void GcIoBuffers(int64_t bytes_to_free = -1);
 
-  /// Default ready buffer queue capacity. This constant doesn't matter too much
-  /// since the system dynamically adjusts.
-  static const int DEFAULT_QUEUE_CAPACITY;
+  /// The maximum number of ready buffers that can be queued in a scan range. Having two
+  /// queued buffers (plus the buffer that is returned to the client) gives good
+  /// performance in most scenarios:
+  /// 1. If the consumer is consuming data faster than we can read from disk, then the
+  ///    queue will be empty most of the time because the buffer will be immediately
+  ///    pulled off the queue as soon as it is added. There will always be an I/O request
+  ///    in the disk queue to maximize I/O throughput, which is the bottleneck in this
+  ///    case.
+  /// 2. If we can read from disk faster than the consumer is consuming data, the queue
+  ///    will fill up and there will always be a buffer available for the consumer to
+  ///    read, so the consumer will not block and we maximize consumer throughput, which
+  ///    is the bottleneck in this case.
+  /// 3. If the consumer is consuming data at approximately the same rate as we are
+  ///    reading from disk, then the steady state is that the consumer is processing one
+  ///    buffer and one buffer is in the disk queue. The additional buffer can absorb
+  ///    bursts where the producer runs faster than the consumer or the consumer runs
+  ///    faster than the producer without blocking either the producer or consumer.
+  static const int SCAN_RANGE_READY_BUFFER_LIMIT = 2;
 
   /// "Disk" queue offsets for remote accesses.  Offset 0 corresponds to
   /// disk ID (i.e. disk_queue_ index) of num_local_disks().