You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/05/11 19:58:43 UTC

incubator-impala git commit: IMPALA-5169: Add support for async pins in buffer pool

Repository: incubator-impala
Updated Branches:
  refs/heads/master 060b80fd9 -> cb37be893


IMPALA-5169: Add support for async pins in buffer pool

Makes Pin() do async reads behind-the-scenes, instead of
blocking until the read completes. The blocking is done
instead when the client tries to access the buffer via
PageHandle::GetBuffer() or ExtractBuffer().

This is implemented with a new sub-state of "pinned"
where the page has a buffer and consumes reservation
but the buffer does not contain valid data.

Motivation:
This unlocks various opportunities to overlap read I/Os
with other work:
* Reads to different disks can execute in parallel
* I/O and computation can be overlapped.

This initially benefits BufferedTupleStream::PinStream(),
where many pages are pinned at once. With this change the
reads run asynchronously. This can potentially lead
to large speedups when spilling. E.g. if the pages for a Hash
Join's partition are spread across 10 disks, we could get 10x
the read throughput, plus overlap the I/O with hash table build.

In future we can use this to do read-ahead over unpinned
BufferedTupleStreams or for unpinned Runs in Sorter, but
this requires changes to the client code to Pin() pages
in advance.

Testing:
* BufferedTupleStreamV2 already exercises this.
* Various BufferPool tests already exercise this.
* Added a basic test to cover edge cases made possible by the
  new state transitions.
* Extended the randomised test to cover this.

Change-Id: Ibdf074c1ac4405d6f08d623ba438a85f7d39fd79
Reviewed-on: http://gerrit.cloudera.org:8080/6612
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/cb37be89
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/cb37be89
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/cb37be89

Branch: refs/heads/master
Commit: cb37be8935579263122088e2943d014c4a9aba21
Parents: 060b80f
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Apr 11 18:08:39 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu May 11 19:36:09 2017 +0000

----------------------------------------------------------------------
 be/src/runtime/buffered-tuple-stream-v2.cc      |  62 ++++++--
 be/src/runtime/buffered-tuple-stream-v2.h       |  29 +++-
 .../runtime/bufferpool/buffer-pool-internal.h   |  62 +++++---
 be/src/runtime/bufferpool/buffer-pool-test.cc   | 158 +++++++++++++++----
 be/src/runtime/bufferpool/buffer-pool.cc        | 108 +++++++++----
 be/src/runtime/bufferpool/buffer-pool.h         |  55 ++++---
 be/src/runtime/tmp-file-mgr.cc                  |  74 ++++++---
 be/src/runtime/tmp-file-mgr.h                   |  32 +++-
 8 files changed, 425 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb37be89/be/src/runtime/buffered-tuple-stream-v2.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2.cc b/be/src/runtime/buffered-tuple-stream-v2.cc
index 083b59e..c36c31f 100644
--- a/be/src/runtime/buffered-tuple-stream-v2.cc
+++ b/be/src/runtime/buffered-tuple-stream-v2.cc
@@ -44,6 +44,8 @@
 using namespace impala;
 using namespace strings;
 
+using BufferHandle = BufferPool::BufferHandle;
+
 BufferedTupleStreamV2::BufferedTupleStreamV2(RuntimeState* state,
     const RowDescriptor& row_desc, BufferPool::ClientHandle* buffer_pool_client,
     int64_t page_len, const set<SlotId>& ext_varlen_slots)
@@ -54,6 +56,7 @@ BufferedTupleStreamV2::BufferedTupleStreamV2(RuntimeState* state,
     total_byte_size_(0),
     read_page_rows_returned_(-1),
     read_ptr_(nullptr),
+    read_end_ptr_(nullptr),
     write_ptr_(nullptr),
     write_end_ptr_(nullptr),
     rows_returned_(0),
@@ -109,15 +112,20 @@ void BufferedTupleStreamV2::CheckConsistency() const {
   }
   if (has_write_iterator()) {
     DCHECK(write_page_->is_pinned());
-    DCHECK_GE(write_ptr_, write_page_->data());
-    DCHECK_EQ(write_end_ptr_, write_page_->data() + write_page_->len());
+    DCHECK(write_page_->retrieved_buffer);
+    const BufferHandle* write_buffer;
+    Status status = write_page_->GetBuffer(&write_buffer);
+    DCHECK(status.ok()); // Write buffer should never have been unpinned.
+    DCHECK_GE(write_ptr_, write_buffer->data());
+    DCHECK_EQ(write_end_ptr_, write_buffer->data() + write_page_->len());
     DCHECK_GE(write_end_ptr_, write_ptr_);
   }
   if (has_read_iterator()) {
     DCHECK(read_page_->is_pinned());
-    uint8_t* read_end_ptr = read_page_->data() + read_page_->len();
-    DCHECK_GE(read_ptr_, read_page_->data());
-    DCHECK_GE(read_end_ptr, read_ptr_);
+    DCHECK(read_page_->retrieved_buffer);
+    // Can't check read buffer without affecting behaviour, because a read may be in
+    // flight and this would required blocking on that write.
+    DCHECK_GE(read_end_ptr_, read_ptr_);
   }
 }
 
@@ -186,9 +194,13 @@ Status BufferedTupleStreamV2::PrepareForReadWrite(
 
 void BufferedTupleStreamV2::Close(RowBatch* batch, RowBatch::FlushMode flush) {
   for (Page& page : pages_) {
-    if (batch != nullptr && page.is_pinned()) {
+    if (batch != nullptr && page.retrieved_buffer) {
+      // Subtle: We only need to attach buffers from pages that we may have returned
+      // references to. ExtractBuffer() cannot fail for these pages because the data
+      // is guaranteed to already be in -memory.
       BufferPool::BufferHandle buffer;
-      buffer_pool_->ExtractBuffer(buffer_pool_client_, &page.handle, &buffer);
+      Status status = buffer_pool_->ExtractBuffer(buffer_pool_client_, &page.handle, &buffer);
+      DCHECK(status.ok());
       batch->AddBuffer(buffer_pool_client_, move(buffer), flush);
     } else {
       buffer_pool_->DestroyPage(buffer_pool_client_, &page.handle);
@@ -247,6 +259,7 @@ void BufferedTupleStreamV2::UnpinPageIfNeeded(Page* page, bool stream_pinned) {
     DCHECK_EQ(new_pin_count, page->pin_count() - 1);
     buffer_pool_->Unpin(buffer_pool_client_, &page->handle);
     bytes_pinned_ -= page->len();
+    if (page->pin_count() == 0) page->retrieved_buffer = false;
   }
 }
 
@@ -255,16 +268,17 @@ Status BufferedTupleStreamV2::NewWritePage() noexcept {
   DCHECK(!has_write_iterator());
 
   Page new_page;
-  RETURN_IF_ERROR(
-      buffer_pool_->CreatePage(buffer_pool_client_, page_len_, &new_page.handle));
+  const BufferHandle* write_buffer;
+  RETURN_IF_ERROR(buffer_pool_->CreatePage(
+      buffer_pool_client_, page_len_, &new_page.handle, &write_buffer));
   bytes_pinned_ += page_len_;
   total_byte_size_ += page_len_;
 
   pages_.push_back(std::move(new_page));
   write_page_ = &pages_.back();
   DCHECK_EQ(write_page_->num_rows, 0);
-  write_ptr_ = write_page_->data();
-  write_end_ptr_ = write_page_->data() + page_len_;
+  write_ptr_ = write_buffer->data();
+  write_end_ptr_ = write_ptr_ + page_len_;
   return Status::OK();
 }
 
@@ -312,6 +326,8 @@ void BufferedTupleStreamV2::ResetWritePage() {
   // Unpin the write page if we're reading in unpinned mode.
   Page* prev_write_page = write_page_;
   write_page_ = nullptr;
+  write_ptr_ = nullptr;
+  write_end_ptr_ = nullptr;
 
   // May need to decrement pin count now that it's not the write page, depending on
   // the stream's mode.
@@ -347,8 +363,13 @@ Status BufferedTupleStreamV2::NextReadPage() {
   // actually fail once we have variable-length pages.
   RETURN_IF_ERROR(PinPageIfNeeded(&*read_page_, pinned_));
 
+  // This waits for the pin to complete if the page was unpinned earlier.
+  const BufferHandle* read_buffer;
+  RETURN_IF_ERROR(read_page_->GetBuffer(&read_buffer));
+
   read_page_rows_returned_ = 0;
-  read_ptr_ = read_page_->data();
+  read_ptr_ = read_buffer->data();
+  read_end_ptr_ = read_ptr_ + read_buffer->len();
 
   CHECK_CONSISTENCY();
   return Status::OK();
@@ -359,6 +380,8 @@ void BufferedTupleStreamV2::ResetReadPage() {
   // Unpin the write page if we're reading in unpinned mode.
   Page* prev_read_page = &*read_page_;
   read_page_ = pages_.end();
+  read_ptr_ = nullptr;
+  read_end_ptr_ = nullptr;
 
   // May need to decrement pin count after destroying read iterator.
   UnpinPageIfNeeded(prev_read_page, pinned_);
@@ -384,10 +407,15 @@ Status BufferedTupleStreamV2::PrepareForReadInternal(bool delete_on_read) {
   read_page_ = pages_.begin();
   RETURN_IF_ERROR(PinPageIfNeeded(&*read_page_, pinned_));
 
+  // This waits for the pin to complete if the page was unpinned earlier.
+  const BufferHandle* read_buffer;
+  RETURN_IF_ERROR(read_page_->GetBuffer(&read_buffer));
+
   DCHECK(has_read_iterator());
   DCHECK(read_page_->is_pinned());
   read_page_rows_returned_ = 0;
-  read_ptr_ = read_page_->data();
+  read_ptr_ = read_buffer->data();
+  read_end_ptr_ = read_ptr_ + read_buffer->len();
   rows_returned_ = 0;
   delete_on_read_ = delete_on_read;
   CHECK_CONSISTENCY();
@@ -411,6 +439,8 @@ Status BufferedTupleStreamV2::PinStream(bool* pinned) {
   if (!reservation_granted) return Status::OK();
 
   // At this point success is guaranteed - go through to pin the pages we need to pin.
+  // If the page data was evicted from memory, the read I/O can happen in parallel
+  // because we defer calling GetBuffer() until NextReadPage().
   for (Page& page : pages_) RETURN_IF_ERROR(PinPageIfNeeded(&page, true));
 
   pinned_ = true;
@@ -556,7 +586,7 @@ Status BufferedTupleStreamV2::GetNextInternal(
     batch->MarkNeedsDeepCopy();
   }
   if (FILL_FLAT_ROWS) DCHECK_EQ(flat_rows->size(), rows_to_fill);
-  DCHECK_LE(read_ptr_, read_page_->data() + read_page_->len());
+  DCHECK_LE(read_ptr_, read_end_ptr_);
   return Status::OK();
 }
 
@@ -567,7 +597,7 @@ void BufferedTupleStreamV2::FixUpStringsForRead(
     if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
 
     StringValue* sv = tuple->GetStringSlot(slot_desc->tuple_offset());
-    DCHECK_LE(read_ptr_ + sv->len, read_page_->data() + read_page_->len());
+    DCHECK_LE(read_ptr_ + sv->len, read_end_ptr_);
     sv->ptr = reinterpret_cast<char*>(read_ptr_);
     read_ptr_ += sv->len;
   }
@@ -582,7 +612,7 @@ void BufferedTupleStreamV2::FixUpCollectionsForRead(
     CollectionValue* cv = tuple->GetCollectionSlot(slot_desc->tuple_offset());
     const TupleDescriptor& item_desc = *slot_desc->collection_item_descriptor();
     int coll_byte_size = cv->num_tuples * item_desc.byte_size();
-    DCHECK_LE(read_ptr_ + coll_byte_size, read_page_->data() + read_page_->len());
+    DCHECK_LE(read_ptr_ + coll_byte_size, read_end_ptr_);
     cv->ptr = reinterpret_cast<uint8_t*>(read_ptr_);
     read_ptr_ += coll_byte_size;
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb37be89/be/src/runtime/buffered-tuple-stream-v2.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2.h b/be/src/runtime/buffered-tuple-stream-v2.h
index d707604..e5fea47 100644
--- a/be/src/runtime/buffered-tuple-stream-v2.h
+++ b/be/src/runtime/buffered-tuple-stream-v2.h
@@ -152,7 +152,9 @@ class TupleRow;
 ///     or false respectively) before advancing to the next page.
 ///   2. Pinned: All pages in the stream are pinned so do not need to be pinned or
 ///     unpinned when reading from the stream. If delete on read is true, pages are
-///     deleted after being read.
+///     deleted after being read. If the stream was previously unpinned, the page's data
+///     may not yet be in memory - reading from the stream can block on I/O or fail with
+///     an I/O error.
 /// Write:
 ///   1. Unpinned: Unpin pages as they fill up. This means that only a enough reservation
 ///     to pin a single write page is required to write to the stream, regardless of the
@@ -314,10 +316,9 @@ class BufferedTupleStreamV2 {
       bool* got_rows) WARN_UNUSED_RESULT;
 
   /// Must be called once at the end to cleanup all resources. If 'batch' is non-NULL,
-  /// attaches buffers from any pinned pages to the batch and deletes unpinned
-  /// pages. Otherwise deletes all pages. Does nothing if the stream was already
-  /// closed. The 'flush' mode is forwarded to RowBatch::AddBuffer() when attaching
-  /// buffers.
+  /// attaches buffers from pinned pages that rows returned from GetNext() may reference.
+  /// Otherwise deletes all pages. Does nothing if the stream was already closed. The
+  /// 'flush' mode is forwarded to RowBatch::AddBuffer() when attaching buffers.
   void Close(RowBatch* batch, RowBatch::FlushMode flush);
 
   /// Number of rows in the stream.
@@ -354,18 +355,27 @@ class BufferedTupleStreamV2 {
 
   /// Wrapper around BufferPool::PageHandle that tracks additional info about the page.
   struct Page {
-    Page() : num_rows(0) {}
+    Page() : num_rows(0), retrieved_buffer(true) {}
 
     inline int len() const { return handle.len(); }
-    inline uint8_t* data() const { return handle.data(); }
     inline bool is_pinned() const { return handle.is_pinned(); }
     inline int pin_count() const { return handle.pin_count(); }
+    Status GetBuffer(const BufferPool::BufferHandle** buffer) {
+      RETURN_IF_ERROR(handle.GetBuffer(buffer));
+      retrieved_buffer = true;
+      return Status::OK();
+    }
     std::string DebugString() const;
 
     BufferPool::PageHandle handle;
 
     /// Number of rows written to the page.
     int num_rows;
+
+    /// Whether we called GetBuffer() on the page since it was last pinned. This means
+    /// that GetBuffer() and ExtractBuffer() cannot fail and that GetNext() may have
+    /// returned rows referencing the page's buffer.
+    bool retrieved_buffer;
   };
 
   /// Runtime state instance used to check for cancellation. Not owned.
@@ -415,11 +425,14 @@ class BufferedTupleStreamV2 {
   /// Pointer into read_page_ to the byte after the last row read.
   uint8_t* read_ptr_;
 
+  /// Pointer to one byte past the end of read_page_. Used to detect overruns.
+  const uint8_t* read_end_ptr_;
+
   /// Pointer into write_page_ to the byte after the last row written.
   uint8_t* write_ptr_;
 
   /// Pointer to one byte past the end of write_page_. Cached to speed up computation
-  uint8_t* write_end_ptr_;
+  const uint8_t* write_end_ptr_;
 
   /// Number of rows returned to the caller from GetNext() since the last
   /// PrepareForRead() call.

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb37be89/be/src/runtime/bufferpool/buffer-pool-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-internal.h b/be/src/runtime/bufferpool/buffer-pool-internal.h
index 619288b..0c0408b 100644
--- a/be/src/runtime/bufferpool/buffer-pool-internal.h
+++ b/be/src/runtime/bufferpool/buffer-pool-internal.h
@@ -37,15 +37,21 @@
 /// Each Page object is owned by at most one InternalList<Page> at any given point.
 /// Each page is either pinned or unpinned. Unpinned has a number of sub-states, which
 /// is determined by which list in Client/BufferPool contains the page.
-/// * Pinned: Always in this state when 'pin_count' > 0. The page is in
-///     Client::pinned_pages_.
-/// * Unpinned - Dirty: When no write has been started for an unpinned page. The page is
-///     in Client::dirty_unpinned_pages_.
-/// * Unpinned - Write in flight: When the write has been started but not completed for
-///     a dirty unpinned page. The page is in Client::write_in_flight_pages_. For
-///     accounting purposes this is considered a dirty page.
-/// * Unpinned - Clean: When the write has completed but the page was not evicted. The
-///     page is in a clean pages list in a BufferAllocator arena.
+/// * Pinned: Always in this state when 'pin_count' > 0. The page has a buffer and is in
+///     Client::pinned_pages_. 'pin_in_flight' determines which sub-state the page is in:
+///   -> When pin_in_flight=false, the buffer contains the page's data and the client can
+///      read and write to the buffer.
+///   -> When pin_in_flight=true, the page's data is in the process of being read from
+///      scratch disk into the buffer. Clients will block on the read I/O if they attempt
+///      to access the buffer.
+/// * Unpinned - Dirty: When no write to scratch has been started for an unpinned page.
+///     The page is in Client::dirty_unpinned_pages_.
+/// * Unpinned - Write in flight: When the write to scratch has been started but not
+///     completed for a dirty unpinned page. The page is in
+///     Client::write_in_flight_pages_. For accounting purposes this is considered a
+///     dirty page.
+/// * Unpinned - Clean: When the write to scratch has completed but the page was not
+///     evicted. The page is in a clean pages list in a BufferAllocator arena.
 /// * Unpinned - Evicted: After a clean page's buffer has been reclaimed. The page is
 ///     not in any list.
 ///
@@ -91,7 +97,8 @@ namespace impala {
 /// The internal representation of a page, which can be pinned or unpinned. See the
 /// class comment for explanation of the different page states.
 struct BufferPool::Page : public InternalList<Page>::Node {
-  Page(Client* client, int64_t len) : client(client), len(len), pin_count(0) {}
+  Page(Client* client, int64_t len)
+    : client(client), len(len), pin_count(0), pin_in_flight(false) {}
 
   std::string DebugString();
 
@@ -108,6 +115,11 @@ struct BufferPool::Page : public InternalList<Page>::Node {
   /// PageHandle, so it cannot be accessed by multiple threads concurrently.
   int pin_count;
 
+  /// True if the read I/O to pin the page was started but not completed. Only accessed
+  /// in contexts that are passed the associated PageHandle, so it cannot be accessed
+  /// by multiple threads concurrently.
+  bool pin_in_flight;
+
   /// Non-null if there is a write in flight, the page is clean, or the page is evicted.
   std::unique_ptr<TmpFileMgr::WriteHandle> write_handle;
 
@@ -211,10 +223,20 @@ class BufferPool::Client {
   void MoveToDirtyUnpinned(Page* page);
 
   /// Move an unpinned page to the pinned state, moving between data structures and
-  /// reading from disk if necessary. Returns once the page's buffer is allocated
-  /// and contains the page's data. Neither the client's lock nor
-  /// handle->page_->buffer_lock should be held by the caller.
-  Status MoveToPinned(ClientHandle* client, PageHandle* handle);
+  /// reading from disk if necessary. Ensures the page has a buffer. If the data is
+  /// already in memory, ensures the data is in the page's buffer. If the data is on
+  /// disk, starts an async read of the data and sets 'pin_in_flight' on the page to
+  /// true. Neither the client's lock nor page->buffer_lock should be held by the caller.
+  Status StartMoveToPinned(ClientHandle* client, Page* page);
+
+  /// Moves a page that has a pin in flight back to the evicted state, undoing
+  /// StartMoveToPinned(). Neither the client's lock nor page->buffer_lock should be held
+  /// by the caller.
+  void UndoMoveEvictedToPinned(Page* page);
+
+  /// Finish the work of bring the data of an evicted page to memory if
+  /// page->pin_in_flight was set to true by StartMoveToPinned().
+  Status FinishMoveEvictedToPinned(Page* page);
 
   /// Must be called once before allocating a buffer of 'len' via the AllocateBuffer()
   /// API to deduct from the client's reservation and update internal accounting. Cleans
@@ -285,12 +307,12 @@ class BufferPool::Client {
   /// Called when a write for 'page' completes.
   void WriteCompleteCallback(Page* page, const Status& write_status);
 
-  /// Move an evicted page to the pinned state by allocating a new buffer, reading data
-  /// from disk and moving the page to 'pinned_pages_'. client->impl must be locked by
-  /// the caller via 'client_lock' and handle->page must be unlocked. 'client_lock' is
-  /// released then reacquired.
-  Status MoveEvictedToPinned(boost::unique_lock<boost::mutex>* client_lock,
-      ClientHandle* client, PageHandle* handle);
+  /// Move an evicted page to the pinned state by allocating a new buffer, starting an
+  /// async read from disk and moving the page to 'pinned_pages_'. client->impl must be
+  /// locked by the caller via 'client_lock' and handle->page must be unlocked.
+  /// 'client_lock' is released then reacquired.
+  Status StartMoveEvictedToPinned(
+      boost::unique_lock<boost::mutex>* client_lock, ClientHandle* client, Page* page);
 
   /// The buffer pool that owns the client.
   BufferPool* const pool_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb37be89/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc
index 6b9177e..27a2ea9 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -266,7 +266,7 @@ class BufferPoolTest : public ::testing::Test {
   void WriteOrVerifyData(const T& object, int val, bool write) {
     // Only write sentinel values to start and end of buffer to make writing and
     // verification cheap.
-    MemRange mem = object.mem_range();
+    MemRange mem = GetMemRange(object);
     uint64_t* start_word = reinterpret_cast<uint64_t*>(mem.data());
     uint64_t* end_word =
         reinterpret_cast<uint64_t*>(&mem.data()[mem.len() - sizeof(uint64_t)]);
@@ -279,6 +279,14 @@ class BufferPoolTest : public ::testing::Test {
     }
   }
 
+  MemRange GetMemRange(const BufferHandle& buffer) { return buffer.mem_range(); }
+
+  MemRange GetMemRange(const PageHandle& page) {
+    const BufferHandle* buffer;
+    EXPECT_OK(page.GetBuffer(&buffer));
+    return buffer->mem_range();
+  }
+
   /// Return the total number of bytes allocated from the system currently.
   int64_t SystemBytesAllocated(BufferPool* pool) {
     return pool->allocator()->GetSystemBytesAllocated();
@@ -329,6 +337,11 @@ class BufferPoolTest : public ::testing::Test {
     LOG(INFO) << "Injected fault by truncating file " << path;
   }
 
+  // Return whether a pin is in flight for the page.
+  static bool PinInFlight(PageHandle* page) {
+    return page->page_->pin_in_flight;
+  }
+
   // Return the path of the temporary file backing the page.
   static string TmpFilePath(PageHandle* page) {
     return page->page_->write_handle->TmpFilePath();
@@ -505,11 +518,11 @@ TEST_F(BufferPoolTest, PageCreation) {
     ASSERT_OK(pool.CreatePage(&client, page_len, &handles[i]));
     ASSERT_TRUE(handles[i].is_open());
     ASSERT_TRUE(handles[i].is_pinned());
-    ASSERT_TRUE(handles[i].buffer_handle() != NULL);
-    ASSERT_TRUE(handles[i].data() != NULL);
-    ASSERT_EQ(handles[i].buffer_handle()->data(), handles[i].data());
+    const BufferHandle* buffer;
+    ASSERT_OK(handles[i].GetBuffer(&buffer));
+    ASSERT_TRUE(buffer->data() != NULL);
     ASSERT_EQ(handles[i].len(), page_len);
-    ASSERT_EQ(handles[i].buffer_handle()->len(), page_len);
+    ASSERT_EQ(buffer->len(), page_len);
     ASSERT_EQ(client.GetUsedReservation(), used_before + page_len);
   }
 
@@ -623,14 +636,15 @@ TEST_F(BufferPoolTest, Pin) {
   BufferPool::PageHandle handle1, handle2;
 
   // Can pin two minimum sized pages.
-  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1));
+  const BufferHandle* page_buffer;
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1, &page_buffer));
   ASSERT_TRUE(handle1.is_open());
   ASSERT_TRUE(handle1.is_pinned());
-  ASSERT_TRUE(handle1.data() != NULL);
-  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2));
+  ASSERT_TRUE(page_buffer->data() != NULL);
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2, &page_buffer));
   ASSERT_TRUE(handle2.is_open());
   ASSERT_TRUE(handle2.is_pinned());
-  ASSERT_TRUE(handle2.data() != NULL);
+  ASSERT_TRUE(page_buffer->data() != NULL);
 
   pool.Unpin(&client, &handle2);
   ASSERT_FALSE(handle2.is_pinned());
@@ -646,10 +660,10 @@ TEST_F(BufferPoolTest, Pin) {
 
   // Can pin double-sized page only once.
   BufferPool::PageHandle double_handle;
-  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN * 2, &double_handle));
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN * 2, &double_handle, &page_buffer));
   ASSERT_TRUE(double_handle.is_open());
   ASSERT_TRUE(double_handle.is_pinned());
-  ASSERT_TRUE(double_handle.data() != NULL);
+  ASSERT_TRUE(page_buffer->data() != NULL);
 
   // Destroy the pages - test destroying both pinned and unpinned.
   pool.DestroyPage(&client, &handle1);
@@ -659,6 +673,78 @@ TEST_F(BufferPoolTest, Pin) {
   pool.DeregisterClient(&client);
 }
 
+// Test the various state transitions possible with async Pin() calls.
+TEST_F(BufferPoolTest, AsyncPin) {
+  const int DATA_SEED = 1234;
+  // Set up pool with enough reservation to keep two buffers in memory.
+  const int64_t TOTAL_MEM = 2 * TEST_BUFFER_LEN;
+  BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+  global_reservations_.InitRootTracker(NULL, TOTAL_MEM);
+  BufferPool::ClientHandle client;
+  ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
+      NULL, TOTAL_MEM, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservationToFit(TOTAL_MEM));
+
+  PageHandle handle;
+  const BufferHandle* buffer;
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle, &buffer));
+  WriteData(*buffer, DATA_SEED);
+  // Pin() on a pinned page just increments the pin count.
+  ASSERT_OK(pool.Pin(&client, &handle));
+  EXPECT_EQ(2, handle.pin_count());
+  EXPECT_FALSE(PinInFlight(&handle));
+
+  pool.Unpin(&client, &handle);
+  pool.Unpin(&client, &handle);
+  ASSERT_FALSE(handle.is_pinned());
+
+  // Calling Pin() then Pin() results in double-pinning.
+  ASSERT_OK(pool.Pin(&client, &handle));
+  ASSERT_OK(pool.Pin(&client, &handle));
+  EXPECT_EQ(2, handle.pin_count());
+  EXPECT_FALSE(PinInFlight(&handle));
+
+  pool.Unpin(&client, &handle);
+  pool.Unpin(&client, &handle);
+  ASSERT_FALSE(handle.is_pinned());
+
+  // Pin() on a page that isn't evicted pins it immediately.
+  ASSERT_OK(pool.Pin(&client, &handle));
+  EXPECT_EQ(1, handle.pin_count());
+  EXPECT_FALSE(PinInFlight(&handle));
+  VerifyData(handle, 1234);
+  pool.Unpin(&client, &handle);
+  ASSERT_FALSE(handle.is_pinned());
+
+  // Force eviction. Pin() on an evicted page starts the write asynchronously.
+  ASSERT_OK(AllocateAndFree(&pool, &client, TOTAL_MEM));
+  ASSERT_OK(pool.Pin(&client, &handle));
+  EXPECT_EQ(1, handle.pin_count());
+  EXPECT_TRUE(PinInFlight(&handle));
+  // Block on the pin and verify the buffer.
+  ASSERT_OK(handle.GetBuffer(&buffer));
+  EXPECT_FALSE(PinInFlight(&handle));
+  VerifyData(*buffer, 1234);
+
+  // Test that we can unpin while in flight and the data remains valid.
+  pool.Unpin(&client, &handle);
+  ASSERT_OK(AllocateAndFree(&pool, &client, TOTAL_MEM));
+  ASSERT_OK(pool.Pin(&client, &handle));
+  EXPECT_TRUE(PinInFlight(&handle));
+  pool.Unpin(&client, &handle);
+  ASSERT_OK(pool.Pin(&client, &handle));
+  ASSERT_OK(handle.GetBuffer(&buffer));
+  VerifyData(*buffer, 1234);
+
+  // Evict the page, then destroy while we're pinning it asynchronously.
+  pool.Unpin(&client, &handle);
+  ASSERT_OK(AllocateAndFree(&pool, &client, TOTAL_MEM));
+  ASSERT_OK(pool.Pin(&client, &handle));
+  pool.DestroyPage(&client, &handle);
+
+  pool.DeregisterClient(&client);
+}
+
 /// Creating a page or pinning without sufficient reservation should DCHECK.
 TEST_F(BufferPoolTest, PinWithoutReservation) {
   int64_t total_mem = TEST_BUFFER_LEN * 1024;
@@ -698,9 +784,10 @@ TEST_F(BufferPoolTest, ExtractBuffer) {
 
   // Test basic buffer extraction.
   for (int len = TEST_BUFFER_LEN; len <= 2 * TEST_BUFFER_LEN; len *= 2) {
-    ASSERT_OK(pool.CreatePage(&client, len, &page));
-    uint8_t* page_data = page.data();
-    pool.ExtractBuffer(&client, &page, &buffer);
+    const BufferHandle* page_buffer;
+    ASSERT_OK(pool.CreatePage(&client, len, &page, &page_buffer));
+    uint8_t* page_data = page_buffer->data();
+    ASSERT_OK(pool.ExtractBuffer(&client, &page, &buffer));
     ASSERT_FALSE(page.is_open());
     ASSERT_TRUE(buffer.is_open());
     ASSERT_EQ(len, buffer.len());
@@ -711,11 +798,12 @@ TEST_F(BufferPoolTest, ExtractBuffer) {
   }
 
   // Test that ExtractBuffer() accounts correctly for pin count > 1.
-  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page));
-  uint8_t* page_data = page.data();
+  const BufferHandle* page_buffer;
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page, &page_buffer));
+  uint8_t* page_data = page_buffer->data();
   ASSERT_OK(pool.Pin(&client, &page));
   ASSERT_EQ(TEST_BUFFER_LEN * 2, client.GetUsedReservation());
-  pool.ExtractBuffer(&client, &page, &buffer);
+  ASSERT_OK(pool.ExtractBuffer(&client, &page, &buffer));
   ASSERT_EQ(TEST_BUFFER_LEN, client.GetUsedReservation());
   ASSERT_FALSE(page.is_open());
   ASSERT_TRUE(buffer.is_open());
@@ -727,7 +815,7 @@ TEST_F(BufferPoolTest, ExtractBuffer) {
   // Test that ExtractBuffer() DCHECKs for unpinned pages.
   ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page));
   pool.Unpin(&client, &page);
-  IMPALA_ASSERT_DEBUG_DEATH(pool.ExtractBuffer(&client, &page, &buffer), "");
+  IMPALA_ASSERT_DEBUG_DEATH((void)pool.ExtractBuffer(&client, &page, &buffer), "");
   pool.DestroyPage(&client, &page);
 
   pool.DeregisterClient(&client);
@@ -871,15 +959,17 @@ TEST_F(BufferPoolTest, EvictPageDifferentClient) {
   }
 
   // Create a pinned and unpinned page for the first client.
-  BufferPool::PageHandle handle1, handle2;
-  ASSERT_OK(pool.CreatePage(&clients[0], TEST_BUFFER_LEN, &handle1));
+  PageHandle handle1, handle2;
+  const BufferHandle* page_buffer;
+  ASSERT_OK(pool.CreatePage(&clients[0], TEST_BUFFER_LEN, &handle1, &page_buffer));
   const uint8_t TEST_VAL = 123;
-  memset(handle1.data(), TEST_VAL, handle1.len()); // Fill page with an arbitrary value.
+  memset(
+      page_buffer->data(), TEST_VAL, handle1.len()); // Fill page with an arbitrary value.
   pool.Unpin(&clients[0], &handle1);
   ASSERT_OK(pool.CreatePage(&clients[0], TEST_BUFFER_LEN, &handle2));
 
   // Allocating a buffer for the second client requires evicting the unpinned page.
-  BufferPool::BufferHandle buffer;
+  BufferHandle buffer;
   ASSERT_OK(pool.AllocateBuffer(&clients[1], TEST_BUFFER_LEN, &buffer));
   ASSERT_TRUE(IsEvicted(&handle1));
 
@@ -887,7 +977,10 @@ TEST_F(BufferPoolTest, EvictPageDifferentClient) {
   pool.Unpin(&clients[0], &handle2);
   ASSERT_OK(pool.Pin(&clients[0], &handle1));
   ASSERT_TRUE(IsEvicted(&handle2));
-  for (int i = 0; i < handle1.len(); ++i) EXPECT_EQ(TEST_VAL, handle1.data()[i]) << i;
+  ASSERT_OK(handle1.GetBuffer(&page_buffer));
+  for (int i = 0; i < handle1.len(); ++i) {
+    EXPECT_EQ(TEST_VAL, page_buffer->data()[i]) << i;
+  }
 
   // Clean up everything.
   pool.DestroyPage(&clients[0], &handle1);
@@ -1438,7 +1531,10 @@ TEST_F(BufferPoolTest, ScratchReadError) {
       DCHECK_EQ(error_type, TRUNCATE);
       TruncateBackingFile(tmp_file);
     }
-    Status status = pool.Pin(&client, &page);
+    ASSERT_OK(pool.Pin(&client, &page));
+    // The read is async, so won't bubble up until we block on it with GetBuffer().
+    const BufferHandle* page_buffer;
+    Status status = page.GetBuffer(&page_buffer);
     if (error_type == CORRUPT_DATA && !FLAGS_disk_spill_encryption) {
       // Without encryption we can't detect that the data changed.
       EXPECT_OK(status);
@@ -1662,7 +1758,8 @@ void BufferPoolTest::TestRandomInternalImpl(BufferPool* pool, FileGroup* file_gr
     if ((i % 10000) == 0) LOG(ERROR) << " Iteration " << i << endl;
     // Pick an operation.
     // New page: 15%
-    // Pin a page: 30%
+    // Pin a page and block waiting for the result: 20%
+    // Pin a page and let it continue asynchronously: 10%
     // Unpin a pinned page: 25% (< Pin prob. so that memory consumption increases).
     // Destroy page: 10% (< New page prob. so that number of pages grows over time).
     // Allocate buffer: 10%
@@ -1679,24 +1776,29 @@ void BufferPoolTest::TestRandomInternalImpl(BufferPool* pool, FileGroup* file_gr
       WriteData(new_page, data);
       pages.emplace_back(move(new_page), data);
     } else if (p < 0.45) {
-      // Pin a page.
+      // Pin a page asynchronously.
       if (pages.empty()) continue;
       int rand_pick = uniform_int_distribution<int>(0, pages.size() - 1)(*rng);
       PageHandle* page = &pages[rand_pick].first;
       if (!client.IncreaseReservationToFit(page->len())) continue;
       if (!page->is_pinned() || multiple_pins) ASSERT_OK(pool->Pin(&client, page));
-      VerifyData(*page, pages[rand_pick].second);
+      // Block on the pin and verify data for sync pins.
+      if (p < 0.35) VerifyData(*page, pages[rand_pick].second);
     } else if (p < 0.70) {
       // Unpin a pinned page.
       if (pages.empty()) continue;
       int rand_pick = uniform_int_distribution<int>(0, pages.size() - 1)(*rng);
       PageHandle* page = &pages[rand_pick].first;
-      if (page->is_pinned()) pool->Unpin(&client, page);
+      if (page->is_pinned()) {
+        VerifyData(*page, pages[rand_pick].second);
+        pool->Unpin(&client, page);
+      }
     } else if (p < 0.80) {
       // Destroy a page.
       if (pages.empty()) continue;
       int rand_pick = uniform_int_distribution<int>(0, pages.size() - 1)(*rng);
       auto page_data = move(pages[rand_pick]);
+      if (page_data.first.is_pinned()) VerifyData(page_data.first, page_data.second);
       pages[rand_pick] = move(pages.back());
       pages.pop_back();
       pool->DestroyPage(&client, &page_data.first);

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb37be89/be/src/runtime/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc
index 1d2f1d3..c1bde5f 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -90,11 +90,18 @@ int64_t BufferPool::PageHandle::len() const {
   return page_->len; // Does not require locking.
 }
 
-const BufferPool::BufferHandle* BufferPool::PageHandle::buffer_handle() const {
+Status BufferPool::PageHandle::GetBuffer(const BufferHandle** buffer) const {
+  DCHECK(is_open());
+  DCHECK(client_->is_registered());
   DCHECK(is_pinned());
-  // The 'buffer' field cannot change while the page is pinned, so it is safe to access
-  // without locking.
-  return &page_->buffer;
+  if (page_->pin_in_flight) {
+    // Finish the work started in Pin().
+    RETURN_IF_ERROR(client_->impl_->FinishMoveEvictedToPinned(page_));
+  }
+  DCHECK(!page_->pin_in_flight);
+  *buffer = &page_->buffer;
+  DCHECK((*buffer)->is_open());
+  return Status::OK();
 }
 
 BufferPool::BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit)
@@ -123,16 +130,18 @@ void BufferPool::DeregisterClient(ClientHandle* client) {
   client->impl_ = NULL;
 }
 
-Status BufferPool::CreatePage(ClientHandle* client, int64_t len, PageHandle* handle) {
+Status BufferPool::CreatePage(
+    ClientHandle* client, int64_t len, PageHandle* handle, const BufferHandle** buffer) {
   DCHECK(!handle->is_open());
   DCHECK_GE(len, min_buffer_len_);
   DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len));
 
-  BufferHandle buffer;
+  BufferHandle new_buffer;
   // No changes have been made to state yet, so we can cleanly return on error.
-  RETURN_IF_ERROR(AllocateBuffer(client, len, &buffer));
-  Page* page = client->impl_->CreatePinnedPage(move(buffer));
+  RETURN_IF_ERROR(AllocateBuffer(client, len, &new_buffer));
+  Page* page = client->impl_->CreatePinnedPage(move(new_buffer));
   handle->Open(page, client);
+  if (buffer != nullptr) *buffer = &page->buffer;
   return Status::OK();
 }
 
@@ -140,10 +149,16 @@ void BufferPool::DestroyPage(ClientHandle* client, PageHandle* handle) {
   if (!handle->is_open()) return; // DestroyPage() should be idempotent.
 
   if (handle->is_pinned()) {
+    // Cancel the read I/O - we don't need the data any more.
+    if (handle->page_->pin_in_flight) {
+      handle->page_->write_handle->CancelRead();
+      handle->page_->pin_in_flight = false;
+    }
     // In the pinned case, delegate to ExtractBuffer() and FreeBuffer() to do the work
     // of cleaning up the page, freeing the buffer and updating reservations correctly.
     BufferHandle buffer;
-    ExtractBuffer(client, handle, &buffer);
+    Status status = ExtractBuffer(client, handle, &buffer);
+    DCHECK(status.ok()) << status.msg().msg();
     FreeBuffer(client, &buffer);
   } else {
     // In the unpinned case, no reservations are used so we just clean up the page.
@@ -158,7 +173,7 @@ Status BufferPool::Pin(ClientHandle* client, PageHandle* handle) {
 
   Page* page = handle->page_;
   if (page->pin_count == 0) {
-    RETURN_IF_ERROR(client->impl_->MoveToPinned(client, handle));
+    RETURN_IF_ERROR(client->impl_->StartMoveToPinned(client, page));
     COUNTER_ADD(client->impl_->counters().peak_unpinned_bytes, -page->len);
   }
   // Update accounting last to avoid complicating the error return path above.
@@ -178,23 +193,34 @@ void BufferPool::Unpin(ClientHandle* client, PageHandle* handle) {
   reservation->ReleaseTo(page->len);
 
   if (--page->pin_count > 0) return;
-  client->impl_->MoveToDirtyUnpinned(page);
+  if (page->pin_in_flight) {
+    // Data is not in memory - move it back to evicted.
+    client->impl_->UndoMoveEvictedToPinned(page);
+  } else {
+    // Data is in memory - move it to dirty unpinned.
+    client->impl_->MoveToDirtyUnpinned(page);
+  }
   COUNTER_ADD(client->impl_->counters().total_unpinned_bytes, handle->len());
   COUNTER_ADD(client->impl_->counters().peak_unpinned_bytes, handle->len());
 }
 
-void BufferPool::ExtractBuffer(
+Status BufferPool::ExtractBuffer(
     ClientHandle* client, PageHandle* page_handle, BufferHandle* buffer_handle) {
   DCHECK(page_handle->is_pinned());
   DCHECK(!buffer_handle->is_open());
   DCHECK_EQ(page_handle->client_, client);
 
+  // If an async pin is in flight, we need to wait for it.
+  const BufferHandle* dummy;
+  RETURN_IF_ERROR(page_handle->GetBuffer(&dummy));
+
   // Bring the pin count to 1 so that we're not using surplus reservations.
   while (page_handle->pin_count() > 1) Unpin(client, page_handle);
 
   // Destroy the page and extract the buffer.
   client->impl_->DestroyPageInternal(page_handle, buffer_handle);
   DCHECK(buffer_handle->is_open());
+  return Status::OK();
 }
 
 Status BufferPool::AllocateBuffer(
@@ -347,6 +373,7 @@ void BufferPool::Client::MoveToDirtyUnpinned(Page* page) {
   // Only valid to unpin pages if spilling is enabled.
   DCHECK(spilling_enabled());
   DCHECK_EQ(0, page->pin_count);
+
   unique_lock<mutex> lock(lock_);
   DCHECK_CONSISTENCY();
   DCHECK(pinned_pages_.Contains(page));
@@ -357,8 +384,7 @@ void BufferPool::Client::MoveToDirtyUnpinned(Page* page) {
   WriteDirtyPagesAsync();
 }
 
-Status BufferPool::Client::MoveToPinned(ClientHandle* client, PageHandle* handle) {
-  Page* page = handle->page_;
+Status BufferPool::Client::StartMoveToPinned(ClientHandle* client, Page* page) {
   unique_lock<mutex> cl(lock_);
   DCHECK_CONSISTENCY();
   // Propagate any write errors that occurred for this client.
@@ -390,31 +416,55 @@ Status BufferPool::Client::MoveToPinned(ClientHandle* client, PageHandle* handle
     return file_group_->RestoreData(move(page->write_handle), page->buffer.mem_range());
   }
   // If the page wasn't in the clean pages list, it must have been evicted.
-  return MoveEvictedToPinned(&cl, client, handle);
+  return StartMoveEvictedToPinned(&cl, client, page);
 }
 
-Status BufferPool::Client::MoveEvictedToPinned(
-    unique_lock<mutex>* client_lock, ClientHandle* client, PageHandle* handle) {
-  Page* page = handle->page_;
+Status BufferPool::Client::StartMoveEvictedToPinned(
+    unique_lock<mutex>* client_lock, ClientHandle* client, Page* page) {
   DCHECK(!page->buffer.is_open());
 
-  // Don't hold any locks while allocating or reading back the data. It is safe to modify
-  // the page's buffer handle without holding any locks because no concurrent operations
-  // can modify evicted pages.
-  client_lock->unlock();
+  // Safe to modify the page's buffer handle without holding the page lock because no
+  // concurrent operations can modify evicted pages.
   BufferHandle buffer;
   RETURN_IF_ERROR(pool_->allocator_->Allocate(client, page->len, &page->buffer));
   COUNTER_ADD(counters().bytes_read, page->len);
   COUNTER_ADD(counters().read_io_ops, 1);
-  {
-    SCOPED_TIMER(counters().read_wait_time);
-    RETURN_IF_ERROR(
-        file_group_->Read(page->write_handle.get(), page->buffer.mem_range()));
-  }
-  file_group_->DestroyWriteHandle(move(page->write_handle));
-  client_lock->lock();
+  RETURN_IF_ERROR(
+      file_group_->ReadAsync(page->write_handle.get(), page->buffer.mem_range()));
   pinned_pages_.Enqueue(page);
+  page->pin_in_flight = true;
+  DCHECK_CONSISTENCY();
+  return Status::OK();
+}
+
+void BufferPool::Client::UndoMoveEvictedToPinned(Page* page) {
+  // We need to get the page back to the evicted state where:
+  // * There is no in-flight read.
+  // * The page's data is on disk referenced by 'write_handle'
+  // * The page has no attached buffer.
+  DCHECK(page->pin_in_flight);
+  page->write_handle->CancelRead();
+  page->pin_in_flight = false;
+
+  unique_lock<mutex> lock(lock_);
   DCHECK_CONSISTENCY();
+  DCHECK(pinned_pages_.Contains(page));
+  pinned_pages_.Remove(page);
+  // Discard the buffer - the pin was in flight so there was no way that a valid
+  // reference to the buffer's contents was returned since the pin was still in flight.
+  pool_->allocator_->Free(move(page->buffer));
+}
+
+Status BufferPool::Client::FinishMoveEvictedToPinned(Page* page) {
+  DCHECK(page->pin_in_flight);
+  SCOPED_TIMER(counters().read_wait_time);
+  // Don't hold any locks while reading back the data. It is safe to modify the page's
+  // buffer handle without holding any locks because no concurrent operations can modify
+  // evicted pages.
+  RETURN_IF_ERROR(
+      file_group_->WaitForAsyncRead(page->write_handle.get(), page->buffer.mem_range()));
+  file_group_->DestroyWriteHandle(move(page->write_handle));
+  page->pin_in_flight = false;
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb37be89/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h
index 9a37923..dbf75bc 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -133,7 +133,9 @@ class SystemAllocator;
 /// * Once the operator needs the page's contents again and has sufficient unused
 ///   reservation, it can call Pin(), which brings the page's contents back into memory,
 ///   perhaps in a different buffer. Therefore the operator must fix up any pointers into
-///   the previous buffer.
+///   the previous buffer. Pin() can execute asynchronously - the caller only blocks
+///   waiting for read I/O if it calls GetBuffer() or ExtractBuffer() while the read is
+///   in flight.
 /// * If the operator is done with the page, it can call FreeBuffer() to destroy the
 ///   handle and release resources, or call ExtractBuffer() to extract the buffer.
 ///
@@ -184,15 +186,19 @@ class BufferPool : public CacheLineAligned {
   /// sufficient unused reservation to pin the new page (otherwise it will DCHECK).
   /// CreatePage() only fails when a system error prevents the buffer pool from fulfilling
   /// the reservation.
-  /// On success, the handle is mapped to the new page.
-  Status CreatePage(
-      ClientHandle* client, int64_t len, PageHandle* handle) WARN_UNUSED_RESULT;
+  /// On success, the handle is mapped to the new page and 'buffer', if non-NULL, is set
+  /// to the page's buffer.
+  Status CreatePage(ClientHandle* client, int64_t len, PageHandle* handle,
+      const BufferHandle** buffer = nullptr) WARN_UNUSED_RESULT;
 
   /// Increment the pin count of 'handle'. After Pin() the underlying page will
-  /// be mapped to a buffer, which will be accessible through 'handle'. Uses
-  /// reservation from 'client'. The caller is responsible for ensuring it has enough
-  /// unused reservation before calling Pin() (otherwise it will DCHECK). Pin() only
-  /// fails when a system error prevents the buffer pool from fulfilling the reservation.
+  /// be mapped to a buffer, which will be accessible through 'handle'. If the data
+  /// was evicted from memory, it will be read back into memory asynchronously.
+  /// Attempting to access the buffer with ExtractBuffer() or handle.GetBuffer() will
+  /// block until the data is in memory. The caller is responsible for ensuring it has
+  /// enough unused reservation before calling Pin() (otherwise it will DCHECK). Pin()
+  /// only fails when a system error prevents the buffer pool from fulfilling the
+  /// reservation or if an I/O error is encountered reading back data from disk.
   /// 'handle' must be open.
   Status Pin(ClientHandle* client, PageHandle* handle) WARN_UNUSED_RESULT;
 
@@ -214,9 +220,11 @@ class BufferPool : public CacheLineAligned {
   /// Extracts buffer from a pinned page. After this returns, the page referenced by
   /// 'page_handle' will be destroyed and 'buffer_handle' will reference the buffer from
   /// 'page_handle'. This may decrease reservation usage of 'client' if the page was
-  /// pinned multiple times via 'page_handle'.
-  void ExtractBuffer(
-      ClientHandle* client, PageHandle* page_handle, BufferHandle* buffer_handle);
+  /// pinned multiple times via 'page_handle'. May return an error if 'page_handle' was
+  /// unpinned earlier with no subsequent GetBuffer() call and a read error is
+  /// encountered while bringing the page back into memory.
+  Status ExtractBuffer(
+      ClientHandle* client, PageHandle* page_handle, BufferHandle* buffer_handle) WARN_UNUSED_RESULT;
 
   /// Allocates a new buffer of 'len' bytes. Uses reservation from 'client'. The caller
   /// is responsible for ensuring it has enough unused reservation before calling
@@ -400,19 +408,15 @@ class BufferPool::PageHandle {
   bool is_pinned() const { return pin_count() > 0; }
   int pin_count() const;
   int64_t len() const;
-  /// Get a pointer to the start of the page's buffer. Only valid to call if the page
-  /// is pinned.
-  uint8_t* data() const { return buffer_handle()->data(); }
 
-  /// Convenience function to get the memory range for the page's buffer. Only valid to
-  /// call if the page is pinned.
-  MemRange mem_range() const { return buffer_handle()->mem_range(); }
-
-  /// Return a pointer to the page's buffer handle. Only valid to call if the page is
-  /// pinned via this handle. Only const accessors of the returned handle can be used:
-  /// it is invalid to call FreeBuffer() or TransferBuffer() on it or to otherwise modify
-  /// the handle.
-  const BufferHandle* buffer_handle() const;
+  /// Get a reference to the page's buffer handle. Only valid to call if the page is
+  /// pinned. If the page was previously unpinned and the read I/O for the data is still
+  /// in flight, this can block waiting. Returns an error if an error was encountered
+  /// reading the data back, which can only happen if Unpin() was called on the page
+  /// since the last call to GetBuffer(). Only const accessors of the returned handle can
+  /// be used: it is invalid to call FreeBuffer() or TransferBuffer() on it or to
+  /// otherwise modify the handle.
+  Status GetBuffer(const BufferHandle** buffer_handle) const WARN_UNUSED_RESULT;
 
   std::string DebugString() const;
 
@@ -431,9 +435,8 @@ class BufferPool::PageHandle {
   /// The internal page structure. NULL if the handle is not open.
   Page* page_;
 
-  /// The client the page handle belongs to, used to validate that the correct client
-  /// is being used.
-  const ClientHandle* client_;
+  /// The client the page handle belongs to.
+  ClientHandle* client_;
 };
 
 inline BufferPool::BufferHandle::BufferHandle(BufferHandle&& src) {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb37be89/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 293a898..bc09f2e 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -368,36 +368,44 @@ Status TmpFileMgr::FileGroup::Write(
 }
 
 Status TmpFileMgr::FileGroup::Read(WriteHandle* handle, MemRange buffer) {
+  RETURN_IF_ERROR(ReadAsync(handle, buffer));
+  return WaitForAsyncRead(handle, buffer);
+}
+
+Status TmpFileMgr::FileGroup::ReadAsync(WriteHandle* handle, MemRange buffer) {
   DCHECK(handle->write_range_ != nullptr);
   DCHECK(!handle->is_cancelled_);
   DCHECK_EQ(buffer.len(), handle->len());
   Status status;
 
-  // Don't grab 'lock_' in this method - it is not necessary because we don't touch
-  // any members that it protects and could block other threads for the duration of
-  // the synchronous read.
+  // Don't grab 'write_state_lock_' in this method - it is not necessary because we
+  // don't touch any members that it protects and could block other threads for the
+  // duration of the synchronous read.
   DCHECK(!handle->write_in_flight_);
+  DCHECK(handle->read_range_ == nullptr);
   DCHECK(handle->write_range_ != nullptr);
-  // Don't grab handle->lock_, it is safe to touch all of handle's state since the
-  // write is not in flight.
-  DiskIoMgr::ScanRange* scan_range = scan_range_pool_.Add(new DiskIoMgr::ScanRange);
-  scan_range->Reset(nullptr, handle->write_range_->file(), handle->write_range_->len(),
-      handle->write_range_->offset(), handle->write_range_->disk_id(), false,
+  // Don't grab handle->write_state_lock_, it is safe to touch all of handle's state
+  // since the write is not in flight.
+  handle->read_range_ = scan_range_pool_.Add(new DiskIoMgr::ScanRange);
+  handle->read_range_->Reset(nullptr, handle->write_range_->file(),
+      handle->write_range_->len(), handle->write_range_->offset(),
+      handle->write_range_->disk_id(), false,
       DiskIoMgr::BufferOpts::ReadInto(buffer.data(), buffer.len()));
-  DiskIoMgr::BufferDescriptor* io_mgr_buffer = nullptr;
-  {
-    SCOPED_TIMER(disk_read_timer_);
-    read_counter_->Add(1);
-    bytes_read_counter_->Add(buffer.len());
-    status = io_mgr_->Read(io_ctx_, scan_range, &io_mgr_buffer);
-    if (!status.ok()) goto exit;
-  }
-
-  if (FLAGS_disk_spill_encryption) {
-    status = handle->CheckHashAndDecrypt(buffer);
-    if (!status.ok()) goto exit;
-  }
+  read_counter_->Add(1);
+  bytes_read_counter_->Add(buffer.len());
+  RETURN_IF_ERROR(io_mgr_->AddScanRange(io_ctx_, handle->read_range_, true));
+  return Status::OK();
+}
 
+Status TmpFileMgr::FileGroup::WaitForAsyncRead(WriteHandle* handle, MemRange buffer) {
+  DCHECK(handle->read_range_ != nullptr);
+  // Don't grab handle->write_state_lock_, it is safe to touch all of handle's state
+  // since the write is not in flight.
+  SCOPED_TIMER(disk_read_timer_);
+  DiskIoMgr::BufferDescriptor* io_mgr_buffer = nullptr;
+  Status status = handle->read_range_->GetNext(&io_mgr_buffer);
+  if (!status.ok()) goto exit;
+  DCHECK(io_mgr_buffer != NULL);
   DCHECK(io_mgr_buffer->eosr());
   DCHECK_LE(io_mgr_buffer->len(), buffer.len());
   if (io_mgr_buffer->len() < buffer.len()) {
@@ -409,9 +417,14 @@ Status TmpFileMgr::FileGroup::Read(WriteHandle* handle, MemRange buffer) {
   }
   DCHECK_EQ(io_mgr_buffer->buffer(), buffer.data());
 
+  if (FLAGS_disk_spill_encryption) {
+    status = handle->CheckHashAndDecrypt(buffer);
+    if (!status.ok()) goto exit;
+  }
 exit:
   // Always return the buffer before exiting to avoid leaking it.
   if (io_mgr_buffer != nullptr) io_mgr_buffer->Return();
+  handle->read_range_ = nullptr;
   return status;
 }
 
@@ -420,6 +433,7 @@ Status TmpFileMgr::FileGroup::RestoreData(
   DCHECK_EQ(handle->write_range_->data(), buffer.data());
   DCHECK_EQ(handle->len(), buffer.len());
   DCHECK(!handle->write_in_flight_);
+  DCHECK(handle->read_range_ == nullptr);
   // Decrypt after the write is finished, so that we don't accidentally write decrypted
   // data to disk.
   Status status;
@@ -501,6 +515,7 @@ TmpFileMgr::WriteHandle::WriteHandle(
   : cb_(cb),
     encryption_timer_(encryption_timer),
     file_(nullptr),
+    read_range_(nullptr),
     is_cancelled_(false),
     write_in_flight_(false) {}
 
@@ -570,9 +585,20 @@ void TmpFileMgr::WriteHandle::WriteComplete(const Status& write_status) {
 }
 
 void TmpFileMgr::WriteHandle::Cancel() {
-  unique_lock<mutex> lock(write_state_lock_);
-  is_cancelled_ = true;
-  // TODO: in future, if DiskIoMgr supported cancellation, we could cancel it here.
+  CancelRead();
+  {
+    unique_lock<mutex> lock(write_state_lock_);
+    is_cancelled_ = true;
+    // TODO: in future, if DiskIoMgr supported write cancellation, we could cancel it
+    // here.
+  }
+}
+
+void TmpFileMgr::WriteHandle::CancelRead() {
+  if (read_range_ != nullptr) {
+    read_range_->Cancel(Status::CANCELLED);
+    read_range_ = nullptr;
+  }
 }
 
 void TmpFileMgr::WriteHandle::WaitForWrite() {

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb37be89/be/src/runtime/tmp-file-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index d66c527..c71c370 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -123,9 +123,22 @@ class TmpFileMgr {
 
     /// Synchronously read the data referenced by 'handle' from the temporary file into
     /// 'buffer'. buffer.len() must be the same as handle->len(). Can only be called
-    /// after a write successfully completes.
+    /// after a write successfully completes. Should not be called while an async read
+    /// is in flight. Equivalent to calling ReadAsync() then WaitForAsyncRead().
     Status Read(WriteHandle* handle, MemRange buffer) WARN_UNUSED_RESULT;
 
+    /// Asynchronously read the data referenced by 'handle' from the temporary file into
+    /// 'buffer'. buffer.len() must be the same as handle->len(). Can only be called
+    /// after a write successfully completes. WaitForAsyncRead() must be called before the
+    /// data in the buffer is valid. Should not be called while an async read
+    /// is already in flight.
+    Status ReadAsync(WriteHandle* handle, MemRange buffer) WARN_UNUSED_RESULT;
+
+    /// Wait until the read started for 'handle' by ReadAsync() completes. 'buffer'
+    /// should be the same buffer passed into ReadAsync(). Returns an error if the
+    /// read fails. Retrying a failed read by calling ReadAsync() again is allowed.
+    Status WaitForAsyncRead(WriteHandle* handle, MemRange buffer) WARN_UNUSED_RESULT;
+
     /// Restore the original data in the 'buffer' passed to Write(), decrypting or
     /// decompressing as necessary. Returns an error if restoring the data fails.
     /// The write must not be in-flight - the caller is responsible for waiting for
@@ -215,7 +228,7 @@ class TmpFileMgr {
     /// Amount of scratch space allocated in bytes.
     RuntimeProfile::Counter* const scratch_space_bytes_used_counter_;
 
-    /// Time taken for disk reads.
+    /// Time spent waiting for disk reads.
     RuntimeProfile::Counter* const disk_read_timer_;
 
     /// Time spent in disk spill encryption, decryption, and integrity checking.
@@ -263,14 +276,21 @@ class TmpFileMgr {
    public:
     /// The write must be destroyed by passing it to FileGroup - destroying it before
     /// the write completes is an error.
-    ~WriteHandle() { DCHECK(!write_in_flight_); }
+    ~WriteHandle() {
+      DCHECK(!write_in_flight_);
+      DCHECK(read_range_ == nullptr);
+    }
 
-    /// Cancels the write asynchronously. After Cancel() is called, writes are not
+    /// Cancels any in-flight writes or reads. Reads are cancelled synchronously and
+    /// writes are cancelled asynchronously. After Cancel() is called, writes are not
     /// retried. The write callback may be called with a CANCELLED status (unless
     /// it succeeded or encountered a different error first).
     /// TODO: IMPALA-3200: make this private once BufferedBlockMgr doesn't need it.
     void Cancel();
 
+    /// Cancel any in-flight read synchronously.
+    void CancelRead();
+
     /// Blocks until the write completes either successfully or unsuccessfully.
     /// May return before the write callback has been called.
     /// TODO: IMPALA-3200: make this private once BufferedBlockMgr doesn't need it.
@@ -333,6 +353,10 @@ class TmpFileMgr {
     /// on writes; verified on reads. This is calculated _after_ encryption.
     IntegrityHash hash_;
 
+    /// The scan range for the read that is currently in flight. NULL when no read is in
+    /// flight.
+    DiskIoMgr::ScanRange* read_range_;
+
     /// Protects all fields below while 'write_in_flight_' is true. At other times, it is
     /// invalid to call WriteRange/FileGroup methods concurrently from multiple threads,
     /// so no locking is required. This is a terminal lock and should not be held while