You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/04/28 23:41:56 UTC

[05/15] impala git commit: IMPALA-6587: free buffers before ScanRange::Cancel() returns

IMPALA-6587: free buffers before ScanRange::Cancel() returns

ScanRange::Cancel() now waits until an in-flight read finishes so
that the disk I/O buffer being processed by the disk thread is
freed when Cancel() returns.

The fix is to set a 'read_in_flight_' flag on the scan range
while the disk thread is doing the read. Cancel() blocks until
read_in_flight_ == false.

The code is refactored to move more logic into ScanRange and
to avoid holding RequestContext::lock_ for longer than necessary.

Testing:
Added query test that reproduces the issue.

Added a unit test and a stress option that reproduces the problem in a
targeted way.

Ran disk-io-mgr-stress test for a few hours. Ran it under TSAN and
inspected output to make sure there were no non-benign data races.

Change-Id: I87182b6bd51b5fb0b923e7e4c8d08a44e7617db2
Reviewed-on: http://gerrit.cloudera.org:8080/9680
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Tim Armstrong <ta...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/d7bba821
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/d7bba821
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/d7bba821

Branch: refs/heads/master
Commit: d7bba821923b96272c254840facd3d870b1ebe36
Parents: fb5dc9e
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Feb 28 09:49:25 2018 -0800
Committer: Tim Armstrong <ta...@cloudera.com>
Committed: Sat Apr 28 23:41:39 2018 +0000

----------------------------------------------------------------------
 be/src/common/global-flags.cc                   |   2 +
 be/src/runtime/io/disk-io-mgr-internal.h        |   9 +-
 be/src/runtime/io/disk-io-mgr-stress.cc         |   2 +
 be/src/runtime/io/disk-io-mgr-test.cc           |  49 +++++++
 be/src/runtime/io/disk-io-mgr.cc                | 100 +------------
 be/src/runtime/io/disk-io-mgr.h                 |  19 +--
 be/src/runtime/io/request-context.cc            |  32 ++++-
 be/src/runtime/io/request-context.h             |  16 ++-
 be/src/runtime/io/request-ranges.h              |  79 ++++++----
 be/src/runtime/io/scan-range.cc                 | 143 +++++++++++++++----
 .../queries/QueryTest/scanners.test             |  11 ++
 11 files changed, 288 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/d7bba821/be/src/common/global-flags.cc
----------------------------------------------------------------------
diff --git a/be/src/common/global-flags.cc b/be/src/common/global-flags.cc
index ea88b28..d1a54e1 100644
--- a/be/src/common/global-flags.cc
+++ b/be/src/common/global-flags.cc
@@ -148,6 +148,8 @@ DEFINE_bool(thread_creation_fault_injection, false, "A fault injection option th
 DEFINE_int32(stress_catalog_init_delay_ms, 0, "A stress option that injects extra delay"
     " in milliseconds when initializing an impalad's local catalog replica. Delay <= 0"
     " inject no delay.");
+DEFINE_int32(stress_disk_read_delay_ms, 0, "A stress option that injects extra delay"
+    " in milliseconds when the I/O manager is reading from disk.");
 #endif
 
 // Used for testing the path where the Kudu client is stubbed.

http://git-wip-us.apache.org/repos/asf/impala/blob/d7bba821/be/src/runtime/io/disk-io-mgr-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-internal.h b/be/src/runtime/io/disk-io-mgr-internal.h
index 292530f..475456a 100644
--- a/be/src/runtime/io/disk-io-mgr-internal.h
+++ b/be/src/runtime/io/disk-io-mgr-internal.h
@@ -39,6 +39,8 @@
 /// This file contains internal structures shared between submodules of the IoMgr. Users
 /// of the IoMgr do not need to include this file.
 
+DECLARE_uint64(max_cached_file_handles);
+
 // Macros to work around counters sometimes not being provided.
 // TODO: fix things so that counters are always non-NULL.
 #define COUNTER_ADD_IF_NOT_NULL(c, v) \
@@ -56,8 +58,13 @@
 namespace impala {
 namespace io {
 
+// Indicates if file handle caching should be used
+static inline bool is_file_handle_caching_enabled() {
+  return FLAGS_max_cached_file_handles > 0;
+}
+
 /// Per disk state
-struct DiskIoMgr::DiskQueue {
+struct DiskQueue {
   /// Disk id (0-based)
   int disk_id;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/d7bba821/be/src/runtime/io/disk-io-mgr-stress.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-stress.cc b/be/src/runtime/io/disk-io-mgr-stress.cc
index 3fd33de..fd360c4 100644
--- a/be/src/runtime/io/disk-io-mgr-stress.cc
+++ b/be/src/runtime/io/disk-io-mgr-stress.cc
@@ -247,6 +247,8 @@ void DiskIoMgrStress::NewClient(int i) {
   // Clean up leftover state from the previous client (if any).
   client.scan_ranges.clear();
   ExecEnv* exec_env = ExecEnv::GetInstance();
+  if (client.reader != nullptr) io_mgr_->UnregisterContext(client.reader.get());
+
   exec_env->buffer_pool()->DeregisterClient(&buffer_pool_clients_[i]);
   if (client_mem_trackers_[i] != nullptr) client_mem_trackers_[i]->Close();
   client.obj_pool.Clear();

http://git-wip-us.apache.org/repos/asf/impala/blob/d7bba821/be/src/runtime/io/disk-io-mgr-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr-test.cc b/be/src/runtime/io/disk-io-mgr-test.cc
index 95b2d03..3e8a9ae 100644
--- a/be/src/runtime/io/disk-io-mgr-test.cc
+++ b/be/src/runtime/io/disk-io-mgr-test.cc
@@ -33,10 +33,12 @@
 #include "service/fe-support.h"
 #include "testutil/gtest-util.h"
 #include "testutil/rand-util.h"
+#include "testutil/scoped-flag-setter.h"
 #include "util/condition-variable.h"
 #include "util/cpu-info.h"
 #include "util/disk-info.h"
 #include "util/thread.h"
+#include "util/time.h"
 
 #include "common/names.h"
 
@@ -48,6 +50,9 @@ DECLARE_int64(min_buffer_size);
 DECLARE_int32(num_remote_hdfs_io_threads);
 DECLARE_int32(num_s3_io_threads);
 DECLARE_int32(num_adls_io_threads);
+#ifndef NDEBUG
+DECLARE_int32(stress_disk_read_delay_ms);
+#endif
 
 const int MIN_BUFFER_SIZE = 128;
 const int MAX_BUFFER_SIZE = 1024;
@@ -1260,6 +1265,50 @@ TEST_F(DiskIoMgrTest, SkipAllocateBuffers) {
   io_mgr.UnregisterContext(reader.get());
 }
 
+// Regression test for IMPALA-6587 - all buffers should be released after Cancel().
+TEST_F(DiskIoMgrTest, CancelReleasesResources) {
+  InitRootReservation(LARGE_RESERVATION_LIMIT);
+  const char* tmp_file = "/tmp/disk_io_mgr_test.txt";
+  const char* data = "the quick brown fox jumped over the lazy dog";
+  int len = strlen(data);
+  const int64_t MIN_BUFFER_SIZE = 2;
+  const int64_t MAX_BUFFER_SIZE = 1024;
+  CreateTempFile(tmp_file, data);
+
+  // Get mtime for file
+  struct stat stat_val;
+  stat(tmp_file, &stat_val);
+
+  const int NUM_DISK_THREADS = 20;
+  DiskIoMgr io_mgr(
+      1, NUM_DISK_THREADS, NUM_DISK_THREADS, MIN_BUFFER_SIZE, MAX_BUFFER_SIZE);
+#ifndef NDEBUG
+  auto s = ScopedFlagSetter<int32_t>::Make(&FLAGS_stress_disk_read_delay_ms, 5);
+#endif
+
+  ASSERT_OK(io_mgr.Init());
+  unique_ptr<RequestContext> reader = io_mgr.RegisterContext();
+  BufferPool::ClientHandle read_client;
+  RegisterBufferPoolClient(
+      LARGE_RESERVATION_LIMIT, LARGE_INITIAL_RESERVATION, &read_client);
+
+  for (int i = 0; i < 10; ++i) {
+    ScanRange* range = InitRange(&pool_, tmp_file, 0, len, 0, stat_val.st_mtime);
+    bool needs_buffers;
+    ASSERT_OK(io_mgr.StartScanRange(reader.get(), range, &needs_buffers));
+    EXPECT_TRUE(needs_buffers);
+    ASSERT_OK(io_mgr.AllocateBuffersForRange(reader.get(), &read_client, range, MAX_BUFFER_SIZE));
+    // Give disk I/O thread a chance to start read.
+    SleepForMs(1);
+
+    range->Cancel(Status::CANCELLED);
+    // Resources should be released immediately once Cancel() returns.
+    EXPECT_EQ(0, read_client.GetUsedReservation()) << " iter " << i;
+  }
+  buffer_pool()->DeregisterClient(&read_client);
+  io_mgr.UnregisterContext(reader.get());
+}
+
 // Test reading into a client-allocated buffer.
 TEST_F(DiskIoMgrTest, ReadIntoClientBuffer) {
   InitRootReservation(LARGE_RESERVATION_LIMIT);

http://git-wip-us.apache.org/repos/asf/impala/blob/d7bba821/be/src/runtime/io/disk-io-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.cc b/be/src/runtime/io/disk-io-mgr.cc
index 8933fec..50f6fc4 100644
--- a/be/src/runtime/io/disk-io-mgr.cc
+++ b/be/src/runtime/io/disk-io-mgr.cc
@@ -129,13 +129,6 @@ DEFINE_uint64(num_file_handle_cache_partitions, 16, "Number of partitions used b
 
 AtomicInt32 DiskIoMgr::next_disk_id_;
 
-namespace detail {
-// Indicates if file handle caching should be used
-static inline bool is_file_handle_caching_enabled() {
-  return FLAGS_max_cached_file_handles > 0;
-}
-}
-
 string DiskIoMgr::DebugString() {
   stringstream ss;
   ss << "Disks: " << endl;
@@ -549,7 +542,7 @@ bool DiskIoMgr::GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
       disk_queue->request_contexts.pop_front();
       DCHECK(*request_context != nullptr);
       request_disk_state = &((*request_context)->disk_states_[disk_id]);
-      request_disk_state->IncrementDiskThreadAndDequeue();
+      request_disk_state->IncrementDiskThreadAfterDequeue();
     }
 
     // NOTE: no locks were taken in between.  We need to be careful about what state
@@ -643,50 +636,6 @@ void DiskIoMgr::HandleWriteFinished(
   }
 }
 
-void DiskIoMgr::HandleReadFinished(DiskQueue* disk_queue, RequestContext* reader,
-    Status read_status, unique_ptr<BufferDescriptor> buffer) {
-  unique_lock<mutex> reader_lock(reader->lock_);
-
-  RequestContext::PerDiskState* disk_state = &reader->disk_states_[disk_queue->disk_id];
-  DCHECK(reader->Validate()) << endl << reader->DebugString();
-  DCHECK_GT(disk_state->num_threads_in_op(), 0);
-  DCHECK(buffer->buffer_ != nullptr);
-  DCHECK(!buffer->is_cached()) << "HDFS cache reads don't go through this code path.";
-
-  // After calling EnqueueReadyBuffer() below, it is no longer valid to read from buffer.
-  // Store the state we need before calling EnqueueReadyBuffer().
-  bool eosr = buffer->eosr_;
-
-  // TODO: IMPALA-4249: it safe to touch 'scan_range' until DecrementDiskThread() is
-  // called because all clients of DiskIoMgr keep ScanRange objects alive until they
-  // unregister their RequestContext.
-  ScanRange* scan_range = buffer->scan_range_;
-  bool scan_range_done = eosr;
-  if (read_status.ok() && reader->state_ != RequestContext::Cancelled) {
-    DCHECK_EQ(reader->state_, RequestContext::Active);
-    // Read successfully - update the reader's scan ranges.  There are two cases here:
-    //  1. End of scan range or cancelled scan range - don't need to reschedule.
-    //  2. Middle of scan range - need to schedule to read next buffer.
-    bool enqueued = scan_range->EnqueueReadyBuffer(reader_lock, move(buffer));
-    if (!eosr && enqueued) reader->ScheduleScanRange(reader_lock, scan_range);
-  } else {
-    // The scan range will be cancelled, either because we hit an error or because the
-    // request context was cancelled.  The buffer is not needed - we must free it.
-    reader->FreeBuffer(buffer.get());
-    // Propagate 'read_status' to the scan range. If we are here because the context
-    // was cancelled, the scan range is already cancelled so we do not need to re-cancel
-    // it.
-    if (!read_status.ok()) scan_range->CancelFromReader(reader_lock, read_status);
-    scan_range_done = true;
-  }
-  if (scan_range_done) {
-    scan_range->Close();
-    --disk_state->num_remaining_ranges();
-  }
-  DCHECK(reader->Validate()) << endl << reader->DebugString();
-  disk_state->DecrementDiskThread(reader_lock, reader);
-}
-
 void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
   // The thread waits until there is work or the entire system is being shut down.
   // If there is work, performs the read or write requested and re-enqueues the
@@ -707,7 +656,9 @@ void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
       return;
     }
     if (range->request_type() == RequestType::READ) {
-      ReadRange(disk_queue, worker_context, static_cast<ScanRange*>(range));
+      ScanRange* scan_range = static_cast<ScanRange*>(range);
+      ReadOutcome outcome = scan_range->DoRead(disk_queue->disk_id);
+      worker_context->ReadDone(disk_queue->disk_id, outcome, scan_range);
     } else {
       DCHECK(range->request_type() == RequestType::WRITE);
       Write(worker_context, static_cast<WriteRange*>(range));
@@ -715,49 +666,6 @@ void DiskIoMgr::WorkLoop(DiskQueue* disk_queue) {
   }
 }
 
-void DiskIoMgr::ReadRange(
-    DiskQueue* disk_queue, RequestContext* reader, ScanRange* range) {
-  int64_t bytes_remaining = range->len_ - range->bytes_read_;
-  DCHECK_GT(bytes_remaining, 0);
-  unique_ptr<BufferDescriptor> buffer_desc;
-  if (range->external_buffer_tag_ == ScanRange::ExternalBufferTag::CLIENT_BUFFER) {
-    buffer_desc = unique_ptr<BufferDescriptor>(new BufferDescriptor(this, reader, range,
-        range->client_buffer_.data, range->client_buffer_.len));
-  } else {
-    DCHECK(range->external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER)
-        << "This code path does not handle other buffer types, i.e. HDFS cache"
-        << static_cast<int>(range->external_buffer_tag_);
-    buffer_desc = range->GetNextUnusedBufferForRange();
-    if (buffer_desc == nullptr) {
-      // No buffer available - the range will be rescheduled when a buffer is added.
-      unique_lock<mutex> reader_lock(reader->lock_);
-      reader->disk_states_[disk_queue->disk_id].DecrementDiskThread(reader_lock, reader);
-      DCHECK(reader->Validate()) << endl << reader->DebugString();
-      return;
-    }
-  }
-
-  // No locks in this section.  Only working on local vars.  We don't want to hold a
-  // lock across the read call.
-  Status read_status = range->Open(detail::is_file_handle_caching_enabled());
-  if (read_status.ok()) {
-    // Update counters.
-    COUNTER_ADD_IF_NOT_NULL(reader->active_read_thread_counter_, 1L);
-    COUNTER_BITOR_IF_NOT_NULL(reader->disks_accessed_bitmap_, 1LL << disk_queue->disk_id);
-
-    read_status = range->Read(buffer_desc->buffer_, buffer_desc->buffer_len_,
-        &buffer_desc->len_, &buffer_desc->eosr_);
-    buffer_desc->scan_range_offset_ = range->bytes_read_ - buffer_desc->len_;
-
-    COUNTER_ADD_IF_NOT_NULL(reader->bytes_read_counter_, buffer_desc->len_);
-    COUNTER_ADD(&total_bytes_read_counter_, buffer_desc->len_);
-    COUNTER_ADD_IF_NOT_NULL(reader->active_read_thread_counter_, -1L);
-  }
-
-  // Finished read, update reader/disk based on the results
-  HandleReadFinished(disk_queue, reader, read_status, move(buffer_desc));
-}
-
 void DiskIoMgr::Write(RequestContext* writer_context, WriteRange* write_range) {
   Status ret_status = Status::OK();
   FILE* file_handle = nullptr;

http://git-wip-us.apache.org/repos/asf/impala/blob/d7bba821/be/src/runtime/io/disk-io-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/disk-io-mgr.h b/be/src/runtime/io/disk-io-mgr.h
index 17aa211..b6b4b75 100644
--- a/be/src/runtime/io/disk-io-mgr.h
+++ b/be/src/runtime/io/disk-io-mgr.h
@@ -44,6 +44,8 @@
 namespace impala {
 
 namespace io {
+
+struct DiskQueue;
 /// Manager object that schedules IO for all queries on all disks and remote filesystems
 /// (such as S3). Each query maps to one or more RequestContext objects, each of which
 /// has its own queue of scan ranges and/or write ranges.
@@ -393,7 +395,6 @@ class DiskIoMgr : public CacheLineAligned {
   friend class RequestContext;
   // TODO: remove io:: prefix - it is required for the "using ScanRange" workaround above.
   friend class io::ScanRange;
-  struct DiskQueue;
 
   friend class DiskIoMgrTest_Buffers_Test;
   friend class DiskIoMgrTest_BufferSizeSelection_Test;
@@ -448,8 +449,8 @@ class DiskIoMgr : public CacheLineAligned {
   FileHandleCache file_handle_cache_;
 
   /// Disk worker thread loop. This function retrieves the next range to process on
-  /// the disk queue and invokes ReadRange() or Write() depending on the type of Range().
-  /// There can be multiple threads per disk running this loop.
+  /// the disk queue and invokes ScanRange::DoRead() or Write() depending on the type
+  /// of Range. There can be multiple threads per disk running this loop.
   void WorkLoop(DiskQueue* queue);
 
   /// This is called from the disk thread to get the next range to process. It will
@@ -460,13 +461,6 @@ class DiskIoMgr : public CacheLineAligned {
   bool GetNextRequestRange(DiskQueue* disk_queue, RequestRange** range,
       RequestContext** request_context);
 
-  /// Updates disk queue and reader state after a read is complete. If the read
-  /// was successful, 'read_status' is ok and 'buffer' contains the result of the
-  /// read. If the read failed with an error, 'read_status' contains the error and
-  /// 'buffer' has the buffer that was meant to hold the result of the read.
-  void HandleReadFinished(DiskQueue* disk_queue, RequestContext* reader,
-      Status read_status, std::unique_ptr<BufferDescriptor> buffer);
-
   /// Invokes write_range->callback_  after the range has been written and
   /// updates per-disk state and handle state. The status of the write OK/RUNTIME_ERROR
   /// etc. is passed via write_status and to the callback.
@@ -488,11 +482,6 @@ class DiskIoMgr : public CacheLineAligned {
   /// Does not open or close the file that is written.
   Status WriteRangeHelper(FILE* file_handle, WriteRange* write_range) WARN_UNUSED_RESULT;
 
-  /// Reads the specified scan range and calls HandleReadFinished() when done. If no
-  /// buffer is available to read the range's data into, the read cannot proceed, the
-  /// range becomes blocked and this function returns without doing I/O.
-  void ReadRange(DiskQueue* disk_queue, RequestContext* reader, ScanRange* range);
-
   /// Helper for AllocateBuffersForRange() to compute the buffer sizes for a scan range
   /// with length 'scan_range_len', given that 'max_bytes' of memory should be allocated.
   std::vector<int64_t> ChooseBufferSizes(int64_t scan_range_len, int64_t max_bytes);

http://git-wip-us.apache.org/repos/asf/impala/blob/d7bba821/be/src/runtime/io/request-context.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.cc b/be/src/runtime/io/request-context.cc
index dec6aa6..dd8ff24 100644
--- a/be/src/runtime/io/request-context.cc
+++ b/be/src/runtime/io/request-context.cc
@@ -64,6 +64,34 @@ void RequestContext::FreeBuffer(BufferDescriptor* buffer) {
   buffer->buffer_ = nullptr;
 }
 
+void RequestContext::ReadDone(int disk_id, ReadOutcome outcome, ScanRange* range) {
+  // TODO: IMPALA-4249: it is safe to touch 'range' until DecrementDiskThread() is
+  // called because all clients of DiskIoMgr keep ScanRange objects alive until they
+  // unregister their RequestContext.
+  unique_lock<mutex> lock(lock_);
+  RequestContext::PerDiskState* disk_state = &disk_states_[disk_id];
+  DCHECK_GT(disk_state->num_threads_in_op(), 0);
+  if (outcome == ReadOutcome::SUCCESS_EOSR) {
+    // No more reads to do.
+    --disk_state->num_remaining_ranges();
+  } else if (outcome == ReadOutcome::SUCCESS_NO_EOSR) {
+    // Schedule the next read.
+    if (state_ != RequestContext::Cancelled) {
+      ScheduleScanRange(lock, range);
+    }
+  } else if (outcome == ReadOutcome::BLOCKED_ON_BUFFER) {
+    // Do nothing - the caller must add a buffer to the range or cancel it.
+  } else {
+    DCHECK(outcome == ReadOutcome::CANCELLED) << static_cast<int>(outcome);
+    // No more reads - clean up the scan range.
+    --disk_state->num_remaining_ranges();
+    RemoveActiveScanRangeLocked(lock, range);
+  }
+  // Release refcount that was taken in IncrementDiskThreadAfterDequeue().
+  disk_state->DecrementDiskThread(lock, this);
+  DCHECK(Validate()) << endl << DebugString();
+}
+
 // Cancellation of a RequestContext requires coordination from multiple threads that may
 // hold references to the context:
 //  1. Disk threads that are currently processing a range for this context.
@@ -104,7 +132,9 @@ void RequestContext::Cancel() {
 
     // Clear out all request ranges from queues for this reader. Cancel the scan
     // ranges and invoke the write range callbacks to propagate the cancellation.
-    for (ScanRange* range : active_scan_ranges_) range->CancelInternal(Status::CANCELLED);
+    for (ScanRange* range : active_scan_ranges_) {
+      range->CancelInternal(Status::CANCELLED, false);
+    }
     active_scan_ranges_.clear();
     for (PerDiskState& disk_state : disk_states_) {
       RequestRange* range;

http://git-wip-us.apache.org/repos/asf/impala/blob/d7bba821/be/src/runtime/io/request-context.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-context.h b/be/src/runtime/io/request-context.h
index b028596..adfe6d5 100644
--- a/be/src/runtime/io/request-context.h
+++ b/be/src/runtime/io/request-context.h
@@ -201,6 +201,11 @@ class RequestContext {
     state.ScheduleContext(lock, this, range->disk_id());
   }
 
+  /// Called from a disk thread when a read completes. Decrements the disk thread count
+  /// and other bookkeeping and re-schedules 'range' if there are more reads to do.
+  /// Caller must not hold 'lock_'.
+  void ReadDone(int disk_id, ReadOutcome outcome, ScanRange* range);
+
   /// Cancel the context if not already cancelled, wait for all scan ranges to finish
   /// and mark the context as inactive, after which it cannot be used.
   void CancelAndMarkInactive();
@@ -367,13 +372,14 @@ class RequestContext {
     void ScheduleContext(const boost::unique_lock<boost::mutex>& context_lock,
         RequestContext* context, int disk_id);
 
-    /// Increment the count of disk threads that have a reference to this context. These
-    /// threads do not hold any locks while reading from HDFS, so we need to prevent the
-    /// RequestContext from being destroyed underneath them.
+    /// Called when dequeueing this RequestContext from the disk queue to increment the
+    /// count of disk threads with a reference to this context. These threads do not hold
+    /// any locks while reading from HDFS, so we need to prevent the RequestContext from
+    /// being destroyed underneath them.
     ///
     /// The caller does not need to hold 'lock_', so this can execute concurrently with
     /// itself and DecrementDiskThread().
-    void IncrementDiskThreadAndDequeue() {
+    void IncrementDiskThreadAfterDequeue() {
       /// Incrementing 'num_threads_in_op_' first so that there is no window when other
       /// threads see 'is_on_queue_ == num_threads_in_op_ == 0' and think there are no
       /// references left to this context.
@@ -395,7 +401,7 @@ class RequestContext {
       }
       // The state is cancelled, check to see if we're the last thread to touch the
       // context on this disk. We need to load 'is_on_queue_' and 'num_threads_in_op_'
-      // in this order to avoid a race with IncrementDiskThreadAndDequeue().
+      // in this order to avoid a race with IncrementDiskThreadAfterDequeue().
       if (is_on_queue_.Load() == 0 && num_threads_in_op_.Load() == 0 && !done_) {
         context->DecrementDiskRefCount(context_lock);
         done_ = true;

http://git-wip-us.apache.org/repos/asf/impala/blob/d7bba821/be/src/runtime/io/request-ranges.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/request-ranges.h b/be/src/runtime/io/request-ranges.h
index 7ec0cb6..3e266d6 100644
--- a/be/src/runtime/io/request-ranges.h
+++ b/be/src/runtime/io/request-ranges.h
@@ -34,8 +34,9 @@
 namespace impala {
 namespace io {
 class DiskIoMgr;
-class RequestContext;
+struct DiskQueue;
 class ExclusiveHdfsFileHandle;
+class RequestContext;
 class ScanRange;
 
 /// Buffer struct that is used by the caller and IoMgr to pass read buffers.
@@ -114,6 +115,20 @@ struct RequestType {
   };
 };
 
+/// ReadOutput describes the possible outcomes of the DoRead() function.
+enum class ReadOutcome {
+  // The last (eosr) buffer was successfully enqueued.
+  SUCCESS_EOSR,
+  // The buffer was successfully enqueued but we are not at eosr and can schedule
+  // the next read.
+  SUCCESS_NO_EOSR,
+  // The scan range is blocked waiting for the next buffer.
+  BLOCKED_ON_BUFFER,
+  // The scan range is cancelled (either by caller or because of an error). No more
+  // reads will be scheduled.
+  CANCELLED
+};
+
 /// Represents a contiguous sequence of bytes in a single file.
 /// This is the common base class for read and write IO requests - ScanRange and
 /// WriteRange. Each disk thread processes exactly one RequestRange at a time.
@@ -236,12 +251,12 @@ class ScanRange : public RequestRange {
   /// and cannot be accessed.
   void ReturnBuffer(std::unique_ptr<BufferDescriptor> buffer);
 
-  /// Cancel this scan range. This cleans up all queued buffers and wakes up any threads
-  /// blocked on GetNext(). Status is a non-ok status with the reason the range was
-  /// cancelled, e.g. CANCELLED if the range was cancelled because it was not needed, or
-  /// another error if an error was encountered while scanning the range. Status is
-  /// returned to the any callers of GetNext(). If a thread is currently blocked in
-  /// GetNext(), it is woken up.
+  /// Cancel this scan range. This waits for any in-flight read operations to complete,
+  /// cleans up all buffers owned by the scan range (i.e. queued or unused buffers)
+  /// and wakes up any threads blocked on GetNext(). Status is a non-ok status with the
+  /// reason the range was cancelled, e.g. CANCELLED if the range was cancelled because
+  /// it was not needed, or another error if an error was encountered while scanning the
+  /// range. Status is returned to the any callers of GetNext().
   void Cancel(const Status& status);
 
   /// return a descriptive string for debug.
@@ -259,10 +274,9 @@ class ScanRange : public RequestRange {
 
   /// Enqueues a ready buffer with valid data for this range. This does not block.
   /// The caller passes ownership of buffer to the scan range and it is not
-  /// valid to access buffer after this call. The reader lock must be held by the
-  /// caller. Returns false if the scan range was cancelled.
-  bool EnqueueReadyBuffer(const boost::unique_lock<boost::mutex>& reader_lock,
-      std::unique_ptr<BufferDescriptor> buffer);
+  /// valid to access buffer after this call. Returns false if the scan range was
+  /// cancelled.
+  bool EnqueueReadyBuffer(std::unique_ptr<BufferDescriptor> buffer);
 
   /// Validates the internal state of this range. lock_ must be taken
   /// before calling this.
@@ -324,10 +338,10 @@ class ScanRange : public RequestRange {
   std::unique_ptr<BufferDescriptor> GetUnusedBuffer(
       const boost::unique_lock<boost::mutex>& scan_range_lock);
 
-  /// Get the next buffer for this scan range for a disk thread to read into. Returns
-  /// the new buffer if successful.  If no buffers are available, marks the range
-  /// as blocked and returns nullptr. Called must not hold 'lock_'.
-  std::unique_ptr<BufferDescriptor> GetNextUnusedBufferForRange();
+  /// Called from a disk I/O thread to read the next buffer of data for this range. The
+  /// returned ReadOutcome describes what the result of the read was. 'disk_id' is the
+  /// ID of the disk queue. Caller must not hold 'lock_'.
+  ReadOutcome DoRead(int disk_id);
 
   /// Cleans up a buffer that was not returned to the client.
   /// Either ReturnBuffer() or CleanUpBuffer() is called for every BufferDescriptor.
@@ -344,15 +358,18 @@ class ScanRange : public RequestRange {
   /// range is cancelled or at eos. The caller must hold 'lock_' via 'scan_range_lock'.
   void CleanUpUnusedBuffers(const boost::unique_lock<boost::mutex>& scan_range_lock);
 
-  /// Same as Cancel() except reader_->lock must be held by the caller.
-  void CancelFromReader(const boost::unique_lock<boost::mutex>& reader_lock,
-      const Status& status);
-
   /// Same as Cancel() except doesn't remove the scan range from
-  /// reader_->active_scan_ranges_. This is invoked by RequestContext::Cancel(),
-  /// which removes the range itself to avoid invalidating its active_scan_ranges_
-  /// iterator.
-  void CancelInternal(const Status& status);
+  /// reader_->active_scan_ranges_ or wait for in-flight reads to finish.
+  /// This is invoked by RequestContext::Cancel(), which removes the range itself
+  /// to avoid invalidating its active_scan_ranges_ iterator. If 'read_error' is
+  /// true, this is being called from a disk thread to propagate a read error, so
+  /// 'read_in_flight_' is set to false and threads in WaitForInFlightRead() are
+  /// woken up.
+  void CancelInternal(const Status& status, bool read_error);
+
+  /// Waits for any in-flight read to complete. Called after CancelInternal() to ensure
+  /// no more reads will occur for the scan range.
+  void WaitForInFlightRead();
 
   /// Marks the scan range as blocked waiting for a buffer. Caller must not hold 'lock_'.
   void SetBlockedOnBuffer();
@@ -451,10 +468,15 @@ class ScanRange : public RequestRange {
   /// Total number of bytes of buffers in 'unused_iomgr_buffers_'.
   int64_t unused_iomgr_buffer_bytes_ = 0;
 
-  /// Number of bytes of buffers returned from GetNextUnusedBufferForRange(). Used to
-  /// infer how many bytes of buffers need to be held onto to read the rest of the scan
-  /// range.
-  int64_t iomgr_buffer_bytes_returned_ = 0;
+  /// Cumulative bytes of I/O mgr buffers taken from 'unused_iomgr_buffers_' by DoRead().
+  /// Used to infer how many bytes of buffers need to be held onto to read the rest of
+  /// the scan range.
+  int64_t iomgr_buffer_cumulative_bytes_used_ = 0;
+
+  /// True if a disk thread is currently doing a read for this scan range. Set to true in
+  /// DoRead() and set to false in EnqueueReadyBuffer() or CancelInternal() when the
+  /// read completes and any buffer used for the read is either enqueued or freed.
+  bool read_in_flight_ = false;
 
   /// If true, the last buffer for this scan range has been queued.
   /// If this is true and 'ready_buffers_' is empty, then no more buffers will be
@@ -472,7 +494,8 @@ class ScanRange : public RequestRange {
   /// 'cancel_status_' is not OK.
   std::deque<std::unique_ptr<BufferDescriptor>> ready_buffers_;
 
-  /// Condition variable for threads in GetNext() that are waiting for the next buffer.
+  /// Condition variable for threads in GetNext() that are waiting for the next buffer
+  /// and threads in WaitForInFlightRead() that are waiting for a read to finish.
   /// Signalled when a buffer is enqueued in 'ready_buffers_' or the scan range is
   /// cancelled.
   ConditionVariable buffer_ready_cv_;

http://git-wip-us.apache.org/repos/asf/impala/blob/d7bba821/be/src/runtime/io/scan-range.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/io/scan-range.cc b/be/src/runtime/io/scan-range.cc
index 4f7c38b..c868c3d 100644
--- a/be/src/runtime/io/scan-range.cc
+++ b/be/src/runtime/io/scan-range.cc
@@ -36,23 +36,33 @@ DEFINE_bool(use_hdfs_pread, false, "Enables using hdfsPread() instead of hdfsRea
 DEFINE_int64(adls_read_chunk_size, 128 * 1024, "The maximum read chunk size to use when "
     "reading from ADLS.");
 
+#ifndef NDEBUG
+DECLARE_int32(stress_disk_read_delay_ms);
+#endif
+
 // Implementation of the ScanRange functionality. Each ScanRange contains a queue
 // of ready buffers. For each ScanRange, there is only a single producer and
 // consumer thread, i.e. only one disk thread will push to a scan range at
 // any time and only one thread will remove from the queue. This is to guarantee
 // that buffers are queued and read in file order.
-
-bool ScanRange::EnqueueReadyBuffer(
-    const unique_lock<mutex>& reader_lock, unique_ptr<BufferDescriptor> buffer) {
-  DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock());
+bool ScanRange::EnqueueReadyBuffer(unique_ptr<BufferDescriptor> buffer) {
   DCHECK(buffer->buffer_ != nullptr) << "Cannot enqueue freed buffer";
   {
     unique_lock<mutex> scan_range_lock(lock_);
     DCHECK(Validate()) << DebugString();
     DCHECK(!eosr_queued_);
+    if (!buffer->is_cached()) {
+      // All non-cached buffers are enqueued by disk threads. Indicate that the read
+      // finished.
+      DCHECK(read_in_flight_);
+      read_in_flight_ = false;
+    }
     if (!cancel_status_.ok()) {
       // This range has been cancelled, no need to enqueue the buffer.
       CleanUpBuffer(scan_range_lock, move(buffer));
+      // One or more threads may be blocked in WaitForInFlightRead() waiting for the read
+      // to complete. Wake up all of them.
+      buffer_ready_cv_.NotifyAll();
       return false;
     }
     // Clean up any surplus buffers. E.g. we may have allocated too many if the file was
@@ -122,7 +132,7 @@ void ScanRange::AddUnusedBuffers(vector<unique_ptr<BufferDescriptor>>&& buffers,
       if (external_buffer_tag_ != ExternalBufferTag::NO_BUFFER
           || !cancel_status_.ok()
           || eosr_queued_
-          || unused_iomgr_buffer_bytes_ >= len_ - iomgr_buffer_bytes_returned_) {
+          || unused_iomgr_buffer_bytes_ >= len_ - iomgr_buffer_cumulative_bytes_used_) {
         CleanUpBuffer(scan_range_lock, move(buffer));
       } else {
         unused_iomgr_buffer_bytes_ += buffer->buffer_len();
@@ -156,15 +166,77 @@ unique_ptr<BufferDescriptor> ScanRange::GetUnusedBuffer(
   return result;
 }
 
-unique_ptr<BufferDescriptor> ScanRange::GetNextUnusedBufferForRange() {
-  unique_lock<mutex> lock(lock_);
-  unique_ptr<BufferDescriptor> buffer_desc = GetUnusedBuffer(lock);
-  if (buffer_desc == nullptr) {
-    blocked_on_buffer_ = true;
-  } else {
-    iomgr_buffer_bytes_returned_ += buffer_desc->buffer_len();
+ReadOutcome ScanRange::DoRead(int disk_id) {
+  int64_t bytes_remaining = len_ - bytes_read_;
+  DCHECK_GT(bytes_remaining, 0);
+
+  unique_ptr<BufferDescriptor> buffer_desc;
+  {
+    unique_lock<mutex> lock(lock_);
+    DCHECK(!read_in_flight_);
+    if (!cancel_status_.ok()) return ReadOutcome::CANCELLED;
+
+    if (external_buffer_tag_ == ScanRange::ExternalBufferTag::CLIENT_BUFFER) {
+      buffer_desc = unique_ptr<BufferDescriptor>(new BufferDescriptor(
+          io_mgr_, reader_, this, client_buffer_.data, client_buffer_.len));
+    } else {
+      DCHECK(external_buffer_tag_ == ScanRange::ExternalBufferTag::NO_BUFFER)
+          << "This code path does not handle other buffer types, i.e. HDFS cache"
+          << static_cast<int>(external_buffer_tag_);
+      buffer_desc = GetUnusedBuffer(lock);
+      if (buffer_desc == nullptr) {
+        // No buffer available - the range will be rescheduled when a buffer is added.
+        blocked_on_buffer_ = true;
+        return ReadOutcome::BLOCKED_ON_BUFFER;
+      }
+      iomgr_buffer_cumulative_bytes_used_ += buffer_desc->buffer_len();
+    }
+    read_in_flight_ = true;
   }
-  return buffer_desc;
+
+  // No locks in this section.  Only working on local vars.  We don't want to hold a
+  // lock across the read call.
+  Status read_status = Open(is_file_handle_caching_enabled());
+  if (read_status.ok()) {
+    COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, 1L);
+    COUNTER_BITOR_IF_NOT_NULL(reader_->disks_accessed_bitmap_, 1LL << disk_id);
+
+    read_status = Read(buffer_desc->buffer_, buffer_desc->buffer_len_,
+        &buffer_desc->len_, &buffer_desc->eosr_);
+    buffer_desc->scan_range_offset_ = bytes_read_ - buffer_desc->len_;
+
+    COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, buffer_desc->len_);
+    COUNTER_ADD(&io_mgr_->total_bytes_read_counter_, buffer_desc->len_);
+    COUNTER_ADD_IF_NOT_NULL(reader_->active_read_thread_counter_, -1L);
+  }
+
+  DCHECK(buffer_desc->buffer_ != nullptr);
+  DCHECK(!buffer_desc->is_cached()) << "HDFS cache reads don't go through this code path.";
+  if (!read_status.ok()) {
+    // Free buffer to release resources before we cancel the range so that all buffers
+    // are freed at cancellation.
+    reader_->FreeBuffer(buffer_desc.get());
+    buffer_desc.reset();
+
+    // Propagate 'read_status' to the scan range. This will also wake up any waiting
+    // threads.
+    CancelInternal(read_status, true);
+    // No more reads for this scan range - we can close it.
+    Close();
+    return ReadOutcome::CANCELLED;
+  }
+
+  // After calling EnqueueReadyBuffer(), it is no longer valid to touch 'buffer_desc'.
+  // Store the state we need before calling EnqueueReadyBuffer().
+  bool eosr = buffer_desc->eosr_;
+  // Read successful - enqueue the buffer and return the appropriate outcome.
+  if (!EnqueueReadyBuffer(move(buffer_desc))) return ReadOutcome::CANCELLED;
+  if (eosr) {
+    // No more reads for this scan range - we can close it.
+    Close();
+    return ReadOutcome::SUCCESS_EOSR;
+  }
+  return ReadOutcome::SUCCESS_NO_EOSR;
 }
 
 void ScanRange::SetBlockedOnBuffer() {
@@ -202,18 +274,14 @@ void ScanRange::CleanUpUnusedBuffers(const unique_lock<mutex>& scan_range_lock)
 void ScanRange::Cancel(const Status& status) {
   // Cancelling a range that was never started, ignore.
   if (io_mgr_ == nullptr) return;
-  CancelInternal(status);
+  CancelInternal(status, false);
+  // Wait until an in-flight read is finished. The read thread will clean up the
+  // buffer it used. Once the range is cancelled, no more reads should be started.
+  WaitForInFlightRead();
   reader_->RemoveActiveScanRange(this);
 }
 
-void ScanRange::CancelFromReader(const boost::unique_lock<boost::mutex>& reader_lock,
-    const Status& status) {
-  DCHECK(reader_lock.mutex() == &reader_->lock_ && reader_lock.owns_lock());
-  CancelInternal(status);
-  reader_->RemoveActiveScanRangeLocked(reader_lock, this);
-}
-
-void ScanRange::CancelInternal(const Status& status) {
+void ScanRange::CancelInternal(const Status& status, bool read_error) {
   DCHECK(io_mgr_ != nullptr);
   DCHECK(!status.ok());
   {
@@ -223,10 +291,10 @@ void ScanRange::CancelInternal(const Status& status) {
     {
       unique_lock<mutex> hdfs_lock(hdfs_lock_);
       DCHECK(Validate()) << DebugString();
-      // If already cancelled, preserve the original reason for cancellation. The first
-      // thread to set 'cancel_status_' does the cleanup below.
-      RETURN_VOID_IF_ERROR(cancel_status_);
-      cancel_status_ = status;
+      // If already cancelled, preserve the original reason for cancellation. Most of the
+      // cleanup is not required if already cancelled, but we need to set
+      // 'read_in_flight_' to false.
+      if (cancel_status_.ok()) cancel_status_ = status;
     }
 
     /// Clean up 'ready_buffers_' while still holding 'lock_' to prevent other threads
@@ -238,6 +306,10 @@ void ScanRange::CancelInternal(const Status& status) {
 
     /// Clean up buffers that we don't need any more because we won't read any more data.
     CleanUpUnusedBuffers(scan_range_lock);
+    if (read_error) {
+      DCHECK(read_in_flight_);
+      read_in_flight_ = false;
+    }
   }
   buffer_ready_cv_.NotifyAll();
 
@@ -246,6 +318,11 @@ void ScanRange::CancelInternal(const Status& status) {
   if (external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER) Close();
 }
 
+void ScanRange::WaitForInFlightRead() {
+  unique_lock<mutex> scan_range_lock(lock_);
+  while (read_in_flight_) buffer_ready_cv_.Wait(scan_range_lock);
+}
+
 string ScanRange::DebugString() const {
   stringstream ss;
   ss << "file=" << file_ << " disk_id=" << disk_id_ << " offset=" << offset_
@@ -303,6 +380,7 @@ ScanRange::~ScanRange() {
   DCHECK(exclusive_hdfs_fh_ == nullptr) << "File was not closed.";
   DCHECK(external_buffer_tag_ != ExternalBufferTag::CACHED_BUFFER)
       << "Cached buffer was not released.";
+  DCHECK(!read_in_flight_);
   DCHECK_EQ(0, ready_buffers_.size());
   DCHECK_EQ(0, num_buffers_in_reader_.Load());
 }
@@ -310,6 +388,7 @@ ScanRange::~ScanRange() {
 void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
     int disk_id, bool expected_local, const BufferOpts& buffer_opts, void* meta_data) {
   DCHECK(ready_buffers_.empty());
+  DCHECK(!read_in_flight_);
   DCHECK(file != nullptr);
   DCHECK_GE(len, 0);
   DCHECK_GE(offset, 0);
@@ -340,13 +419,14 @@ void ScanRange::Reset(hdfsFS fs, const char* file, int64_t len, int64_t offset,
 void ScanRange::InitInternal(DiskIoMgr* io_mgr, RequestContext* reader) {
   DCHECK(exclusive_hdfs_fh_ == nullptr);
   DCHECK(local_file_ == nullptr);
+  DCHECK(!read_in_flight_);
   io_mgr_ = io_mgr;
   reader_ = reader;
   local_file_ = nullptr;
   exclusive_hdfs_fh_ = nullptr;
   bytes_read_ = 0;
   unused_iomgr_buffer_bytes_ = 0;
-  iomgr_buffer_bytes_returned_ = 0;
+  iomgr_buffer_cumulative_bytes_used_ = 0;
   cancel_status_ = Status::OK();
   eosr_queued_ = false;
   blocked_on_buffer_ = false;
@@ -474,6 +554,13 @@ int64_t ScanRange::MaxReadChunkSize() const {
 // TODO: look at linux disk scheduling
 Status ScanRange::Read(
     uint8_t* buffer, int64_t buffer_len, int64_t* bytes_read, bool* eosr) {
+  DCHECK(read_in_flight_);
+  // Delay before acquiring the lock, to allow triggering IMPALA-6587 race.
+#ifndef NDEBUG
+  if (FLAGS_stress_disk_read_delay_ms > 0) {
+    SleepForMs(FLAGS_stress_disk_read_delay_ms);
+  }
+#endif
   unique_lock<mutex> hdfs_lock(hdfs_lock_);
   RETURN_IF_ERROR(cancel_status_);
 
@@ -664,7 +751,7 @@ Status ScanRange::ReadFromCache(
   desc->scan_range_offset_ = 0;
   desc->eosr_ = true;
   bytes_read_ = bytes_read;
-  EnqueueReadyBuffer(reader_lock, move(desc));
+  EnqueueReadyBuffer(move(desc));
   COUNTER_ADD_IF_NOT_NULL(reader_->bytes_read_counter_, bytes_read);
   *read_succeeded = true;
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/impala/blob/d7bba821/testdata/workloads/functional-query/queries/QueryTest/scanners.test
----------------------------------------------------------------------
diff --git a/testdata/workloads/functional-query/queries/QueryTest/scanners.test b/testdata/workloads/functional-query/queries/QueryTest/scanners.test
index a000a15..b05786e 100644
--- a/testdata/workloads/functional-query/queries/QueryTest/scanners.test
+++ b/testdata/workloads/functional-query/queries/QueryTest/scanners.test
@@ -117,3 +117,14 @@ select * from emptytable;
 ---- TYPES
 STRING,INT
 ====
+---- QUERY
+# IMPALA-6587: regression test for reservation not being managed correctly. Should be
+# able to execute this query reliably with the minimum reservation, even with tiny
+# scan ranges. This reliably reproduced the issue when run against text/lzo.
+set max_scan_range_length=1;
+select count(*) from alltypessmall
+---- RESULTS
+100
+---- TYPES
+BIGINT
+====