You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/23 22:51:40 UTC

[13/15] impala git commit: IMPALA-4835: Part 2: Allocate scan range buffers upfront

http://git-wip-us.apache.org/repos/asf/impala/blob/0b6fab73/be/src/runtime/io/request-context.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.cc b/be/src/runtime/io/request-context.cc
index 031b976..b124702 100644
--- a/be/src/runtime/io/request-context.cc
+++ b/be/src/runtime/io/request-context.cc
@@ -36,30 +36,6 @@ BufferDescriptor::BufferDescriptor(DiskIoMgr* io_mgr,
   DCHECK_GE(buffer_len, 0);
 }
 
-Status RequestContext::AllocBuffer(ScanRange* range, int64_t buffer_size,
-    unique_ptr<BufferDescriptor>* buffer_desc) {
-  DCHECK(range->external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER)
-      << static_cast<int>(range->external_buffer_tag_);
-  DCHECK_LE(buffer_size, parent_->max_buffer_size_);
-  DCHECK_GT(buffer_size, 0);
-  buffer_size = BitUtil::RoundUpToPowerOfTwo(
-      max(parent_->min_buffer_size_, min(parent_->max_buffer_size_, buffer_size)));
-
-  DCHECK(mem_tracker_ != nullptr);
-  if (!mem_tracker_->TryConsume(buffer_size)) {
-    return mem_tracker_->MemLimitExceeded(nullptr, "disk I/O buffer", buffer_size);
-  }
-
-  uint8_t* buffer = reinterpret_cast<uint8_t*>(malloc(buffer_size));
-  if (buffer == nullptr) {
-    mem_tracker_->Release(buffer_size);
-    return Status(TErrorCode::INTERNAL_ERROR,
-        Substitute("Could not malloc buffer of $0 bytes"));
-  }
-  buffer_desc->reset(new BufferDescriptor(parent_, this, range, buffer, buffer_size));
-  return Status::OK();
-}
-
 void RequestContext::FreeBuffer(BufferDescriptor* buffer) {
   DCHECK(buffer->buffer_ != nullptr);
   if (!buffer->is_cached() && !buffer->is_client_buffer()) {
@@ -108,40 +84,26 @@ void RequestContext::Cancel() {
     // The reader will be put into a cancelled state until call cleanup is complete.
     state_ = RequestContext::Cancelled;
 
-    // Cancel all scan ranges for this reader. Each range could be one one of
-    // four queues.
-    for (int i = 0; i < disk_states_.size(); ++i) {
-      PerDiskState& state = disk_states_[i];
-      RequestRange* range = nullptr;
-      while ((range = state.in_flight_ranges()->Dequeue()) != nullptr) {
-        if (range->request_type() == RequestType::READ) {
-          static_cast<ScanRange*>(range)->Cancel(Status::CANCELLED);
-        } else {
-          DCHECK(range->request_type() == RequestType::WRITE);
+    // Clear out all request ranges from queues for this reader. Cancel the scan
+    // ranges and invoke the write range callbacks to propagate the cancellation.
+    for (ScanRange* range : active_scan_ranges_) range->CancelInternal(Status::CANCELLED);
+    active_scan_ranges_.clear();
+    for (PerDiskState& disk_state : disk_states_) {
+      RequestRange* range;
+      while ((range = disk_state.in_flight_ranges()->Dequeue()) != nullptr) {
+        if (range->request_type() == RequestType::WRITE) {
           write_callbacks.push_back(static_cast<WriteRange*>(range)->callback_);
         }
       }
-
-      ScanRange* scan_range;
-      while ((scan_range = state.unstarted_scan_ranges()->Dequeue()) != nullptr) {
-        scan_range->Cancel(Status::CANCELLED);
-      }
+      while (disk_state.unstarted_scan_ranges()->Dequeue() != nullptr);
       WriteRange* write_range;
-      while ((write_range = state.unstarted_write_ranges()->Dequeue()) != nullptr) {
+      while ((write_range = disk_state.unstarted_write_ranges()->Dequeue()) != nullptr) {
         write_callbacks.push_back(write_range->callback_);
       }
     }
-
-    ScanRange* range = nullptr;
-    while ((range = ready_to_start_ranges_.Dequeue()) != nullptr) {
-      range->Cancel(Status::CANCELLED);
-    }
-    while ((range = blocked_ranges_.Dequeue()) != nullptr) {
-      range->Cancel(Status::CANCELLED);
-    }
-    while ((range = cached_ranges_.Dequeue()) != nullptr) {
-      range->Cancel(Status::CANCELLED);
-    }
+    // Clear out the lists of scan ranges.
+    while (ready_to_start_ranges_.Dequeue() != nullptr);
+    while (cached_ranges_.Dequeue() != nullptr);
 
     // Ensure that the reader is scheduled on all disks (it may already be scheduled on
     // some). The disk threads will notice that the context is cancelled and do any
@@ -170,9 +132,8 @@ void RequestContext::CancelAndMarkInactive() {
   // Wait until the ranges finish up.
   while (num_disks_with_ranges_ > 0) disks_complete_cond_var_.Wait(l);
 
-  // Validate that no buffers were leaked from this context.
-  DCHECK_EQ(num_buffers_in_reader_.Load(), 0) << endl << DebugString();
-  DCHECK_EQ(num_used_buffers_.Load(), 0) << endl << DebugString();
+  // Validate that no ranges are active.
+  DCHECK_EQ(0, active_scan_ranges_.size()) << endl << DebugString();
 
   // Validate that no threads are active and the context is not queued.
   for (const PerDiskState& disk_state : disk_states_) {
@@ -185,42 +146,58 @@ void RequestContext::CancelAndMarkInactive() {
   state_ = Inactive;
 }
 
-void RequestContext::AddRequestRange(const unique_lock<mutex>& lock,
-    RequestRange* range, bool schedule_immediately) {
+void RequestContext::AddRangeToDisk(const unique_lock<mutex>& lock,
+    RequestRange* range, ScheduleMode schedule_mode) {
   DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
-  PerDiskState& state = disk_states_[range->disk_id()];
-  if (state.done()) {
-    DCHECK_EQ(state.num_remaining_ranges(), 0);
-    state.set_done(false);
+  DCHECK_EQ(state_, Active) << DebugString();
+  PerDiskState* disk_state = &disk_states_[range->disk_id()];
+  if (disk_state->done()) {
+    DCHECK_EQ(disk_state->num_remaining_ranges(), 0);
+    disk_state->set_done(false);
     ++num_disks_with_ranges_;
   }
-
-  bool schedule_context;
   if (range->request_type() == RequestType::READ) {
     ScanRange* scan_range = static_cast<ScanRange*>(range);
-    if (schedule_immediately) {
+    if (schedule_mode == ScheduleMode::IMMEDIATELY) {
       ScheduleScanRange(lock, scan_range);
-    } else {
-      state.unstarted_scan_ranges()->Enqueue(scan_range);
+    } else if (schedule_mode == ScheduleMode::UPON_GETNEXT) {
+      disk_state->unstarted_scan_ranges()->Enqueue(scan_range);
       num_unstarted_scan_ranges_.Add(1);
+      // If there's no 'next_scan_range_to_start', schedule this RequestContext so that
+      // one of the 'unstarted_scan_ranges' will become the 'next_scan_range_to_start'.
+      if (disk_state->next_scan_range_to_start() == nullptr) {
+        disk_state->ScheduleContext(lock, this, range->disk_id());
+      }
     }
-    // If next_scan_range_to_start is NULL, schedule this RequestContext so that it will
-    // be set. If it's not NULL, this context will be scheduled when GetNextRange() is
-    // invoked.
-    schedule_context = state.next_scan_range_to_start() == NULL;
   } else {
     DCHECK(range->request_type() == RequestType::WRITE);
-    DCHECK(!schedule_immediately);
+    DCHECK(schedule_mode == ScheduleMode::IMMEDIATELY) << static_cast<int>(schedule_mode);
     WriteRange* write_range = static_cast<WriteRange*>(range);
-    state.unstarted_write_ranges()->Enqueue(write_range);
+    disk_state->unstarted_write_ranges()->Enqueue(write_range);
 
-    // ScheduleContext() has no effect if the context is already scheduled,
-    // so this is safe.
-    schedule_context = true;
+    // Ensure that the context is scheduled so that the write range gets picked up.
+    // ScheduleContext() has no effect if already scheduled, so this is safe to do always.
+    disk_state->ScheduleContext(lock, this, range->disk_id());
   }
+  ++disk_state->num_remaining_ranges();
+}
+
+void RequestContext::AddActiveScanRangeLocked(
+    const unique_lock<mutex>& lock, ScanRange* range) {
+  DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
+  DCHECK(state_ == Active);
+  active_scan_ranges_.insert(range);
+}
+
+void RequestContext::RemoveActiveScanRange(ScanRange* range) {
+  unique_lock<mutex> lock(lock_);
+  RemoveActiveScanRangeLocked(lock, range);
+}
 
-  if (schedule_context) state.ScheduleContext(lock, this, range->disk_id());
-  ++state.num_remaining_ranges();
+void RequestContext::RemoveActiveScanRangeLocked(
+    const unique_lock<mutex>& lock, ScanRange* range) {
+  DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
+  active_scan_ranges_.erase(range);
 }
 
 RequestContext::RequestContext(
@@ -235,12 +212,9 @@ string RequestContext::DebugString() const {
   if (state_ == RequestContext::Cancelled) ss << "Cancelled";
   if (state_ == RequestContext::Active) ss << "Active";
   if (state_ != RequestContext::Inactive) {
-    ss << " #ready_buffers=" << num_ready_buffers_.Load()
-       << " #used_buffers=" << num_used_buffers_.Load()
-       << " #num_buffers_in_reader=" << num_buffers_in_reader_.Load()
-       << " #finished_scan_ranges=" << num_finished_ranges_.Load()
-       << " #disk_with_ranges=" << num_disks_with_ranges_
-       << " #disks=" << num_disks_with_ranges_;
+    ss << " #disk_with_ranges=" << num_disks_with_ranges_
+       << " #disks=" << num_disks_with_ranges_
+       << " #active scan ranges=" << active_scan_ranges_.size();
     for (int i = 0; i < disk_states_.size(); ++i) {
       ss << endl << "   " << i << ": "
          << "is_on_queue=" << disk_states_[i].is_on_queue()
@@ -263,16 +237,6 @@ bool RequestContext::Validate() const {
     return false;
   }
 
-  if (num_used_buffers_.Load() < 0) {
-    LOG(WARNING) << "num_used_buffers_ < 0: #used=" << num_used_buffers_.Load();
-    return false;
-  }
-
-  if (num_ready_buffers_.Load() < 0) {
-    LOG(WARNING) << "num_ready_buffers_ < 0: #used=" << num_ready_buffers_.Load();
-    return false;
-  }
-
   int total_unstarted_ranges = 0;
   for (int i = 0; i < disk_states_.size(); ++i) {
     const PerDiskState& state = disk_states_[i];
@@ -350,8 +314,8 @@ bool RequestContext::Validate() const {
       LOG(WARNING) << "Reader cancelled but has ready to start ranges.";
       return false;
     }
-    if (!blocked_ranges_.empty()) {
-      LOG(WARNING) << "Reader cancelled but has blocked ranges.";
+    if (!active_scan_ranges_.empty()) {
+      LOG(WARNING) << "Reader cancelled but has active ranges.";
       return false;
     }
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/0b6fab73/be/src/runtime/io/request-context.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.h b/be/src/runtime/io/request-context.h
index 737a16c..3aea2bc 100644
--- a/be/src/runtime/io/request-context.h
+++ b/be/src/runtime/io/request-context.h
@@ -23,6 +23,11 @@
 
 namespace impala {
 namespace io {
+
+// Mode argument for AddRangeToDisk().
+enum class ScheduleMode {
+  IMMEDIATELY, UPON_GETNEXT, BY_CALLER
+};
 /// A request context is used to group together I/O requests belonging to a client of the
 /// I/O manager for management and scheduling.
 ///
@@ -32,53 +37,59 @@ namespace io {
 /// maintains state across all disks as well as per disk state.
 /// The unit for an IO request is a RequestRange, which may be a ScanRange or a
 /// WriteRange.
-/// A scan range for the reader is on one of five states:
-/// 1) PerDiskState's unstarted_ranges: This range has only been queued
+/// A scan range for the reader is on one of six states:
+/// 1) PerDiskState's 'unstarted_scan_ranges_': This range has only been queued
 ///    and nothing has been read from it.
-/// 2) RequestContext's ready_to_start_ranges_: This range is about to be started.
-///    As soon as the reader picks it up, it will move to the in_flight_ranges
+/// 2) RequestContext's 'ready_to_start_ranges_': This range is about to be started.
+///    As soon as the reader picks it up, it will move to the 'in_flight_ranges_'
 ///    queue.
-/// 3) PerDiskState's in_flight_ranges: This range is being processed and will
+/// 3) PerDiskState's 'in_flight_ranges_': This range is being processed and will
 ///    be read from the next time a disk thread picks it up in GetNextRequestRange()
-/// 4) ScanRange's outgoing ready buffers is full. We can't read for this range
-///    anymore. We need the caller to pull a buffer off which will put this in
-///    the in_flight_ranges queue. These ranges are in the RequestContext's
-///    blocked_ranges_ queue.
-/// 5) ScanRange is cached and in the cached_ranges_ queue.
-//
-/// If the scan range is read and does not get blocked on the outgoing queue, the
+/// 4) The ScanRange is blocked waiting for buffers because it does not have any unused
+///    buffers to read data into. It is unblocked when a client adds new buffers via
+///    AllocateBuffersForRange() or returns existing buffers via ReturnBuffer().
+///    ScanRanges in this state are identified by 'blocked_on_buffer_' == true.
+/// 5) ScanRange is cached and in the 'cached_ranges_' queue.
+/// 6) Inactive - either all the data for the range was returned or the range was
+///    cancelled. I.e. ScanRange::eosr_ is true or ScanRange::cancel_status_ != OK.
+///
+/// If the scan range is read and does not get blocked waiting for buffers, the
 /// transitions are: 1 -> 2 -> 3.
 /// If the scan range does get blocked, the transitions are
 /// 1 -> 2 -> 3 -> (4 -> 3)*
-//
-/// In the case of a cached scan range, the range is immediately put in cached_ranges_.
+///
+/// In the case of a cached scan range, the range is immediately put in 'cached_ranges_'.
 /// When the caller asks for the next range to process, we first pull ranges from
-/// the cache_ranges_ queue. If the range was cached, the range is removed and
+/// the 'cache_ranges_' queue. If the range was cached, the range is removed and
 /// done (ranges are either entirely cached or not at all). If the cached read attempt
 /// fails, we put the range in state 1.
-//
-/// A write range for a context may be in one of two lists:
-/// 1) unstarted_write_ranges_ : Ranges that have been queued but not processed.
-/// 2) in_flight_ranges_: The write range is ready to be processed by the next disk thread
-///    that picks it up in GetNextRequestRange().
-//
+///
+/// All scan ranges in states 1-5 are tracked in 'active_scan_ranges_' so that they can be
+/// cancelled when the RequestContext is cancelled. Scan ranges are removed from
+/// 'active_scan_ranges_' during their transition to state 6.
+///
+/// A write range for a context may be in one of two queues:
+/// 1) 'unstarted_write_ranges_': Ranges that have been queued but not processed.
+/// 2) 'in_flight_ranges_': The write range is ready to be processed by the next disk
+///    thread that picks it up in GetNextRequestRange().
+///
 /// AddWriteRange() adds WriteRanges for a disk.
 /// It is the responsibility of the client to pin the data to be written via a WriteRange
 /// in memory. After a WriteRange has been written, a callback is invoked to inform the
 /// client that the write has completed.
-//
+///
 /// An important assumption is that write does not exceed the maximum read size and that
 /// the entire range is written when the write request is handled. (In other words, writes
 /// are not broken up.)
-//
+///
 /// When a RequestContext is processed by a disk thread in GetNextRequestRange(),
 /// a write range is always removed from the list of unstarted write ranges and appended
 /// to the in_flight_ranges_ queue. This is done to alternate reads and writes - a read
-/// that is scheduled (by calling GetNextRange()) is always followed by a write (if one
-/// exists).  And since at most one WriteRange can be present in in_flight_ranges_ at any
-/// time (once a write range is returned from GetNetxRequestRange() it is completed an
-/// not re-enqueued), a scan range scheduled via a call to GetNextRange() can be queued up
-/// behind at most one write range.
+/// that is scheduled (by calling GetNextUnstartedRange()) is always followed by a write
+/// (if one exists). And since at most one WriteRange can be present in in_flight_ranges_
+/// at any time (once a write range is returned from GetNetxRequestRange() it is completed
+/// and not re-enqueued), a scan range scheduled via a call to GetNextUnstartedRange() can
+/// be queued up behind at most one write range.
 class RequestContext {
  public:
   ~RequestContext() {
@@ -97,7 +108,6 @@ class RequestContext {
     return state_ == Cancelled;
   }
 
-  int64_t queue_size() const { return num_ready_buffers_.Load(); }
   int64_t bytes_read_local() const { return bytes_read_local_.Load(); }
   int64_t bytes_read_short_circuit() const { return bytes_read_short_circuit_.Load(); }
   int64_t bytes_read_dn_cache() const { return bytes_read_dn_cache_.Load(); }
@@ -150,13 +160,6 @@ class RequestContext {
 
   RequestContext(DiskIoMgr* parent, int num_disks, MemTracker* tracker);
 
-  /// Allocates a buffer to read into with size between
-  /// max('buffer_size', 'min_buffer_size_') and 'max_buffer_size_'.
-  /// Does not acquire 'lock_'.
-  /// TODO: allocate using the buffer pool client associated with this reader.
-  Status AllocBuffer(ScanRange* range, int64_t buffer_size,
-      std::unique_ptr<BufferDescriptor>* buffer);
-
   /// Cleans up a buffer. If the buffer was allocated with AllocBuffer(), frees the buffer
   /// memory and release the consumption to the client MemTracker. Otherwise (e.g. a
   /// client or HDFS cache buffer), just prepares the descriptor to be destroyed.
@@ -184,11 +187,11 @@ class RequestContext {
 
   /// Adds range to in_flight_ranges, scheduling this reader on the disk threads
   /// if necessary.
-  /// 'lock_' must be held via 'lock'
+  /// 'lock_' must be held via 'lock'. Only valid to call if this context is active.
   void ScheduleScanRange(const boost::unique_lock<boost::mutex>& lock, ScanRange* range) {
     DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
     DCHECK_EQ(state_, Active);
-    DCHECK(range != NULL);
+    DCHECK(range != nullptr);
     RequestContext::PerDiskState& state = disk_states_[range->disk_id()];
     state.in_flight_ranges()->Enqueue(range);
     state.ScheduleContext(lock, this, range->disk_id());
@@ -198,11 +201,34 @@ class RequestContext {
   /// and mark the context as inactive, after which it cannot be used.
   void CancelAndMarkInactive();
 
-  /// Adds request range to disk queue for this request context. Currently,
-  /// schedule_immediately must be false is RequestRange is a write range.
-  /// Caller must hold 'lock_' via 'lock'.
-  void AddRequestRange(const boost::unique_lock<boost::mutex>& lock,
-      RequestRange* range, bool schedule_immediately);
+  /// Adds a request range to the appropriate disk state. 'schedule_mode' controls which
+  /// queue the range is placed in. This RequestContext is scheduled on the disk state
+  /// if required by 'schedule_mode'.
+  ///
+  /// Write ranges must always have 'schedule_mode' IMMEDIATELY and are added to the
+  /// 'unstarted_write_ranges_' queue, from which they will be asynchronously moved to the
+  /// 'in_flight_ranges_' queue.
+  ///
+  /// Scan ranges can have different 'schedule_mode' values. If IMMEDIATELY, the range is
+  /// immediately added to the 'in_flight_ranges_' queue where it will be processed
+  /// asynchronously by disk threads. If UPON_GETNEXT, the range is added to the
+  /// 'unstarted_ranges_' queue, from which it can be returned to a client by
+  /// DiskIoMgr::GetNextUnstartedRange(). If BY_CALLER, the scan range is not added to
+  /// any queues. The range will be scheduled later as a separate step, e.g. when it is
+  /// unblocked by adding buffers to it. Caller must hold 'lock_' via 'lock'.
+  void AddRangeToDisk(const boost::unique_lock<boost::mutex>& lock, RequestRange* range,
+      ScheduleMode schedule_mode);
+
+  /// Adds an active range to 'active_scan_ranges_'
+  void AddActiveScanRangeLocked(
+      const boost::unique_lock<boost::mutex>& lock, ScanRange* range);
+
+  /// Removes the range from 'active_scan_ranges_'. Called by ScanRange after eos or
+  /// cancellation. If calling the Locked version, the caller must hold
+  /// 'lock_'. Otherwise the function will acquire 'lock_'.
+  void RemoveActiveScanRange(ScanRange* range);
+  void RemoveActiveScanRangeLocked(
+      const boost::unique_lock<boost::mutex>& lock, ScanRange* range);
 
   /// Validates invariants of reader.  Reader lock must be taken beforehand.
   bool Validate() const;
@@ -243,13 +269,6 @@ class RequestContext {
   /// Total number of bytes from remote reads that were expected to be local.
   AtomicInt64 unexpected_remote_bytes_{0};
 
-  /// The number of buffers that have been returned to the reader (via GetNext()) that the
-  /// reader has not returned. Only included for debugging and diagnostics.
-  AtomicInt32 num_buffers_in_reader_{0};
-
-  /// The number of scan ranges that have been completed for this reader.
-  AtomicInt32 num_finished_ranges_{0};
-
   /// The number of scan ranges that required a remote read, updated at the end of each
   /// range scan. Only used for diagnostics.
   AtomicInt32 num_remote_ranges_{0};
@@ -264,17 +283,6 @@ class RequestContext {
   /// Total number of file handle opens where the file handle was not in the cache
   AtomicInt32 cached_file_handles_miss_count_{0};
 
-  /// The number of buffers that are being used for this reader. This is the sum
-  /// of all buffers in ScanRange queues and buffers currently being read into (i.e. about
-  /// to be queued). This includes both IOMgr-allocated buffers and client-provided
-  /// buffers.
-  AtomicInt32 num_used_buffers_{0};
-
-  /// The total number of ready buffers across all ranges.  Ready buffers are buffers
-  /// that have been read from disk but not retrieved by the caller.
-  /// This is the sum of all queued buffers in all ranges for this reader context.
-  AtomicInt32 num_ready_buffers_{0};
-
   /// All fields below are accessed by multiple threads and the lock needs to be
   /// taken before accessing them. Must be acquired before ScanRange::lock_ if both
   /// are held simultaneously.
@@ -283,6 +291,17 @@ class RequestContext {
   /// Current state of the reader
   State state_ = Active;
 
+  /// Scan ranges that have been added to the IO mgr for this context. Ranges can only
+  /// be added when 'state_' is Active. When this context is cancelled, Cancel() is
+  /// called for all the active ranges. If a client attempts to add a range while
+  /// 'state_' is Cancelled, the range is not added to this list and Status::CANCELLED
+  /// is returned to the client. This ensures that all active ranges are cancelled as a
+  /// result of RequestContext cancellation.
+  /// Ranges can be cancelled or hit eos non-atomically with their removal from this set,
+  /// so eos or cancelled ranges may be temporarily present here. Cancelling these ranges
+  /// a second time or cancelling after eos is safe and has no effect.
+  boost::unordered_set<ScanRange*> active_scan_ranges_;
+
   /// The number of disks with scan ranges remaining (always equal to the sum of
   /// disks with ranges).
   int num_disks_with_ranges_ = 0;
@@ -293,18 +312,15 @@ class RequestContext {
   InternalList<ScanRange> cached_ranges_;
 
   /// A list of ranges that should be returned in subsequent calls to
-  /// GetNextRange.
+  /// GetNextUnstartedRange().
   /// There is a trade-off with when to populate this list.  Populating it on
-  /// demand means consumers need to wait (happens in DiskIoMgr::GetNextRange()).
+  /// demand means consumers need to wait (happens in DiskIoMgr::GetNextUnstartedRange()).
   /// Populating it preemptively means we make worse scheduling decisions.
   /// We currently populate one range per disk.
   /// TODO: think about this some more.
   InternalList<ScanRange> ready_to_start_ranges_;
   ConditionVariable ready_to_start_ranges_cv_; // used with lock_
 
-  /// Ranges that are blocked due to back pressure on outgoing buffers.
-  InternalList<ScanRange> blocked_ranges_;
-
   /// Condition variable for UnregisterContext() to wait for all disks to complete
   ConditionVariable disks_complete_cond_var_;
 
@@ -429,7 +445,7 @@ class RequestContext {
 
     /// Queue of pending IO requests for this disk in the order that they will be
     /// processed. A ScanRange is added to this queue when it is returned in
-    /// GetNextRange(), or when it is added with schedule_immediately = true.
+    /// GetNextUnstartedRange(), or when it is added with schedule_mode == IMMEDIATELY.
     /// A WriteRange is added to this queue from unstarted_write_ranges_ for each
     /// invocation of GetNextRequestRange() in WorkLoop().
     /// The size of this queue is always less than or equal to num_remaining_ranges.
@@ -458,7 +474,8 @@ class RequestContext {
     /// unstarted_read_ranges_ and unstarted_write_ranges_ to alternate between reads
     /// and writes. (Otherwise, since next_scan_range_to_start is set
     /// in GetNextRequestRange() whenever it is null, repeated calls to
-    /// GetNextRequestRange() and GetNextRange() may result in only reads being processed)
+    /// GetNextRequestRange() and GetNextUnstartedRange() may result in only reads being
+    /// processed)
     InternalQueue<WriteRange> unstarted_write_ranges_;
   };
 

http://git-wip-us.apache.org/repos/asf/impala/blob/0b6fab73/be/src/runtime/io/request-ranges.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index ab0810a..041cb9d 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -26,8 +26,10 @@
 #include "common/atomic.h"
 #include "common/hdfs.h"
 #include "common/status.h"
+#include "runtime/bufferpool/buffer-pool.h"
 #include "util/condition-variable.h"
 #include "util/internal-queue.h"
+#include "util/mem-range.h"
 
 namespace impala {
 class MemTracker;
@@ -202,11 +204,11 @@ class ScanRange : public RequestRange {
 
   /// Resets this scan range object with the scan range description. The scan range
   /// is for bytes [offset, offset + len) in 'file' on 'fs' (which is nullptr for the
-  /// local filesystem). The scan range must fall within the file bounds (offset >= 0
-  /// and offset + len <= file_length). 'disk_id' is the disk queue to add the range
-  /// to. If 'expected_local' is true, a warning is generated if the read did not
-  /// come from a local disk. 'buffer_opts' specifies buffer management options -
-  /// see the DiskIoMgr class comment and the BufferOpts comments for details.
+  /// local filesystem). The scan range must be non-empty and fall within the file bounds
+  /// (len > 0 and offset >= 0 and offset + len <= file_length). 'disk_id' is the disk
+  /// queue to add the range to. If 'expected_local' is true, a warning is generated if
+  /// the read did not come from a local disk. 'buffer_opts' specifies buffer management
+  /// options - see the DiskIoMgr class comment and the BufferOpts comments for details.
   /// 'meta_data' is an arbitrary client-provided pointer for any auxiliary data.
   void Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset, int disk_id,
       bool expected_local, const BufferOpts& buffer_opts, void* meta_data = nullptr);
@@ -248,12 +250,11 @@ class ScanRange : public RequestRange {
   /// Initialize internal fields
   void InitInternal(DiskIoMgr* io_mgr, RequestContext* reader);
 
-  /// Enqueues a buffer for this range. This does not block.
-  /// Returns true if this scan range has hit the queue capacity, false otherwise.
+  /// Enqueues a ready buffer with valid data for this range. This does not block.
   /// The caller passes ownership of buffer to the scan range and it is not
   /// valid to access buffer after this call. The reader lock must be held by the
-  /// caller.
-  bool EnqueueBuffer(const boost::unique_lock<boost::mutex>& reader_lock,
+  /// caller. Returns false if the scan range was cancelled.
+  bool EnqueueReadyBuffer(const boost::unique_lock<boost::mutex>& reader_lock,
       std::unique_ptr<BufferDescriptor> buffer);
 
   /// Validates the internal state of this range. lock_ must be taken
@@ -304,16 +305,51 @@ class ScanRange : public RequestRange {
   Status ReadFromCache(const boost::unique_lock<boost::mutex>& reader_lock,
       bool* read_succeeded) WARN_UNUSED_RESULT;
 
+  /// Add buffers for the range to read data into and schedule the range if blocked.
+  /// If 'returned' is true, the buffers returned from GetNext() that are being recycled
+  /// via ReturnBuffer(). Otherwise the buffers are newly allocated buffers to be added.
+  void AddUnusedBuffers(
+      std::vector<std::unique_ptr<BufferDescriptor>>&& buffers, bool returned);
+
+  /// Remove a buffer from 'unused_iomgr_buffers_' and update
+  /// 'unused_iomgr_buffer_bytes_'. If 'unused_iomgr_buffers_' is empty, return NULL.
+  /// 'lock_' must be held by the caller via 'scan_range_lock'.
+  std::unique_ptr<BufferDescriptor> GetUnusedBuffer(
+      const boost::unique_lock<boost::mutex>& scan_range_lock);
+
+  /// Get the next buffer for this scan range for a disk thread to read into. Returns
+  /// the new buffer if successful.  If no buffers are available, marks the range
+  /// as blocked and returns nullptr. Called must not hold 'lock_'.
+  std::unique_ptr<BufferDescriptor> GetNextUnusedBufferForRange();
+
   /// Cleans up a buffer that was not returned to the client.
   /// Either ReturnBuffer() or CleanUpBuffer() is called for every BufferDescriptor.
-  /// This function will acquire 'lock_' and may acquire 'hdfs_lock_'.
-  void CleanUpBuffer(std::unique_ptr<BufferDescriptor> buffer);
-
-  /// Same as CleanUpBuffer() except the caller must already hold 'lock_' via
-  /// 'scan_range_lock'.
-  void CleanUpBufferLocked(const boost::unique_lock<boost::mutex>& scan_range_lock,
+  /// The caller must hold 'lock_' via 'scan_range_lock'.
+  /// This function may acquire 'hdfs_lock_'
+  void CleanUpBuffer(const boost::unique_lock<boost::mutex>& scan_range_lock,
       std::unique_ptr<BufferDescriptor> buffer);
 
+  /// Same as CleanUpBuffer() except cleans up multiple buffers and caller must not
+  /// hold 'lock_'.
+  void CleanUpBuffers(std::vector<std::unique_ptr<BufferDescriptor>>&& buffers);
+
+  /// Clean up all buffers in 'unused_iomgr_buffers_'. Only valid to call when the scan
+  /// range is cancelled or at eos. The caller must hold 'lock_' via 'scan_range_lock'.
+  void CleanUpUnusedBuffers(const boost::unique_lock<boost::mutex>& scan_range_lock);
+
+  /// Same as Cancel() except reader_->lock must be held by the caller.
+  void CancelFromReader(const boost::unique_lock<boost::mutex>& reader_lock,
+      const Status& status);
+
+  /// Same as Cancel() except doesn't remove the scan range from
+  /// reader_->active_scan_ranges_. This is invoked by RequestContext::Cancel(),
+  /// which removes the range itself to avoid invalidating its active_scan_ranges_
+  /// iterator.
+  void CancelInternal(const Status& status);
+
+  /// Marks the scan range as blocked waiting for a buffer. Caller must not hold 'lock_'.
+  void SetBlockedOnBuffer();
+
   /// Returns true if no more buffers will be returned to clients in the future,
   /// either because of hitting eosr or cancellation.
   bool all_buffers_returned(const boost::unique_lock<boost::mutex>& lock) const {
@@ -386,6 +422,10 @@ class ScanRange : public RequestRange {
     struct hadoopRzBuffer* cached_buffer_ = nullptr;
   };
 
+  /// The number of buffers that have been returned to a client via GetNext() that have
+  /// not yet been returned with ReturnBuffer().
+  AtomicInt32 num_buffers_in_reader_{0};
+
   /// Lock protecting fields below.
   /// This lock should not be taken during Open()/Read()/Close().
   /// If RequestContext::lock_ and this lock need to be held simultaneously,
@@ -395,18 +435,30 @@ class ScanRange : public RequestRange {
   /// Number of bytes read so far for this scan range
   int bytes_read_;
 
-  /// The number of buffers that have been returned to a client via GetNext() that have
-  /// not yet been returned with ReturnBuffer().
-  int num_buffers_in_reader_ = 0;
+  /// Buffers to read into, used if the 'external_buffer_tag_' is NO_BUFFER. These are
+  /// initially populated when the client calls AllocateBuffersForRange() and
+  /// and are used to read scanned data into. Buffers are taken from this vector for
+  /// every read and added back, if needed, when the client calls ReturnBuffer().
+  std::vector<std::unique_ptr<BufferDescriptor>> unused_iomgr_buffers_;
+
+  /// Total number of bytes of buffers in 'unused_iomgr_buffers_'.
+  int64_t unused_iomgr_buffer_bytes_ = 0;
+
+  /// Number of bytes of buffers returned from GetNextUnusedBufferForRange(). Used to
+  /// infer how many bytes of buffers need to be held onto to read the rest of the scan
+  /// range.
+  int64_t iomgr_buffer_bytes_returned_ = 0;
 
   /// If true, the last buffer for this scan range has been queued.
   /// If this is true and 'ready_buffers_' is empty, then no more buffers will be
   /// returned to the caller by this scan range.
   bool eosr_queued_ = false;
 
-  /// If true, this scan range has been removed from the reader's in_flight_ranges
-  /// queue because the ready_buffers_ queue is full.
-  bool blocked_on_queue_ = false;
+  /// If true, this scan range is not scheduled because a buffer is not available for
+  /// the next I/O in the range. This can happen when the scan range is initially created
+  /// or if the buffers added to the range have all been filled with data an not yet
+  /// returned.
+  bool blocked_on_buffer_ = false;
 
   /// IO buffers that are queued for this scan range. When Cancel() is called
   /// this is drained by the cancelling thread. I.e. this is always empty if

http://git-wip-us.apache.org/repos/asf/impala/blob/0b6fab73/be/src/runtime/io/scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index 1ffba00..0663a2b 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -42,7 +42,7 @@ DEFINE_int64(adls_read_chunk_size, 128 * 1024, "The maximum read chunk size to u
 // any time and only one thread will remove from the queue. This is to guarantee
 // that buffers are queued and read in file order.
 
-bool ScanRange::EnqueueBuffer(
+bool ScanRange::EnqueueReadyBuffer(
     const unique_lock<mutex>& reader_lock, unique_ptr<BufferDescriptor> buffer) {
   DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock());
   DCHECK(buffer->buffer_ != nullptr) << "Cannot enqueue freed buffer";
@@ -52,19 +52,17 @@ bool ScanRange::EnqueueBuffer(
     DCHECK(!eosr_queued_);
     if (!cancel_status_.ok()) {
       // This range has been cancelled, no need to enqueue the buffer.
-      reader_->num_used_buffers_.Add(-1);
-      CleanUpBufferLocked(scan_range_lock, move(buffer));
+      CleanUpBuffer(scan_range_lock, move(buffer));
       return false;
     }
-    reader_->num_ready_buffers_.Add(1);
+    // Clean up any surplus buffers. E.g. we may have allocated too many if the file was
+    // shorter than expected.
+    if (buffer->eosr()) CleanUpUnusedBuffers(scan_range_lock);
     eosr_queued_ = buffer->eosr();
     ready_buffers_.emplace_back(move(buffer));
-
-    DCHECK_LE(ready_buffers_.size(), DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT);
-    blocked_on_queue_ = ready_buffers_.size() == DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT;
   }
   buffer_ready_cv_.NotifyOne();
-  return blocked_on_queue_;
+  return true;
 }
 
 Status ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) {
@@ -84,64 +82,97 @@ Status ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) {
 
     // Remove the first ready buffer from the queue and return it
     DCHECK(!ready_buffers_.empty());
-    DCHECK_LE(ready_buffers_.size(), DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT);
     *buffer = move(ready_buffers_.front());
     ready_buffers_.pop_front();
     eosr = (*buffer)->eosr();
+    DCHECK(!eosr || unused_iomgr_buffers_.empty()) << DebugString();
   }
 
-  // Update tracking counters. The buffer has now moved from the IoMgr to the
-  // caller.
-  reader_->num_ready_buffers_.Add(-1);
-  reader_->num_used_buffers_.Add(-1);
-  if (eosr) reader_->num_finished_ranges_.Add(1);
-
-  unique_lock<mutex> reader_lock(reader_->lock_);
+  // Update tracking counters. The buffer has now moved from the IoMgr to the caller.
+  if (eosr) reader_->RemoveActiveScanRange(this);
+  num_buffers_in_reader_.Add(1);
+  return Status::OK();
+}
 
-  DCHECK(reader_->Validate()) << endl << reader_->DebugString();
-  if (reader_->state_ == RequestContext::Cancelled) {
-    reader_->blocked_ranges_.Remove(this);
-    Cancel(Status::CANCELLED);
-    CleanUpBuffer(move(*buffer));
-    return Status::CANCELLED;
-  }
+void ScanRange::ReturnBuffer(unique_ptr<BufferDescriptor> buffer_desc) {
+  vector<unique_ptr<BufferDescriptor>> buffers;
+  buffers.emplace_back(move(buffer_desc));
+  AddUnusedBuffers(move(buffers), true);
+}
 
-  // At this point success is guaranteed so increment counters for returned buffers.
-  reader_->num_buffers_in_reader_.Add(1);
+void ScanRange::AddUnusedBuffers(vector<unique_ptr<BufferDescriptor>>&& buffers,
+      bool returned) {
+  DCHECK_GT(buffers.size(), 0);
+  /// Keep track of whether the range was unblocked in this function. If so, we need
+  /// to schedule it so it resumes progress.
+  bool unblocked = false;
   {
-    // Check to see if we can re-schedule a blocked range. Note that EnqueueBuffer()
-    // may have been called after we released 'lock_' above so we need to re-check
-    // whether the queue is full.
     unique_lock<mutex> scan_range_lock(lock_);
-    ++num_buffers_in_reader_;
-    if (blocked_on_queue_
-        && ready_buffers_.size() < DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT
-        && !eosr_queued_) {
-      blocked_on_queue_ = false;
-      // This scan range was blocked and is no longer, add it to the reader
-      // queue again.
-      reader_->blocked_ranges_.Remove(this);
+    if (returned) {
+      // Buffers were in reader but now aren't.
+      num_buffers_in_reader_.Add(-buffers.size());
+    }
+
+    for (unique_ptr<BufferDescriptor>& buffer : buffers) {
+      // We should not hold onto the buffers in the following cases:
+      // 1. the scan range is using external buffers, e.g. cached buffers.
+      // 2. the scan range is cancelled
+      // 3. the scan range already hit eosr
+      // 4. we already have enough buffers to read the remainder of the scan range.
+      if (external_buffer_tag_ != ExternalBufferTag::NO_BUFFER
+          || !cancel_status_.ok()
+          || eosr_queued_
+          || unused_iomgr_buffer_bytes_ >= len_ - iomgr_buffer_bytes_returned_) {
+        CleanUpBuffer(scan_range_lock, move(buffer));
+      } else {
+        unused_iomgr_buffer_bytes_ += buffer->buffer_len();
+        unused_iomgr_buffers_.emplace_back(move(buffer));
+        if (blocked_on_buffer_) {
+          blocked_on_buffer_ = false;
+          unblocked = true;
+        }
+      }
+    }
+  }
+  // Must drop the ScanRange lock before acquiring the RequestContext lock.
+  if (unblocked) {
+    unique_lock<mutex> reader_lock(reader_->lock_);
+    // Reader may have been cancelled after we dropped 'scan_range_lock' above.
+    if (reader_->state_ == RequestContext::Cancelled) {
+      DCHECK(!cancel_status_.ok());
+    } else {
       reader_->ScheduleScanRange(reader_lock, this);
     }
   }
-  return Status::OK();
 }
 
-void ScanRange::ReturnBuffer(unique_ptr<BufferDescriptor> buffer_desc) {
-  reader_->num_buffers_in_reader_.Add(-1);
-  {
-    unique_lock<mutex> scan_range_lock(lock_);
-    --num_buffers_in_reader_;
-    CleanUpBufferLocked(scan_range_lock, move(buffer_desc));
+unique_ptr<BufferDescriptor> ScanRange::GetUnusedBuffer(
+    const unique_lock<mutex>& scan_range_lock) {
+  DCHECK(scan_range_lock.mutex() == &lock_ && scan_range_lock.owns_lock());
+  if (unused_iomgr_buffers_.empty()) return nullptr;
+  unique_ptr<BufferDescriptor> result = move(unused_iomgr_buffers_.back());
+  unused_iomgr_buffers_.pop_back();
+  unused_iomgr_buffer_bytes_ -= result->buffer_len();
+  return result;
+}
+
+unique_ptr<BufferDescriptor> ScanRange::GetNextUnusedBufferForRange() {
+  unique_lock<mutex> lock(lock_);
+  unique_ptr<BufferDescriptor> buffer_desc = GetUnusedBuffer(lock);
+  if (buffer_desc == nullptr) {
+    blocked_on_buffer_ = true;
+  } else {
+    iomgr_buffer_bytes_returned_ += buffer_desc->buffer_len();
   }
+  return buffer_desc;
 }
 
-void ScanRange::CleanUpBuffer(unique_ptr<BufferDescriptor> buffer_desc) {
-  unique_lock<mutex> scan_range_lock(lock_);
-  CleanUpBufferLocked(scan_range_lock, move(buffer_desc));
+void ScanRange::SetBlockedOnBuffer() {
+  unique_lock<mutex> lock(lock_);
+  blocked_on_buffer_ = true;
 }
 
-void ScanRange::CleanUpBufferLocked(
+void ScanRange::CleanUpBuffer(
     const boost::unique_lock<boost::mutex>& scan_range_lock,
     unique_ptr<BufferDescriptor> buffer_desc) {
   DCHECK(scan_range_lock.mutex() == &lock_ && scan_range_lock.owns_lock());
@@ -149,7 +180,7 @@ void ScanRange::CleanUpBufferLocked(
   DCHECK_EQ(this, buffer_desc->scan_range_);
   buffer_desc->reader_->FreeBuffer(buffer_desc.get());
 
-  if (all_buffers_returned(scan_range_lock) && num_buffers_in_reader_ == 0) {
+  if (all_buffers_returned(scan_range_lock) && num_buffers_in_reader_.Load() == 0) {
     // Close the scan range if there are no more buffers in the reader and no more buffers
     // will be returned to readers in future. Close() is idempotent so it is ok to call
     // multiple times during cleanup so long as the range is actually finished.
@@ -157,10 +188,33 @@ void ScanRange::CleanUpBufferLocked(
   }
 }
 
+void ScanRange::CleanUpBuffers(vector<unique_ptr<BufferDescriptor>>&& buffers) {
+  unique_lock<mutex> lock(lock_);
+  for (unique_ptr<BufferDescriptor>& buffer : buffers) CleanUpBuffer(lock, move(buffer));
+}
+
+void ScanRange::CleanUpUnusedBuffers(const unique_lock<mutex>& scan_range_lock) {
+  while (!unused_iomgr_buffers_.empty()) {
+    CleanUpBuffer(scan_range_lock, GetUnusedBuffer(scan_range_lock));
+  }
+}
+
 void ScanRange::Cancel(const Status& status) {
   // Cancelling a range that was never started, ignore.
   if (io_mgr_ == nullptr) return;
+  CancelInternal(status);
+  reader_->RemoveActiveScanRange(this);
+}
+
+void ScanRange::CancelFromReader(const boost::unique_lock<boost::mutex>& reader_lock,
+    const Status& status) {
+  DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock());
+  CancelInternal(status);
+  reader_->RemoveActiveScanRangeLocked(reader_lock, this);
+}
 
+void ScanRange::CancelInternal(const Status& status) {
+  DCHECK(io_mgr_ != nullptr);
   DCHECK(!status.ok());
   {
     // Grab both locks to make sure that we don't change 'cancel_status_' while other
@@ -177,12 +231,13 @@ void ScanRange::Cancel(const Status& status) {
 
     /// Clean up 'ready_buffers_' while still holding 'lock_' to prevent other threads
     /// from seeing inconsistent state.
-    reader_->num_used_buffers_.Add(-ready_buffers_.size());
-    reader_->num_ready_buffers_.Add(-ready_buffers_.size());
     while (!ready_buffers_.empty()) {
-      CleanUpBufferLocked(scan_range_lock, move(ready_buffers_.front()));
+      CleanUpBuffer(scan_range_lock, move(ready_buffers_.front()));
       ready_buffers_.pop_front();
     }
+
+    /// Clean up buffers that we don't need any more because we won't read any more data.
+    CleanUpUnusedBuffers(scan_range_lock);
   }
   buffer_ready_cv_.NotifyAll();
 
@@ -197,6 +252,10 @@ string ScanRange::DebugString() const {
      << " len=" << len_ << " bytes_read=" << bytes_read_
      << " cancel_status=" << cancel_status_.GetDetail()
      << " buffer_queue=" << ready_buffers_.size()
+     << " num_buffers_in_readers=" << num_buffers_in_reader_.Load()
+     << " unused_iomgr_buffers=" << unused_iomgr_buffers_.size()
+     << " unused_iomgr_buffer_bytes=" << unused_iomgr_buffer_bytes_
+     << " blocked_on_buffer=" << blocked_on_buffer_
      << " hdfs_file=" << exclusive_hdfs_fh_;
   return ss.str();
 }
@@ -211,6 +270,27 @@ bool ScanRange::Validate() {
     LOG(ERROR) << "Cancelled range should not have queued buffers " << DebugString();
     return false;
   }
+  int64_t unused_iomgr_buffer_bytes = 0;
+  for (auto& buffer : unused_iomgr_buffers_)
+    unused_iomgr_buffer_bytes += buffer->buffer_len();
+  if (unused_iomgr_buffer_bytes != unused_iomgr_buffer_bytes_) {
+    LOG(ERROR) << "unused_iomgr_buffer_bytes_ incorrect actual: "
+               << unused_iomgr_buffer_bytes_
+               << " vs. expected: " << unused_iomgr_buffer_bytes;
+    return false;
+  }
+  bool is_finished = !cancel_status_.ok() || eosr_queued_;
+  if (is_finished && !unused_iomgr_buffers_.empty()) {
+    LOG(ERROR) << "Held onto too many buffers " << unused_iomgr_buffers_.size()
+               << " bytes: " << unused_iomgr_buffer_bytes_
+               << " cancel_status: " << cancel_status_.GetDetail()
+               << " eosr_queued: " << eosr_queued_;
+    return false;
+  }
+  if (!is_finished && blocked_on_buffer_ && !unused_iomgr_buffers_.empty()) {
+    LOG(ERROR) << "Blocked despite having buffers: " << DebugString();
+    return false;
+  }
   return true;
 }
 
@@ -224,7 +304,7 @@ ScanRange::~ScanRange() {
   DCHECK(external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER)
       << "Cached buffer was not released.";
   DCHECK_EQ(0, ready_buffers_.size());
-  DCHECK_EQ(0, num_buffers_in_reader_);
+  DCHECK_EQ(0, num_buffers_in_reader_.Load());
 }
 
 void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
@@ -268,9 +348,11 @@ void ScanRange::InitInternal(DiskIoMgr* io_mgr, RequestContext* reader) {
   local_file_ = nullptr;
   exclusive_hdfs_fh_ = nullptr;
   bytes_read_ = 0;
+  unused_iomgr_buffer_bytes_ = 0;
+  iomgr_buffer_bytes_returned_ = 0;
   cancel_status_ = Status::OK();
-  eosr_queued_= false;
-  blocked_on_queue_ = false;
+  eosr_queued_ = false;
+  blocked_on_buffer_ = false;
   DCHECK(Validate()) << DebugString();
 }
 
@@ -316,9 +398,7 @@ Status ScanRange::Open(bool use_file_handle_cache) {
           "for file: $1: $2", offset_, file_, GetStrErrMsg()));
     }
   }
-  if (ImpaladMetrics::IO_MGR_NUM_OPEN_FILES != nullptr) {
-    ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(1L);
-  }
+  ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(1L);
   return Status::OK();
 }
 
@@ -370,9 +450,7 @@ void ScanRange::Close() {
     local_file_ = nullptr;
     closed_file = true;
   }
-  if (closed_file && ImpaladMetrics::IO_MGR_NUM_OPEN_FILES != nullptr) {
-    ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(-1L);
-  }
+  if (closed_file) ImpaladMetrics::IO_MGR_NUM_OPEN_FILES->Increment(-1L);
 }
 
 int64_t ScanRange::MaxReadChunkSize() const {
@@ -580,12 +658,9 @@ Status ScanRange::ReadFromCache(
   desc->scan_range_offset_ = 0;
   desc->eosr_ = true;
   bytes_read_ = bytes_read;
-  EnqueueBuffer(reader_lock, move(desc));
-  if (reader_->bytes_read_counter_ != nullptr) {
-    COUNTER_ADD(reader_->bytes_read_counter_, bytes_read);
-  }
+  EnqueueReadyBuffer(reader_lock, move(desc));
+  COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, bytes_read);
   *read_succeeded = true;
-  reader_->num_used_buffers_.Add(1);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/impala/blob/0b6fab73/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 7b00179..3a69c33 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -396,7 +396,10 @@ Status TmpFileMgr::FileGroup::ReadAsync(WriteHandle* handle, MemRange buffer) {
       BufferOpts::ReadInto(buffer.data(), buffer.len()));
   read_counter_->Add(1);
   bytes_read_counter_->Add(buffer.len());
-  RETURN_IF_ERROR(io_mgr_->AddScanRange(io_ctx_.get(), handle->read_range_, true));
+  bool needs_buffers;
+  RETURN_IF_ERROR(io_mgr_->StartScanRange(
+      io_ctx_.get(), handle->read_range_, &needs_buffers));
+  DCHECK(!needs_buffers) << "Already provided a buffer";
   return Status::OK();
 }
 
@@ -522,11 +525,20 @@ TmpFileMgr::WriteHandle::WriteHandle(
     is_cancelled_(false),
     write_in_flight_(false) {}
 
+TmpFileMgr::WriteHandle::~WriteHandle() {
+  DCHECK(!write_in_flight_);
+  DCHECK(read_range_ == nullptr);
+}
+
 string TmpFileMgr::WriteHandle::TmpFilePath() const {
   if (file_ == nullptr) return "";
   return file_->path();
 }
 
+int64_t TmpFileMgr::WriteHandle::len() const {
+  return write_range_->len();
+}
+
 Status TmpFileMgr::WriteHandle::Write(DiskIoMgr* io_mgr, RequestContext* io_ctx,
     File* file, int64_t offset, MemRange buffer,
     WriteRange::WriteDoneCallback callback) {

http://git-wip-us.apache.org/repos/asf/impala/blob/0b6fab73/be/src/runtime/tmp-file-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index 95072ae..55901f4 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -28,7 +28,6 @@
 #include "common/object-pool.h"
 #include "common/status.h"
 #include "gen-cpp/Types_types.h" // for TUniqueId
-#include "runtime/io/request-ranges.h"
 #include "util/collection-metrics.h"
 #include "util/condition-variable.h"
 #include "util/mem-range.h"
@@ -37,6 +36,12 @@
 #include "util/spinlock.h"
 
 namespace impala {
+namespace io {
+  class DiskIoMgr;
+  class RequestContext;
+  class ScanRange;
+  class WriteRange;
+}
 
 /// TmpFileMgr provides an abstraction for management of temporary (a.k.a. scratch) files
 /// on the filesystem and I/O to and from them. TmpFileMgr manages multiple scratch
@@ -84,6 +89,7 @@ class TmpFileMgr {
   /// Needs to be public for TmpFileMgrTest.
   typedef int DeviceId;
 
+  /// Same typedef as io::WriteRange::WriteDoneCallback.
   typedef std::function<void(const Status&)> WriteDoneCallback;
 
   /// Represents a group of temporary files - one per disk with a scratch directory. The
@@ -277,10 +283,7 @@ class TmpFileMgr {
    public:
     /// The write must be destroyed by passing it to FileGroup - destroying it before
     /// the write completes is an error.
-    ~WriteHandle() {
-      DCHECK(!write_in_flight_);
-      DCHECK(read_range_ == nullptr);
-    }
+    ~WriteHandle();
 
     /// Cancel any in-flight read synchronously.
     void CancelRead();
@@ -290,7 +293,7 @@ class TmpFileMgr {
     std::string TmpFilePath() const;
 
     /// The length of the write range in bytes.
-    int64_t len() const { return write_range_->len(); }
+    int64_t len() const;
 
     std::string DebugString();
 
@@ -305,7 +308,7 @@ class TmpFileMgr {
     /// failure and 'is_cancelled_' is set to true on failure.
     Status Write(io::DiskIoMgr* io_mgr, io::RequestContext* io_ctx, File* file,
         int64_t offset, MemRange buffer,
-        io::WriteRange::WriteDoneCallback callback) WARN_UNUSED_RESULT;
+        WriteDoneCallback callback) WARN_UNUSED_RESULT;
 
     /// Retry the write after the initial write failed with an error, instead writing to
     /// 'offset' of 'file'. 'write_in_flight_' must be true before calling.

http://git-wip-us.apache.org/repos/asf/impala/blob/0b6fab73/be/src/util/bit-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/util/bit-util-test.cc b/be/src/util/bit-util-test.cc
index 5f8d443..6f70727 100644
--- a/be/src/util/bit-util-test.cc
+++ b/be/src/util/bit-util-test.cc
@@ -258,6 +258,17 @@ TEST(BitUtil, Log2) {
   EXPECT_EQ(BitUtil::Log2CeilingNonZero64(ULLONG_MAX), 64);
 }
 
+TEST(BitUtil, RoundToPowerOfTwo) {
+  EXPECT_EQ(16, BitUtil::RoundUpToPowerOfTwo(9));
+  EXPECT_EQ(16, BitUtil::RoundUpToPowerOfTwo(15));
+  EXPECT_EQ(16, BitUtil::RoundUpToPowerOfTwo(16));
+  EXPECT_EQ(32, BitUtil::RoundUpToPowerOfTwo(17));
+  EXPECT_EQ(8, BitUtil::RoundDownToPowerOfTwo(9));
+  EXPECT_EQ(8, BitUtil::RoundDownToPowerOfTwo(15));
+  EXPECT_EQ(16, BitUtil::RoundDownToPowerOfTwo(16));
+  EXPECT_EQ(16, BitUtil::RoundDownToPowerOfTwo(17));
+}
+
 TEST(BitUtil, RoundUpToPowerOf2) {
   EXPECT_EQ(BitUtil::RoundUpToPowerOf2(7, 8), 8);
   EXPECT_EQ(BitUtil::RoundUpToPowerOf2(8, 8), 8);

http://git-wip-us.apache.org/repos/asf/impala/blob/0b6fab73/be/src/util/bit-util.h
----------------------------------------------------------------------
diff --git a/be/src/util/bit-util.h b/be/src/util/bit-util.h
index 5c9a29b..8a65509 100644
--- a/be/src/util/bit-util.h
+++ b/be/src/util/bit-util.h
@@ -98,6 +98,12 @@ class BitUtil {
     return v;
   }
 
+  /// Returns the largest power of two <= v.
+  static inline int64_t RoundDownToPowerOfTwo(int64_t v) {
+    int64_t v_rounded_up = RoundUpToPowerOfTwo(v);
+    return v_rounded_up == v ? v : v_rounded_up / 2;
+  }
+
   /// Returns 'value' rounded up to the nearest multiple of 'factor' when factor is
   /// a power of two
   static inline int64_t RoundUpToPowerOf2(int64_t value, int64_t factor) {
@@ -105,7 +111,7 @@ class BitUtil {
     return (value + (factor - 1)) & ~(factor - 1);
   }
 
-  static inline int RoundDownToPowerOf2(int value, int factor) {
+  static inline int64_t RoundDownToPowerOf2(int64_t value, int64_t factor) {
     DCHECK((factor > 0) && ((factor & (factor - 1)) == 0));
     return value & ~(factor - 1);
   }