You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/09/29 00:32:38 UTC
[6/6] incubator-impala git commit: IMPALA-3201: in-memory buffer pool
implementation
IMPALA-3201: in-memory buffer pool implementation
This patch implements basic in-memory buffer management, with
reservations managed by ReservationTrackers.
Locks are fine-grained so that the buffer pool can scale to many
concurrent queries.
Includes basic tests for buffer pool setup, allocation and reservations.
Change-Id: I4bda61c31cc02d26bc83c3d458c835b0984b86a0
Reviewed-on: http://gerrit.cloudera.org:8080/4070
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/241c7e01
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/241c7e01
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/241c7e01
Branch: refs/heads/master
Commit: 241c7e01978f180012453b0a4ff6d061ca6d5093
Parents: 9cee2b5
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Fri Aug 19 17:41:45 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Wed Sep 28 23:38:20 2016 +0000
----------------------------------------------------------------------
be/src/bufferpool/CMakeLists.txt | 1 +
be/src/bufferpool/buffer-allocator.cc | 39 +++
be/src/bufferpool/buffer-allocator.h | 48 ++++
be/src/bufferpool/buffer-pool-test.cc | 381 +++++++++++++++++++++++++++--
be/src/bufferpool/buffer-pool.cc | 361 ++++++++++++++++++++++++++-
be/src/bufferpool/buffer-pool.h | 159 +++++++++---
be/src/util/internal-queue.h | 11 +
common/thrift/generate_error_codes.py | 4 +-
8 files changed, 934 insertions(+), 70 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/be/src/bufferpool/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/CMakeLists.txt b/be/src/bufferpool/CMakeLists.txt
index 69c4e4a..2f056e0 100644
--- a/be/src/bufferpool/CMakeLists.txt
+++ b/be/src/bufferpool/CMakeLists.txt
@@ -22,6 +22,7 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/bufferpool")
set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/bufferpool")
add_library(BufferPool
+ buffer-allocator.cc
buffer-pool.cc
reservation-tracker.cc
)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/be/src/bufferpool/buffer-allocator.cc
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/buffer-allocator.cc b/be/src/bufferpool/buffer-allocator.cc
new file mode 100644
index 0000000..27bd788
--- /dev/null
+++ b/be/src/bufferpool/buffer-allocator.cc
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "bufferpool/buffer-allocator.h"
+
+#include "util/bit-util.h"
+
+namespace impala {
+
+BufferAllocator::BufferAllocator(int64_t min_buffer_len)
+ : min_buffer_len_(min_buffer_len) {}
+
+Status BufferAllocator::Allocate(int64_t len, uint8_t** buffer) {
+ DCHECK_GE(len, min_buffer_len_);
+ DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len));
+
+ *buffer = reinterpret_cast<uint8_t*>(malloc(len));
+ if (*buffer == NULL) return Status(TErrorCode::BUFFER_ALLOCATION_FAILED, len);
+ return Status::OK();
+}
+
+void BufferAllocator::Free(uint8_t* buffer, int64_t len) {
+ free(buffer);
+}
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/be/src/bufferpool/buffer-allocator.h
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/buffer-allocator.h b/be/src/bufferpool/buffer-allocator.h
new file mode 100644
index 0000000..c3e0c70
--- /dev/null
+++ b/be/src/bufferpool/buffer-allocator.h
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_BUFFER_ALLOCATOR_H
+#define IMPALA_BUFFER_ALLOCATOR_H
+
+#include "common/status.h"
+
+namespace impala {
+
+/// The underlying memory allocator for the buffer pool. All buffers are allocated through
+/// the BufferPool's BufferAllocator. The allocator only handles allocating buffers that
+/// are power-of-two multiples of the minimum buffer length.
+///
+/// TODO:
+/// * Allocate memory with mmap() instead of malloc().
+/// * Implement free lists in the allocator or external to the allocator.
+class BufferAllocator {
+ public:
+ BufferAllocator(int64_t min_buffer_len);
+
+ /// Allocate memory for a buffer of 'len' bytes. 'len' must be a power-of-two multiple
+ /// of the minimum buffer length.
+ Status Allocate(int64_t len, uint8_t** buffer);
+
+ /// Free the memory for a previously-allocated buffer.
+ void Free(uint8_t* buffer, int64_t len);
+
+ private:
+ const int64_t min_buffer_len_;
+};
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/be/src/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/buffer-pool-test.cc b/be/src/bufferpool/buffer-pool-test.cc
index cdb163d..d45f017 100644
--- a/be/src/bufferpool/buffer-pool-test.cc
+++ b/be/src/bufferpool/buffer-pool-test.cc
@@ -28,6 +28,7 @@
#include "bufferpool/reservation-tracker.h"
#include "common/init.h"
#include "common/object-pool.h"
+#include "testutil/death-test-util.h"
#include "testutil/test-macros.h"
#include "common/names.h"
@@ -51,13 +52,16 @@ class BufferPoolTest : public ::testing::Test {
}
/// The minimum buffer size used in most tests.
- const static int64_t TEST_PAGE_LEN = 1024;
+ const static int64_t TEST_BUFFER_LEN = 1024;
/// Test helper to simulate registering then deregistering a number of queries with
/// the given initial reservation and reservation limit.
void RegisterQueriesAndClients(BufferPool* pool, int query_id_hi, int num_queries,
int64_t initial_query_reservation, int64_t query_reservation_limit);
+ /// Create and destroy a page multiple times.
+ void CreatePageLoop(BufferPool* pool, ReservationTracker* parent_tracker, int num_ops);
+
protected:
static int64_t QueryId(int hi, int lo) { return static_cast<int64_t>(hi) << 32 | lo; }
@@ -85,7 +89,7 @@ class BufferPoolTest : public ::testing::Test {
SpinLock query_reservations_lock_;
};
-const int64_t BufferPoolTest::TEST_PAGE_LEN;
+const int64_t BufferPoolTest::TEST_BUFFER_LEN;
void BufferPoolTest::RegisterQueriesAndClients(BufferPool* pool, int query_id_hi,
int num_queries, int64_t initial_query_reservation, int64_t query_reservation_limit) {
@@ -156,7 +160,7 @@ TEST_F(BufferPoolTest, BasicRegistration) {
int64_t total_mem = sum_initial_reservations * num_concurrent_queries;
global_reservations_.InitRootTracker(NewProfile(), total_mem);
- BufferPool pool(TEST_PAGE_LEN, total_mem);
+ BufferPool pool(TEST_BUFFER_LEN, total_mem);
RegisterQueriesAndClients(
&pool, 0, num_concurrent_queries, sum_initial_reservations, reservation_limit);
@@ -179,7 +183,7 @@ TEST_F(BufferPoolTest, ConcurrentRegistration) {
int64_t total_mem = num_concurrent_queries * sum_initial_reservations;
global_reservations_.InitRootTracker(NewProfile(), total_mem);
- BufferPool pool(TEST_PAGE_LEN, total_mem);
+ BufferPool pool(TEST_BUFFER_LEN, total_mem);
// Launch threads, each with a different set of query IDs.
thread_group workers;
@@ -195,32 +199,355 @@ TEST_F(BufferPoolTest, ConcurrentRegistration) {
global_reservations_.Close();
}
-/// Test that reservation setup fails if the initial buffers cannot be fulfilled.
-TEST_F(BufferPoolTest, QueryReservationsUnfulfilled) {
- Status status;
- int num_queries = 128;
- int64_t reservation_per_query = 128;
- // Won't be able to fulfill initial reservation for last query.
- int64_t total_mem = num_queries * reservation_per_query - 1;
- global_reservations_.InitRootTracker(NewProfile(), total_mem);
+/// Test basic page handle creation.
+TEST_F(BufferPoolTest, PageCreation) {
+ // Allocate many pages, each a power-of-two multiple of the minimum page length.
+ int num_pages = 16;
+ int64_t max_page_len = TEST_BUFFER_LEN << (num_pages - 1);
+ int64_t total_mem = 2 * 2 * max_page_len;
+ global_reservations_.InitRootTracker(NULL, total_mem);
+ BufferPool pool(TEST_BUFFER_LEN, total_mem);
+ ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
+ client_tracker->InitChildTracker(NewProfile(), &global_reservations_, NULL, total_mem);
+ ASSERT_TRUE(client_tracker->IncreaseReservation(total_mem));
+ BufferPool::Client client;
+ ASSERT_OK(pool.RegisterClient("test client", client_tracker, &client));
+
+ vector<BufferPool::PageHandle> handles(num_pages);
+
+ // Create pages of various valid sizes.
+ for (int i = 0; i < num_pages; ++i) {
+ int size_multiple = 1 << i;
+ int64_t page_len = TEST_BUFFER_LEN * size_multiple;
+ int64_t used_before = client_tracker->GetUsedReservation();
+ ASSERT_OK(pool.CreatePage(&client, page_len, &handles[i]));
+ ASSERT_TRUE(handles[i].is_open());
+ ASSERT_TRUE(handles[i].is_pinned());
+ ASSERT_TRUE(handles[i].buffer_handle() != NULL);
+ ASSERT_TRUE(handles[i].data() != NULL);
+ ASSERT_EQ(handles[i].buffer_handle()->data(), handles[i].data());
+ ASSERT_EQ(handles[i].len(), page_len);
+ ASSERT_EQ(handles[i].buffer_handle()->len(), page_len);
+ DCHECK_EQ(client_tracker->GetUsedReservation(), used_before + page_len);
+ }
- for (int i = 0; i < num_queries; ++i) {
- ReservationTracker* query_tracker = GetQueryReservationTracker(i);
- query_tracker->InitChildTracker(
- NewProfile(), &global_reservations_, NULL, 2 * reservation_per_query);
- bool got_initial_reservation =
- query_tracker->IncreaseReservationToFit(reservation_per_query);
- if (i < num_queries - 1) {
- ASSERT_TRUE(got_initial_reservation);
- } else {
- ASSERT_FALSE(got_initial_reservation);
-
- // Getting the initial reservation should succeed after freeing up buffers from
- // other query.
- GetQueryReservationTracker(i - 1)->Close();
- ASSERT_TRUE(query_tracker->IncreaseReservationToFit(reservation_per_query));
+ // Close the handles and check memory consumption.
+ for (int i = 0; i < num_pages; ++i) {
+ int64_t used_before = client_tracker->GetUsedReservation();
+ int page_len = handles[i].len();
+ pool.DestroyPage(&client, &handles[i]);
+ DCHECK_EQ(client_tracker->GetUsedReservation(), used_before - page_len);
+ }
+
+ pool.DeregisterClient(&client);
+ client_tracker->Close();
+
+ // All the reservations should be released at this point.
+ DCHECK_EQ(global_reservations_.GetReservation(), 0);
+ global_reservations_.Close();
+}
+
+TEST_F(BufferPoolTest, BufferAllocation) {
+ // Allocate many buffers, each a power-of-two multiple of the minimum buffer length.
+ int num_buffers = 16;
+ int64_t max_buffer_len = TEST_BUFFER_LEN << (num_buffers - 1);
+ int64_t total_mem = 2 * 2 * max_buffer_len;
+ global_reservations_.InitRootTracker(NULL, total_mem);
+ BufferPool pool(TEST_BUFFER_LEN, total_mem);
+ ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
+ client_tracker->InitChildTracker(NewProfile(), &global_reservations_, NULL, total_mem);
+ ASSERT_TRUE(client_tracker->IncreaseReservationToFit(total_mem));
+ BufferPool::Client client;
+ ASSERT_OK(pool.RegisterClient("test client", client_tracker, &client));
+
+ vector<BufferPool::BufferHandle> handles(num_buffers);
+
+ // Create buffers of various valid sizes.
+ for (int i = 0; i < num_buffers; ++i) {
+ int size_multiple = 1 << i;
+ int64_t buffer_len = TEST_BUFFER_LEN * size_multiple;
+ int64_t used_before = client_tracker->GetUsedReservation();
+ ASSERT_OK(pool.AllocateBuffer(&client, buffer_len, &handles[i]));
+ ASSERT_TRUE(handles[i].is_open());
+ ASSERT_TRUE(handles[i].data() != NULL);
+ ASSERT_EQ(handles[i].len(), buffer_len);
+ DCHECK_EQ(client_tracker->GetUsedReservation(), used_before + buffer_len);
+ }
+
+ // Close the handles and check memory consumption.
+ for (int i = 0; i < num_buffers; ++i) {
+ int64_t used_before = client_tracker->GetUsedReservation();
+ int buffer_len = handles[i].len();
+ pool.FreeBuffer(&client, &handles[i]);
+ DCHECK_EQ(client_tracker->GetUsedReservation(), used_before - buffer_len);
+ }
+
+ pool.DeregisterClient(&client);
+ client_tracker->Close();
+
+ // All the reservations should be released at this point.
+ DCHECK_EQ(global_reservations_.GetReservation(), 0);
+ global_reservations_.Close();
+}
+
+/// Test transfer of buffer handles between clients.
+TEST_F(BufferPoolTest, BufferTransfer) {
+ // Each client needs to have enough reservation for a buffer.
+ const int num_clients = 5;
+ int64_t total_mem = num_clients * TEST_BUFFER_LEN;
+ global_reservations_.InitRootTracker(NULL, total_mem);
+ BufferPool pool(TEST_BUFFER_LEN, total_mem);
+ ReservationTracker client_trackers[num_clients];
+ BufferPool::Client clients[num_clients];
+ BufferPool::BufferHandle handles[num_clients];
+ for (int i = 0; i < num_clients; ++i) {
+ client_trackers[i].InitChildTracker(
+ NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN);
+ ASSERT_TRUE(client_trackers[i].IncreaseReservationToFit(TEST_BUFFER_LEN));
+ ASSERT_OK(pool.RegisterClient("test client", &client_trackers[i], &clients[i]));
+ }
+
+ // Transfer the page around between the clients repeatedly in a circle.
+ ASSERT_OK(pool.AllocateBuffer(&clients[0], TEST_BUFFER_LEN, &handles[0]));
+ uint8_t* data = handles[0].data();
+ for (int iter = 0; iter < 10; ++iter) {
+ for (int client = 0; client < num_clients; ++client) {
+ int next_client = (client + 1) % num_clients;
+ ASSERT_OK(pool.TransferBuffer(&clients[client], &handles[client],
+ &clients[next_client], &handles[next_client]));
+ // Check that the transfer left things in a consistent state.
+ ASSERT_FALSE(handles[client].is_open());
+ ASSERT_EQ(0, client_trackers[client].GetUsedReservation());
+ ASSERT_TRUE(handles[next_client].is_open());
+ ASSERT_EQ(TEST_BUFFER_LEN, client_trackers[next_client].GetUsedReservation());
+ // The same underlying buffer should be used.
+ ASSERT_EQ(data, handles[next_client].data());
}
}
+
+ pool.FreeBuffer(&clients[0], &handles[0]);
+ for (int i = 0; i < num_clients; ++i) {
+ pool.DeregisterClient(&clients[i]);
+ client_trackers[i].Close();
+ }
+ DCHECK_EQ(global_reservations_.GetReservation(), 0);
+ global_reservations_.Close();
+}
+
+/// Test basic pinning and unpinning.
+TEST_F(BufferPoolTest, Pin) {
+ int64_t total_mem = TEST_BUFFER_LEN * 1024;
+ // Set up client with enough reservation to pin twice.
+ int64_t child_reservation = TEST_BUFFER_LEN * 2;
+ BufferPool pool(TEST_BUFFER_LEN, total_mem);
+ global_reservations_.InitRootTracker(NULL, total_mem);
+ ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
+ client_tracker->InitChildTracker(
+ NewProfile(), &global_reservations_, NULL, child_reservation);
+ ASSERT_TRUE(client_tracker->IncreaseReservationToFit(child_reservation));
+ BufferPool::Client client;
+ ASSERT_OK(pool.RegisterClient("test client", client_tracker, &client));
+
+ BufferPool::PageHandle handle1, handle2;
+
+ // Can pin two minimum sized pages.
+ ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1));
+ ASSERT_TRUE(handle1.is_open());
+ ASSERT_TRUE(handle1.is_pinned());
+ ASSERT_TRUE(handle1.data() != NULL);
+ ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2));
+ ASSERT_TRUE(handle2.is_open());
+ ASSERT_TRUE(handle2.is_pinned());
+ ASSERT_TRUE(handle2.data() != NULL);
+
+ pool.Unpin(&client, &handle2);
+ ASSERT_FALSE(handle2.is_pinned());
+
+ // Can pin minimum-sized page twice.
+ ASSERT_OK(pool.Pin(&client, &handle1));
+ ASSERT_TRUE(handle1.is_pinned());
+ // Have to unpin twice.
+ pool.Unpin(&client, &handle1);
+ ASSERT_TRUE(handle1.is_pinned());
+ pool.Unpin(&client, &handle1);
+ ASSERT_FALSE(handle1.is_pinned());
+
+ // Can pin double-sized page only once.
+ BufferPool::PageHandle double_handle;
+ ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN * 2, &double_handle));
+ ASSERT_TRUE(double_handle.is_open());
+ ASSERT_TRUE(double_handle.is_pinned());
+ ASSERT_TRUE(double_handle.data() != NULL);
+
+ // Destroy the pages - test destroying both pinned and unpinned.
+ pool.DestroyPage(&client, &handle1);
+ pool.DestroyPage(&client, &handle2);
+ pool.DestroyPage(&client, &double_handle);
+
+ pool.DeregisterClient(&client);
+ client_tracker->Close();
+}
+
+/// Creating a page or pinning without sufficient reservation should DCHECK.
+TEST_F(BufferPoolTest, PinWithoutReservation) {
+ int64_t total_mem = TEST_BUFFER_LEN * 1024;
+ BufferPool pool(TEST_BUFFER_LEN, total_mem);
+ global_reservations_.InitRootTracker(NULL, total_mem);
+ ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
+ client_tracker->InitChildTracker(
+ NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN);
+ BufferPool::Client client;
+ ASSERT_OK(pool.RegisterClient("test client", client_tracker, &client));
+
+ BufferPool::PageHandle handle;
+ IMPALA_ASSERT_DEBUG_DEATH(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle), "");
+
+ // Should succeed after increasing reservation.
+ ASSERT_TRUE(client_tracker->IncreaseReservationToFit(TEST_BUFFER_LEN));
+ ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle));
+
+ // But we can't pin again.
+ IMPALA_ASSERT_DEBUG_DEATH(pool.Pin(&client, &handle), "");
+
+ pool.DestroyPage(&client, &handle);
+ pool.DeregisterClient(&client);
+ client_tracker->Close();
+}
+
+TEST_F(BufferPoolTest, ExtractBuffer) {
+ int64_t total_mem = TEST_BUFFER_LEN * 1024;
+ // Set up client with enough reservation for two buffers/pins.
+ int64_t child_reservation = TEST_BUFFER_LEN * 2;
+ BufferPool pool(TEST_BUFFER_LEN, total_mem);
+ global_reservations_.InitRootTracker(NULL, total_mem);
+ ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
+ client_tracker->InitChildTracker(
+ NewProfile(), &global_reservations_, NULL, child_reservation);
+ ASSERT_TRUE(client_tracker->IncreaseReservationToFit(child_reservation));
+ BufferPool::Client client;
+ ASSERT_OK(pool.RegisterClient("test client", client_tracker, &client));
+
+ BufferPool::PageHandle page;
+ BufferPool::BufferHandle buffer;
+
+ // Test basic buffer extraction.
+ for (int len = TEST_BUFFER_LEN; len <= 2 * TEST_BUFFER_LEN; len *= 2) {
+ ASSERT_OK(pool.CreatePage(&client, len, &page));
+ uint8_t* page_data = page.data();
+ pool.ExtractBuffer(&client, &page, &buffer);
+ ASSERT_FALSE(page.is_open());
+ ASSERT_TRUE(buffer.is_open());
+ ASSERT_EQ(len, buffer.len());
+ ASSERT_EQ(page_data, buffer.data());
+ ASSERT_EQ(len, client_tracker->GetUsedReservation());
+ pool.FreeBuffer(&client, &buffer);
+ ASSERT_EQ(0, client_tracker->GetUsedReservation());
+ }
+
+ // Test that ExtractBuffer() accounts correctly for pin count > 1.
+ ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page));
+ uint8_t* page_data = page.data();
+ ASSERT_OK(pool.Pin(&client, &page));
+ ASSERT_EQ(TEST_BUFFER_LEN * 2, client_tracker->GetUsedReservation());
+ pool.ExtractBuffer(&client, &page, &buffer);
+ ASSERT_EQ(TEST_BUFFER_LEN, client_tracker->GetUsedReservation());
+ ASSERT_FALSE(page.is_open());
+ ASSERT_TRUE(buffer.is_open());
+ ASSERT_EQ(TEST_BUFFER_LEN, buffer.len());
+ ASSERT_EQ(page_data, buffer.data());
+ pool.FreeBuffer(&client, &buffer);
+ ASSERT_EQ(0, client_tracker->GetUsedReservation());
+
+ // Test that ExtractBuffer() DCHECKs for unpinned pages.
+ ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page));
+ pool.Unpin(&client, &page);
+ IMPALA_ASSERT_DEBUG_DEATH(pool.ExtractBuffer(&client, &page, &buffer), "");
+ pool.DestroyPage(&client, &page);
+
+ pool.DeregisterClient(&client);
+ client_tracker->Close();
+}
+
+// Test concurrent creation and destruction of pages.
+TEST_F(BufferPoolTest, ConcurrentPageCreation) {
+ int ops_per_thread = 1024;
+ int num_threads = 64;
+ // Need enough buffers for all initial reservations.
+ int total_mem = num_threads * TEST_BUFFER_LEN;
+ global_reservations_.InitRootTracker(NULL, total_mem);
+
+ BufferPool pool(TEST_BUFFER_LEN, total_mem);
+
+ // Launch threads, each with a different set of query IDs.
+ thread_group workers;
+ for (int i = 0; i < num_threads; ++i) {
+ workers.add_thread(new thread(bind(&BufferPoolTest::CreatePageLoop, this, &pool,
+ &global_reservations_, ops_per_thread)));
+ }
+
+ // Build debug string to test concurrent iteration over pages_ list.
+ for (int i = 0; i < 64; ++i) {
+ LOG(INFO) << pool.DebugString();
+ }
+ workers.join_all();
+
+ // All the reservations should be released at this point.
+ DCHECK_EQ(global_reservations_.GetChildReservations(), 0);
+ global_reservations_.Close();
+}
+
+void BufferPoolTest::CreatePageLoop(
+ BufferPool* pool, ReservationTracker* parent_tracker, int num_ops) {
+ ReservationTracker client_tracker;
+ client_tracker.InitChildTracker(NewProfile(), parent_tracker, NULL, TEST_BUFFER_LEN);
+ BufferPool::Client client;
+ ASSERT_OK(pool->RegisterClient("test client", &client_tracker, &client));
+ for (int i = 0; i < num_ops; ++i) {
+ BufferPool::PageHandle handle;
+ ASSERT_TRUE(client_tracker.IncreaseReservation(TEST_BUFFER_LEN));
+ ASSERT_OK(pool->CreatePage(&client, TEST_BUFFER_LEN, &handle));
+ pool->Unpin(&client, &handle);
+ ASSERT_OK(pool->Pin(&client, &handle));
+ pool->DestroyPage(&client, &handle);
+ client_tracker.DecreaseReservation(TEST_BUFFER_LEN);
+ }
+ pool->DeregisterClient(&client);
+ client_tracker.Close();
+}
+
+/// Test error path where pool is unable to fulfill a reservation because it cannot evict
+/// unpinned pages.
+TEST_F(BufferPoolTest, CapacityExhausted) {
+ global_reservations_.InitRootTracker(NULL, TEST_BUFFER_LEN);
+ // TODO: once we enable spilling, set up buffer pool so that spilling is disabled.
+ // Set up pool with one more buffer than reservations (so that we hit the reservation
+ // limit instead of the buffer limit).
+ BufferPool pool(TEST_BUFFER_LEN, TEST_BUFFER_LEN * 2);
+
+ BufferPool::PageHandle handle1, handle2, handle3;
+
+ BufferPool::Client client;
+ ASSERT_OK(pool.RegisterClient("test client", &global_reservations_, &client));
+ ASSERT_TRUE(global_reservations_.IncreaseReservation(TEST_BUFFER_LEN));
+ ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1));
+
+ // Do not have enough reservations because we pinned the page.
+ IMPALA_ASSERT_DEBUG_DEATH(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2), "");
+
+ // Even with reservations, we can only create one more unpinned page because we don't
+ // support eviction yet.
+ pool.Unpin(&client, &handle1);
+ ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2));
+ pool.Unpin(&client, &handle2);
+ ASSERT_FALSE(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle3).ok());
+
+ // After destroying a page, we should have a free buffer that we can use.
+ pool.DestroyPage(&client, &handle1);
+ ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle3));
+
+ pool.DestroyPage(&client, &handle2);
+ pool.DestroyPage(&client, &handle3);
+ pool.DeregisterClient(&client);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/be/src/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/buffer-pool.cc b/be/src/bufferpool/buffer-pool.cc
index 21893fc..c89b8f6 100644
--- a/be/src/bufferpool/buffer-pool.cc
+++ b/be/src/bufferpool/buffer-pool.cc
@@ -31,13 +31,151 @@ using strings::Substitute;
namespace impala {
+/// The internal representation of a page, which can be pinned or unpinned. If the
+/// page is pinned, a buffer is associated with the page.
+///
+/// Code manipulating the page is responsible for acquiring 'lock' when reading or
+/// modifying the page.
+struct BufferPool::Page : public BufferPool::PageList::Node {
+ Page(int64_t len) : len(len), pin_count(0), dirty(false) {}
+
+ /// Increment the pin count. Caller must hold 'lock'.
+ void IncrementPinCount(PageHandle* handle) {
+ lock.DCheckLocked();
+ ++pin_count;
+ // Pinned page buffers may be modified by anyone with a pointer to the buffer, so we
+ // have to assume they are dirty.
+ dirty = true;
+ }
+
+ /// Decrement the pin count. Caller must hold 'lock'.
+ void DecrementPinCount(PageHandle* handle) {
+ lock.DCheckLocked();
+ DCHECK(pin_count > 0);
+ --pin_count;
+ }
+
+ string DebugString() {
+ return Substitute("<BufferPool::Page> $0 len: $1 pin_count: $2 buf: $3 dirty: $4", this,
+ len, pin_count, buffer.DebugString(), dirty);
+ }
+
+ // Helper for BufferPool::DebugString().
+ static bool DebugStringCallback(stringstream* ss, BufferPool::Page* page) {
+ lock_guard<SpinLock> pl(page->lock);
+ (*ss) << page->DebugString() << "\n";
+ return true;
+ }
+
+ /// The length of the page in bytes.
+ const int64_t len;
+
+ /// Lock to protect the below members of Page. The lock must be held when modifying any
+ /// of the below members and when reading any of the below members of an unpinned page.
+ SpinLock lock;
+
+ /// The pin count of the page.
+ int pin_count;
+
+ /// Buffer with the page's contents, Always open if pinned. Closed if page is unpinned
+ /// and was evicted from memory.
+ BufferHandle buffer;
+
+ /// True if the buffer's contents need to be saved before evicting it from memory.
+ bool dirty;
+};
+
+BufferPool::BufferHandle::BufferHandle() {
+ Reset();
+}
+
+BufferPool::BufferHandle::BufferHandle(BufferHandle&& src) {
+ *this = std::move(src);
+}
+
+BufferPool::BufferHandle& BufferPool::BufferHandle::operator=(BufferHandle&& src) {
+ DCHECK(!is_open());
+ // Copy over all members then close src.
+ client_ = src.client_;
+ data_ = src.data_;
+ len_ = src.len_;
+ src.Reset();
+ return *this;
+}
+
+void BufferPool::BufferHandle::Open(const Client* client, uint8_t* data, int64_t len) {
+ client_ = client;
+ data_ = data;
+ len_ = len;
+}
+
+void BufferPool::BufferHandle::Reset() {
+ client_ = NULL;
+ data_ = NULL;
+ len_ = -1;
+}
+
+BufferPool::PageHandle::PageHandle() {
+ Reset();
+}
+
+BufferPool::PageHandle::PageHandle(PageHandle&& src) {
+ *this = std::move(src);
+}
+
+BufferPool::PageHandle& BufferPool::PageHandle::operator=(PageHandle&& src) {
+ DCHECK(!is_open());
+ // Copy over all members then close src.
+ page_ = src.page_;
+ client_ = src.client_;
+ src.Reset();
+ return *this;
+}
+
+void BufferPool::PageHandle::Open(Page* page, Client* client) {
+ DCHECK(!is_open());
+ page->lock.DCheckLocked();
+ page_ = page;
+ client_ = client;
+}
+
+void BufferPool::PageHandle::Reset() {
+ page_ = NULL;
+ client_ = NULL;
+}
+
+int BufferPool::PageHandle::pin_count() const {
+ DCHECK(is_open());
+ // The pin count can only be modified via this PageHandle, which must not be
+ // concurrently accessed by multiple threads, so it is safe to access without locking
+ return page_->pin_count;
+}
+
+int64_t BufferPool::PageHandle::len() const {
+ DCHECK(is_open());
+ // The length of the page cannot change, so it is safe to access without locking.
+ return page_->len;
+}
+
+const BufferPool::BufferHandle* BufferPool::PageHandle::buffer_handle() const {
+ DCHECK(is_pinned());
+ // The 'buffer' field cannot change while the page is pinned, so it is safe to access
+ // without locking.
+ return &page_->buffer;
+}
+
BufferPool::BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit)
- : min_buffer_len_(min_buffer_len), buffer_bytes_limit_(buffer_bytes_limit) {
+ : allocator_(new BufferAllocator(min_buffer_len)),
+ min_buffer_len_(min_buffer_len),
+ buffer_bytes_limit_(buffer_bytes_limit),
+ buffer_bytes_remaining_(buffer_bytes_limit) {
DCHECK_GT(min_buffer_len, 0);
DCHECK_EQ(min_buffer_len, BitUtil::RoundUpToPowerOfTwo(min_buffer_len));
}
-BufferPool::~BufferPool() {}
+BufferPool::~BufferPool() {
+ DCHECK(pages_.empty());
+}
Status BufferPool::RegisterClient(
const string& name, ReservationTracker* reservation, Client* client) {
@@ -55,15 +193,228 @@ void BufferPool::DeregisterClient(Client* client) {
client->reservation_ = NULL;
}
+Status BufferPool::CreatePage(Client* client, int64_t len, PageHandle* handle) {
+ DCHECK(!handle->is_open());
+ DCHECK_GE(len, min_buffer_len_);
+ DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len));
+
+ BufferHandle buffer;
+ // No changes have been made to state yet, so we can cleanly return on error.
+ RETURN_IF_ERROR(AllocateBufferInternal(client, len, &buffer));
+
+ Page* page = new Page(len);
+ {
+ lock_guard<SpinLock> pl(page->lock);
+ page->buffer = std::move(buffer);
+ handle->Open(page, client);
+ page->IncrementPinCount(handle);
+ }
+
+ // Only add to globally-visible list after page is initialized. The page lock also
+ // needs to be released before enqueueing to respect the lock ordering.
+ pages_.Enqueue(page);
+
+ client->reservation_->AllocateFrom(len);
+ return Status::OK();
+}
+
+void BufferPool::DestroyPage(Client* client, PageHandle* handle) {
+ if (!handle->is_open()) return; // DestroyPage() should be idempotent.
+
+ Page* page = handle->page_;
+ if (handle->is_pinned()) {
+ // In the pinned case, delegate to ExtractBuffer() and FreeBuffer() to do the work
+ // of cleaning up the page and freeing the buffer.
+ BufferHandle buffer;
+ ExtractBuffer(client, handle, &buffer);
+ FreeBuffer(client, &buffer);
+ return;
+ }
+
+ {
+ lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its state.
+ // In the unpinned case, no reservation is consumed, so just free the buffer.
+ // TODO: wait for in-flight writes for 'page' so we can safely free 'page'.
+ if (page->buffer.is_open()) FreeBufferInternal(&page->buffer);
+ }
+ CleanUpPage(handle);
+}
+
+void BufferPool::CleanUpPage(PageHandle* handle) {
+ // Remove the destroyed page from data structures in a way that ensures no other
+ // threads have a remaining reference. Threads that access pages via the 'pages_'
+ // list hold 'pages_.lock_', so Remove() will not return until those threads are done
+ // and it is safe to delete page.
+ pages_.Remove(handle->page_);
+ delete handle->page_;
+ handle->Reset();
+}
+
+Status BufferPool::Pin(Client* client, PageHandle* handle) {
+ DCHECK(client->is_registered());
+ DCHECK(handle->is_open());
+ DCHECK_EQ(handle->client_, client);
+
+ Page* page = handle->page_;
+ {
+ lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its state.
+ if (!page->buffer.is_open()) {
+ // No changes have been made to state yet, so we can cleanly return on error.
+ RETURN_IF_ERROR(AllocateBufferInternal(client, page->len, &page->buffer));
+ }
+ page->IncrementPinCount(handle);
+
+ // TODO: will need to initiate/wait for read if the page is not in-memory.
+ }
+
+ client->reservation_->AllocateFrom(page->len);
+ return Status::OK();
+}
+
+void BufferPool::Unpin(Client* client, PageHandle* handle) {
+ DCHECK(handle->is_open());
+ lock_guard<SpinLock> pl(handle->page_->lock);
+ UnpinLocked(client, handle);
+}
+
+void BufferPool::UnpinLocked(Client* client, PageHandle* handle) {
+ DCHECK(client->is_registered());
+ DCHECK_EQ(handle->client_, client);
+ // If handle is pinned, we can assume that the page itself is pinned.
+ DCHECK(handle->is_pinned());
+ Page* page = handle->page_;
+ page->lock.DCheckLocked();
+
+ page->DecrementPinCount(handle);
+ client->reservation_->ReleaseTo(page->len);
+
+ // TODO: can evict now. Only need to preserve contents if 'page->dirty' is true.
+}
+
+void BufferPool::ExtractBuffer(
+ Client* client, PageHandle* page_handle, BufferHandle* buffer_handle) {
+ DCHECK(page_handle->is_pinned());
+ DCHECK_EQ(page_handle->client_, client);
+
+ Page* page = page_handle->page_;
+ {
+ lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its state.
+ // TODO: wait for in-flight writes for 'page' so we can safely free 'page'.
+
+ // Bring the pin count to 1 so that we're not using surplus reservations.
+ while (page->pin_count > 1) UnpinLocked(client, page_handle);
+ *buffer_handle = std::move(page->buffer);
+ }
+ CleanUpPage(page_handle);
+}
+
+Status BufferPool::AllocateBuffer(Client* client, int64_t len, BufferHandle* handle) {
+ client->reservation_->AllocateFrom(len);
+ return AllocateBufferInternal(client, len, handle);
+}
+
+Status BufferPool::AllocateBufferInternal(
+ Client* client, int64_t len, BufferHandle* buffer) {
+ DCHECK(!buffer->is_open());
+ DCHECK_GE(len, min_buffer_len_);
+ DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len));
+
+ // If there is headroom in 'buffer_bytes_remaining_', we can just allocate a new buffer.
+ if (TryDecreaseBufferBytesRemaining(len)) {
+ uint8_t* data;
+ Status status = allocator_->Allocate(len, &data);
+ if (!status.ok()) {
+ buffer_bytes_remaining_.Add(len);
+ return status;
+ }
+ DCHECK(data != NULL);
+ buffer->Open(client, data, len);
+ return Status::OK();
+ }
+
+ // If there is no remaining capacity, we must evict another page.
+ return Status(TErrorCode::NOT_IMPLEMENTED_ERROR,
+ Substitute("Buffer bytes limit $0 of buffer pool is exhausted and page eviction is "
+ "not implemented yet!", buffer_bytes_limit_));
+}
+
+void BufferPool::FreeBuffer(Client* client, BufferHandle* handle) {
+ if (!handle->is_open()) return; // Should be idempotent.
+ DCHECK_EQ(client, handle->client_);
+ client->reservation_->ReleaseTo(handle->len_);
+ FreeBufferInternal(handle);
+}
+
+void BufferPool::FreeBufferInternal(BufferHandle* handle) {
+ DCHECK(handle->is_open());
+ allocator_->Free(handle->data(), handle->len());
+ buffer_bytes_remaining_.Add(handle->len());
+ handle->Reset();
+}
+
+Status BufferPool::TransferBuffer(
+ Client* src_client, BufferHandle* src, Client* dst_client, BufferHandle* dst) {
+ DCHECK(src->is_open());
+ DCHECK(!dst->is_open());
+ DCHECK_EQ(src_client, src->client_);
+ DCHECK_NE(src, dst);
+ DCHECK_NE(src_client, dst_client);
+
+ dst_client->reservation_->AllocateFrom(src->len());
+ src_client->reservation_->ReleaseTo(src->len());
+ *dst = std::move(*src);
+ dst->client_ = dst_client;
+ return Status::OK();
+}
+
+bool BufferPool::TryDecreaseBufferBytesRemaining(int64_t len) {
+ // TODO: we may want to change this policy so that we don't always use up to the limit
+ // for buffers, since this may starve other operators using non-buffer-pool memory.
+ while (true) {
+ int64_t old_value = buffer_bytes_remaining_.Load();
+ if (old_value < len) return false;
+ int64_t new_value = old_value - len;
+ if (buffer_bytes_remaining_.CompareAndSwap(old_value, new_value)) {
+ return true;
+ }
+ }
+}
+
string BufferPool::Client::DebugString() const {
- return Substitute("<BufferPool::Client> $0 name: $1 reservation: $2", this, name_,
- reservation_->DebugString());
+ if (is_registered()) {
+ return Substitute("<BufferPool::Client> $0 name: $1 reservation: {$2}", this, name_,
+ reservation_->DebugString());
+ } else {
+ return Substitute("<BufferPool::Client> $0 UNREGISTERED", this);
+ }
+}
+
+string BufferPool::PageHandle::DebugString() const {
+ if (is_open()) {
+ lock_guard<SpinLock> pl(page_->lock);
+ return Substitute(
+ "<BufferPool::PageHandle> $0 client: {$1} page: {$2}",
+ this, client_->DebugString(), page_->DebugString());
+ } else {
+ return Substitute("<BufferPool::PageHandle> $0 CLOSED", this);
+ }
+}
+
+string BufferPool::BufferHandle::DebugString() const {
+ if (is_open()) {
+ return Substitute("<BufferPool::BufferHandle> $0 client: {$1} data: $2 len: $3", this,
+ client_->DebugString(), data_, len_);
+ } else {
+ return Substitute("<BufferPool::BufferHandle> $0 CLOSED", this);
+ }
}
string BufferPool::DebugString() {
stringstream ss;
ss << "<BufferPool> " << this << " min_buffer_len: " << min_buffer_len_
- << "buffer_bytes_limit: " << buffer_bytes_limit_ << "\n";
+ << " buffer_bytes_limit: " << buffer_bytes_limit_
+ << " buffer_bytes_remaining: " << buffer_bytes_remaining_.Load() << "\n";
+ pages_.Iterate(bind<bool>(Page::DebugStringCallback, &ss, _1));
return ss.str();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/be/src/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/buffer-pool.h b/be/src/bufferpool/buffer-pool.h
index e566543..44b5574 100644
--- a/be/src/bufferpool/buffer-pool.h
+++ b/be/src/bufferpool/buffer-pool.h
@@ -18,10 +18,12 @@
#ifndef IMPALA_BUFFER_POOL_H
#define IMPALA_BUFFER_POOL_H
+#include <stdint.h>
+#include <boost/scoped_ptr.hpp>
#include <boost/thread/locks.hpp>
#include <string>
-#include <stdint.h>
+#include "bufferpool/buffer-allocator.h"
#include "common/atomic.h"
#include "common/status.h"
#include "gutil/macros.h"
@@ -30,6 +32,7 @@
namespace impala {
+class BufferAllocator;
class ReservationTracker;
/// A buffer pool that manages memory buffers for all queries in an Impala daemon.
@@ -69,7 +72,7 @@ class ReservationTracker;
/// Buffer/Page Sizes
/// =================
/// The buffer pool has a minimum buffer size, which must be a power-of-two. Page and
-/// buffer sizes must be an exact multiple of the minimum buffer size.
+/// buffer sizes must be an exact power-of-two multiple of the minimum buffer size.
///
/// Reservations
/// ============
@@ -140,11 +143,18 @@ class ReservationTracker;
/// +========================+
/// | IMPLEMENTATION DETAILS |
/// +========================+
-/// ... TODO ...
+///
+/// Lock Ordering
+/// =============
+/// The lock ordering is:
+/// * pages_::lock_ -> Page::lock_
+///
+/// If a reference to a page is acquired via the pages_ list, pages_::lock_ must be held
+/// until done with the page to ensure the page isn't concurrently deleted.
class BufferPool {
public:
- class Client;
class BufferHandle;
+ class Client;
class PageHandle;
/// Constructs a new buffer pool.
@@ -198,9 +208,10 @@ class BufferPool {
/// Extracts buffer from a pinned page. After this returns, the page referenced by
/// 'page_handle' will be destroyed and 'buffer_handle' will reference the buffer from
- /// 'page_handle'. This may decrease reservation usage if the page was pinned multiple
- /// times via 'page_handle'.
- void ExtractBuffer(PageHandle* page_handle, BufferHandle* buffer_handle);
+ /// 'page_handle'. This may decrease reservation usage of 'client' if the page was
+ /// pinned multiple times via 'page_handle'.
+ void ExtractBuffer(
+ Client* client, PageHandle* page_handle, BufferHandle* buffer_handle);
/// Allocates a new buffer of 'len' bytes. Uses reservation from 'client'. The caller
/// is responsible for ensuring it has enough unused reservation before calling
@@ -214,9 +225,9 @@ class BufferPool {
/// Transfer ownership of buffer from 'src_client' to 'dst_client' and move the
/// handle from 'src' to 'dst'. Increases reservation usage in 'dst_client' and
- /// decreases reservation usage in 'src_client'. 'src' must be open and 'dst' must
- /// be closed
- /// before calling. After a successful call, 'src' is closed and 'dst' is open.
+ /// decreases reservation usage in 'src_client'. 'src' must be open and 'dst' must be
+ /// closed before calling. 'src'/'dst' and 'src_client'/'dst_client' must be different.
+ /// After a successful call, 'src' is closed and 'dst' is open.
Status TransferBuffer(Client* src_client, BufferHandle* src, Client* dst_client,
BufferHandle* dst);
@@ -228,13 +239,49 @@ class BufferPool {
private:
DISALLOW_COPY_AND_ASSIGN(BufferPool);
+ struct Page;
+
+ /// Same as Unpin(), except the lock for the page referenced by 'handle' must be held
+ /// by the caller.
+ void UnpinLocked(Client* client, PageHandle* handle);
+
+ /// Perform the cleanup of the page object and handle when the page is destroyed.
+ /// Reset 'handle', free the Page object and remove the 'pages_' entry.
+ /// The 'handle->page_' lock should *not* be held by the caller.
+ void CleanUpPage(PageHandle* handle);
- /// The minimum length of a buffer in bytes. All buffers and pages are a multiple of
- /// this length. This is always a power of two.
+ /// Allocate a buffer of length 'len'. Assumes that the client's reservation has already
+ /// been consumed for the buffer. Returns an error if the pool is unable to fulfill the
+ /// reservation.
+ Status AllocateBufferInternal(Client* client, int64_t len, BufferHandle* buffer);
+
+ /// Frees 'buffer', which must be open before calling. Closes 'buffer' and updates
+ /// internal state but does not release to any reservation.
+ void FreeBufferInternal(BufferHandle* buffer);
+
+ /// Check if we can allocate another buffer of size 'len' bytes without
+ /// 'buffer_bytes_remaining_' going negative.
+ /// Returns true and decrease 'buffer_bytes_remaining_' by 'len' if successful.
+ bool TryDecreaseBufferBytesRemaining(int64_t len);
+
+ /// Allocator for allocating and freeing all buffer memory.
+ boost::scoped_ptr<BufferAllocator> allocator_;
+
+ /// The minimum length of a buffer in bytes. All buffers and pages are a power-of-two
+ /// multiple of this length. This is always a power of two.
const int64_t min_buffer_len_;
/// The maximum physical memory in bytes that can be used for buffers.
const int64_t buffer_bytes_limit_;
+
+ /// The remaining number of bytes of 'buffer_bytes_limit_' that can be used for
+ /// allocating new buffers. Must be updated atomically before a new buffer is
+ /// allocated or after an existing buffer is freed.
+ AtomicInt64 buffer_bytes_remaining_;
+
+ /// List containing all pages. Protected by the list's internal lock.
+ typedef InternalQueue<Page> PageList;
+ PageList pages_;
};
/// External representation of a client of the BufferPool. Clients are used for
@@ -266,6 +313,54 @@ class BufferPool::Client {
ReservationTracker* reservation_;
};
+/// A handle to a buffer allocated from the buffer pool. Each BufferHandle should only
+/// be used by a single thread at a time: concurrently calling BufferHandle methods or
+/// BufferPool methods with the BufferHandle as an argument is not supported.
+class BufferPool::BufferHandle {
+ public:
+ BufferHandle();
+ ~BufferHandle() { DCHECK(!is_open()); }
+
+ /// Allow move construction of handles, to support std::move().
+ BufferHandle(BufferHandle&& src);
+
+ /// Allow move assignment of handles, to support STL classes like std::vector.
+ /// Destination must be uninitialized.
+ BufferHandle& operator=(BufferHandle&& src);
+
+ bool is_open() const { return data_ != NULL; }
+ int64_t len() const {
+ DCHECK(is_open());
+ return len_;
+ }
+ /// Get a pointer to the start of the buffer.
+ uint8_t* data() const {
+ DCHECK(is_open());
+ return data_;
+ }
+
+ std::string DebugString() const;
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(BufferHandle);
+ friend class BufferPool;
+
+ /// Internal helper to set the handle to an opened state.
+ void Open(const Client* client, uint8_t* data, int64_t len);
+
+ /// Internal helper to reset the handle to an unopened state.
+ void Reset();
+
+ /// The client the buffer handle belongs to, used to validate that the correct client
+ /// is provided in BufferPool method calls.
+ const Client* client_;
+
+ /// Pointer to the start of the buffer. Non-NULL if open, NULL if closed.
+ uint8_t* data_;
+
+ /// Length of the buffer in bytes.
+ int64_t len_;
+};
/// The handle for a page used by clients of the BufferPool. Each PageHandle should
/// only be used by a single thread at a time: concurrently calling PageHandle methods
@@ -282,12 +377,13 @@ class BufferPool::PageHandle {
// Destination must be closed.
PageHandle& operator=(PageHandle&& src);
- bool is_open() const;
- bool is_pinned() const;
+ bool is_open() const { return page_ != NULL; }
+ bool is_pinned() const { return pin_count() > 0; }
+ int pin_count() const;
int64_t len() const;
/// Get a pointer to the start of the page's buffer. Only valid to call if the page
/// is pinned via this handle.
- uint8_t* data() const;
+ uint8_t* data() const { return buffer_handle()->data(); }
/// Return a pointer to the page's buffer handle. Only valid to call if the page is
/// pinned via this handle. Only const accessors of the returned handle can be used:
@@ -299,32 +395,21 @@ class BufferPool::PageHandle {
private:
DISALLOW_COPY_AND_ASSIGN(PageHandle);
-};
-
-/// A handle to a buffer allocated from the buffer pool. Each BufferHandle should only
-/// be used by a single thread at a time: concurrently calling BufferHandle methods or
-/// BufferPool methods with the BufferHandle as an argument is not supported.
-class BufferPool::BufferHandle {
- public:
- BufferHandle();
- ~BufferHandle() { DCHECK(!is_open()); }
+ friend class BufferPool;
+ friend class Page;
- /// Allow move construction of handles, to support std::move().
- BufferHandle(BufferHandle&& src);
+ /// Internal helper to open the handle for the given page.
+ void Open(Page* page, Client* client);
- /// Allow move assignment of handles, to support STL classes like std::vector.
- /// Destination must be uninitialized.
- BufferHandle& operator=(BufferHandle&& src);
+ /// Internal helper to reset the handle to an unopened state.
+ void Reset();
- bool is_open() const;
- int64_t len() const;
- /// Get a pointer to the start of the buffer.
- uint8_t* data() const;
+ /// The internal page structure. NULL if the handle is not open.
+ Page* page_;
- std::string DebugString() const;
-
- private:
- DISALLOW_COPY_AND_ASSIGN(BufferHandle);
+ /// The client the page handle belongs to, used to validate that the correct client
+ /// is being used.
+ const Client* client_;
};
}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/be/src/util/internal-queue.h
----------------------------------------------------------------------
diff --git a/be/src/util/internal-queue.h b/be/src/util/internal-queue.h
index 08365d7..37a9a0c 100644
--- a/be/src/util/internal-queue.h
+++ b/be/src/util/internal-queue.h
@@ -19,6 +19,7 @@
#ifndef IMPALA_UTIL_INTERNAL_QUEUE_H
#define IMPALA_UTIL_INTERNAL_QUEUE_H
+#include <boost/function.hpp>
#include <boost/thread/locks.hpp>
#include "util/spinlock.h"
@@ -231,6 +232,16 @@ class InternalQueue {
return true;
}
+ // Iterate over elements of queue, calling 'fn' for each element. If 'fn' returns
+ // false, terminate iteration. It is invalid to call other InternalQueue methods
+ // from 'fn'.
+ void Iterate(boost::function<bool(T*)> fn) {
+ boost::lock_guard<SpinLock> lock(lock_);
+ for (Node* current = head_; current != NULL; current = current->next) {
+ if (!fn(reinterpret_cast<T*>(current))) return;
+ }
+ }
+
/// Prints the queue ptrs to a string.
std::string DebugString() {
std::stringstream ss;
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 3d48005..2554a18 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -283,7 +283,9 @@ error_codes = (
"supported length of 2147483647 bytes."),
("SCRATCH_LIMIT_EXCEEDED", 91, "Scratch space limit of $0 bytes exceeded for query "
- "while spilling data to disk.")
+ "while spilling data to disk."),
+
+ ("BUFFER_ALLOCATION_FAILED", 92, "Unexpected error allocating $0 byte buffer."),
)
import sys