You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/02/23 22:51:34 UTC

[07/15] impala git commit: IMPALA-4835: Part 1: simplify I/O mgr mem mgmt and cancellation

http://git-wip-us.apache.org/repos/asf/impala/blob/3b3bf871/be/src/runtime/io/request-context.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.h b/be/src/runtime/io/request-context.h
index 9807805..737a16c 100644
--- a/be/src/runtime/io/request-context.h
+++ b/be/src/runtime/io/request-context.h
@@ -24,9 +24,7 @@
 namespace impala {
 namespace io {
 /// A request context is used to group together I/O requests belonging to a client of the
-/// I/O manager for management and scheduling. For most I/O manager clients it is an
-/// opaque pointer, but some clients may need to include this header, e.g. to make the
-/// unique_ptr<DiskIoRequestContext> destructor work correctly.
+/// I/O manager for management and scheduling.
 ///
 /// Implementation Details
 /// ======================
@@ -83,7 +81,51 @@ namespace io {
 /// behind at most one write range.
 class RequestContext {
  public:
-  ~RequestContext() { DCHECK_EQ(state_, Inactive) << "Must be unregistered."; }
+  ~RequestContext() {
+    DCHECK_EQ(state_, Inactive) << "Must be unregistered. " << DebugString();
+  }
+
+  /// Cancel the context asynchronously. All outstanding requests are cancelled
+  /// asynchronously. This does not need to be called if the context finishes normally.
+  /// Calling GetNext() on any scan ranges belonging to this RequestContext will return
+  /// CANCELLED (or another error, if an error was encountered for that scan range before
+  /// it is cancelled).
+  void Cancel();
+
+  bool IsCancelled() {
+    boost::unique_lock<boost::mutex> lock(lock_);
+    return state_ == Cancelled;
+  }
+
+  int64_t queue_size() const { return num_ready_buffers_.Load(); }
+  int64_t bytes_read_local() const { return bytes_read_local_.Load(); }
+  int64_t bytes_read_short_circuit() const { return bytes_read_short_circuit_.Load(); }
+  int64_t bytes_read_dn_cache() const { return bytes_read_dn_cache_.Load(); }
+  int num_remote_ranges() const { return num_remote_ranges_.Load(); }
+  int64_t unexpected_remote_bytes() const { return unexpected_remote_bytes_.Load(); }
+
+  int cached_file_handles_hit_count() const {
+    return cached_file_handles_hit_count_.Load();
+  }
+
+  int cached_file_handles_miss_count() const {
+    return cached_file_handles_miss_count_.Load();
+  }
+
+  void set_bytes_read_counter(RuntimeProfile::Counter* bytes_read_counter) {
+    bytes_read_counter_ = bytes_read_counter;
+  }
+
+  void set_read_timer(RuntimeProfile::Counter* read_timer) { read_timer_ = read_timer; }
+
+  void set_active_read_thread_counter(
+      RuntimeProfile::Counter* active_read_thread_counter) {
+   active_read_thread_counter_ = active_read_thread_counter;
+  }
+
+  void set_disks_accessed_bitmap(RuntimeProfile::Counter* disks_accessed_bitmap) {
+    disks_accessed_bitmap_ = disks_accessed_bitmap;
+  }
 
  private:
   DISALLOW_COPY_AND_ASSIGN(RequestContext);
@@ -108,11 +150,24 @@ class RequestContext {
 
   RequestContext(DiskIoMgr* parent, int num_disks, MemTracker* tracker);
 
+  /// Allocates a buffer to read into with size between
+  /// max('buffer_size', 'min_buffer_size_') and 'max_buffer_size_'.
+  /// Does not acquire 'lock_'.
+  /// TODO: allocate using the buffer pool client associated with this reader.
+  Status AllocBuffer(ScanRange* range, int64_t buffer_size,
+      std::unique_ptr<BufferDescriptor>* buffer);
+
+  /// Cleans up a buffer. If the buffer was allocated with AllocBuffer(), frees the buffer
+  /// memory and release the consumption to the client MemTracker. Otherwise (e.g. a
+  /// client or HDFS cache buffer), just prepares the descriptor to be destroyed.
+  /// After this is called, buffer->buffer() is NULL. Does not acquire 'lock_'.
+  void FreeBuffer(BufferDescriptor* buffer);
+
   /// Decrements the number of active disks for this reader.  If the disk count
   /// goes to 0, the disk complete condition variable is signaled.
-  /// Reader lock must be taken before this call.
-  void DecrementDiskRefCount() {
-    // boost doesn't let us dcheck that the reader lock is taken
+  /// 'lock_' must be held via 'lock'.
+  void DecrementDiskRefCount(const boost::unique_lock<boost::mutex>& lock) {
+    DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
     DCHECK_GT(num_disks_with_ranges_, 0);
     if (--num_disks_with_ranges_ == 0) {
       disks_complete_cond_var_.NotifyAll();
@@ -129,25 +184,25 @@ class RequestContext {
 
   /// Adds range to in_flight_ranges, scheduling this reader on the disk threads
   /// if necessary.
-  /// Reader lock must be taken before this.
-  void ScheduleScanRange(ScanRange* range) {
+  /// 'lock_' must be held via 'lock'
+  void ScheduleScanRange(const boost::unique_lock<boost::mutex>& lock, ScanRange* range) {
+    DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
     DCHECK_EQ(state_, Active);
     DCHECK(range != NULL);
     RequestContext::PerDiskState& state = disk_states_[range->disk_id()];
     state.in_flight_ranges()->Enqueue(range);
-    state.ScheduleContext(this, range->disk_id());
+    state.ScheduleContext(lock, this, range->disk_id());
   }
 
-  /// Cancels the context with status code 'status'
-  void Cancel(const Status& status);
-
   /// Cancel the context if not already cancelled, wait for all scan ranges to finish
   /// and mark the context as inactive, after which it cannot be used.
   void CancelAndMarkInactive();
 
   /// Adds request range to disk queue for this request context. Currently,
   /// schedule_immediately must be false is RequestRange is a write range.
-  void AddRequestRange(RequestRange* range, bool schedule_immediately);
+  /// Caller must hold 'lock_' via 'lock'.
+  void AddRequestRange(const boost::unique_lock<boost::mutex>& lock,
+      RequestRange* range, bool schedule_immediately);
 
   /// Validates invariants of reader.  Reader lock must be taken beforehand.
   bool Validate() const;
@@ -159,6 +214,7 @@ class RequestContext {
   DiskIoMgr* const parent_;
 
   /// Memory used for this reader.  This is unowned by this object.
+  /// TODO: replace with bp client
   MemTracker* const mem_tracker_;
 
   /// Total bytes read for this reader
@@ -187,7 +243,7 @@ class RequestContext {
   /// Total number of bytes from remote reads that were expected to be local.
   AtomicInt64 unexpected_remote_bytes_{0};
 
-  /// The number of buffers that have been returned to the reader (via GetNext) that the
+  /// The number of buffers that have been returned to the reader (via GetNext()) that the
   /// reader has not returned. Only included for debugging and diagnostics.
   AtomicInt32 num_buffers_in_reader_{0};
 
@@ -227,9 +283,6 @@ class RequestContext {
   /// Current state of the reader
   State state_ = Active;
 
-  /// Status of this reader.  Set to non-ok if cancelled.
-  Status status_;
-
   /// The number of disks with scan ranges remaining (always equal to the sum of
   /// disks with ranges).
   int num_disks_with_ranges_ = 0;
@@ -237,7 +290,7 @@ class RequestContext {
   /// This is the list of ranges that are expected to be cached on the DN.
   /// When the reader asks for a new range (GetNextScanRange()), we first
   /// return ranges from this list.
-  InternalQueue<ScanRange> cached_ranges_;
+  InternalList<ScanRange> cached_ranges_;
 
   /// A list of ranges that should be returned in subsequent calls to
   /// GetNextRange.
@@ -246,11 +299,11 @@ class RequestContext {
   /// Populating it preemptively means we make worse scheduling decisions.
   /// We currently populate one range per disk.
   /// TODO: think about this some more.
-  InternalQueue<ScanRange> ready_to_start_ranges_;
+  InternalList<ScanRange> ready_to_start_ranges_;
   ConditionVariable ready_to_start_ranges_cv_; // used with lock_
 
   /// Ranges that are blocked due to back pressure on outgoing buffers.
-  InternalQueue<ScanRange> blocked_ranges_;
+  InternalList<ScanRange> blocked_ranges_;
 
   /// Condition variable for UnregisterContext() to wait for all disks to complete
   ConditionVariable disks_complete_cond_var_;
@@ -270,21 +323,9 @@ class RequestContext {
       next_scan_range_to_start_ = range;
     }
 
-    /// We need to have a memory barrier to prevent this load from being reordered
-    /// with num_threads_in_op(), since these variables are set without the reader
-    /// lock taken
-    bool is_on_queue() const {
-      bool b = is_on_queue_;
-      __sync_synchronize();
-      return b;
-    }
+    bool is_on_queue() const { return is_on_queue_.Load() != 0; }
 
-    int num_threads_in_op() const {
-      int v = num_threads_in_op_.Load();
-      // TODO: determine whether this barrier is necessary for any callsites.
-      AtomicUtil::MemoryBarrier();
-      return v;
-    }
+    int num_threads_in_op() const { return num_threads_in_op_.Load(); }
 
     const InternalQueue<ScanRange>* unstarted_scan_ranges() const {
       return &unstarted_scan_ranges_;
@@ -303,26 +344,41 @@ class RequestContext {
     InternalQueue<RequestRange>* in_flight_ranges() { return &in_flight_ranges_; }
 
     /// Schedules the request context on this disk if it's not already on the queue.
-    /// Context lock must be taken before this.
-    void ScheduleContext(RequestContext* context, int disk_id);
-
-    /// Increment the ref count on reader.  We need to track the number of threads per
-    /// reader per disk that are in the unlocked hdfs read code section. This is updated
-    /// by multiple threads without a lock so we need to use an atomic int.
-    void IncrementRequestThreadAndDequeue() {
+    /// context->lock_ must be held by the caller via 'context_lock'.
+    void ScheduleContext(const boost::unique_lock<boost::mutex>& context_lock,
+        RequestContext* context, int disk_id);
+
+    /// Increment the count of disk threads that have a reference to this context. These
+    /// threads do not hold any locks while reading from HDFS, so we need to prevent the
+    /// RequestContext from being destroyed underneath them.
+    ///
+    /// The caller does not need to hold 'lock_', so this can execute concurrently with
+    /// itself and DecrementDiskThread().
+    void IncrementDiskThreadAndDequeue() {
+      /// Incrementing 'num_threads_in_op_' first so that there is no window when other
+      /// threads see 'is_on_queue_ == num_threads_in_op_ == 0' and think there are no
+      /// references left to this context.
       num_threads_in_op_.Add(1);
-      is_on_queue_ = false;
+      is_on_queue_.Store(0);
     }
 
-    void DecrementRequestThread() { num_threads_in_op_.Add(-1); }
-
-    /// Decrement request thread count and do final cleanup if this is the last
-    /// thread. RequestContext lock must be taken before this.
-    void DecrementRequestThreadAndCheckDone(RequestContext* context) {
-      num_threads_in_op_.Add(-1); // Also acts as a barrier.
-      if (!is_on_queue_ && num_threads_in_op_.Load() == 0 && !done_) {
-        // This thread is the last one for this reader on this disk, do final cleanup
-        context->DecrementDiskRefCount();
+    /// Decrement the count of disks threads with a reference to this context. Does final
+    /// cleanup if the context is cancelled and this is the last thread for the disk.
+    /// context->lock_ must be held by the caller via 'context_lock'.
+    void DecrementDiskThread(const boost::unique_lock<boost::mutex>& context_lock,
+        RequestContext* context) {
+      DCHECK(context_lock.mutex() == &context->lock_ && context_lock.owns_lock());
+      num_threads_in_op_.Add(-1);
+
+      if (context->state_ != Cancelled) {
+        DCHECK_EQ(context->state_, Active);
+        return;
+      }
+      // The state is cancelled, check to see if we're the last thread to touch the
+      // context on this disk. We need to load 'is_on_queue_' and 'num_threads_in_op_'
+      // in this order to avoid a race with IncrementDiskThreadAndDequeue().
+      if (is_on_queue_.Load() == 0 && num_threads_in_op_.Load() == 0 && !done_) {
+        context->DecrementDiskRefCount(context_lock);
         done_ = true;
       }
     }
@@ -335,7 +391,12 @@ class RequestContext {
     bool done_ = true;
 
     /// For each disk, keeps track if the context is on this disk's queue, indicating
-    /// the disk must do some work for this context. The disk needs to do work in 4 cases:
+    /// the disk must do some work for this context. 1 means that the context is on the
+    /// disk queue, 0 means that it's not on the queue (either because it has on ranges
+    /// active for the disk or because a disk thread dequeued the context and is
+    /// currently processing a request).
+    ///
+    /// The disk needs to do work in 4 cases:
     ///  1) in_flight_ranges is not empty, the disk needs to read for this reader.
     ///  2) next_range_to_start is NULL, the disk needs to prepare a scan range to be
     ///     read next.
@@ -346,7 +407,15 @@ class RequestContext {
     /// useful that can be done. If there's nothing useful, the disk queue will wake up
     /// and then remove the reader from the queue. Doing this causes thrashing of the
     /// threads.
-    bool is_on_queue_ = false;
+    ///
+    /// This variable is important during context cancellation because it indicates
+    /// whether a queue has a reference to the context that must be released before
+    /// the context is considered unregistered. Atomically set to false after
+    /// incrementing 'num_threads_in_op_' when dequeueing so that there is no window
+    /// when other threads see 'is_on_queue_ == num_threads_in_op_ == 0' and think there
+    /// are no references left to this context.
+    /// TODO: this could be combined with 'num_threads_in_op_' to be a single refcount.
+    AtomicInt32 is_on_queue_{0};
 
     /// For each disks, the number of request ranges that have not been fully read.
     /// In the non-cancellation path, this will hit 0, and done will be set to true
@@ -376,11 +445,11 @@ class RequestContext {
     /// range to ready_to_start_ranges_.
     ScanRange* next_scan_range_to_start_ = nullptr;
 
-    /// For each disk, the number of threads issuing the underlying read/write on behalf
-    /// of this context. There are a few places where we release the context lock, do some
-    /// work, and then grab the lock again.  Because we don't hold the lock for the
-    /// entire operation, we need this ref count to keep track of which thread should do
-    /// final resource cleanup during cancellation.
+    /// For each disk, the number of disk threads issuing the underlying read/write on
+    /// behalf of this context. There are a few places where we release the context lock,
+    /// do some work, and then grab the lock again.  Because we don't hold the lock for
+    /// the entire operation, we need this ref count to keep track of which thread should
+    /// do final resource cleanup during cancellation.
     /// Only the thread that sees the count at 0 should do the final cleanup.
     AtomicInt32 num_threads_in_op_{0};
 

http://git-wip-us.apache.org/repos/asf/impala/blob/3b3bf871/be/src/runtime/io/request-ranges.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index 222f847..ab0810a 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -23,6 +23,7 @@
 
 #include <boost/thread/mutex.hpp>
 
+#include "common/atomic.h"
 #include "common/hdfs.h"
 #include "common/status.h"
 #include "util/condition-variable.h"
@@ -55,14 +56,6 @@ class BufferDescriptor {
   /// Returns the offset within the scan range that this buffer starts at
   int64_t scan_range_offset() const { return scan_range_offset_; }
 
-  /// Transfer ownership of buffer memory from 'mem_tracker_' to 'dst' and set
-  /// 'mem_tracker_' to 'dst'. 'mem_tracker_' and 'dst' must be non-NULL. Does not
-  /// check memory limits on 'dst': the caller should check the memory limit if a
-  /// different memory limit may apply to 'dst'. If the buffer was a client-provided
-  /// buffer, transferring is not allowed.
-  /// TODO: IMPALA-3209: revisit this as part of scanner memory usage revamp.
-  void TransferOwnership(MemTracker* dst);
-
  private:
   friend class DiskIoMgr;
   friend class ScanRange;
@@ -71,8 +64,7 @@ class BufferDescriptor {
   /// Create a buffer descriptor for a new reader, range and data buffer. The buffer
   /// memory should already be accounted against 'mem_tracker'.
   BufferDescriptor(DiskIoMgr* io_mgr, RequestContext* reader,
-      ScanRange* scan_range, uint8_t* buffer, int64_t buffer_len,
-      MemTracker* mem_tracker);
+      ScanRange* scan_range, uint8_t* buffer, int64_t buffer_len);
 
   /// Return true if this is a cached buffer owned by HDFS.
   bool is_cached() const;
@@ -86,14 +78,11 @@ class BufferDescriptor {
   /// Reader that this buffer is for.
   RequestContext* const reader_;
 
-  /// The current tracker this buffer is associated with. After initialisation,
-  /// NULL for cached buffers and non-NULL for all other buffers.
-  MemTracker* mem_tracker_;
-
   /// Scan range that this buffer is for. Non-NULL when initialised.
   ScanRange* const scan_range_;
 
-  /// buffer with the read contents
+  /// Buffer for the read contents. Must be set to NULL in RequestContext::FreeBuffer()
+  /// before destruction of the descriptor.
   uint8_t* buffer_;
 
   /// length of buffer_. For buffers from cached reads, the length is 0.
@@ -105,9 +94,6 @@ class BufferDescriptor {
   /// true if the current scan range is complete
   bool eosr_ = false;
 
-  /// Status of the read to this buffer. if status is not ok, 'buffer' is nullptr
-  Status status_;
-
   int64_t scan_range_offset_ = 0;
 };
 
@@ -236,10 +222,17 @@ class ScanRange : public RequestRange {
   /// Only one thread can be in GetNext() at any time.
   Status GetNext(std::unique_ptr<BufferDescriptor>* buffer) WARN_UNUSED_RESULT;
 
-  /// Cancel this scan range. This cleans up all queued buffers and
-  /// wakes up any threads blocked on GetNext().
-  /// Status is the reason the range was cancelled. Must not be ok().
-  /// Status is returned to the user in GetNext().
+  /// Returns the buffer to the scan range. This must be called for every buffer
+  /// returned by GetNext(). After calling this, the buffer descriptor is invalid
+  /// and cannot be accessed.
+  void ReturnBuffer(std::unique_ptr<BufferDescriptor> buffer);
+
+  /// Cancel this scan range. This cleans up all queued buffers and wakes up any threads
+  /// blocked on GetNext(). Status is a non-ok status with the reason the range was
+  /// cancelled, e.g. CANCELLED if the range was cancelled because it was not needed, or
+  /// another error if an error was encountered while scanning the range. Status is
+  /// returned to the any callers of GetNext(). If a thread is currently blocked in
+  /// GetNext(), it is woken up.
   void Cancel(const Status& status);
 
   /// return a descriptive string for debug.
@@ -263,10 +256,6 @@ class ScanRange : public RequestRange {
   bool EnqueueBuffer(const boost::unique_lock<boost::mutex>& reader_lock,
       std::unique_ptr<BufferDescriptor> buffer);
 
-  /// Cleanup any queued buffers (i.e. due to cancellation). This cannot
-  /// be called with any locks taken.
-  void CleanupQueuedBuffers();
-
   /// Validates the internal state of this range. lock_ must be taken
   /// before calling this.
   bool Validate();
@@ -283,6 +272,10 @@ class ScanRange : public RequestRange {
   /// exclusive use by this scan range. The scan range is the exclusive owner of the
   /// file handle, and the file handle is destroyed in Close().
   /// All local OS files are opened using normal OS file APIs.
+  ///
+  /// If an error is encountered during opening, returns a status describing the error.
+  /// If the scan range was cancelled, returns the reason for cancellation. Otherwise, on
+  /// success, returns OK.
   Status Open(bool use_file_handle_cache) WARN_UNUSED_RESULT;
 
   /// Closes the file for this range. This function only modifies state in this range.
@@ -290,6 +283,10 @@ class ScanRange : public RequestRange {
 
   /// Reads from this range into 'buffer', which has length 'buffer_len' bytes. Returns
   /// the number of bytes read. The read position in this scan range is updated.
+  ///
+  /// If an error is encountered during reading, returns a status describing the error.
+  /// If the scan range was cancelled, returns the reason for cancellation. Otherwise, on
+  /// success, returns OK.
   Status Read(uint8_t* buffer, int64_t buffer_len, int64_t* bytes_read,
       bool* eosr) WARN_UNUSED_RESULT;
 
@@ -307,6 +304,23 @@ class ScanRange : public RequestRange {
   Status ReadFromCache(const boost::unique_lock<boost::mutex>& reader_lock,
       bool* read_succeeded) WARN_UNUSED_RESULT;
 
+  /// Cleans up a buffer that was not returned to the client.
+  /// Either ReturnBuffer() or CleanUpBuffer() is called for every BufferDescriptor.
+  /// This function will acquire 'lock_' and may acquire 'hdfs_lock_'.
+  void CleanUpBuffer(std::unique_ptr<BufferDescriptor> buffer);
+
+  /// Same as CleanUpBuffer() except the caller must already hold 'lock_' via
+  /// 'scan_range_lock'.
+  void CleanUpBufferLocked(const boost::unique_lock<boost::mutex>& scan_range_lock,
+      std::unique_ptr<BufferDescriptor> buffer);
+
+  /// Returns true if no more buffers will be returned to clients in the future,
+  /// either because of hitting eosr or cancellation.
+  bool all_buffers_returned(const boost::unique_lock<boost::mutex>& lock) const {
+    DCHECK(lock.mutex() == &lock_ && lock.owns_lock());
+    return !cancel_status_.ok() || (eosr_queued_ && ready_buffers_.empty());
+  }
+
   /// Pointer to caller specified metadata. This is untouched by the io manager
   /// and the caller can put whatever auxiliary data in here.
   void* meta_data_ = nullptr;
@@ -323,6 +337,9 @@ class ScanRange : public RequestRange {
   /// TODO: we can do more with this
   bool expected_local_ = false;
 
+  /// Last modified time of the file associated with the scan range. Set in Reset().
+  int64_t mtime_;
+
   /// Total number of bytes read remotely. This is necessary to maintain a count of
   /// the number of remote scan ranges. Since IO statistics can be collected multiple
   /// times for a scan range, it is necessary to keep some state about whether this
@@ -378,26 +395,29 @@ class ScanRange : public RequestRange {
   /// Number of bytes read so far for this scan range
   int bytes_read_;
 
-  /// Status for this range. This is non-ok if is_cancelled_ is true.
-  /// Note: an individual range can fail without the RequestContext being
-  /// cancelled. This allows us to skip individual ranges.
-  Status status_;
+  /// The number of buffers that have been returned to a client via GetNext() that have
+  /// not yet been returned with ReturnBuffer().
+  int num_buffers_in_reader_ = 0;
 
   /// If true, the last buffer for this scan range has been queued.
+  /// If this is true and 'ready_buffers_' is empty, then no more buffers will be
+  /// returned to the caller by this scan range.
   bool eosr_queued_ = false;
 
-  /// If true, the last buffer for this scan range has been returned.
-  bool eosr_returned_ = false;
-
   /// If true, this scan range has been removed from the reader's in_flight_ranges
   /// queue because the ready_buffers_ queue is full.
   bool blocked_on_queue_ = false;
 
-  /// IO buffers that are queued for this scan range.
-  /// Condition variable for GetNext
-  ConditionVariable buffer_ready_cv_;
+  /// IO buffers that are queued for this scan range. When Cancel() is called
+  /// this is drained by the cancelling thread. I.e. this is always empty if
+  /// 'cancel_status_' is not OK.
   std::deque<std::unique_ptr<BufferDescriptor>> ready_buffers_;
 
+  /// Condition variable for threads in GetNext() that are waiting for the next buffer.
+  /// Signalled when a buffer is enqueued in 'ready_buffers_' or the scan range is
+  /// cancelled.
+  ConditionVariable buffer_ready_cv_;
+
   /// Lock that should be taken during hdfs calls. Only one thread (the disk reading
   /// thread) calls into hdfs at a time so this lock does not have performance impact.
   /// This lock only serves to coordinate cleanup. Specifically it serves to ensure
@@ -406,11 +426,16 @@ class ScanRange : public RequestRange {
   /// If this lock and lock_ need to be taken, lock_ must be taken first.
   boost::mutex hdfs_lock_;
 
-  /// If true, this scan range has been cancelled.
-  bool is_cancelled_ = false;
-
-  /// Last modified time of the file associated with the scan range
-  int64_t mtime_;
+  /// If non-OK, this scan range has been cancelled. This status is the reason for
+  /// cancellation - CANCELLED if cancelled without error, or another status if an
+  /// error caused cancellation. Note that a range can be cancelled without cancelling
+  /// the owning context. This means that ranges can be cancelled or hit errors without
+  /// aborting all scan ranges.
+  //
+  /// Writers must hold both 'lock_' and 'hdfs_lock_'. Readers must hold either 'lock_'
+  /// or 'hdfs_lock_'. This prevents the range from being cancelled while any thread
+  /// is inside a critical section.
+  Status cancel_status_;
 };
 
 /// Used to specify data to be written to a file and offset.

http://git-wip-us.apache.org/repos/asf/impala/blob/3b3bf871/be/src/runtime/io/scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index 21daa96..1ffba00 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -45,19 +45,15 @@ DEFINE_int64(adls_read_chunk_size, 128 * 1024, "The maximum read chunk size to u
 bool ScanRange::EnqueueBuffer(
     const unique_lock<mutex>& reader_lock, unique_ptr<BufferDescriptor> buffer) {
   DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock());
+  DCHECK(buffer->buffer_ != nullptr) << "Cannot enqueue freed buffer";
   {
     unique_lock<mutex> scan_range_lock(lock_);
     DCHECK(Validate()) << DebugString();
-    DCHECK(!eosr_returned_);
     DCHECK(!eosr_queued_);
-    if (is_cancelled_) {
-      // Return the buffer, this range has been cancelled
-      if (buffer->buffer_ != nullptr) {
-        io_mgr_->num_buffers_in_readers_.Add(1);
-        reader_->num_buffers_in_reader_.Add(1);
-      }
+    if (!cancel_status_.ok()) {
+      // This range has been cancelled, no need to enqueue the buffer.
       reader_->num_used_buffers_.Add(-1);
-      io_mgr_->ReturnBuffer(move(buffer));
+      CleanUpBufferLocked(scan_range_lock, move(buffer));
       return false;
     }
     reader_->num_ready_buffers_.Add(1);
@@ -67,9 +63,7 @@ bool ScanRange::EnqueueBuffer(
     DCHECK_LE(ready_buffers_.size(), DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT);
     blocked_on_queue_ = ready_buffers_.size() == DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT;
   }
-
   buffer_ready_cv_.NotifyOne();
-
   return blocked_on_queue_;
 }
 
@@ -78,56 +72,48 @@ Status ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) {
   bool eosr;
   {
     unique_lock<mutex> scan_range_lock(lock_);
-    if (eosr_returned_) return Status::OK();
     DCHECK(Validate()) << DebugString();
+    // No more buffers to return - return the cancel status or OK if not cancelled.
+    if (all_buffers_returned(scan_range_lock)) return cancel_status_;
 
-    while (ready_buffers_.empty() && !is_cancelled_) {
+    while (ready_buffers_.empty() && cancel_status_.ok()) {
       buffer_ready_cv_.Wait(scan_range_lock);
     }
-
-    if (is_cancelled_) {
-      DCHECK(!status_.ok());
-      return status_;
-    }
+    /// Propagate cancellation to the client if it happened while we were waiting.
+    RETURN_IF_ERROR(cancel_status_);
 
     // Remove the first ready buffer from the queue and return it
     DCHECK(!ready_buffers_.empty());
     DCHECK_LE(ready_buffers_.size(), DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT);
     *buffer = move(ready_buffers_.front());
     ready_buffers_.pop_front();
-    eosr_returned_ = (*buffer)->eosr();
     eosr = (*buffer)->eosr();
   }
 
   // Update tracking counters. The buffer has now moved from the IoMgr to the
   // caller.
-  io_mgr_->num_buffers_in_readers_.Add(1);
-  reader_->num_buffers_in_reader_.Add(1);
   reader_->num_ready_buffers_.Add(-1);
   reader_->num_used_buffers_.Add(-1);
   if (eosr) reader_->num_finished_ranges_.Add(1);
 
-  Status status = (*buffer)->status_;
-  if (!status.ok()) {
-    io_mgr_->ReturnBuffer(move(*buffer));
-    return status;
-  }
-
   unique_lock<mutex> reader_lock(reader_->lock_);
 
   DCHECK(reader_->Validate()) << endl << reader_->DebugString();
   if (reader_->state_ == RequestContext::Cancelled) {
     reader_->blocked_ranges_.Remove(this);
-    Cancel(reader_->status_);
-    io_mgr_->ReturnBuffer(move(*buffer));
-    return status_;
+    Cancel(Status::CANCELLED);
+    CleanUpBuffer(move(*buffer));
+    return Status::CANCELLED;
   }
 
+  // At this point success is guaranteed so increment counters for returned buffers.
+  reader_->num_buffers_in_reader_.Add(1);
   {
     // Check to see if we can re-schedule a blocked range. Note that EnqueueBuffer()
     // may have been called after we released 'lock_' above so we need to re-check
     // whether the queue is full.
     unique_lock<mutex> scan_range_lock(lock_);
+    ++num_buffers_in_reader_;
     if (blocked_on_queue_
         && ready_buffers_.size() < DiskIoMgr::SCAN_RANGE_READY_BUFFER_LIMIT
         && !eosr_queued_) {
@@ -135,51 +121,81 @@ Status ScanRange::GetNext(unique_ptr<BufferDescriptor>* buffer) {
       // This scan range was blocked and is no longer, add it to the reader
       // queue again.
       reader_->blocked_ranges_.Remove(this);
-      reader_->ScheduleScanRange(this);
+      reader_->ScheduleScanRange(reader_lock, this);
     }
   }
   return Status::OK();
 }
 
+void ScanRange::ReturnBuffer(unique_ptr<BufferDescriptor> buffer_desc) {
+  reader_->num_buffers_in_reader_.Add(-1);
+  {
+    unique_lock<mutex> scan_range_lock(lock_);
+    --num_buffers_in_reader_;
+    CleanUpBufferLocked(scan_range_lock, move(buffer_desc));
+  }
+}
+
+void ScanRange::CleanUpBuffer(unique_ptr<BufferDescriptor> buffer_desc) {
+  unique_lock<mutex> scan_range_lock(lock_);
+  CleanUpBufferLocked(scan_range_lock, move(buffer_desc));
+}
+
+void ScanRange::CleanUpBufferLocked(
+    const boost::unique_lock<boost::mutex>& scan_range_lock,
+    unique_ptr<BufferDescriptor> buffer_desc) {
+  DCHECK(scan_range_lock.mutex() == &lock_ && scan_range_lock.owns_lock());
+  DCHECK(buffer_desc != nullptr);
+  DCHECK_EQ(this, buffer_desc->scan_range_);
+  buffer_desc->reader_->FreeBuffer(buffer_desc.get());
+
+  if (all_buffers_returned(scan_range_lock) && num_buffers_in_reader_ == 0) {
+    // Close the scan range if there are no more buffers in the reader and no more buffers
+    // will be returned to readers in future. Close() is idempotent so it is ok to call
+    // multiple times during cleanup so long as the range is actually finished.
+    Close();
+  }
+}
+
 void ScanRange::Cancel(const Status& status) {
   // Cancelling a range that was never started, ignore.
   if (io_mgr_ == nullptr) return;
 
   DCHECK(!status.ok());
   {
-    // Grab both locks to make sure that all working threads see is_cancelled_.
+    // Grab both locks to make sure that we don't change 'cancel_status_' while other
+    // threads are in critical sections.
     unique_lock<mutex> scan_range_lock(lock_);
-    unique_lock<mutex> hdfs_lock(hdfs_lock_);
-    DCHECK(Validate()) << DebugString();
-    if (is_cancelled_) return;
-    is_cancelled_ = true;
-    status_ = status;
+    {
+      unique_lock<mutex> hdfs_lock(hdfs_lock_);
+      DCHECK(Validate()) << DebugString();
+      // If already cancelled, preserve the original reason for cancellation. The first
+      // thread to set 'cancel_status_' does the cleanup below.
+      RETURN_VOID_IF_ERROR(cancel_status_);
+      cancel_status_ = status;
+    }
+
+    /// Clean up 'ready_buffers_' while still holding 'lock_' to prevent other threads
+    /// from seeing inconsistent state.
+    reader_->num_used_buffers_.Add(-ready_buffers_.size());
+    reader_->num_ready_buffers_.Add(-ready_buffers_.size());
+    while (!ready_buffers_.empty()) {
+      CleanUpBufferLocked(scan_range_lock, move(ready_buffers_.front()));
+      ready_buffers_.pop_front();
+    }
   }
   buffer_ready_cv_.NotifyAll();
-  CleanupQueuedBuffers();
 
   // For cached buffers, we can't close the range until the cached buffer is returned.
-  // Close() is called from DiskIoMgr::ReturnBuffer().
+  // Close() is called from ScanRange::CleanUpBufferLocked().
   if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) Close();
 }
 
-void ScanRange::CleanupQueuedBuffers() {
-  DCHECK(is_cancelled_);
-  io_mgr_->num_buffers_in_readers_.Add(ready_buffers_.size());
-  reader_->num_buffers_in_reader_.Add(ready_buffers_.size());
-  reader_->num_used_buffers_.Add(-ready_buffers_.size());
-  reader_->num_ready_buffers_.Add(-ready_buffers_.size());
-
-  while (!ready_buffers_.empty()) {
-    io_mgr_->ReturnBuffer(move(ready_buffers_.front()));
-    ready_buffers_.pop_front();
-  }
-}
-
 string ScanRange::DebugString() const {
   stringstream ss;
   ss << "file=" << file_ << " disk_id=" << disk_id_ << " offset=" << offset_
      << " len=" << len_ << " bytes_read=" << bytes_read_
+     << " cancel_status=" << cancel_status_.GetDetail()
      << " buffer_queue=" << ready_buffers_.size()
      << " hdfs_file=" << exclusive_hdfs_fh_;
   return ss.str();
@@ -187,14 +203,12 @@ string ScanRange::DebugString() const {
 
 bool ScanRange::Validate() {
   if (bytes_read_ > len_) {
-    LOG(WARNING) << "Bytes read tracking is wrong. Shouldn't read past the scan range."
+    LOG(ERROR) << "Bytes read tracking is wrong. Shouldn't read past the scan range."
                  << " bytes_read_=" << bytes_read_ << " len_=" << len_;
     return false;
   }
-  if (eosr_returned_ && !eosr_queued_) {
-    LOG(WARNING) << "Returned eosr to reader before finishing reading the scan range"
-                 << " eosr_returned_=" << eosr_returned_
-                 << " eosr_queued_=" << eosr_queued_;
+  if (!cancel_status_.ok() && !ready_buffers_.empty()) {
+    LOG(ERROR) << "Cancelled range should not have queued buffers " << DebugString();
     return false;
   }
   return true;
@@ -203,13 +217,14 @@ bool ScanRange::Validate() {
 ScanRange::ScanRange()
   : RequestRange(RequestType::READ),
     num_remote_bytes_(0),
-    external_buffer_tag_(ExternalBufferTag::NO_BUFFER),
-    mtime_(-1) {}
+    external_buffer_tag_(ExternalBufferTag::NO_BUFFER) {}
 
 ScanRange::~ScanRange() {
   DCHECK(exclusive_hdfs_fh_ == nullptr) << "File was not closed.";
   DCHECK(external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER)
       << "Cached buffer was not released.";
+  DCHECK_EQ(0, ready_buffers_.size());
+  DCHECK_EQ(0, num_buffers_in_reader_);
 }
 
 void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
@@ -253,16 +268,15 @@ void ScanRange::InitInternal(DiskIoMgr* io_mgr, RequestContext* reader) {
   local_file_ = nullptr;
   exclusive_hdfs_fh_ = nullptr;
   bytes_read_ = 0;
-  is_cancelled_ = false;
+  cancel_status_ = Status::OK();
   eosr_queued_= false;
-  eosr_returned_= false;
   blocked_on_queue_ = false;
   DCHECK(Validate()) << DebugString();
 }
 
 Status ScanRange::Open(bool use_file_handle_cache) {
   unique_lock<mutex> hdfs_lock(hdfs_lock_);
-  if (is_cancelled_) return Status::CANCELLED;
+  RETURN_IF_ERROR(cancel_status_);
 
   if (fs_ != nullptr) {
     if (exclusive_hdfs_fh_ != nullptr) return Status::OK();
@@ -386,7 +400,7 @@ int64_t ScanRange::MaxReadChunkSize() const {
 Status ScanRange::Read(
     uint8_t* buffer, int64_t buffer_len, int64_t* bytes_read, bool* eosr) {
   unique_lock<mutex> hdfs_lock(hdfs_lock_);
-  if (is_cancelled_) return Status::CANCELLED;
+  RETURN_IF_ERROR(cancel_status_);
 
   *eosr = false;
   *bytes_read = 0;
@@ -523,7 +537,7 @@ Status ScanRange::ReadFromCache(
 
   {
     unique_lock<mutex> hdfs_lock(hdfs_lock_);
-    if (is_cancelled_) return Status::CANCELLED;
+    RETURN_IF_ERROR(cancel_status_);
 
     DCHECK(exclusive_hdfs_fh_ != nullptr);
     DCHECK(external_buffer_tag_ == ExternalBufferTag::NO_BUFFER);
@@ -561,7 +575,7 @@ Status ScanRange::ReadFromCache(
   // 'mem_tracker' is nullptr because the memory is owned by the HDFS java client,
   // not the Impala backend.
   unique_ptr<BufferDescriptor> desc = unique_ptr<BufferDescriptor>(new BufferDescriptor(
-      io_mgr_, reader_, this, reinterpret_cast<uint8_t*>(buffer), 0, nullptr));
+      io_mgr_, reader_, this, reinterpret_cast<uint8_t*>(buffer), 0));
   desc->len_ = bytes_read;
   desc->scan_range_offset_ = 0;
   desc->eosr_ = true;

http://git-wip-us.apache.org/repos/asf/impala/blob/3b3bf871/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index 10a3424..8d28f8f 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -107,7 +107,6 @@ class MemTracker {
   /// destruction to prevent other threads from getting a reference to the MemTracker
   /// via its parent. Only used to deregister the query-level MemTracker from the
   /// global hierarchy.
-  /// TODO: IMPALA-3200: this is also used by BufferedBlockMgr, which will be deleted.
   void CloseAndUnregisterFromParent();
 
   /// Include counters from a ReservationTracker in logs and other diagnostics.

http://git-wip-us.apache.org/repos/asf/impala/blob/3b3bf871/be/src/runtime/test-env.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/test-env.cc b/be/src/runtime/test-env.cc
index 770eaba..0d6c6f0 100644
--- a/be/src/runtime/test-env.cc
+++ b/be/src/runtime/test-env.cc
@@ -49,7 +49,7 @@ Status TestEnv::Init() {
   exec_env_.reset(new ExecEnv);
   // Populate the ExecEnv state that the backend tests need.
   exec_env_->mem_tracker_.reset(new MemTracker(-1, "Process"));
-  RETURN_IF_ERROR(exec_env_->disk_io_mgr()->Init(exec_env_->process_mem_tracker()));
+  RETURN_IF_ERROR(exec_env_->disk_io_mgr()->Init());
   exec_env_->metrics_.reset(new MetricGroup("test-env-metrics"));
   exec_env_->tmp_file_mgr_.reset(new TmpFileMgr);
   if (have_tmp_file_mgr_args_) {

http://git-wip-us.apache.org/repos/asf/impala/blob/3b3bf871/be/src/runtime/tmp-file-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr-test.cc b/be/src/runtime/tmp-file-mgr-test.cc
index 3091c58..9161b63 100644
--- a/be/src/runtime/tmp-file-mgr-test.cc
+++ b/be/src/runtime/tmp-file-mgr-test.cc
@@ -25,6 +25,7 @@
 #include <gtest/gtest.h>
 
 #include "common/init.h"
+#include "runtime/io/request-context.h"
 #include "runtime/test-env.h"
 #include "runtime/tmp-file-mgr-internal.h"
 #include "runtime/tmp-file-mgr.h"
@@ -134,7 +135,7 @@ class TmpFileMgrTest : public ::testing::Test {
 
   /// Helper to cancel the FileGroup RequestContext.
   static void CancelIoContext(TmpFileMgr::FileGroup* group) {
-    group->io_mgr_->CancelContext(group->io_ctx_.get());
+    group->io_ctx_->Cancel();
   }
 
   /// Helper to get the # of bytes allocated by the group. Validates that the sum across

http://git-wip-us.apache.org/repos/asf/impala/blob/3b3bf871/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 3807670..7b00179 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -426,7 +426,7 @@ Status TmpFileMgr::FileGroup::WaitForAsyncRead(WriteHandle* handle, MemRange buf
   }
 exit:
   // Always return the buffer before exiting to avoid leaking it.
-  if (io_mgr_buffer != nullptr) io_mgr_->ReturnBuffer(move(io_mgr_buffer));
+  if (io_mgr_buffer != nullptr) handle->read_range_->ReturnBuffer(move(io_mgr_buffer));
   handle->read_range_ = nullptr;
   return status;
 }

http://git-wip-us.apache.org/repos/asf/impala/blob/3b3bf871/be/src/util/impalad-metrics.cc
----------------------------------------------------------------------
diff --git a/be/src/util/impalad-metrics.cc b/be/src/util/impalad-metrics.cc
index 8f5f1be..32320d8 100644
--- a/be/src/util/impalad-metrics.cc
+++ b/be/src/util/impalad-metrics.cc
@@ -46,12 +46,6 @@ const char* ImpaladMetricKeys::HASH_TABLE_TOTAL_BYTES =
     "impala-server.hash-table.total-bytes";
 const char* ImpaladMetricKeys::IO_MGR_NUM_OPEN_FILES =
     "impala-server.io-mgr.num-open-files";
-const char* ImpaladMetricKeys::IO_MGR_NUM_BUFFERS =
-    "impala-server.io-mgr.num-buffers";
-const char* ImpaladMetricKeys::IO_MGR_TOTAL_BYTES =
-    "impala-server.io-mgr.total-bytes";
-const char* ImpaladMetricKeys::IO_MGR_NUM_UNUSED_BUFFERS =
-    "impala-server.io-mgr.num-unused-buffers";
 const char* ImpaladMetricKeys::IO_MGR_BYTES_READ =
     "impala-server.io-mgr.bytes-read";
 const char* ImpaladMetricKeys::IO_MGR_LOCAL_BYTES_READ =
@@ -208,11 +202,8 @@ void ImpaladMetrics::CreateMetrics(MetricGroup* m) {
       ImpaladMetricKeys::NUM_FILES_OPEN_FOR_INSERT, 0);
 
   // Initialize IO mgr metrics
-  IO_MGR_NUM_OPEN_FILES = m->AddGauge(ImpaladMetricKeys::IO_MGR_NUM_OPEN_FILES, 0);
-  IO_MGR_NUM_BUFFERS = m->AddGauge(ImpaladMetricKeys::IO_MGR_NUM_BUFFERS, 0);
-  IO_MGR_TOTAL_BYTES = m->AddGauge(ImpaladMetricKeys::IO_MGR_TOTAL_BYTES, 0);
-  IO_MGR_NUM_UNUSED_BUFFERS = m->AddGauge(
-      ImpaladMetricKeys::IO_MGR_NUM_UNUSED_BUFFERS, 0);
+  IO_MGR_NUM_OPEN_FILES = m->AddGauge(
+      ImpaladMetricKeys::IO_MGR_NUM_OPEN_FILES, 0);
   IO_MGR_NUM_CACHED_FILE_HANDLES = m->AddGauge(
       ImpaladMetricKeys::IO_MGR_NUM_CACHED_FILE_HANDLES, 0);
   IO_MGR_NUM_FILE_HANDLES_OUTSTANDING = m->AddGauge(

http://git-wip-us.apache.org/repos/asf/impala/blob/3b3bf871/be/src/util/impalad-metrics.h
----------------------------------------------------------------------
diff --git a/be/src/util/impalad-metrics.h b/be/src/util/impalad-metrics.h
index f32e3fa..a62c4c6 100644
--- a/be/src/util/impalad-metrics.h
+++ b/be/src/util/impalad-metrics.h
@@ -67,15 +67,6 @@ class ImpaladMetricKeys {
   /// Number of files currently opened by the io mgr
   static const char* IO_MGR_NUM_OPEN_FILES;
 
-  /// Number of IO buffers allocated by the io mgr
-  static const char* IO_MGR_NUM_BUFFERS;
-
-  /// Number of bytes used by IO buffers (used and unused).
-  static const char* IO_MGR_TOTAL_BYTES;
-
-  /// Number of IO buffers that are currently unused (and can be GC'ed)
-  static const char* IO_MGR_NUM_UNUSED_BUFFERS;
-
   /// Total number of bytes read by the io mgr
   static const char* IO_MGR_BYTES_READ;