You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/09/29 00:32:38 UTC

[6/6] incubator-impala git commit: IMPALA-3201: in-memory buffer pool implementation

IMPALA-3201: in-memory buffer pool implementation

This patch implements basic in-memory buffer management, with
reservations managed by ReservationTrackers.

Locks are fine-grained so that the buffer pool can scale to many
concurrent queries.

Includes basic tests for buffer pool setup, allocation and reservations.

Change-Id: I4bda61c31cc02d26bc83c3d458c835b0984b86a0
Reviewed-on: http://gerrit.cloudera.org:8080/4070
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/241c7e01
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/241c7e01
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/241c7e01

Branch: refs/heads/master
Commit: 241c7e01978f180012453b0a4ff6d061ca6d5093
Parents: 9cee2b5
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Fri Aug 19 17:41:45 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Wed Sep 28 23:38:20 2016 +0000

----------------------------------------------------------------------
 be/src/bufferpool/CMakeLists.txt      |   1 +
 be/src/bufferpool/buffer-allocator.cc |  39 +++
 be/src/bufferpool/buffer-allocator.h  |  48 ++++
 be/src/bufferpool/buffer-pool-test.cc | 381 +++++++++++++++++++++++++++--
 be/src/bufferpool/buffer-pool.cc      | 361 ++++++++++++++++++++++++++-
 be/src/bufferpool/buffer-pool.h       | 159 +++++++++---
 be/src/util/internal-queue.h          |  11 +
 common/thrift/generate_error_codes.py |   4 +-
 8 files changed, 934 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/be/src/bufferpool/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/CMakeLists.txt b/be/src/bufferpool/CMakeLists.txt
index 69c4e4a..2f056e0 100644
--- a/be/src/bufferpool/CMakeLists.txt
+++ b/be/src/bufferpool/CMakeLists.txt
@@ -22,6 +22,7 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/bufferpool")
 set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/bufferpool")
 
 add_library(BufferPool
+  buffer-allocator.cc
   buffer-pool.cc
   reservation-tracker.cc
 )

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/be/src/bufferpool/buffer-allocator.cc
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/buffer-allocator.cc b/be/src/bufferpool/buffer-allocator.cc
new file mode 100644
index 0000000..27bd788
--- /dev/null
+++ b/be/src/bufferpool/buffer-allocator.cc
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "bufferpool/buffer-allocator.h"
+
+#include "util/bit-util.h"
+
+namespace impala {
+
+BufferAllocator::BufferAllocator(int64_t min_buffer_len)
+  : min_buffer_len_(min_buffer_len) {}
+
+Status BufferAllocator::Allocate(int64_t len, uint8_t** buffer) {
+  DCHECK_GE(len, min_buffer_len_);
+  DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len));
+
+  *buffer = reinterpret_cast<uint8_t*>(malloc(len));
+  if (*buffer == NULL) return Status(TErrorCode::BUFFER_ALLOCATION_FAILED, len);
+  return Status::OK();
+}
+
+void BufferAllocator::Free(uint8_t* buffer, int64_t len) {
+  free(buffer);
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/be/src/bufferpool/buffer-allocator.h
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/buffer-allocator.h b/be/src/bufferpool/buffer-allocator.h
new file mode 100644
index 0000000..c3e0c70
--- /dev/null
+++ b/be/src/bufferpool/buffer-allocator.h
@@ -0,0 +1,48 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_BUFFER_ALLOCATOR_H
+#define IMPALA_BUFFER_ALLOCATOR_H
+
+#include "common/status.h"
+
+namespace impala {
+
+/// The underlying memory allocator for the buffer pool. All buffers are allocated through
+/// the BufferPool's BufferAllocator. The allocator only handles allocating buffers that
+/// are power-of-two multiples of the minimum buffer length.
+///
+/// TODO:
+/// * Allocate memory with mmap() instead of malloc().
+/// * Implement free lists in the allocator or external to the allocator.
+class BufferAllocator {
+ public:
+  BufferAllocator(int64_t min_buffer_len);
+
+  /// Allocate memory for a buffer of 'len' bytes. 'len' must be a power-of-two multiple
+  /// of the minimum buffer length.
+  Status Allocate(int64_t len, uint8_t** buffer);
+
+  /// Free the memory for a previously-allocated buffer.
+  void Free(uint8_t* buffer, int64_t len);
+
+ private:
+  const int64_t min_buffer_len_;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/be/src/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/buffer-pool-test.cc b/be/src/bufferpool/buffer-pool-test.cc
index cdb163d..d45f017 100644
--- a/be/src/bufferpool/buffer-pool-test.cc
+++ b/be/src/bufferpool/buffer-pool-test.cc
@@ -28,6 +28,7 @@
 #include "bufferpool/reservation-tracker.h"
 #include "common/init.h"
 #include "common/object-pool.h"
+#include "testutil/death-test-util.h"
 #include "testutil/test-macros.h"
 
 #include "common/names.h"
@@ -51,13 +52,16 @@ class BufferPoolTest : public ::testing::Test {
   }
 
   /// The minimum buffer size used in most tests.
-  const static int64_t TEST_PAGE_LEN = 1024;
+  const static int64_t TEST_BUFFER_LEN = 1024;
 
   /// Test helper to simulate registering then deregistering a number of queries with
   /// the given initial reservation and reservation limit.
   void RegisterQueriesAndClients(BufferPool* pool, int query_id_hi, int num_queries,
       int64_t initial_query_reservation, int64_t query_reservation_limit);
 
+  /// Create and destroy a page multiple times.
+  void CreatePageLoop(BufferPool* pool, ReservationTracker* parent_tracker, int num_ops);
+
  protected:
   static int64_t QueryId(int hi, int lo) { return static_cast<int64_t>(hi) << 32 | lo; }
 
@@ -85,7 +89,7 @@ class BufferPoolTest : public ::testing::Test {
   SpinLock query_reservations_lock_;
 };
 
-const int64_t BufferPoolTest::TEST_PAGE_LEN;
+const int64_t BufferPoolTest::TEST_BUFFER_LEN;
 
 void BufferPoolTest::RegisterQueriesAndClients(BufferPool* pool, int query_id_hi,
     int num_queries, int64_t initial_query_reservation, int64_t query_reservation_limit) {
@@ -156,7 +160,7 @@ TEST_F(BufferPoolTest, BasicRegistration) {
   int64_t total_mem = sum_initial_reservations * num_concurrent_queries;
   global_reservations_.InitRootTracker(NewProfile(), total_mem);
 
-  BufferPool pool(TEST_PAGE_LEN, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem);
 
   RegisterQueriesAndClients(
       &pool, 0, num_concurrent_queries, sum_initial_reservations, reservation_limit);
@@ -179,7 +183,7 @@ TEST_F(BufferPoolTest, ConcurrentRegistration) {
   int64_t total_mem = num_concurrent_queries * sum_initial_reservations;
   global_reservations_.InitRootTracker(NewProfile(), total_mem);
 
-  BufferPool pool(TEST_PAGE_LEN, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem);
 
   // Launch threads, each with a different set of query IDs.
   thread_group workers;
@@ -195,32 +199,355 @@ TEST_F(BufferPoolTest, ConcurrentRegistration) {
   global_reservations_.Close();
 }
 
-/// Test that reservation setup fails if the initial buffers cannot be fulfilled.
-TEST_F(BufferPoolTest, QueryReservationsUnfulfilled) {
-  Status status;
-  int num_queries = 128;
-  int64_t reservation_per_query = 128;
-  // Won't be able to fulfill initial reservation for last query.
-  int64_t total_mem = num_queries * reservation_per_query - 1;
-  global_reservations_.InitRootTracker(NewProfile(), total_mem);
+/// Test basic page handle creation.
+TEST_F(BufferPoolTest, PageCreation) {
+  // Allocate many pages, each a power-of-two multiple of the minimum page length.
+  int num_pages = 16;
+  int64_t max_page_len = TEST_BUFFER_LEN << (num_pages - 1);
+  int64_t total_mem = 2 * 2 * max_page_len;
+  global_reservations_.InitRootTracker(NULL, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
+  client_tracker->InitChildTracker(NewProfile(), &global_reservations_, NULL, total_mem);
+  ASSERT_TRUE(client_tracker->IncreaseReservation(total_mem));
+  BufferPool::Client client;
+  ASSERT_OK(pool.RegisterClient("test client", client_tracker, &client));
+
+  vector<BufferPool::PageHandle> handles(num_pages);
+
+  // Create pages of various valid sizes.
+  for (int i = 0; i < num_pages; ++i) {
+    int size_multiple = 1 << i;
+    int64_t page_len = TEST_BUFFER_LEN * size_multiple;
+    int64_t used_before = client_tracker->GetUsedReservation();
+    ASSERT_OK(pool.CreatePage(&client, page_len, &handles[i]));
+    ASSERT_TRUE(handles[i].is_open());
+    ASSERT_TRUE(handles[i].is_pinned());
+    ASSERT_TRUE(handles[i].buffer_handle() != NULL);
+    ASSERT_TRUE(handles[i].data() != NULL);
+    ASSERT_EQ(handles[i].buffer_handle()->data(), handles[i].data());
+    ASSERT_EQ(handles[i].len(), page_len);
+    ASSERT_EQ(handles[i].buffer_handle()->len(), page_len);
+    DCHECK_EQ(client_tracker->GetUsedReservation(), used_before + page_len);
+  }
 
-  for (int i = 0; i < num_queries; ++i) {
-    ReservationTracker* query_tracker = GetQueryReservationTracker(i);
-    query_tracker->InitChildTracker(
-        NewProfile(), &global_reservations_, NULL, 2 * reservation_per_query);
-    bool got_initial_reservation =
-        query_tracker->IncreaseReservationToFit(reservation_per_query);
-    if (i < num_queries - 1) {
-      ASSERT_TRUE(got_initial_reservation);
-    } else {
-      ASSERT_FALSE(got_initial_reservation);
-
-      // Getting the initial reservation should succeed after freeing up buffers from
-      // other query.
-      GetQueryReservationTracker(i - 1)->Close();
-      ASSERT_TRUE(query_tracker->IncreaseReservationToFit(reservation_per_query));
+  // Close the handles and check memory consumption.
+  for (int i = 0; i < num_pages; ++i) {
+    int64_t used_before = client_tracker->GetUsedReservation();
+    int page_len = handles[i].len();
+    pool.DestroyPage(&client, &handles[i]);
+    DCHECK_EQ(client_tracker->GetUsedReservation(), used_before - page_len);
+  }
+
+  pool.DeregisterClient(&client);
+  client_tracker->Close();
+
+  // All the reservations should be released at this point.
+  DCHECK_EQ(global_reservations_.GetReservation(), 0);
+  global_reservations_.Close();
+}
+
+TEST_F(BufferPoolTest, BufferAllocation) {
+  // Allocate many buffers, each a power-of-two multiple of the minimum buffer length.
+  int num_buffers = 16;
+  int64_t max_buffer_len = TEST_BUFFER_LEN << (num_buffers - 1);
+  int64_t total_mem = 2 * 2 * max_buffer_len;
+  global_reservations_.InitRootTracker(NULL, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
+  client_tracker->InitChildTracker(NewProfile(), &global_reservations_, NULL, total_mem);
+  ASSERT_TRUE(client_tracker->IncreaseReservationToFit(total_mem));
+  BufferPool::Client client;
+  ASSERT_OK(pool.RegisterClient("test client", client_tracker, &client));
+
+  vector<BufferPool::BufferHandle> handles(num_buffers);
+
+  // Create buffers of various valid sizes.
+  for (int i = 0; i < num_buffers; ++i) {
+    int size_multiple = 1 << i;
+    int64_t buffer_len = TEST_BUFFER_LEN * size_multiple;
+    int64_t used_before = client_tracker->GetUsedReservation();
+    ASSERT_OK(pool.AllocateBuffer(&client, buffer_len, &handles[i]));
+    ASSERT_TRUE(handles[i].is_open());
+    ASSERT_TRUE(handles[i].data() != NULL);
+    ASSERT_EQ(handles[i].len(), buffer_len);
+    DCHECK_EQ(client_tracker->GetUsedReservation(), used_before + buffer_len);
+  }
+
+  // Close the handles and check memory consumption.
+  for (int i = 0; i < num_buffers; ++i) {
+    int64_t used_before = client_tracker->GetUsedReservation();
+    int buffer_len = handles[i].len();
+    pool.FreeBuffer(&client, &handles[i]);
+    DCHECK_EQ(client_tracker->GetUsedReservation(), used_before - buffer_len);
+  }
+
+  pool.DeregisterClient(&client);
+  client_tracker->Close();
+
+  // All the reservations should be released at this point.
+  DCHECK_EQ(global_reservations_.GetReservation(), 0);
+  global_reservations_.Close();
+}
+
+/// Test transfer of buffer handles between clients.
+TEST_F(BufferPoolTest, BufferTransfer) {
+  // Each client needs to have enough reservation for a buffer.
+  const int num_clients = 5;
+  int64_t total_mem = num_clients * TEST_BUFFER_LEN;
+  global_reservations_.InitRootTracker(NULL, total_mem);
+  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  ReservationTracker client_trackers[num_clients];
+  BufferPool::Client clients[num_clients];
+  BufferPool::BufferHandle handles[num_clients];
+  for (int i = 0; i < num_clients; ++i) {
+    client_trackers[i].InitChildTracker(
+        NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN);
+    ASSERT_TRUE(client_trackers[i].IncreaseReservationToFit(TEST_BUFFER_LEN));
+    ASSERT_OK(pool.RegisterClient("test client", &client_trackers[i], &clients[i]));
+  }
+
+  // Transfer the page around between the clients repeatedly in a circle.
+  ASSERT_OK(pool.AllocateBuffer(&clients[0], TEST_BUFFER_LEN, &handles[0]));
+  uint8_t* data = handles[0].data();
+  for (int iter = 0; iter < 10; ++iter) {
+    for (int client = 0; client < num_clients; ++client) {
+      int next_client = (client + 1) % num_clients;
+      ASSERT_OK(pool.TransferBuffer(&clients[client], &handles[client],
+          &clients[next_client], &handles[next_client]));
+      // Check that the transfer left things in a consistent state.
+      ASSERT_FALSE(handles[client].is_open());
+      ASSERT_EQ(0, client_trackers[client].GetUsedReservation());
+      ASSERT_TRUE(handles[next_client].is_open());
+      ASSERT_EQ(TEST_BUFFER_LEN, client_trackers[next_client].GetUsedReservation());
+      // The same underlying buffer should be used.
+      ASSERT_EQ(data, handles[next_client].data());
     }
   }
+
+  pool.FreeBuffer(&clients[0], &handles[0]);
+  for (int i = 0; i < num_clients; ++i) {
+    pool.DeregisterClient(&clients[i]);
+    client_trackers[i].Close();
+  }
+  DCHECK_EQ(global_reservations_.GetReservation(), 0);
+  global_reservations_.Close();
+}
+
+/// Test basic pinning and unpinning.
+TEST_F(BufferPoolTest, Pin) {
+  int64_t total_mem = TEST_BUFFER_LEN * 1024;
+  // Set up client with enough reservation to pin twice.
+  int64_t child_reservation = TEST_BUFFER_LEN * 2;
+  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  global_reservations_.InitRootTracker(NULL, total_mem);
+  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
+  client_tracker->InitChildTracker(
+      NewProfile(), &global_reservations_, NULL, child_reservation);
+  ASSERT_TRUE(client_tracker->IncreaseReservationToFit(child_reservation));
+  BufferPool::Client client;
+  ASSERT_OK(pool.RegisterClient("test client", client_tracker, &client));
+
+  BufferPool::PageHandle handle1, handle2;
+
+  // Can pin two minimum sized pages.
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1));
+  ASSERT_TRUE(handle1.is_open());
+  ASSERT_TRUE(handle1.is_pinned());
+  ASSERT_TRUE(handle1.data() != NULL);
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2));
+  ASSERT_TRUE(handle2.is_open());
+  ASSERT_TRUE(handle2.is_pinned());
+  ASSERT_TRUE(handle2.data() != NULL);
+
+  pool.Unpin(&client, &handle2);
+  ASSERT_FALSE(handle2.is_pinned());
+
+  // Can pin minimum-sized page twice.
+  ASSERT_OK(pool.Pin(&client, &handle1));
+  ASSERT_TRUE(handle1.is_pinned());
+  // Have to unpin twice.
+  pool.Unpin(&client, &handle1);
+  ASSERT_TRUE(handle1.is_pinned());
+  pool.Unpin(&client, &handle1);
+  ASSERT_FALSE(handle1.is_pinned());
+
+  // Can pin double-sized page only once.
+  BufferPool::PageHandle double_handle;
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN * 2, &double_handle));
+  ASSERT_TRUE(double_handle.is_open());
+  ASSERT_TRUE(double_handle.is_pinned());
+  ASSERT_TRUE(double_handle.data() != NULL);
+
+  // Destroy the pages - test destroying both pinned and unpinned.
+  pool.DestroyPage(&client, &handle1);
+  pool.DestroyPage(&client, &handle2);
+  pool.DestroyPage(&client, &double_handle);
+
+  pool.DeregisterClient(&client);
+  client_tracker->Close();
+}
+
+/// Creating a page or pinning without sufficient reservation should DCHECK.
+TEST_F(BufferPoolTest, PinWithoutReservation) {
+  int64_t total_mem = TEST_BUFFER_LEN * 1024;
+  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  global_reservations_.InitRootTracker(NULL, total_mem);
+  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
+  client_tracker->InitChildTracker(
+      NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN);
+  BufferPool::Client client;
+  ASSERT_OK(pool.RegisterClient("test client", client_tracker, &client));
+
+  BufferPool::PageHandle handle;
+  IMPALA_ASSERT_DEBUG_DEATH(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle), "");
+
+  // Should succeed after increasing reservation.
+  ASSERT_TRUE(client_tracker->IncreaseReservationToFit(TEST_BUFFER_LEN));
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle));
+
+  // But we can't pin again.
+  IMPALA_ASSERT_DEBUG_DEATH(pool.Pin(&client, &handle), "");
+
+  pool.DestroyPage(&client, &handle);
+  pool.DeregisterClient(&client);
+  client_tracker->Close();
+}
+
+TEST_F(BufferPoolTest, ExtractBuffer) {
+  int64_t total_mem = TEST_BUFFER_LEN * 1024;
+  // Set up client with enough reservation for two buffers/pins.
+  int64_t child_reservation = TEST_BUFFER_LEN * 2;
+  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+  global_reservations_.InitRootTracker(NULL, total_mem);
+  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
+  client_tracker->InitChildTracker(
+      NewProfile(), &global_reservations_, NULL, child_reservation);
+  ASSERT_TRUE(client_tracker->IncreaseReservationToFit(child_reservation));
+  BufferPool::Client client;
+  ASSERT_OK(pool.RegisterClient("test client", client_tracker, &client));
+
+  BufferPool::PageHandle page;
+  BufferPool::BufferHandle buffer;
+
+  // Test basic buffer extraction.
+  for (int len = TEST_BUFFER_LEN; len <= 2 * TEST_BUFFER_LEN; len *= 2) {
+    ASSERT_OK(pool.CreatePage(&client, len, &page));
+    uint8_t* page_data = page.data();
+    pool.ExtractBuffer(&client, &page, &buffer);
+    ASSERT_FALSE(page.is_open());
+    ASSERT_TRUE(buffer.is_open());
+    ASSERT_EQ(len, buffer.len());
+    ASSERT_EQ(page_data, buffer.data());
+    ASSERT_EQ(len, client_tracker->GetUsedReservation());
+    pool.FreeBuffer(&client, &buffer);
+    ASSERT_EQ(0, client_tracker->GetUsedReservation());
+  }
+
+  // Test that ExtractBuffer() accounts correctly for pin count > 1.
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page));
+  uint8_t* page_data = page.data();
+  ASSERT_OK(pool.Pin(&client, &page));
+  ASSERT_EQ(TEST_BUFFER_LEN * 2, client_tracker->GetUsedReservation());
+  pool.ExtractBuffer(&client, &page, &buffer);
+  ASSERT_EQ(TEST_BUFFER_LEN, client_tracker->GetUsedReservation());
+  ASSERT_FALSE(page.is_open());
+  ASSERT_TRUE(buffer.is_open());
+  ASSERT_EQ(TEST_BUFFER_LEN, buffer.len());
+  ASSERT_EQ(page_data, buffer.data());
+  pool.FreeBuffer(&client, &buffer);
+  ASSERT_EQ(0, client_tracker->GetUsedReservation());
+
+  // Test that ExtractBuffer() DCHECKs for unpinned pages.
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page));
+  pool.Unpin(&client, &page);
+  IMPALA_ASSERT_DEBUG_DEATH(pool.ExtractBuffer(&client, &page, &buffer), "");
+  pool.DestroyPage(&client, &page);
+
+  pool.DeregisterClient(&client);
+  client_tracker->Close();
+}
+
+// Test concurrent creation and destruction of pages.
+TEST_F(BufferPoolTest, ConcurrentPageCreation) {
+  int ops_per_thread = 1024;
+  int num_threads = 64;
+  // Need enough buffers for all initial reservations.
+  int total_mem = num_threads * TEST_BUFFER_LEN;
+  global_reservations_.InitRootTracker(NULL, total_mem);
+
+  BufferPool pool(TEST_BUFFER_LEN, total_mem);
+
+  // Launch threads, each with a different set of query IDs.
+  thread_group workers;
+  for (int i = 0; i < num_threads; ++i) {
+    workers.add_thread(new thread(bind(&BufferPoolTest::CreatePageLoop, this, &pool,
+        &global_reservations_, ops_per_thread)));
+  }
+
+  // Build debug string to test concurrent iteration over pages_ list.
+  for (int i = 0; i < 64; ++i) {
+    LOG(INFO) << pool.DebugString();
+  }
+  workers.join_all();
+
+  // All the reservations should be released at this point.
+  DCHECK_EQ(global_reservations_.GetChildReservations(), 0);
+  global_reservations_.Close();
+}
+
+void BufferPoolTest::CreatePageLoop(
+    BufferPool* pool, ReservationTracker* parent_tracker, int num_ops) {
+  ReservationTracker client_tracker;
+  client_tracker.InitChildTracker(NewProfile(), parent_tracker, NULL, TEST_BUFFER_LEN);
+  BufferPool::Client client;
+  ASSERT_OK(pool->RegisterClient("test client", &client_tracker, &client));
+  for (int i = 0; i < num_ops; ++i) {
+    BufferPool::PageHandle handle;
+    ASSERT_TRUE(client_tracker.IncreaseReservation(TEST_BUFFER_LEN));
+    ASSERT_OK(pool->CreatePage(&client, TEST_BUFFER_LEN, &handle));
+    pool->Unpin(&client, &handle);
+    ASSERT_OK(pool->Pin(&client, &handle));
+    pool->DestroyPage(&client, &handle);
+    client_tracker.DecreaseReservation(TEST_BUFFER_LEN);
+  }
+  pool->DeregisterClient(&client);
+  client_tracker.Close();
+}
+
+/// Test error path where pool is unable to fulfill a reservation because it cannot evict
+/// unpinned pages.
+TEST_F(BufferPoolTest, CapacityExhausted) {
+  global_reservations_.InitRootTracker(NULL, TEST_BUFFER_LEN);
+  // TODO: once we enable spilling, set up buffer pool so that spilling is disabled.
+  // Set up pool with one more buffer than reservations (so that we hit the reservation
+  // limit instead of the buffer limit).
+  BufferPool pool(TEST_BUFFER_LEN, TEST_BUFFER_LEN * 2);
+
+  BufferPool::PageHandle handle1, handle2, handle3;
+
+  BufferPool::Client client;
+  ASSERT_OK(pool.RegisterClient("test client", &global_reservations_, &client));
+  ASSERT_TRUE(global_reservations_.IncreaseReservation(TEST_BUFFER_LEN));
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1));
+
+  // Do not have enough reservations because we pinned the page.
+  IMPALA_ASSERT_DEBUG_DEATH(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2), "");
+
+  // Even with reservations, we can only create one more unpinned page because we don't
+  // support eviction yet.
+  pool.Unpin(&client, &handle1);
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle2));
+  pool.Unpin(&client, &handle2);
+  ASSERT_FALSE(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle3).ok());
+
+  // After destroying a page, we should have a free buffer that we can use.
+  pool.DestroyPage(&client, &handle1);
+  ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle3));
+
+  pool.DestroyPage(&client, &handle2);
+  pool.DestroyPage(&client, &handle3);
+  pool.DeregisterClient(&client);
 }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/be/src/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/buffer-pool.cc b/be/src/bufferpool/buffer-pool.cc
index 21893fc..c89b8f6 100644
--- a/be/src/bufferpool/buffer-pool.cc
+++ b/be/src/bufferpool/buffer-pool.cc
@@ -31,13 +31,151 @@ using strings::Substitute;
 
 namespace impala {
 
+/// The internal representation of a page, which can be pinned or unpinned. If the
+/// page is pinned, a buffer is associated with the page.
+///
+/// Code manipulating the page is responsible for acquiring 'lock' when reading or
+/// modifying the page.
+struct BufferPool::Page : public BufferPool::PageList::Node {
+  Page(int64_t len) : len(len), pin_count(0), dirty(false) {}
+
+  /// Increment the pin count. Caller must hold 'lock'.
+  void IncrementPinCount(PageHandle* handle) {
+    lock.DCheckLocked();
+    ++pin_count;
+    // Pinned page buffers may be modified by anyone with a pointer to the buffer, so we
+    // have to assume they are dirty.
+    dirty = true;
+  }
+
+  /// Decrement the pin count. Caller must hold 'lock'.
+  void DecrementPinCount(PageHandle* handle) {
+    lock.DCheckLocked();
+    DCHECK(pin_count > 0);
+    --pin_count;
+  }
+
+  string DebugString() {
+    return Substitute("<BufferPool::Page> $0 len: $1 pin_count: $2 buf: $3 dirty: $4", this,
+        len, pin_count, buffer.DebugString(), dirty);
+  }
+
+  // Helper for BufferPool::DebugString().
+  static bool DebugStringCallback(stringstream* ss, BufferPool::Page* page) {
+    lock_guard<SpinLock> pl(page->lock);
+    (*ss) << page->DebugString() << "\n";
+    return true;
+  }
+
+  /// The length of the page in bytes.
+  const int64_t len;
+
+  /// Lock to protect the below members of Page. The lock must be held when modifying any
+  /// of the below members and when reading any of the below members of an unpinned page.
+  SpinLock lock;
+
+  /// The pin count of the page.
+  int pin_count;
+
+  /// Buffer with the page's contents, Always open if pinned. Closed if page is unpinned
+  /// and was evicted from memory.
+  BufferHandle buffer;
+
+  /// True if the buffer's contents need to be saved before evicting it from memory.
+  bool dirty;
+};
+
+BufferPool::BufferHandle::BufferHandle() {
+  Reset();
+}
+
+BufferPool::BufferHandle::BufferHandle(BufferHandle&& src) {
+  *this = std::move(src);
+}
+
+BufferPool::BufferHandle& BufferPool::BufferHandle::operator=(BufferHandle&& src) {
+  DCHECK(!is_open());
+  // Copy over all members then close src.
+  client_ = src.client_;
+  data_ = src.data_;
+  len_ = src.len_;
+  src.Reset();
+  return *this;
+}
+
+void BufferPool::BufferHandle::Open(const Client* client, uint8_t* data, int64_t len) {
+  client_ = client;
+  data_ = data;
+  len_ = len;
+}
+
+void BufferPool::BufferHandle::Reset() {
+  client_ = NULL;
+  data_ = NULL;
+  len_ = -1;
+}
+
+BufferPool::PageHandle::PageHandle() {
+  Reset();
+}
+
+BufferPool::PageHandle::PageHandle(PageHandle&& src) {
+  *this = std::move(src);
+}
+
+BufferPool::PageHandle& BufferPool::PageHandle::operator=(PageHandle&& src) {
+  DCHECK(!is_open());
+  // Copy over all members then close src.
+  page_ = src.page_;
+  client_ = src.client_;
+  src.Reset();
+  return *this;
+}
+
+void BufferPool::PageHandle::Open(Page* page, Client* client) {
+  DCHECK(!is_open());
+  page->lock.DCheckLocked();
+  page_ = page;
+  client_ = client;
+}
+
+void BufferPool::PageHandle::Reset() {
+  page_ = NULL;
+  client_ = NULL;
+}
+
+int BufferPool::PageHandle::pin_count() const {
+  DCHECK(is_open());
+  // The pin count can only be modified via this PageHandle, which must not be
+  // concurrently accessed by multiple threads, so it is safe to access without locking
+  return page_->pin_count;
+}
+
+int64_t BufferPool::PageHandle::len() const {
+  DCHECK(is_open());
+  // The length of the page cannot change, so it is safe to access without locking.
+  return page_->len;
+}
+
+const BufferPool::BufferHandle* BufferPool::PageHandle::buffer_handle() const {
+  DCHECK(is_pinned());
+  // The 'buffer' field cannot change while the page is pinned, so it is safe to access
+  // without locking.
+  return &page_->buffer;
+}
+
 BufferPool::BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit)
-  : min_buffer_len_(min_buffer_len), buffer_bytes_limit_(buffer_bytes_limit) {
+  : allocator_(new BufferAllocator(min_buffer_len)),
+    min_buffer_len_(min_buffer_len),
+    buffer_bytes_limit_(buffer_bytes_limit),
+    buffer_bytes_remaining_(buffer_bytes_limit) {
   DCHECK_GT(min_buffer_len, 0);
   DCHECK_EQ(min_buffer_len, BitUtil::RoundUpToPowerOfTwo(min_buffer_len));
 }
 
-BufferPool::~BufferPool() {}
+BufferPool::~BufferPool() {
+  DCHECK(pages_.empty());
+}
 
 Status BufferPool::RegisterClient(
     const string& name, ReservationTracker* reservation, Client* client) {
@@ -55,15 +193,228 @@ void BufferPool::DeregisterClient(Client* client) {
   client->reservation_ = NULL;
 }
 
+Status BufferPool::CreatePage(Client* client, int64_t len, PageHandle* handle) {
+  DCHECK(!handle->is_open());
+  DCHECK_GE(len, min_buffer_len_);
+  DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len));
+
+  BufferHandle buffer;
+  // No changes have been made to state yet, so we can cleanly return on error.
+  RETURN_IF_ERROR(AllocateBufferInternal(client, len, &buffer));
+
+  Page* page = new Page(len);
+  {
+    lock_guard<SpinLock> pl(page->lock);
+    page->buffer = std::move(buffer);
+    handle->Open(page, client);
+    page->IncrementPinCount(handle);
+  }
+
+  // Only add to globally-visible list after page is initialized. The page lock also
+  // needs to be released before enqueueing to respect the lock ordering.
+  pages_.Enqueue(page);
+
+  client->reservation_->AllocateFrom(len);
+  return Status::OK();
+}
+
+void BufferPool::DestroyPage(Client* client, PageHandle* handle) {
+  if (!handle->is_open()) return; // DestroyPage() should be idempotent.
+
+  Page* page = handle->page_;
+  if (handle->is_pinned()) {
+    // In the pinned case, delegate to ExtractBuffer() and FreeBuffer() to do the work
+    // of cleaning up the page and freeing the buffer.
+    BufferHandle buffer;
+    ExtractBuffer(client, handle, &buffer);
+    FreeBuffer(client, &buffer);
+    return;
+  }
+
+  {
+    lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its state.
+    // In the unpinned case, no reservation is consumed, so just free the buffer.
+    // TODO: wait for in-flight writes for 'page' so we can safely free 'page'.
+    if (page->buffer.is_open()) FreeBufferInternal(&page->buffer);
+  }
+  CleanUpPage(handle);
+}
+
+void BufferPool::CleanUpPage(PageHandle* handle) {
+  // Remove the destroyed page from data structures in a way that ensures no other
+  // threads have a remaining reference. Threads that access pages via the 'pages_'
+  // list hold 'pages_.lock_', so Remove() will not return until those threads are done
+  // and it is safe to delete page.
+  pages_.Remove(handle->page_);
+  delete handle->page_;
+  handle->Reset();
+}
+
+Status BufferPool::Pin(Client* client, PageHandle* handle) {
+  DCHECK(client->is_registered());
+  DCHECK(handle->is_open());
+  DCHECK_EQ(handle->client_, client);
+
+  Page* page = handle->page_;
+  {
+    lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its state.
+    if (!page->buffer.is_open()) {
+      // No changes have been made to state yet, so we can cleanly return on error.
+      RETURN_IF_ERROR(AllocateBufferInternal(client, page->len, &page->buffer));
+    }
+    page->IncrementPinCount(handle);
+
+    // TODO: will need to initiate/wait for read if the page is not in-memory.
+  }
+
+  client->reservation_->AllocateFrom(page->len);
+  return Status::OK();
+}
+
+void BufferPool::Unpin(Client* client, PageHandle* handle) {
+  DCHECK(handle->is_open());
+  lock_guard<SpinLock> pl(handle->page_->lock);
+  UnpinLocked(client, handle);
+}
+
+void BufferPool::UnpinLocked(Client* client, PageHandle* handle) {
+  DCHECK(client->is_registered());
+  DCHECK_EQ(handle->client_, client);
+  // If handle is pinned, we can assume that the page itself is pinned.
+  DCHECK(handle->is_pinned());
+  Page* page = handle->page_;
+  page->lock.DCheckLocked();
+
+  page->DecrementPinCount(handle);
+  client->reservation_->ReleaseTo(page->len);
+
+  // TODO: can evict now. Only need to preserve contents if 'page->dirty' is true.
+}
+
+void BufferPool::ExtractBuffer(
+    Client* client, PageHandle* page_handle, BufferHandle* buffer_handle) {
+  DCHECK(page_handle->is_pinned());
+  DCHECK_EQ(page_handle->client_, client);
+
+  Page* page = page_handle->page_;
+  {
+    lock_guard<SpinLock> pl(page->lock); // Lock page while we work on its state.
+    // TODO: wait for in-flight writes for 'page' so we can safely free 'page'.
+
+    // Bring the pin count to 1 so that we're not using surplus reservations.
+    while (page->pin_count > 1) UnpinLocked(client, page_handle);
+    *buffer_handle = std::move(page->buffer);
+  }
+  CleanUpPage(page_handle);
+}
+
+Status BufferPool::AllocateBuffer(Client* client, int64_t len, BufferHandle* handle) {
+  client->reservation_->AllocateFrom(len);
+  return AllocateBufferInternal(client, len, handle);
+}
+
+Status BufferPool::AllocateBufferInternal(
+    Client* client, int64_t len, BufferHandle* buffer) {
+  DCHECK(!buffer->is_open());
+  DCHECK_GE(len, min_buffer_len_);
+  DCHECK_EQ(len, BitUtil::RoundUpToPowerOfTwo(len));
+
+  // If there is headroom in 'buffer_bytes_remaining_', we can just allocate a new buffer.
+  if (TryDecreaseBufferBytesRemaining(len)) {
+    uint8_t* data;
+    Status status = allocator_->Allocate(len, &data);
+    if (!status.ok()) {
+      buffer_bytes_remaining_.Add(len);
+      return status;
+    }
+    DCHECK(data != NULL);
+    buffer->Open(client, data, len);
+    return Status::OK();
+  }
+
+  // If there is no remaining capacity, we must evict another page.
+  return Status(TErrorCode::NOT_IMPLEMENTED_ERROR,
+      Substitute("Buffer bytes limit $0 of buffer pool is exhausted and page eviction is "
+                 "not implemented yet!", buffer_bytes_limit_));
+}
+
+void BufferPool::FreeBuffer(Client* client, BufferHandle* handle) {
+  if (!handle->is_open()) return; // Should be idempotent.
+  DCHECK_EQ(client, handle->client_);
+  client->reservation_->ReleaseTo(handle->len_);
+  FreeBufferInternal(handle);
+}
+
+void BufferPool::FreeBufferInternal(BufferHandle* handle) {
+  DCHECK(handle->is_open());
+  allocator_->Free(handle->data(), handle->len());
+  buffer_bytes_remaining_.Add(handle->len());
+  handle->Reset();
+}
+
+Status BufferPool::TransferBuffer(
+    Client* src_client, BufferHandle* src, Client* dst_client, BufferHandle* dst) {
+  DCHECK(src->is_open());
+  DCHECK(!dst->is_open());
+  DCHECK_EQ(src_client, src->client_);
+  DCHECK_NE(src, dst);
+  DCHECK_NE(src_client, dst_client);
+
+  dst_client->reservation_->AllocateFrom(src->len());
+  src_client->reservation_->ReleaseTo(src->len());
+  *dst = std::move(*src);
+  dst->client_ = dst_client;
+  return Status::OK();
+}
+
+bool BufferPool::TryDecreaseBufferBytesRemaining(int64_t len) {
+  // TODO: we may want to change this policy so that we don't always use up to the limit
+  // for buffers, since this may starve other operators using non-buffer-pool memory.
+  while (true) {
+    int64_t old_value = buffer_bytes_remaining_.Load();
+    if (old_value < len) return false;
+    int64_t new_value = old_value - len;
+    if (buffer_bytes_remaining_.CompareAndSwap(old_value, new_value)) {
+      return true;
+    }
+  }
+}
+
 string BufferPool::Client::DebugString() const {
-  return Substitute("<BufferPool::Client> $0 name: $1 reservation: $2", this, name_,
-      reservation_->DebugString());
+  if (is_registered()) {
+    return Substitute("<BufferPool::Client> $0 name: $1 reservation: {$2}", this, name_,
+        reservation_->DebugString());
+  } else {
+    return Substitute("<BufferPool::Client> $0 UNREGISTERED", this);
+  }
+}
+
+string BufferPool::PageHandle::DebugString() const {
+  if (is_open()) {
+    lock_guard<SpinLock> pl(page_->lock);
+    return Substitute(
+        "<BufferPool::PageHandle> $0 client: {$1} page: {$2}",
+        this, client_->DebugString(), page_->DebugString());
+  } else {
+    return Substitute("<BufferPool::PageHandle> $0 CLOSED", this);
+  }
+}
+
+string BufferPool::BufferHandle::DebugString() const {
+  if (is_open()) {
+    return Substitute("<BufferPool::BufferHandle> $0 client: {$1} data: $2 len: $3", this,
+        client_->DebugString(), data_, len_);
+  } else {
+    return Substitute("<BufferPool::BufferHandle> $0 CLOSED", this);
+  }
 }
 
 string BufferPool::DebugString() {
   stringstream ss;
   ss << "<BufferPool> " << this << " min_buffer_len: " << min_buffer_len_
-     << "buffer_bytes_limit: " << buffer_bytes_limit_ << "\n";
+     << " buffer_bytes_limit: " << buffer_bytes_limit_
+     << " buffer_bytes_remaining: " << buffer_bytes_remaining_.Load() << "\n";
+  pages_.Iterate(bind<bool>(Page::DebugStringCallback, &ss, _1));
   return ss.str();
 }
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/be/src/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/buffer-pool.h b/be/src/bufferpool/buffer-pool.h
index e566543..44b5574 100644
--- a/be/src/bufferpool/buffer-pool.h
+++ b/be/src/bufferpool/buffer-pool.h
@@ -18,10 +18,12 @@
 #ifndef IMPALA_BUFFER_POOL_H
 #define IMPALA_BUFFER_POOL_H
 
+#include <stdint.h>
+#include <boost/scoped_ptr.hpp>
 #include <boost/thread/locks.hpp>
 #include <string>
-#include <stdint.h>
 
+#include "bufferpool/buffer-allocator.h"
 #include "common/atomic.h"
 #include "common/status.h"
 #include "gutil/macros.h"
@@ -30,6 +32,7 @@
 
 namespace impala {
 
+class BufferAllocator;
 class ReservationTracker;
 
 /// A buffer pool that manages memory buffers for all queries in an Impala daemon.
@@ -69,7 +72,7 @@ class ReservationTracker;
 /// Buffer/Page Sizes
 /// =================
 /// The buffer pool has a minimum buffer size, which must be a power-of-two. Page and
-/// buffer sizes must be an exact multiple of the minimum buffer size.
+/// buffer sizes must be an exact power-of-two multiple of the minimum buffer size.
 ///
 /// Reservations
 /// ============
@@ -140,11 +143,18 @@ class ReservationTracker;
 /// +========================+
 /// | IMPLEMENTATION DETAILS |
 /// +========================+
-/// ... TODO ...
+///
+/// Lock Ordering
+/// =============
+/// The lock ordering is:
+/// * pages_::lock_ -> Page::lock_
+///
+/// If a reference to a page is acquired via the pages_ list, pages_::lock_ must be held
+/// until done with the page to ensure the page isn't concurrently deleted.
 class BufferPool {
  public:
-  class Client;
   class BufferHandle;
+  class Client;
   class PageHandle;
 
   /// Constructs a new buffer pool.
@@ -198,9 +208,10 @@ class BufferPool {
 
   /// Extracts buffer from a pinned page. After this returns, the page referenced by
   /// 'page_handle' will be destroyed and 'buffer_handle' will reference the buffer from
-  /// 'page_handle'. This may decrease reservation usage if the page was pinned multiple
-  /// times via 'page_handle'.
-  void ExtractBuffer(PageHandle* page_handle, BufferHandle* buffer_handle);
+  /// 'page_handle'. This may decrease reservation usage of 'client' if the page was
+  /// pinned multiple times via 'page_handle'.
+  void ExtractBuffer(
+      Client* client, PageHandle* page_handle, BufferHandle* buffer_handle);
 
   /// Allocates a new buffer of 'len' bytes. Uses reservation from 'client'. The caller
   /// is responsible for ensuring it has enough unused reservation before calling
@@ -214,9 +225,9 @@ class BufferPool {
 
   /// Transfer ownership of buffer from 'src_client' to 'dst_client' and move the
   /// handle from 'src' to 'dst'. Increases reservation usage in 'dst_client' and
-  /// decreases reservation usage in 'src_client'. 'src' must be open and 'dst' must
-  /// be closed
-  /// before calling. After a successful call, 'src' is closed and 'dst' is open.
+  /// decreases reservation usage in 'src_client'. 'src' must be open and 'dst' must be
+  /// closed before calling. 'src'/'dst' and 'src_client'/'dst_client' must be different.
+  /// After a successful call, 'src' is closed and 'dst' is open.
   Status TransferBuffer(Client* src_client, BufferHandle* src, Client* dst_client,
       BufferHandle* dst);
 
@@ -228,13 +239,49 @@ class BufferPool {
 
  private:
   DISALLOW_COPY_AND_ASSIGN(BufferPool);
+  struct Page;
+
+  /// Same as Unpin(), except the lock for the page referenced by 'handle' must be held
+  /// by the caller.
+  void UnpinLocked(Client* client, PageHandle* handle);
+
+  /// Perform the cleanup of the page object and handle when the page is destroyed.
+  /// Reset 'handle', free the Page object and remove the 'pages_' entry.
+  /// The 'handle->page_' lock should *not* be held by the caller.
+  void CleanUpPage(PageHandle* handle);
 
-  /// The minimum length of a buffer in bytes. All buffers and pages are a multiple of
-  /// this length. This is always a power of two.
+  /// Allocate a buffer of length 'len'. Assumes that the client's reservation has already
+  /// been consumed for the buffer. Returns an error if the pool is unable to fulfill the
+  /// reservation.
+  Status AllocateBufferInternal(Client* client, int64_t len, BufferHandle* buffer);
+
+  /// Frees 'buffer', which must be open before calling. Closes 'buffer' and updates
+  /// internal state but does not release to any reservation.
+  void FreeBufferInternal(BufferHandle* buffer);
+
+  /// Check if we can allocate another buffer of size 'len' bytes without
+  /// 'buffer_bytes_remaining_' going negative.
+  /// Returns true and decrease 'buffer_bytes_remaining_' by 'len' if successful.
+  bool TryDecreaseBufferBytesRemaining(int64_t len);
+
+  /// Allocator for allocating and freeing all buffer memory.
+  boost::scoped_ptr<BufferAllocator> allocator_;
+
+  /// The minimum length of a buffer in bytes. All buffers and pages are a power-of-two
+  /// multiple of this length. This is always a power of two.
   const int64_t min_buffer_len_;
 
   /// The maximum physical memory in bytes that can be used for buffers.
   const int64_t buffer_bytes_limit_;
+
+  /// The remaining number of bytes of 'buffer_bytes_limit_' that can be used for
+  /// allocating new buffers. Must be updated atomically before a new buffer is
+  /// allocated or after an existing buffer is freed.
+  AtomicInt64 buffer_bytes_remaining_;
+
+  /// List containing all pages. Protected by the list's internal lock.
+  typedef InternalQueue<Page> PageList;
+  PageList pages_;
 };
 
 /// External representation of a client of the BufferPool. Clients are used for
@@ -266,6 +313,54 @@ class BufferPool::Client {
   ReservationTracker* reservation_;
 };
 
+/// A handle to a buffer allocated from the buffer pool. Each BufferHandle should only
+/// be used by a single thread at a time: concurrently calling BufferHandle methods or
+/// BufferPool methods with the BufferHandle as an argument is not supported.
+class BufferPool::BufferHandle {
+ public:
+  BufferHandle();
+  ~BufferHandle() { DCHECK(!is_open()); }
+
+  /// Allow move construction of handles, to support std::move().
+  BufferHandle(BufferHandle&& src);
+
+  /// Allow move assignment of handles, to support STL classes like std::vector.
+  /// Destination must be uninitialized.
+  BufferHandle& operator=(BufferHandle&& src);
+
+  bool is_open() const { return data_ != NULL; }
+  int64_t len() const {
+    DCHECK(is_open());
+    return len_;
+  }
+  /// Get a pointer to the start of the buffer.
+  uint8_t* data() const {
+    DCHECK(is_open());
+    return data_;
+  }
+
+  std::string DebugString() const;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(BufferHandle);
+  friend class BufferPool;
+
+  /// Internal helper to set the handle to an opened state.
+  void Open(const Client* client, uint8_t* data, int64_t len);
+
+  /// Internal helper to reset the handle to an unopened state.
+  void Reset();
+
+  /// The client the buffer handle belongs to, used to validate that the correct client
+  /// is provided in BufferPool method calls.
+  const Client* client_;
+
+  /// Pointer to the start of the buffer. Non-NULL if open, NULL if closed.
+  uint8_t* data_;
+
+  /// Length of the buffer in bytes.
+  int64_t len_;
+};
 
 /// The handle for a page used by clients of the BufferPool. Each PageHandle should
 /// only be used by a single thread at a time: concurrently calling PageHandle methods
@@ -282,12 +377,13 @@ class BufferPool::PageHandle {
   // Destination must be closed.
   PageHandle& operator=(PageHandle&& src);
 
-  bool is_open() const;
-  bool is_pinned() const;
+  bool is_open() const { return page_ != NULL; }
+  bool is_pinned() const { return pin_count() > 0; }
+  int pin_count() const;
   int64_t len() const;
   /// Get a pointer to the start of the page's buffer. Only valid to call if the page
   /// is pinned via this handle.
-  uint8_t* data() const;
+  uint8_t* data() const { return buffer_handle()->data(); }
 
   /// Return a pointer to the page's buffer handle. Only valid to call if the page is
   /// pinned via this handle. Only const accessors of the returned handle can be used:
@@ -299,32 +395,21 @@ class BufferPool::PageHandle {
 
  private:
   DISALLOW_COPY_AND_ASSIGN(PageHandle);
-};
-
-/// A handle to a buffer allocated from the buffer pool. Each BufferHandle should only
-/// be used by a single thread at a time: concurrently calling BufferHandle methods or
-/// BufferPool methods with the BufferHandle as an argument is not supported.
-class BufferPool::BufferHandle {
- public:
-  BufferHandle();
-  ~BufferHandle() { DCHECK(!is_open()); }
+  friend class BufferPool;
+  friend class Page;
 
-  /// Allow move construction of handles, to support std::move().
-  BufferHandle(BufferHandle&& src);
+  /// Internal helper to open the handle for the given page.
+  void Open(Page* page, Client* client);
 
-  /// Allow move assignment of handles, to support STL classes like std::vector.
-  /// Destination must be uninitialized.
-  BufferHandle& operator=(BufferHandle&& src);
+  /// Internal helper to reset the handle to an unopened state.
+  void Reset();
 
-  bool is_open() const;
-  int64_t len() const;
-  /// Get a pointer to the start of the buffer.
-  uint8_t* data() const;
+  /// The internal page structure. NULL if the handle is not open.
+  Page* page_;
 
-  std::string DebugString() const;
-
- private:
-  DISALLOW_COPY_AND_ASSIGN(BufferHandle);
+  /// The client the page handle belongs to, used to validate that the correct client
+  /// is being used.
+  const Client* client_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/be/src/util/internal-queue.h
----------------------------------------------------------------------
diff --git a/be/src/util/internal-queue.h b/be/src/util/internal-queue.h
index 08365d7..37a9a0c 100644
--- a/be/src/util/internal-queue.h
+++ b/be/src/util/internal-queue.h
@@ -19,6 +19,7 @@
 #ifndef IMPALA_UTIL_INTERNAL_QUEUE_H
 #define IMPALA_UTIL_INTERNAL_QUEUE_H
 
+#include <boost/function.hpp>
 #include <boost/thread/locks.hpp>
 
 #include "util/spinlock.h"
@@ -231,6 +232,16 @@ class InternalQueue {
     return true;
   }
 
+  // Iterate over elements of queue, calling 'fn' for each element. If 'fn' returns
+  // false, terminate iteration. It is invalid to call other InternalQueue methods
+  // from 'fn'.
+  void Iterate(boost::function<bool(T*)> fn) {
+    boost::lock_guard<SpinLock> lock(lock_);
+    for (Node* current = head_; current != NULL; current = current->next) {
+      if (!fn(reinterpret_cast<T*>(current))) return;
+    }
+  }
+
   /// Prints the queue ptrs to a string.
   std::string DebugString() {
     std::stringstream ss;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/241c7e01/common/thrift/generate_error_codes.py
----------------------------------------------------------------------
diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py
index 3d48005..2554a18 100755
--- a/common/thrift/generate_error_codes.py
+++ b/common/thrift/generate_error_codes.py
@@ -283,7 +283,9 @@ error_codes = (
    "supported length of 2147483647 bytes."),
 
   ("SCRATCH_LIMIT_EXCEEDED", 91, "Scratch space limit of $0 bytes exceeded for query "
-   "while spilling data to disk.")
+   "while spilling data to disk."),
+
+  ("BUFFER_ALLOCATION_FAILED", 92, "Unexpected error allocating $0 byte buffer."),
 )
 
 import sys