You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/05/11 19:58:43 UTC
incubator-impala git commit: IMPALA-5169: Add support for async pins
in buffer pool
Repository: incubator-impala
Updated Branches:
refs/heads/master 060b80fd9 -> cb37be893
IMPALA-5169: Add support for async pins in buffer pool
Makes Pin() do async reads behind-the-scenes, instead of
blocking until the read completes. The blocking is done
instead when the client tries to access the buffer via
PageHandle::GetBuffer() or ExtractBuffer().
This is implemented with a new sub-state of "pinned"
where the page has a buffer and consumes reservation
but the buffer does not contain valid data.
Motivation:
This unlocks various opportunities to overlap read I/Os
with other work:
* Reads to different disks can execute in parallel
* I/O and computation can be overlapped.
This initially benefits BufferedTupleStream::PinStream(),
where many pages are pinned at once. With this change the
reads run asynchronously. This can potentially lead
to large speedups when spilling. E.g. if the pages for a Hash
Join's partition are spread across 10 disks, we could get 10x
the read throughput, plus overlap the I/O with hash table build.
In future we can use this to do read-ahead over unpinned
BufferedTupleStreams or for unpinned Runs in Sorter, but
this requires changes to the client code to Pin() pages
in advance.
Testing:
* BufferedTupleStreamV2 already exercises this.
* Various BufferPool tests already exercise this.
* Added a basic test to cover edge cases made possible by the
new state transitions.
* Extended the randomised test to cover this.
Change-Id: Ibdf074c1ac4405d6f08d623ba438a85f7d39fd79
Reviewed-on: http://gerrit.cloudera.org:8080/6612
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Impala Public Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/cb37be89
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/cb37be89
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/cb37be89
Branch: refs/heads/master
Commit: cb37be8935579263122088e2943d014c4a9aba21
Parents: 060b80f
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Apr 11 18:08:39 2017 -0700
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu May 11 19:36:09 2017 +0000
----------------------------------------------------------------------
be/src/runtime/buffered-tuple-stream-v2.cc | 62 ++++++--
be/src/runtime/buffered-tuple-stream-v2.h | 29 +++-
.../runtime/bufferpool/buffer-pool-internal.h | 62 +++++---
be/src/runtime/bufferpool/buffer-pool-test.cc | 158 +++++++++++++++----
be/src/runtime/bufferpool/buffer-pool.cc | 108 +++++++++----
be/src/runtime/bufferpool/buffer-pool.h | 55 ++++---
be/src/runtime/tmp-file-mgr.cc | 74 ++++++---
be/src/runtime/tmp-file-mgr.h | 32 +++-
8 files changed, 425 insertions(+), 155 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb37be89/be/src/runtime/buffered-tuple-stream-v2.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2.cc b/be/src/runtime/buffered-tuple-stream-v2.cc
index 083b59e..c36c31f 100644
--- a/be/src/runtime/buffered-tuple-stream-v2.cc
+++ b/be/src/runtime/buffered-tuple-stream-v2.cc
@@ -44,6 +44,8 @@
using namespace impala;
using namespace strings;
+using BufferHandle = BufferPool::BufferHandle;
+
BufferedTupleStreamV2::BufferedTupleStreamV2(RuntimeState* state,
const RowDescriptor& row_desc, BufferPool::ClientHandle* buffer_pool_client,
int64_t page_len, const set<SlotId>& ext_varlen_slots)
@@ -54,6 +56,7 @@ BufferedTupleStreamV2::BufferedTupleStreamV2(RuntimeState* state,
total_byte_size_(0),
read_page_rows_returned_(-1),
read_ptr_(nullptr),
+ read_end_ptr_(nullptr),
write_ptr_(nullptr),
write_end_ptr_(nullptr),
rows_returned_(0),
@@ -109,15 +112,20 @@ void BufferedTupleStreamV2::CheckConsistency() const {
}
if (has_write_iterator()) {
DCHECK(write_page_->is_pinned());
- DCHECK_GE(write_ptr_, write_page_->data());
- DCHECK_EQ(write_end_ptr_, write_page_->data() + write_page_->len());
+ DCHECK(write_page_->retrieved_buffer);
+ const BufferHandle* write_buffer;
+ Status status = write_page_->GetBuffer(&write_buffer);
+ DCHECK(status.ok()); // Write buffer should never have been unpinned.
+ DCHECK_GE(write_ptr_, write_buffer->data());
+ DCHECK_EQ(write_end_ptr_, write_buffer->data() + write_page_->len());
DCHECK_GE(write_end_ptr_, write_ptr_);
}
if (has_read_iterator()) {
DCHECK(read_page_->is_pinned());
- uint8_t* read_end_ptr = read_page_->data() + read_page_->len();
- DCHECK_GE(read_ptr_, read_page_->data());
- DCHECK_GE(read_end_ptr, read_ptr_);
+ DCHECK(read_page_->retrieved_buffer);
+ // Can't check read buffer without affecting behaviour, because a read may be in
+ // flight and this would required blocking on that write.
+ DCHECK_GE(read_end_ptr_, read_ptr_);
}
}
@@ -186,9 +194,13 @@ Status BufferedTupleStreamV2::PrepareForReadWrite(
void BufferedTupleStreamV2::Close(RowBatch* batch, RowBatch::FlushMode flush) {
for (Page& page : pages_) {
- if (batch != nullptr && page.is_pinned()) {
+ if (batch != nullptr && page.retrieved_buffer) {
+ // Subtle: We only need to attach buffers from pages that we may have returned
+ // references to. ExtractBuffer() cannot fail for these pages because the data
+ // is guaranteed to already be in -memory.
BufferPool::BufferHandle buffer;
- buffer_pool_->ExtractBuffer(buffer_pool_client_, &page.handle, &buffer);
+ Status status = buffer_pool_->ExtractBuffer(buffer_pool_client_, &page.handle, &buffer);
+ DCHECK(status.ok());
batch->AddBuffer(buffer_pool_client_, move(buffer), flush);
} else {
buffer_pool_->DestroyPage(buffer_pool_client_, &page.handle);
@@ -247,6 +259,7 @@ void BufferedTupleStreamV2::UnpinPageIfNeeded(Page* page, bool stream_pinned) {
DCHECK_EQ(new_pin_count, page->pin_count() - 1);
buffer_pool_->Unpin(buffer_pool_client_, &page->handle);
bytes_pinned_ -= page->len();
+ if (page->pin_count() == 0) page->retrieved_buffer = false;
}
}
@@ -255,16 +268,17 @@ Status BufferedTupleStreamV2::NewWritePage() noexcept {
DCHECK(!has_write_iterator());
Page new_page;
- RETURN_IF_ERROR(
- buffer_pool_->CreatePage(buffer_pool_client_, page_len_, &new_page.handle));
+ const BufferHandle* write_buffer;
+ RETURN_IF_ERROR(buffer_pool_->CreatePage(
+ buffer_pool_client_, page_len_, &new_page.handle, &write_buffer));
bytes_pinned_ += page_len_;
total_byte_size_ += page_len_;
pages_.push_back(std::move(new_page));
write_page_ = &pages_.back();
DCHECK_EQ(write_page_->num_rows, 0);
- write_ptr_ = write_page_->data();
- write_end_ptr_ = write_page_->data() + page_len_;
+ write_ptr_ = write_buffer->data();
+ write_end_ptr_ = write_ptr_ + page_len_;
return Status::OK();
}
@@ -312,6 +326,8 @@ void BufferedTupleStreamV2::ResetWritePage() {
// Unpin the write page if we're reading in unpinned mode.
Page* prev_write_page = write_page_;
write_page_ = nullptr;
+ write_ptr_ = nullptr;
+ write_end_ptr_ = nullptr;
// May need to decrement pin count now that it's not the write page, depending on
// the stream's mode.
@@ -347,8 +363,13 @@ Status BufferedTupleStreamV2::NextReadPage() {
// actually fail once we have variable-length pages.
RETURN_IF_ERROR(PinPageIfNeeded(&*read_page_, pinned_));
+ // This waits for the pin to complete if the page was unpinned earlier.
+ const BufferHandle* read_buffer;
+ RETURN_IF_ERROR(read_page_->GetBuffer(&read_buffer));
+
read_page_rows_returned_ = 0;
- read_ptr_ = read_page_->data();
+ read_ptr_ = read_buffer->data();
+ read_end_ptr_ = read_ptr_ + read_buffer->len();
CHECK_CONSISTENCY();
return Status::OK();
@@ -359,6 +380,8 @@ void BufferedTupleStreamV2::ResetReadPage() {
// Unpin the write page if we're reading in unpinned mode.
Page* prev_read_page = &*read_page_;
read_page_ = pages_.end();
+ read_ptr_ = nullptr;
+ read_end_ptr_ = nullptr;
// May need to decrement pin count after destroying read iterator.
UnpinPageIfNeeded(prev_read_page, pinned_);
@@ -384,10 +407,15 @@ Status BufferedTupleStreamV2::PrepareForReadInternal(bool delete_on_read) {
read_page_ = pages_.begin();
RETURN_IF_ERROR(PinPageIfNeeded(&*read_page_, pinned_));
+ // This waits for the pin to complete if the page was unpinned earlier.
+ const BufferHandle* read_buffer;
+ RETURN_IF_ERROR(read_page_->GetBuffer(&read_buffer));
+
DCHECK(has_read_iterator());
DCHECK(read_page_->is_pinned());
read_page_rows_returned_ = 0;
- read_ptr_ = read_page_->data();
+ read_ptr_ = read_buffer->data();
+ read_end_ptr_ = read_ptr_ + read_buffer->len();
rows_returned_ = 0;
delete_on_read_ = delete_on_read;
CHECK_CONSISTENCY();
@@ -411,6 +439,8 @@ Status BufferedTupleStreamV2::PinStream(bool* pinned) {
if (!reservation_granted) return Status::OK();
// At this point success is guaranteed - go through to pin the pages we need to pin.
+ // If the page data was evicted from memory, the read I/O can happen in parallel
+ // because we defer calling GetBuffer() until NextReadPage().
for (Page& page : pages_) RETURN_IF_ERROR(PinPageIfNeeded(&page, true));
pinned_ = true;
@@ -556,7 +586,7 @@ Status BufferedTupleStreamV2::GetNextInternal(
batch->MarkNeedsDeepCopy();
}
if (FILL_FLAT_ROWS) DCHECK_EQ(flat_rows->size(), rows_to_fill);
- DCHECK_LE(read_ptr_, read_page_->data() + read_page_->len());
+ DCHECK_LE(read_ptr_, read_end_ptr_);
return Status::OK();
}
@@ -567,7 +597,7 @@ void BufferedTupleStreamV2::FixUpStringsForRead(
if (tuple->IsNull(slot_desc->null_indicator_offset())) continue;
StringValue* sv = tuple->GetStringSlot(slot_desc->tuple_offset());
- DCHECK_LE(read_ptr_ + sv->len, read_page_->data() + read_page_->len());
+ DCHECK_LE(read_ptr_ + sv->len, read_end_ptr_);
sv->ptr = reinterpret_cast<char*>(read_ptr_);
read_ptr_ += sv->len;
}
@@ -582,7 +612,7 @@ void BufferedTupleStreamV2::FixUpCollectionsForRead(
CollectionValue* cv = tuple->GetCollectionSlot(slot_desc->tuple_offset());
const TupleDescriptor& item_desc = *slot_desc->collection_item_descriptor();
int coll_byte_size = cv->num_tuples * item_desc.byte_size();
- DCHECK_LE(read_ptr_ + coll_byte_size, read_page_->data() + read_page_->len());
+ DCHECK_LE(read_ptr_ + coll_byte_size, read_end_ptr_);
cv->ptr = reinterpret_cast<uint8_t*>(read_ptr_);
read_ptr_ += coll_byte_size;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb37be89/be/src/runtime/buffered-tuple-stream-v2.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/buffered-tuple-stream-v2.h b/be/src/runtime/buffered-tuple-stream-v2.h
index d707604..e5fea47 100644
--- a/be/src/runtime/buffered-tuple-stream-v2.h
+++ b/be/src/runtime/buffered-tuple-stream-v2.h
@@ -152,7 +152,9 @@ class TupleRow;
/// or false respectively) before advancing to the next page.
/// 2. Pinned: All pages in the stream are pinned so do not need to be pinned or
/// unpinned when reading from the stream. If delete on read is true, pages are
-/// deleted after being read.
+/// deleted after being read. If the stream was previously unpinned, the page's data
+/// may not yet be in memory - reading from the stream can block on I/O or fail with
+/// an I/O error.
/// Write:
/// 1. Unpinned: Unpin pages as they fill up. This means that only a enough reservation
/// to pin a single write page is required to write to the stream, regardless of the
@@ -314,10 +316,9 @@ class BufferedTupleStreamV2 {
bool* got_rows) WARN_UNUSED_RESULT;
/// Must be called once at the end to cleanup all resources. If 'batch' is non-NULL,
- /// attaches buffers from any pinned pages to the batch and deletes unpinned
- /// pages. Otherwise deletes all pages. Does nothing if the stream was already
- /// closed. The 'flush' mode is forwarded to RowBatch::AddBuffer() when attaching
- /// buffers.
+ /// attaches buffers from pinned pages that rows returned from GetNext() may reference.
+ /// Otherwise deletes all pages. Does nothing if the stream was already closed. The
+ /// 'flush' mode is forwarded to RowBatch::AddBuffer() when attaching buffers.
void Close(RowBatch* batch, RowBatch::FlushMode flush);
/// Number of rows in the stream.
@@ -354,18 +355,27 @@ class BufferedTupleStreamV2 {
/// Wrapper around BufferPool::PageHandle that tracks additional info about the page.
struct Page {
- Page() : num_rows(0) {}
+ Page() : num_rows(0), retrieved_buffer(true) {}
inline int len() const { return handle.len(); }
- inline uint8_t* data() const { return handle.data(); }
inline bool is_pinned() const { return handle.is_pinned(); }
inline int pin_count() const { return handle.pin_count(); }
+ Status GetBuffer(const BufferPool::BufferHandle** buffer) {
+ RETURN_IF_ERROR(handle.GetBuffer(buffer));
+ retrieved_buffer = true;
+ return Status::OK();
+ }
std::string DebugString() const;
BufferPool::PageHandle handle;
/// Number of rows written to the page.
int num_rows;
+
+ /// Whether we called GetBuffer() on the page since it was last pinned. This means
+ /// that GetBuffer() and ExtractBuffer() cannot fail and that GetNext() may have
+ /// returned rows referencing the page's buffer.
+ bool retrieved_buffer;
};
/// Runtime state instance used to check for cancellation. Not owned.
@@ -415,11 +425,14 @@ class BufferedTupleStreamV2 {
/// Pointer into read_page_ to the byte after the last row read.
uint8_t* read_ptr_;
+ /// Pointer to one byte past the end of read_page_. Used to detect overruns.
+ const uint8_t* read_end_ptr_;
+
/// Pointer into write_page_ to the byte after the last row written.
uint8_t* write_ptr_;
/// Pointer to one byte past the end of write_page_. Cached to speed up computation
- uint8_t* write_end_ptr_;
+ const uint8_t* write_end_ptr_;
/// Number of rows returned to the caller from GetNext() since the last
/// PrepareForRead() call.
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb37be89/be/src/runtime/bufferpool/buffer-pool-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-internal.h b/be/src/runtime/bufferpool/buffer-pool-internal.h
index 619288b..0c0408b 100644
--- a/be/src/runtime/bufferpool/buffer-pool-internal.h
+++ b/be/src/runtime/bufferpool/buffer-pool-internal.h
@@ -37,15 +37,21 @@
/// Each Page object is owned by at most one InternalList<Page> at any given point.
/// Each page is either pinned or unpinned. Unpinned has a number of sub-states, which
/// is determined by which list in Client/BufferPool contains the page.
-/// * Pinned: Always in this state when 'pin_count' > 0. The page is in
-/// Client::pinned_pages_.
-/// * Unpinned - Dirty: When no write has been started for an unpinned page. The page is
-/// in Client::dirty_unpinned_pages_.
-/// * Unpinned - Write in flight: When the write has been started but not completed for
-/// a dirty unpinned page. The page is in Client::write_in_flight_pages_. For
-/// accounting purposes this is considered a dirty page.
-/// * Unpinned - Clean: When the write has completed but the page was not evicted. The
-/// page is in a clean pages list in a BufferAllocator arena.
+/// * Pinned: Always in this state when 'pin_count' > 0. The page has a buffer and is in
+/// Client::pinned_pages_. 'pin_in_flight' determines which sub-state the page is in:
+/// -> When pin_in_flight=false, the buffer contains the page's data and the client can
+/// read and write to the buffer.
+/// -> When pin_in_flight=true, the page's data is in the process of being read from
+/// scratch disk into the buffer. Clients will block on the read I/O if they attempt
+/// to access the buffer.
+/// * Unpinned - Dirty: When no write to scratch has been started for an unpinned page.
+/// The page is in Client::dirty_unpinned_pages_.
+/// * Unpinned - Write in flight: When the write to scratch has been started but not
+/// completed for a dirty unpinned page. The page is in
+/// Client::write_in_flight_pages_. For accounting purposes this is considered a
+/// dirty page.
+/// * Unpinned - Clean: When the write to scratch has completed but the page was not
+/// evicted. The page is in a clean pages list in a BufferAllocator arena.
/// * Unpinned - Evicted: After a clean page's buffer has been reclaimed. The page is
/// not in any list.
///
@@ -91,7 +97,8 @@ namespace impala {
/// The internal representation of a page, which can be pinned or unpinned. See the
/// class comment for explanation of the different page states.
struct BufferPool::Page : public InternalList<Page>::Node {
- Page(Client* client, int64_t len) : client(client), len(len), pin_count(0) {}
+ Page(Client* client, int64_t len)
+ : client(client), len(len), pin_count(0), pin_in_flight(false) {}
std::string DebugString();
@@ -108,6 +115,11 @@ struct BufferPool::Page : public InternalList<Page>::Node {
/// PageHandle, so it cannot be accessed by multiple threads concurrently.
int pin_count;
+ /// True if the read I/O to pin the page was started but not completed. Only accessed
+ /// in contexts that are passed the associated PageHandle, so it cannot be accessed
+ /// by multiple threads concurrently.
+ bool pin_in_flight;
+
/// Non-null if there is a write in flight, the page is clean, or the page is evicted.
std::unique_ptr<TmpFileMgr::WriteHandle> write_handle;
@@ -211,10 +223,20 @@ class BufferPool::Client {
void MoveToDirtyUnpinned(Page* page);
/// Move an unpinned page to the pinned state, moving between data structures and
- /// reading from disk if necessary. Returns once the page's buffer is allocated
- /// and contains the page's data. Neither the client's lock nor
- /// handle->page_->buffer_lock should be held by the caller.
- Status MoveToPinned(ClientHandle* client, PageHandle* handle);
+ /// reading from disk if necessary. Ensures the page has a buffer. If the data is
+ /// already in memory, ensures the data is in the page's buffer. If the data is on
+ /// disk, starts an async read of the data and sets 'pin_in_flight' on the page to
+ /// true. Neither the client's lock nor page->buffer_lock should be held by the caller.
+ Status StartMoveToPinned(ClientHandle* client, Page* page);
+
+ /// Moves a page that has a pin in flight back to the evicted state, undoing
+ /// StartMoveToPinned(). Neither the client's lock nor page->buffer_lock should be held
+ /// by the caller.
+ void UndoMoveEvictedToPinned(Page* page);
+
+ /// Finish the work of bring the data of an evicted page to memory if
+ /// page->pin_in_flight was set to true by StartMoveToPinned().
+ Status FinishMoveEvictedToPinned(Page* page);
/// Must be called once before allocating a buffer of 'len' via the AllocateBuffer()
/// API to deduct from the client's reservation and update internal accounting. Cleans
@@ -285,12 +307,12 @@ class BufferPool::Client {
/// Called when a write for 'page' completes.
void WriteCompleteCallback(Page* page, const Status& write_status);
- /// Move an evicted page to the pinned state by allocating a new buffer, reading data
- /// from disk and moving the page to 'pinned_pages_'. client->impl must be locked by
- /// the caller via 'client_lock' and handle->page must be unlocked. 'client_lock' is
- /// released then reacquired.
- Status MoveEvictedToPinned(boost::unique_lock<boost::mutex>* client_lock,
- ClientHandle* client, PageHandle* handle);
+ /// Move an evicted page to the pinned state by allocating a new buffer, starting an
+ /// async read from disk and moving the page to 'pinned_pages_'. client->impl must be
+ /// locked by the caller via 'client_lock' and handle->page must be unlocked.
+ /// 'client_lock' is released then reacquired.
+ Status StartMoveEvictedToPinned(
+ boost::unique_lock<boost::mutex>* client_lock, ClientHandle* client, Page* page);
/// The buffer pool that owns the client.
BufferPool* const pool_;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb37be89/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc
index 6b9177e..27a2ea9 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -266,7 +266,7 @@ class BufferPoolTest : public ::testing::Test {
void WriteOrVerifyData(const T& object, int val, bool write) {
// Only write sentinel values to start and end of buffer to make writing and
// verification cheap.
- MemRange mem = object.mem_range();
+ MemRange mem = GetMemRange(object);
uint64_t* start_word = reinterpret_cast<uint64_t*>(mem.data());
uint64_t* end_word =
reinterpret_cast<uint64_t*>(&mem.data()[mem.len() - sizeof(uint64_t)]);
@@ -279,6 +279,14 @@ class BufferPoolTest : public ::testing::Test {
}
}
+ MemRange GetMemRange(const BufferHandle& buffer) { return buffer.mem_range(); }
+
+ MemRange GetMemRange(const PageHandle& page) {
+ const BufferHandle* buffer;
+ EXPECT_OK(page.GetBuffer(&buffer));
+ return buffer->mem_range();
+ }
+
/// Return the total number of bytes allocated from the system currently.
int64_t SystemBytesAllocated(BufferPool* pool) {
return pool->allocator()->GetSystemBytesAllocated();
@@ -329,6 +337,11 @@ class BufferPoolTest : public ::testing::Test {
LOG(INFO) << "Injected fault by truncating file " << path;
}
+ // Return whether a pin is in flight for the page.
+ static bool PinInFlight(PageHandle* page) {
+ return page->page_->pin_in_flight;
+ }
+
// Return the path of the temporary file backing the page.
static string TmpFilePath(PageHandle* page) {
return page->page_->write_handle->TmpFilePath();
@@ -505,11 +518,11 @@ TEST_F(BufferPoolTest, PageCreation) {
ASSERT_OK(pool.CreatePage(&client, page_len, &handles[i]));
ASSERT_TRUE(handles[i].is_open());
ASSERT_TRUE(handles[i].is_pinned());
- ASSERT_TRUE(handles[i].buffer_handle() != NULL);
- ASSERT_TRUE(handles[i].data() != NULL);
- ASSERT_EQ(handles[i].buffer_handle()->data(), handles[i].data());
+ const BufferHandle* buffer;
+ ASSERT_OK(handles[i].GetBuffer(&buffer));
+ ASSERT_TRUE(buffer->data() != NULL);
ASSERT_EQ(handles[i].len(), page_len);
- ASSERT_EQ(handles[i].buffer_handle()->len(), page_len);
+ ASSERT_EQ(buffer->len(), page_len);
ASSERT_EQ(client.GetUsedReservation(), used_before + page_len);
}
@@ -623,14 +636,15 @@ TEST_F(BufferPoolTest, Pin) {
BufferPool::PageHandle handle1, handle2;
// Can pin two minimum sized pages.
- ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1));
+ const BufferHandle* page_buffer;
+ ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1, &page_buffer));
ASSERT_TRUE(handle1.is_open());
ASSERT_TRUE(handle1.is_pinned());
- ASSERT_TRUE(handle1.data() != NULL);
- ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2));
+ ASSERT_TRUE(page_buffer->data() != NULL);
+ ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2, &page_buffer));
ASSERT_TRUE(handle2.is_open());
ASSERT_TRUE(handle2.is_pinned());
- ASSERT_TRUE(handle2.data() != NULL);
+ ASSERT_TRUE(page_buffer->data() != NULL);
pool.Unpin(&client, &handle2);
ASSERT_FALSE(handle2.is_pinned());
@@ -646,10 +660,10 @@ TEST_F(BufferPoolTest, Pin) {
// Can pin double-sized page only once.
BufferPool::PageHandle double_handle;
- ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN * 2, &double_handle));
+ ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN * 2, &double_handle, &page_buffer));
ASSERT_TRUE(double_handle.is_open());
ASSERT_TRUE(double_handle.is_pinned());
- ASSERT_TRUE(double_handle.data() != NULL);
+ ASSERT_TRUE(page_buffer->data() != NULL);
// Destroy the pages - test destroying both pinned and unpinned.
pool.DestroyPage(&client, &handle1);
@@ -659,6 +673,78 @@ TEST_F(BufferPoolTest, Pin) {
pool.DeregisterClient(&client);
}
+// Test the various state transitions possible with async Pin() calls.
+TEST_F(BufferPoolTest, AsyncPin) {
+ const int DATA_SEED = 1234;
+ // Set up pool with enough reservation to keep two buffers in memory.
+ const int64_t TOTAL_MEM = 2 * TEST_BUFFER_LEN;
+ BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM);
+ global_reservations_.InitRootTracker(NULL, TOTAL_MEM);
+ BufferPool::ClientHandle client;
+ ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
+ NULL, TOTAL_MEM, NewProfile(), &client));
+ ASSERT_TRUE(client.IncreaseReservationToFit(TOTAL_MEM));
+
+ PageHandle handle;
+ const BufferHandle* buffer;
+ ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle, &buffer));
+ WriteData(*buffer, DATA_SEED);
+ // Pin() on a pinned page just increments the pin count.
+ ASSERT_OK(pool.Pin(&client, &handle));
+ EXPECT_EQ(2, handle.pin_count());
+ EXPECT_FALSE(PinInFlight(&handle));
+
+ pool.Unpin(&client, &handle);
+ pool.Unpin(&client, &handle);
+ ASSERT_FALSE(handle.is_pinned());
+
+ // Calling Pin() then Pin() results in double-pinning.
+ ASSERT_OK(pool.Pin(&client, &handle));
+ ASSERT_OK(pool.Pin(&client, &handle));
+ EXPECT_EQ(2, handle.pin_count());
+ EXPECT_FALSE(PinInFlight(&handle));
+
+ pool.Unpin(&client, &handle);
+ pool.Unpin(&client, &handle);
+ ASSERT_FALSE(handle.is_pinned());
+
+ // Pin() on a page that isn't evicted pins it immediately.
+ ASSERT_OK(pool.Pin(&client, &handle));
+ EXPECT_EQ(1, handle.pin_count());
+ EXPECT_FALSE(PinInFlight(&handle));
+ VerifyData(handle, 1234);
+ pool.Unpin(&client, &handle);
+ ASSERT_FALSE(handle.is_pinned());
+
+ // Force eviction. Pin() on an evicted page starts the write asynchronously.
+ ASSERT_OK(AllocateAndFree(&pool, &client, TOTAL_MEM));
+ ASSERT_OK(pool.Pin(&client, &handle));
+ EXPECT_EQ(1, handle.pin_count());
+ EXPECT_TRUE(PinInFlight(&handle));
+ // Block on the pin and verify the buffer.
+ ASSERT_OK(handle.GetBuffer(&buffer));
+ EXPECT_FALSE(PinInFlight(&handle));
+ VerifyData(*buffer, 1234);
+
+ // Test that we can unpin while in flight and the data remains valid.
+ pool.Unpin(&client, &handle);
+ ASSERT_OK(AllocateAndFree(&pool, &client, TOTAL_MEM));
+ ASSERT_OK(pool.Pin(&client, &handle));
+ EXPECT_TRUE(PinInFlight(&handle));
+ pool.Unpin(&client, &handle);
+ ASSERT_OK(pool.Pin(&client, &handle));
+ ASSERT_OK(handle.GetBuffer(&buffer));
+ VerifyData(*buffer, 1234);
+
+ // Evict the page, then destroy while we're pinning it asynchronously.
+ pool.Unpin(&client, &handle);
+ ASSERT_OK(AllocateAndFree(&pool, &client, TOTAL_MEM));
+ ASSERT_OK(pool.Pin(&client, &handle));
+ pool.DestroyPage(&client, &handle);
+
+ pool.DeregisterClient(&client);
+}
+
/// Creating a page or pinning without sufficient reservation should DCHECK.
TEST_F(BufferPoolTest, PinWithoutReservation) {
int64_t total_mem = TEST_BUFFER_LEN * 1024;
@@ -698,9 +784,10 @@ TEST_F(BufferPoolTest, ExtractBuffer) {
// Test basic buffer extraction.
for (int len = TEST_BUFFER_LEN; len <= 2 * TEST_BUFFER_LEN; len *= 2) {
- ASSERT_OK(pool.CreatePage(&client, len, &page));
- uint8_t* page_data = page.data();
- pool.ExtractBuffer(&client, &page, &buffer);
+ const BufferHandle* page_buffer;
+ ASSERT_OK(pool.CreatePage(&client, len, &page, &page_buffer));
+ uint8_t* page_data = page_buffer->data();
+ ASSERT_OK(pool.ExtractBuffer(&client, &page, &buffer));
ASSERT_FALSE(page.is_open());
ASSERT_TRUE(buffer.is_open());
ASSERT_EQ(len, buffer.len());
@@ -711,11 +798,12 @@ TEST_F(BufferPoolTest, ExtractBuffer) {
}
// Test that ExtractBuffer() accounts correctly for pin count > 1.
- ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page));
- uint8_t* page_data = page.data();
+ const BufferHandle* page_buffer;
+ ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page, &page_buffer));
+ uint8_t* page_data = page_buffer->data();
ASSERT_OK(pool.Pin(&client, &page));
ASSERT_EQ(TEST_BUFFER_LEN * 2, client.GetUsedReservation());
- pool.ExtractBuffer(&client, &page, &buffer);
+ ASSERT_OK(pool.ExtractBuffer(&client, &page, &buffer));
ASSERT_EQ(TEST_BUFFER_LEN, client.GetUsedReservation());
ASSERT_FALSE(page.is_open());
ASSERT_TRUE(buffer.is_open());
@@ -727,7 +815,7 @@ TEST_F(BufferPoolTest, ExtractBuffer) {
// Test that ExtractBuffer() DCHECKs for unpinned pages.
ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page));
pool.Unpin(&client, &page);
- IMPALA_ASSERT_DEBUG_DEATH(pool.ExtractBuffer(&client, &page, &buffer), "");
+ IMPALA_ASSERT_DEBUG_DEATH((void)pool.ExtractBuffer(&client, &page, &buffer), "");
pool.DestroyPage(&client, &page);
pool.DeregisterClient(&client);
@@ -871,15 +959,17 @@ TEST_F(BufferPoolTest, EvictPageDifferentClient) {
}
// Create a pinned and unpinned page for the first client.
- BufferPool::PageHandle handle1, handle2;
- ASSERT_OK(pool.CreatePage(&clients[0], TEST_BUFFER_LEN, &handle1));
+ PageHandle handle1, handle2;
+ const BufferHandle* page_buffer;
+ ASSERT_OK(pool.CreatePage(&clients[0], TEST_BUFFER_LEN, &handle1, &page_buffer));
const uint8_t TEST_VAL = 123;
- memset(handle1.data(), TEST_VAL, handle1.len()); // Fill page with an arbitrary value.
+ memset(
+ page_buffer->data(), TEST_VAL, handle1.len()); // Fill page with an arbitrary value.
pool.Unpin(&clients[0], &handle1);
ASSERT_OK(pool.CreatePage(&clients[0], TEST_BUFFER_LEN, &handle2));
// Allocating a buffer for the second client requires evicting the unpinned page.
- BufferPool::BufferHandle buffer;
+ BufferHandle buffer;
ASSERT_OK(pool.AllocateBuffer(&clients[1], TEST_BUFFER_LEN, &buffer));
ASSERT_TRUE(IsEvicted(&handle1));
@@ -887,7 +977,10 @@ TEST_F(BufferPoolTest, EvictPageDifferentClient) {
pool.Unpin(&clients[0], &handle2);
ASSERT_OK(pool.Pin(&clients[0], &handle1));
ASSERT_TRUE(IsEvicted(&handle2));
- for (int i = 0; i < handle1.len(); ++i) EXPECT_EQ(TEST_VAL, handle1.data()[i]) << i;
+ ASSERT_OK(handle1.GetBuffer(&page_buffer));
+ for (int i = 0; i < handle1.len(); ++i) {
+ EXPECT_EQ(TEST_VAL, page_buffer->data()[i]) << i;
+ }
// Clean up everything.
pool.DestroyPage(&clients[0], &handle1);
@@ -1438,7 +1531,10 @@ TEST_F(BufferPoolTest, ScratchReadError) {
DCHECK_EQ(error_type, TRUNCATE);
TruncateBackingFile(tmp_file);
}
- Status status = pool.Pin(&client, &page);
+ ASSERT_OK(pool.Pin(&client, &page));
+ // The read is async, so won't bubble up until we block on it with GetBuffer().
+ const BufferHandle* page_buffer;
+ Status status = page.GetBuffer(&page_buffer);
if (error_type == CORRUPT_DATA && !FLAGS_disk_spill_encryption) {
// Without encryption we can't detect that the data changed.
EXPECT_OK(status);
@@ -1662,7 +1758,8 @@ void BufferPoolTest::TestRandomInternalImpl(BufferPool* pool, FileGroup* file_gr
if ((i % 10000) == 0) LOG(ERROR) << " Iteration " << i << endl;
// Pick an operation.
// New page: 15%
- // Pin a page: 30%
+ // Pin a page and block waiting for the result: 20%
+ // Pin a page and let it continue asynchronously: 10%
// Unpin a pinned page: 25% (< Pin prob. so that memory consumption increases).
// Destroy page: 10% (< New page prob. so that number of pages grows over time).
// Allocate buffer: 10%
@@ -1679,24 +1776,29 @@ void BufferPoolTest::TestRandomInternalImpl(BufferPool* pool, FileGroup* file_gr
WriteData(new_page, data);
pages.emplace_back(move(new_page), data);
} else if (p < 0.45) {
- // Pin a page.
+ // Pin a page asynchronously.
if (pages.empty()) continue;
int rand_pick = uniform_int_distribution<int>(0, pages.size() - 1)(*rng);
PageHandle* page = &pages[rand_pick].first;
if (!client.IncreaseReservationToFit(page->len())) continue;
if (!page->is_pinned() || multiple_pins) ASSERT_OK(pool->Pin(&client, page));
- VerifyData(*page, pages[rand_pick].second);
+ // Block on the pin and verify data for sync pins.
+ if (p < 0.35) VerifyData(*page, pages[rand_pick].second);
} else if (p < 0.70) {
// Unpin a pinned page.
if (pages.empty()) continue;
int rand_pick = uniform_int_distribution<int>(0, pages.size() - 1)(*rng);
PageHandle* page = &pages[rand_pick].first;
- if (page->is_pinned()) pool->Unpin(&client, page);
+ if (page->is_pinned()) {
+ VerifyData(*page, pages[rand_pick].second);
+ pool->Unpin(&client, page);
+ }
} else if (p < 0.80) {
// Destroy a page.
if (pages.empty()) continue;
int rand_pick = uniform_int_distribution<int>(0, pages.size() - 1)(*rng);
auto page_data = move(pages[rand_pick]);
+ if (page_data.first.is_pinned()) VerifyData(page_data.first, page_data.second);
pages[rand_pick] = move(pages.back());
pages.pop_back();
pool->DestroyPage(&client, &page_data.first);
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb37be89/be/src/runtime/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc
index 1d2f1d3..c1bde5f 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -90,11 +90,18 @@ int64_t BufferPool::PageHandle::len() const {
return page_->len; // Does not require locking.
}
-const BufferPool::BufferHandle* BufferPool::PageHandle::buffer_handle() const {
+Status BufferPool::PageHandle::GetBuffer(const BufferHandle** buffer) const {
+ DCHECK(is_open());
+ DCHECK(client_->is_registered());
DCHECK(is_pinned());
- // The 'buffer' field cannot change while the page is pinned, so it is safe to access
- // without locking.
- return &page_->buffer;
+ if (page_->pin_in_flight) {
+ // Finish the work started in Pin().
+ RETURN_IF_ERROR(client_->impl_->FinishMoveEvictedToPinned(page_));
+ }
+ DCHECK(!page_->pin_in_flight);
+ *buffer = &page_->buffer;
+ DCHECK((*buffer)->is_open());
+ return Status::OK();
}
BufferPool::BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit)
@@ -123,16 +130,18 @@ void BufferPool::DeregisterClient(ClientHandle* client) {
client->impl_ = NULL;
}
-Status BufferPool::CreatePage(ClientHandle* client, int64_t len, PageHandle* handle) {
+Status BufferPool::CreatePage(
+ ClientHandle* client, int64_t len, PageHandle* handle, const BufferHandle** buffer) {
DCHECK(!handle->is_open());
DCHECK_GE(len, min_buffer_len_);
DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len));
- BufferHandle buffer;
+ BufferHandle new_buffer;
// No changes have been made to state yet, so we can cleanly return on error.
- RETURN_IF_ERROR(AllocateBuffer(client, len, &buffer));
- Page* page = client->impl_->CreatePinnedPage(move(buffer));
+ RETURN_IF_ERROR(AllocateBuffer(client, len, &new_buffer));
+ Page* page = client->impl_->CreatePinnedPage(move(new_buffer));
handle->Open(page, client);
+ if (buffer != nullptr) *buffer = &page->buffer;
return Status::OK();
}
@@ -140,10 +149,16 @@ void BufferPool::DestroyPage(ClientHandle* client, PageHandle* handle) {
if (!handle->is_open()) return; // DestroyPage() should be idempotent.
if (handle->is_pinned()) {
+ // Cancel the read I/O - we don't need the data any more.
+ if (handle->page_->pin_in_flight) {
+ handle->page_->write_handle->CancelRead();
+ handle->page_->pin_in_flight = false;
+ }
// In the pinned case, delegate to ExtractBuffer() and FreeBuffer() to do the work
// of cleaning up the page, freeing the buffer and updating reservations correctly.
BufferHandle buffer;
- ExtractBuffer(client, handle, &buffer);
+ Status status = ExtractBuffer(client, handle, &buffer);
+ DCHECK(status.ok()) << status.msg().msg();
FreeBuffer(client, &buffer);
} else {
// In the unpinned case, no reservations are used so we just clean up the page.
@@ -158,7 +173,7 @@ Status BufferPool::Pin(ClientHandle* client, PageHandle* handle) {
Page* page = handle->page_;
if (page->pin_count == 0) {
- RETURN_IF_ERROR(client->impl_->MoveToPinned(client, handle));
+ RETURN_IF_ERROR(client->impl_->StartMoveToPinned(client, page));
COUNTER_ADD(client->impl_->counters().peak_unpinned_bytes, -page->len);
}
// Update accounting last to avoid complicating the error return path above.
@@ -178,23 +193,34 @@ void BufferPool::Unpin(ClientHandle* client, PageHandle* handle) {
reservation->ReleaseTo(page->len);
if (--page->pin_count > 0) return;
- client->impl_->MoveToDirtyUnpinned(page);
+ if (page->pin_in_flight) {
+ // Data is not in memory - move it back to evicted.
+ client->impl_->UndoMoveEvictedToPinned(page);
+ } else {
+ // Data is in memory - move it to dirty unpinned.
+ client->impl_->MoveToDirtyUnpinned(page);
+ }
COUNTER_ADD(client->impl_->counters().total_unpinned_bytes, handle->len());
COUNTER_ADD(client->impl_->counters().peak_unpinned_bytes, handle->len());
}
-void BufferPool::ExtractBuffer(
+Status BufferPool::ExtractBuffer(
ClientHandle* client, PageHandle* page_handle, BufferHandle* buffer_handle) {
DCHECK(page_handle->is_pinned());
DCHECK(!buffer_handle->is_open());
DCHECK_EQ(page_handle->client_, client);
+ // If an async pin is in flight, we need to wait for it.
+ const BufferHandle* dummy;
+ RETURN_IF_ERROR(page_handle->GetBuffer(&dummy));
+
// Bring the pin count to 1 so that we're not using surplus reservations.
while (page_handle->pin_count() > 1) Unpin(client, page_handle);
// Destroy the page and extract the buffer.
client->impl_->DestroyPageInternal(page_handle, buffer_handle);
DCHECK(buffer_handle->is_open());
+ return Status::OK();
}
Status BufferPool::AllocateBuffer(
@@ -347,6 +373,7 @@ void BufferPool::Client::MoveToDirtyUnpinned(Page* page) {
// Only valid to unpin pages if spilling is enabled.
DCHECK(spilling_enabled());
DCHECK_EQ(0, page->pin_count);
+
unique_lock<mutex> lock(lock_);
DCHECK_CONSISTENCY();
DCHECK(pinned_pages_.Contains(page));
@@ -357,8 +384,7 @@ void BufferPool::Client::MoveToDirtyUnpinned(Page* page) {
WriteDirtyPagesAsync();
}
-Status BufferPool::Client::MoveToPinned(ClientHandle* client, PageHandle* handle) {
- Page* page = handle->page_;
+Status BufferPool::Client::StartMoveToPinned(ClientHandle* client, Page* page) {
unique_lock<mutex> cl(lock_);
DCHECK_CONSISTENCY();
// Propagate any write errors that occurred for this client.
@@ -390,31 +416,55 @@ Status BufferPool::Client::MoveToPinned(ClientHandle* client, PageHandle* handle
return file_group_->RestoreData(move(page->write_handle), page->buffer.mem_range());
}
// If the page wasn't in the clean pages list, it must have been evicted.
- return MoveEvictedToPinned(&cl, client, handle);
+ return StartMoveEvictedToPinned(&cl, client, page);
}
-Status BufferPool::Client::MoveEvictedToPinned(
- unique_lock<mutex>* client_lock, ClientHandle* client, PageHandle* handle) {
- Page* page = handle->page_;
+Status BufferPool::Client::StartMoveEvictedToPinned(
+ unique_lock<mutex>* client_lock, ClientHandle* client, Page* page) {
DCHECK(!page->buffer.is_open());
- // Don't hold any locks while allocating or reading back the data. It is safe to modify
- // the page's buffer handle without holding any locks because no concurrent operations
- // can modify evicted pages.
- client_lock->unlock();
+ // Safe to modify the page's buffer handle without holding the page lock because no
+ // concurrent operations can modify evicted pages.
BufferHandle buffer;
RETURN_IF_ERROR(pool_->allocator_->Allocate(client, page->len, &page->buffer));
COUNTER_ADD(counters().bytes_read, page->len);
COUNTER_ADD(counters().read_io_ops, 1);
- {
- SCOPED_TIMER(counters().read_wait_time);
- RETURN_IF_ERROR(
- file_group_->Read(page->write_handle.get(), page->buffer.mem_range()));
- }
- file_group_->DestroyWriteHandle(move(page->write_handle));
- client_lock->lock();
+ RETURN_IF_ERROR(
+ file_group_->ReadAsync(page->write_handle.get(), page->buffer.mem_range()));
pinned_pages_.Enqueue(page);
+ page->pin_in_flight = true;
+ DCHECK_CONSISTENCY();
+ return Status::OK();
+}
+
+void BufferPool::Client::UndoMoveEvictedToPinned(Page* page) {
+ // We need to get the page back to the evicted state where:
+ // * There is no in-flight read.
+ // * The page's data is on disk referenced by 'write_handle'
+ // * The page has no attached buffer.
+ DCHECK(page->pin_in_flight);
+ page->write_handle->CancelRead();
+ page->pin_in_flight = false;
+
+ unique_lock<mutex> lock(lock_);
DCHECK_CONSISTENCY();
+ DCHECK(pinned_pages_.Contains(page));
+ pinned_pages_.Remove(page);
+ // Discard the buffer - the pin was in flight so there was no way that a valid
+ // reference to the buffer's contents was returned since the pin was still in flight.
+ pool_->allocator_->Free(move(page->buffer));
+}
+
+Status BufferPool::Client::FinishMoveEvictedToPinned(Page* page) {
+ DCHECK(page->pin_in_flight);
+ SCOPED_TIMER(counters().read_wait_time);
+ // Don't hold any locks while reading back the data. It is safe to modify the page's
+ // buffer handle without holding any locks because no concurrent operations can modify
+ // evicted pages.
+ RETURN_IF_ERROR(
+ file_group_->WaitForAsyncRead(page->write_handle.get(), page->buffer.mem_range()));
+ file_group_->DestroyWriteHandle(move(page->write_handle));
+ page->pin_in_flight = false;
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb37be89/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h
index 9a37923..dbf75bc 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -133,7 +133,9 @@ class SystemAllocator;
/// * Once the operator needs the page's contents again and has sufficient unused
/// reservation, it can call Pin(), which brings the page's contents back into memory,
/// perhaps in a different buffer. Therefore the operator must fix up any pointers into
-/// the previous buffer.
+/// the previous buffer. Pin() can execute asynchronously - the caller only blocks
+/// waiting for read I/O if it calls GetBuffer() or ExtractBuffer() while the read is
+/// in flight.
/// * If the operator is done with the page, it can call FreeBuffer() to destroy the
/// handle and release resources, or call ExtractBuffer() to extract the buffer.
///
@@ -184,15 +186,19 @@ class BufferPool : public CacheLineAligned {
/// sufficient unused reservation to pin the new page (otherwise it will DCHECK).
/// CreatePage() only fails when a system error prevents the buffer pool from fulfilling
/// the reservation.
- /// On success, the handle is mapped to the new page.
- Status CreatePage(
- ClientHandle* client, int64_t len, PageHandle* handle) WARN_UNUSED_RESULT;
+ /// On success, the handle is mapped to the new page and 'buffer', if non-NULL, is set
+ /// to the page's buffer.
+ Status CreatePage(ClientHandle* client, int64_t len, PageHandle* handle,
+ const BufferHandle** buffer = nullptr) WARN_UNUSED_RESULT;
/// Increment the pin count of 'handle'. After Pin() the underlying page will
- /// be mapped to a buffer, which will be accessible through 'handle'. Uses
- /// reservation from 'client'. The caller is responsible for ensuring it has enough
- /// unused reservation before calling Pin() (otherwise it will DCHECK). Pin() only
- /// fails when a system error prevents the buffer pool from fulfilling the reservation.
+ /// be mapped to a buffer, which will be accessible through 'handle'. If the data
+ /// was evicted from memory, it will be read back into memory asynchronously.
+ /// Attempting to access the buffer with ExtractBuffer() or handle.GetBuffer() will
+ /// block until the data is in memory. The caller is responsible for ensuring it has
+ /// enough unused reservation before calling Pin() (otherwise it will DCHECK). Pin()
+ /// only fails when a system error prevents the buffer pool from fulfilling the
+ /// reservation or if an I/O error is encountered reading back data from disk.
/// 'handle' must be open.
Status Pin(ClientHandle* client, PageHandle* handle) WARN_UNUSED_RESULT;
@@ -214,9 +220,11 @@ class BufferPool : public CacheLineAligned {
/// Extracts buffer from a pinned page. After this returns, the page referenced by
/// 'page_handle' will be destroyed and 'buffer_handle' will reference the buffer from
/// 'page_handle'. This may decrease reservation usage of 'client' if the page was
- /// pinned multiple times via 'page_handle'.
- void ExtractBuffer(
- ClientHandle* client, PageHandle* page_handle, BufferHandle* buffer_handle);
+ /// pinned multiple times via 'page_handle'. May return an error if 'page_handle' was
+ /// unpinned earlier with no subsequent GetBuffer() call and a read error is
+ /// encountered while bringing the page back into memory.
+ Status ExtractBuffer(
+ ClientHandle* client, PageHandle* page_handle, BufferHandle* buffer_handle) WARN_UNUSED_RESULT;
/// Allocates a new buffer of 'len' bytes. Uses reservation from 'client'. The caller
/// is responsible for ensuring it has enough unused reservation before calling
@@ -400,19 +408,15 @@ class BufferPool::PageHandle {
bool is_pinned() const { return pin_count() > 0; }
int pin_count() const;
int64_t len() const;
- /// Get a pointer to the start of the page's buffer. Only valid to call if the page
- /// is pinned.
- uint8_t* data() const { return buffer_handle()->data(); }
- /// Convenience function to get the memory range for the page's buffer. Only valid to
- /// call if the page is pinned.
- MemRange mem_range() const { return buffer_handle()->mem_range(); }
-
- /// Return a pointer to the page's buffer handle. Only valid to call if the page is
- /// pinned via this handle. Only const accessors of the returned handle can be used:
- /// it is invalid to call FreeBuffer() or TransferBuffer() on it or to otherwise modify
- /// the handle.
- const BufferHandle* buffer_handle() const;
+ /// Get a reference to the page's buffer handle. Only valid to call if the page is
+ /// pinned. If the page was previously unpinned and the read I/O for the data is still
+ /// in flight, this can block waiting. Returns an error if an error was encountered
+ /// reading the data back, which can only happen if Unpin() was called on the page
+ /// since the last call to GetBuffer(). Only const accessors of the returned handle can
+ /// be used: it is invalid to call FreeBuffer() or TransferBuffer() on it or to
+ /// otherwise modify the handle.
+ Status GetBuffer(const BufferHandle** buffer_handle) const WARN_UNUSED_RESULT;
std::string DebugString() const;
@@ -431,9 +435,8 @@ class BufferPool::PageHandle {
/// The internal page structure. NULL if the handle is not open.
Page* page_;
- /// The client the page handle belongs to, used to validate that the correct client
- /// is being used.
- const ClientHandle* client_;
+ /// The client the page handle belongs to.
+ ClientHandle* client_;
};
inline BufferPool::BufferHandle::BufferHandle(BufferHandle&& src) {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb37be89/be/src/runtime/tmp-file-mgr.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.cc b/be/src/runtime/tmp-file-mgr.cc
index 293a898..bc09f2e 100644
--- a/be/src/runtime/tmp-file-mgr.cc
+++ b/be/src/runtime/tmp-file-mgr.cc
@@ -368,36 +368,44 @@ Status TmpFileMgr::FileGroup::Write(
}
Status TmpFileMgr::FileGroup::Read(WriteHandle* handle, MemRange buffer) {
+ RETURN_IF_ERROR(ReadAsync(handle, buffer));
+ return WaitForAsyncRead(handle, buffer);
+}
+
+Status TmpFileMgr::FileGroup::ReadAsync(WriteHandle* handle, MemRange buffer) {
DCHECK(handle->write_range_ != nullptr);
DCHECK(!handle->is_cancelled_);
DCHECK_EQ(buffer.len(), handle->len());
Status status;
- // Don't grab 'lock_' in this method - it is not necessary because we don't touch
- // any members that it protects and could block other threads for the duration of
- // the synchronous read.
+ // Don't grab 'write_state_lock_' in this method - it is not necessary because we
+ // don't touch any members that it protects and could block other threads for the
+ // duration of the synchronous read.
DCHECK(!handle->write_in_flight_);
+ DCHECK(handle->read_range_ == nullptr);
DCHECK(handle->write_range_ != nullptr);
- // Don't grab handle->lock_, it is safe to touch all of handle's state since the
- // write is not in flight.
- DiskIoMgr::ScanRange* scan_range = scan_range_pool_.Add(new DiskIoMgr::ScanRange);
- scan_range->Reset(nullptr, handle->write_range_->file(), handle->write_range_->len(),
- handle->write_range_->offset(), handle->write_range_->disk_id(), false,
+ // Don't grab handle->write_state_lock_, it is safe to touch all of handle's state
+ // since the write is not in flight.
+ handle->read_range_ = scan_range_pool_.Add(new DiskIoMgr::ScanRange);
+ handle->read_range_->Reset(nullptr, handle->write_range_->file(),
+ handle->write_range_->len(), handle->write_range_->offset(),
+ handle->write_range_->disk_id(), false,
DiskIoMgr::BufferOpts::ReadInto(buffer.data(), buffer.len()));
- DiskIoMgr::BufferDescriptor* io_mgr_buffer = nullptr;
- {
- SCOPED_TIMER(disk_read_timer_);
- read_counter_->Add(1);
- bytes_read_counter_->Add(buffer.len());
- status = io_mgr_->Read(io_ctx_, scan_range, &io_mgr_buffer);
- if (!status.ok()) goto exit;
- }
-
- if (FLAGS_disk_spill_encryption) {
- status = handle->CheckHashAndDecrypt(buffer);
- if (!status.ok()) goto exit;
- }
+ read_counter_->Add(1);
+ bytes_read_counter_->Add(buffer.len());
+ RETURN_IF_ERROR(io_mgr_->AddScanRange(io_ctx_, handle->read_range_, true));
+ return Status::OK();
+}
+Status TmpFileMgr::FileGroup::WaitForAsyncRead(WriteHandle* handle, MemRange buffer) {
+ DCHECK(handle->read_range_ != nullptr);
+ // Don't grab handle->write_state_lock_, it is safe to touch all of handle's state
+ // since the write is not in flight.
+ SCOPED_TIMER(disk_read_timer_);
+ DiskIoMgr::BufferDescriptor* io_mgr_buffer = nullptr;
+ Status status = handle->read_range_->GetNext(&io_mgr_buffer);
+ if (!status.ok()) goto exit;
+ DCHECK(io_mgr_buffer != NULL);
DCHECK(io_mgr_buffer->eosr());
DCHECK_LE(io_mgr_buffer->len(), buffer.len());
if (io_mgr_buffer->len() < buffer.len()) {
@@ -409,9 +417,14 @@ Status TmpFileMgr::FileGroup::Read(WriteHandle* handle, MemRange buffer) {
}
DCHECK_EQ(io_mgr_buffer->buffer(), buffer.data());
+ if (FLAGS_disk_spill_encryption) {
+ status = handle->CheckHashAndDecrypt(buffer);
+ if (!status.ok()) goto exit;
+ }
exit:
// Always return the buffer before exiting to avoid leaking it.
if (io_mgr_buffer != nullptr) io_mgr_buffer->Return();
+ handle->read_range_ = nullptr;
return status;
}
@@ -420,6 +433,7 @@ Status TmpFileMgr::FileGroup::RestoreData(
DCHECK_EQ(handle->write_range_->data(), buffer.data());
DCHECK_EQ(handle->len(), buffer.len());
DCHECK(!handle->write_in_flight_);
+ DCHECK(handle->read_range_ == nullptr);
// Decrypt after the write is finished, so that we don't accidentally write decrypted
// data to disk.
Status status;
@@ -501,6 +515,7 @@ TmpFileMgr::WriteHandle::WriteHandle(
: cb_(cb),
encryption_timer_(encryption_timer),
file_(nullptr),
+ read_range_(nullptr),
is_cancelled_(false),
write_in_flight_(false) {}
@@ -570,9 +585,20 @@ void TmpFileMgr::WriteHandle::WriteComplete(const Status& write_status) {
}
void TmpFileMgr::WriteHandle::Cancel() {
- unique_lock<mutex> lock(write_state_lock_);
- is_cancelled_ = true;
- // TODO: in future, if DiskIoMgr supported cancellation, we could cancel it here.
+ CancelRead();
+ {
+ unique_lock<mutex> lock(write_state_lock_);
+ is_cancelled_ = true;
+ // TODO: in future, if DiskIoMgr supported write cancellation, we could cancel it
+ // here.
+ }
+}
+
+void TmpFileMgr::WriteHandle::CancelRead() {
+ if (read_range_ != nullptr) {
+ read_range_->Cancel(Status::CANCELLED);
+ read_range_ = nullptr;
+ }
}
void TmpFileMgr::WriteHandle::WaitForWrite() {
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/cb37be89/be/src/runtime/tmp-file-mgr.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/tmp-file-mgr.h b/be/src/runtime/tmp-file-mgr.h
index d66c527..c71c370 100644
--- a/be/src/runtime/tmp-file-mgr.h
+++ b/be/src/runtime/tmp-file-mgr.h
@@ -123,9 +123,22 @@ class TmpFileMgr {
/// Synchronously read the data referenced by 'handle' from the temporary file into
/// 'buffer'. buffer.len() must be the same as handle->len(). Can only be called
- /// after a write successfully completes.
+ /// after a write successfully completes. Should not be called while an async read
+ /// is in flight. Equivalent to calling ReadAsync() then WaitForAsyncRead().
Status Read(WriteHandle* handle, MemRange buffer) WARN_UNUSED_RESULT;
+ /// Asynchronously read the data referenced by 'handle' from the temporary file into
+ /// 'buffer'. buffer.len() must be the same as handle->len(). Can only be called
+ /// after a write successfully completes. WaitForAsyncRead() must be called before the
+ /// data in the buffer is valid. Should not be called while an async read
+ /// is already in flight.
+ Status ReadAsync(WriteHandle* handle, MemRange buffer) WARN_UNUSED_RESULT;
+
+ /// Wait until the read started for 'handle' by ReadAsync() completes. 'buffer'
+ /// should be the same buffer passed into ReadAsync(). Returns an error if the
+ /// read fails. Retrying a failed read by calling ReadAsync() again is allowed.
+ Status WaitForAsyncRead(WriteHandle* handle, MemRange buffer) WARN_UNUSED_RESULT;
+
/// Restore the original data in the 'buffer' passed to Write(), decrypting or
/// decompressing as necessary. Returns an error if restoring the data fails.
/// The write must not be in-flight - the caller is responsible for waiting for
@@ -215,7 +228,7 @@ class TmpFileMgr {
/// Amount of scratch space allocated in bytes.
RuntimeProfile::Counter* const scratch_space_bytes_used_counter_;
- /// Time taken for disk reads.
+ /// Time spent waiting for disk reads.
RuntimeProfile::Counter* const disk_read_timer_;
/// Time spent in disk spill encryption, decryption, and integrity checking.
@@ -263,14 +276,21 @@ class TmpFileMgr {
public:
/// The write must be destroyed by passing it to FileGroup - destroying it before
/// the write completes is an error.
- ~WriteHandle() { DCHECK(!write_in_flight_); }
+ ~WriteHandle() {
+ DCHECK(!write_in_flight_);
+ DCHECK(read_range_ == nullptr);
+ }
- /// Cancels the write asynchronously. After Cancel() is called, writes are not
+ /// Cancels any in-flight writes or reads. Reads are cancelled synchronously and
+ /// writes are cancelled asynchronously. After Cancel() is called, writes are not
/// retried. The write callback may be called with a CANCELLED status (unless
/// it succeeded or encountered a different error first).
/// TODO: IMPALA-3200: make this private once BufferedBlockMgr doesn't need it.
void Cancel();
+ /// Cancel any in-flight read synchronously.
+ void CancelRead();
+
/// Blocks until the write completes either successfully or unsuccessfully.
/// May return before the write callback has been called.
/// TODO: IMPALA-3200: make this private once BufferedBlockMgr doesn't need it.
@@ -333,6 +353,10 @@ class TmpFileMgr {
/// on writes; verified on reads. This is calculated _after_ encryption.
IntegrityHash hash_;
+ /// The scan range for the read that is currently in flight. NULL when no read is in
+ /// flight.
+ DiskIoMgr::ScanRange* read_range_;
+
/// Protects all fields below while 'write_in_flight_' is true. At other times, it is
/// invalid to call WriteRange/FileGroup methods concurrently from multiple threads,
/// so no locking is required. This is a terminal lock and should not be held while