You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2017/03/16 03:37:25 UTC

[1/3] incubator-impala git commit: Fix typo in Flatbuffers cmake module

Repository: incubator-impala
Updated Branches:
  refs/heads/master de12d86f2 -> 62894e323


Fix typo in Flatbuffers cmake module

Change-Id: I0786344b5485a92c02a246b543b6acda279e199c
Reviewed-on: http://gerrit.cloudera.org:8080/6398
Reviewed-by: Dimitris Tsirogiannis <dt...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/5a333c47
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/5a333c47
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/5a333c47

Branch: refs/heads/master
Commit: 5a333c47c533d7444adc239696401d542c4064d3
Parents: de12d86
Author: Henry Robinson <he...@cloudera.com>
Authored: Fri Mar 10 00:56:21 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Wed Mar 15 03:57:40 2017 +0000

----------------------------------------------------------------------
 cmake_modules/FindFlatBuffers.cmake | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/5a333c47/cmake_modules/FindFlatBuffers.cmake
----------------------------------------------------------------------
diff --git a/cmake_modules/FindFlatBuffers.cmake b/cmake_modules/FindFlatBuffers.cmake
index d61a640..19ee588 100644
--- a/cmake_modules/FindFlatBuffers.cmake
+++ b/cmake_modules/FindFlatBuffers.cmake
@@ -40,4 +40,4 @@ find_program(FLATBUFFERS_COMPILER flatc
 
 include(FindPackageHandleStandardArgs)
 find_package_handle_standard_args(FLATBUFFERS REQUIRED_VARS
-  FLATBUFFERS_INCLUDE_DIR FLATBUFERS_LIBS FLATBUFFERS_COMPILER)
+  FLATBUFFERS_INCLUDE_DIR FLATBUFFERS_LIBS FLATBUFFERS_COMPILER)


[3/3] incubator-impala git commit: IMPALA-4831: enforce BufferPool reservation invariants

Posted by ta...@apache.org.
IMPALA-4831: enforce BufferPool reservation invariants

Before this patch ill-behaved code outside BufferPool could
violate BufferPool invariants by calling methods on ReservationTracker()
such as DecreaseReservation() or ReleaseFrom() or by hooking
up Clients and ReservationTrackers in the wrong way (e.g. sharing
a ReservationTracker between two Clients).

Now each client creates and owns its ReservationTracker and restricts
which methods can be called from outside BufferPool. This also reduces
the amount of boilerplate code required to set up and tear down a
client.

Change-Id: Ic5b0c335d6e73250f7e5a3b9ce2f999c5119c573
Reviewed-on: http://gerrit.cloudera.org:8080/6313
Reviewed-by: Dan Hecht <dh...@cloudera.com>
Tested-by: Impala Public Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/62894e32
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/62894e32
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/62894e32

Branch: refs/heads/master
Commit: 62894e323a87f8f48ece7235f2ffb0eac922fbf8
Parents: 87e95f8
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Tue Mar 7 18:12:15 2017 -0800
Committer: Impala Public Jenkins <im...@gerrit.cloudera.org>
Committed: Thu Mar 16 02:10:02 2017 +0000

----------------------------------------------------------------------
 .../runtime/bufferpool/buffer-pool-internal.h   |  12 +-
 be/src/runtime/bufferpool/buffer-pool-test.cc   | 148 +++++++------------
 be/src/runtime/bufferpool/buffer-pool.cc        |  71 ++++++---
 be/src/runtime/bufferpool/buffer-pool.h         |  48 ++++--
 be/src/runtime/bufferpool/reservation-tracker.h |   2 +
 be/src/runtime/bufferpool/suballocator-test.cc  |  30 ++--
 be/src/runtime/bufferpool/suballocator.cc       |   2 +-
 7 files changed, 163 insertions(+), 150 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/62894e32/be/src/runtime/bufferpool/buffer-pool-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-internal.h b/be/src/runtime/bufferpool/buffer-pool-internal.h
index abc8930..04d76bb 100644
--- a/be/src/runtime/bufferpool/buffer-pool-internal.h
+++ b/be/src/runtime/bufferpool/buffer-pool-internal.h
@@ -76,6 +76,7 @@
 
 #include "runtime/bufferpool/buffer-pool-counters.h"
 #include "runtime/bufferpool/buffer-pool.h"
+#include "runtime/bufferpool/reservation-tracker.h"
 #include "util/condition-variable.h"
 
 namespace impala {
@@ -84,7 +85,8 @@ namespace impala {
 class BufferPool::Client {
  public:
   Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group, const string& name,
-      RuntimeProfile* profile);
+      ReservationTracker* parent_reservation, MemTracker* mem_tracker,
+      int64_t reservation_limit, RuntimeProfile* profile);
 
   ~Client() {
     DCHECK_EQ(0, num_pages_);
@@ -93,6 +95,9 @@ class BufferPool::Client {
     DCHECK_EQ(0, in_flight_write_pages_.size());
   }
 
+  /// Release reservation for this client.
+  void Close() { reservation_.Close(); }
+
   /// Add a new pinned page 'page' to the pinned pages list. 'page' must not be in any
   /// other lists. Neither the client's lock nor page->buffer_lock should be held by the
   /// caller.
@@ -144,6 +149,7 @@ class BufferPool::Client {
     DCHECK(client_lock.mutex() == &lock_ && client_lock.owns_lock());
   }
 
+  ReservationTracker* reservation() { return &reservation_; }
   const BufferPoolClientCounters& counters() const { return counters_; }
   bool spilling_enabled() const { return file_group_ != NULL; }
 
@@ -183,6 +189,10 @@ class BufferPool::Client {
   /// A name identifying the client.
   const std::string name_;
 
+  /// The reservation tracker for the client. All pages pinned by the client count as
+  /// usage against 'reservation_'.
+  ReservationTracker reservation_;
+
   /// The RuntimeProfile counters for this client, owned by the client's RuntimeProfile.
   /// All non-NULL.
   BufferPoolClientCounters counters_;

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/62894e32/be/src/runtime/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc
index b3dbfa9..0e0d384 100644
--- a/be/src/runtime/bufferpool/buffer-pool-test.cc
+++ b/be/src/runtime/bufferpool/buffer-pool-test.cc
@@ -16,6 +16,7 @@
 // under the License.
 
 #include <cstdlib>
+#include <limits>
 #include <string>
 #include <vector>
 #include <boost/bind.hpp>
@@ -123,7 +124,6 @@ void BufferPoolTest::RegisterQueriesAndClients(BufferPool* pool, int query_id_hi
 
   int clients_per_query = 32;
   BufferPool::ClientHandle* clients[num_queries];
-  ReservationTracker* client_reservations[num_queries];
 
   for (int i = 0; i < num_queries; ++i) {
     int64_t query_id = QueryId(query_id_hi, i);
@@ -140,7 +140,6 @@ void BufferPoolTest::RegisterQueriesAndClients(BufferPool* pool, int query_id_hi
     EXPECT_TRUE(query_reservation->IncreaseReservationToFit(initial_query_reservation));
 
     clients[i] = new BufferPool::ClientHandle[clients_per_query];
-    client_reservations[i] = new ReservationTracker[clients_per_query];
 
     for (int j = 0; j < clients_per_query; ++j) {
       int64_t initial_client_reservation =
@@ -148,13 +147,10 @@ void BufferPoolTest::RegisterQueriesAndClients(BufferPool* pool, int query_id_hi
           < initial_query_reservation % clients_per_query;
       // Reservation limit can be anything greater or equal to the initial reservation.
       int64_t client_reservation_limit = initial_client_reservation + rand() % 100000;
-      client_reservations[i][j].InitChildTracker(
-          NULL, query_reservation, NULL, client_reservation_limit);
-      EXPECT_TRUE(
-          client_reservations[i][j].IncreaseReservationToFit(initial_client_reservation));
       string name = Substitute("Client $0 for query $1", j, query_id);
-      EXPECT_OK(pool->RegisterClient(
-          name, &client_reservations[i][j], NULL, NewProfile(), &clients[i][j]));
+      EXPECT_OK(pool->RegisterClient(name, NULL, query_reservation, NULL,
+          client_reservation_limit, NewProfile(), &clients[i][j]));
+      EXPECT_TRUE(clients[i][j].IncreaseReservationToFit(initial_client_reservation));
     }
 
     for (int j = 0; j < clients_per_query; ++j) {
@@ -167,11 +163,9 @@ void BufferPoolTest::RegisterQueriesAndClients(BufferPool* pool, int query_id_hi
     for (int j = 0; j < clients_per_query; ++j) {
       pool->DeregisterClient(&clients[i][j]);
       ASSERT_FALSE(clients[i][j].is_registered());
-      client_reservations[i][j].Close();
     }
 
     delete[] clients[i];
-    delete[] client_reservations[i];
 
     GetQueryReservationTracker(QueryId(query_id_hi, i))->Close();
   }
@@ -234,12 +228,10 @@ TEST_F(BufferPoolTest, PageCreation) {
   int64_t total_mem = 2 * 2 * max_page_len;
   global_reservations_.InitRootTracker(NULL, total_mem);
   BufferPool pool(TEST_BUFFER_LEN, total_mem);
-  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
-  client_tracker->InitChildTracker(NewProfile(), &global_reservations_, NULL, total_mem);
-  ASSERT_TRUE(client_tracker->IncreaseReservation(total_mem));
   BufferPool::ClientHandle client;
-  ASSERT_OK(
-      pool.RegisterClient("test client", client_tracker, NULL, NewProfile(), &client));
+  ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL,
+      total_mem, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservation(total_mem));
 
   vector<BufferPool::PageHandle> handles(num_pages);
 
@@ -247,7 +239,7 @@ TEST_F(BufferPoolTest, PageCreation) {
   for (int i = 0; i < num_pages; ++i) {
     int size_multiple = 1 << i;
     int64_t page_len = TEST_BUFFER_LEN * size_multiple;
-    int64_t used_before = client_tracker->GetUsedReservation();
+    int64_t used_before = client.GetUsedReservation();
     ASSERT_OK(pool.CreatePage(&client, page_len, &handles[i]));
     ASSERT_TRUE(handles[i].is_open());
     ASSERT_TRUE(handles[i].is_pinned());
@@ -256,19 +248,18 @@ TEST_F(BufferPoolTest, PageCreation) {
     ASSERT_EQ(handles[i].buffer_handle()->data(), handles[i].data());
     ASSERT_EQ(handles[i].len(), page_len);
     ASSERT_EQ(handles[i].buffer_handle()->len(), page_len);
-    ASSERT_EQ(client_tracker->GetUsedReservation(), used_before + page_len);
+    ASSERT_EQ(client.GetUsedReservation(), used_before + page_len);
   }
 
   // Close the handles and check memory consumption.
   for (int i = 0; i < num_pages; ++i) {
-    int64_t used_before = client_tracker->GetUsedReservation();
+    int64_t used_before = client.GetUsedReservation();
     int page_len = handles[i].len();
     pool.DestroyPage(&client, &handles[i]);
-    ASSERT_EQ(client_tracker->GetUsedReservation(), used_before - page_len);
+    ASSERT_EQ(client.GetUsedReservation(), used_before - page_len);
   }
 
   pool.DeregisterClient(&client);
-  client_tracker->Close();
 
   // All the reservations should be released at this point.
   ASSERT_EQ(global_reservations_.GetReservation(), 0);
@@ -282,12 +273,10 @@ TEST_F(BufferPoolTest, BufferAllocation) {
   int64_t total_mem = 2 * 2 * max_buffer_len;
   global_reservations_.InitRootTracker(NULL, total_mem);
   BufferPool pool(TEST_BUFFER_LEN, total_mem);
-  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
-  client_tracker->InitChildTracker(NewProfile(), &global_reservations_, NULL, total_mem);
-  ASSERT_TRUE(client_tracker->IncreaseReservationToFit(total_mem));
   BufferPool::ClientHandle client;
-  ASSERT_OK(
-      pool.RegisterClient("test client", client_tracker, NULL, NewProfile(), &client));
+  ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL,
+      total_mem, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservationToFit(total_mem));
 
   vector<BufferPool::BufferHandle> handles(num_buffers);
 
@@ -295,24 +284,23 @@ TEST_F(BufferPoolTest, BufferAllocation) {
   for (int i = 0; i < num_buffers; ++i) {
     int size_multiple = 1 << i;
     int64_t buffer_len = TEST_BUFFER_LEN * size_multiple;
-    int64_t used_before = client_tracker->GetUsedReservation();
+    int64_t used_before = client.GetUsedReservation();
     ASSERT_OK(pool.AllocateBuffer(&client, buffer_len, &handles[i]));
     ASSERT_TRUE(handles[i].is_open());
     ASSERT_TRUE(handles[i].data() != NULL);
     ASSERT_EQ(handles[i].len(), buffer_len);
-    ASSERT_EQ(client_tracker->GetUsedReservation(), used_before + buffer_len);
+    ASSERT_EQ(client.GetUsedReservation(), used_before + buffer_len);
   }
 
   // Close the handles and check memory consumption.
   for (int i = 0; i < num_buffers; ++i) {
-    int64_t used_before = client_tracker->GetUsedReservation();
+    int64_t used_before = client.GetUsedReservation();
     int buffer_len = handles[i].len();
     pool.FreeBuffer(&client, &handles[i]);
-    ASSERT_EQ(client_tracker->GetUsedReservation(), used_before - buffer_len);
+    ASSERT_EQ(client.GetUsedReservation(), used_before - buffer_len);
   }
 
   pool.DeregisterClient(&client);
-  client_tracker->Close();
 
   // All the reservations should be released at this point.
   ASSERT_EQ(global_reservations_.GetReservation(), 0);
@@ -326,15 +314,12 @@ TEST_F(BufferPoolTest, BufferTransfer) {
   int64_t total_mem = num_clients * TEST_BUFFER_LEN;
   global_reservations_.InitRootTracker(NULL, total_mem);
   BufferPool pool(TEST_BUFFER_LEN, total_mem);
-  ReservationTracker client_trackers[num_clients];
   BufferPool::ClientHandle clients[num_clients];
   BufferPool::BufferHandle handles[num_clients];
   for (int i = 0; i < num_clients; ++i) {
-    client_trackers[i].InitChildTracker(
-        NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN);
-    ASSERT_TRUE(client_trackers[i].IncreaseReservationToFit(TEST_BUFFER_LEN));
-    ASSERT_OK(pool.RegisterClient(
-        "test client", &client_trackers[i], NULL, NewProfile(), &clients[i]));
+    ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL,
+        TEST_BUFFER_LEN, NewProfile(), &clients[i]));
+    ASSERT_TRUE(clients[i].IncreaseReservationToFit(TEST_BUFFER_LEN));
   }
 
   // Transfer the page around between the clients repeatedly in a circle.
@@ -347,19 +332,16 @@ TEST_F(BufferPoolTest, BufferTransfer) {
           &clients[next_client], &handles[next_client]));
       // Check that the transfer left things in a consistent state.
       ASSERT_FALSE(handles[client].is_open());
-      ASSERT_EQ(0, client_trackers[client].GetUsedReservation());
+      ASSERT_EQ(0, clients[client].GetUsedReservation());
       ASSERT_TRUE(handles[next_client].is_open());
-      ASSERT_EQ(TEST_BUFFER_LEN, client_trackers[next_client].GetUsedReservation());
+      ASSERT_EQ(TEST_BUFFER_LEN, clients[next_client].GetUsedReservation());
       // The same underlying buffer should be used.
       ASSERT_EQ(data, handles[next_client].data());
     }
   }
 
   pool.FreeBuffer(&clients[0], &handles[0]);
-  for (int i = 0; i < num_clients; ++i) {
-    pool.DeregisterClient(&clients[i]);
-    client_trackers[i].Close();
-  }
+  for (BufferPool::ClientHandle& client : clients) pool.DeregisterClient(&client);
   ASSERT_EQ(global_reservations_.GetReservation(), 0);
   global_reservations_.Close();
 }
@@ -371,13 +353,10 @@ TEST_F(BufferPoolTest, Pin) {
   int64_t child_reservation = TEST_BUFFER_LEN * 2;
   BufferPool pool(TEST_BUFFER_LEN, total_mem);
   global_reservations_.InitRootTracker(NULL, total_mem);
-  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
-  client_tracker->InitChildTracker(
-      NewProfile(), &global_reservations_, NULL, child_reservation);
-  ASSERT_TRUE(client_tracker->IncreaseReservationToFit(child_reservation));
   BufferPool::ClientHandle client;
-  ASSERT_OK(pool.RegisterClient(
-      "test client", client_tracker, NewFileGroup(), NewProfile(), &client));
+  ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
+      NULL, child_reservation, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservationToFit(child_reservation));
 
   BufferPool::PageHandle handle1, handle2;
 
@@ -416,7 +395,6 @@ TEST_F(BufferPoolTest, Pin) {
   pool.DestroyPage(&client, &double_handle);
 
   pool.DeregisterClient(&client);
-  client_tracker->Close();
 }
 
 /// Creating a page or pinning without sufficient reservation should DCHECK.
@@ -424,18 +402,15 @@ TEST_F(BufferPoolTest, PinWithoutReservation) {
   int64_t total_mem = TEST_BUFFER_LEN * 1024;
   BufferPool pool(TEST_BUFFER_LEN, total_mem);
   global_reservations_.InitRootTracker(NULL, total_mem);
-  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
-  client_tracker->InitChildTracker(
-      NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN);
   BufferPool::ClientHandle client;
-  ASSERT_OK(
-      pool.RegisterClient("test client", client_tracker, NULL, NewProfile(), &client));
+  ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL,
+      TEST_BUFFER_LEN, NewProfile(), &client));
 
   BufferPool::PageHandle handle;
   IMPALA_ASSERT_DEBUG_DEATH(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle), "");
 
   // Should succeed after increasing reservation.
-  ASSERT_TRUE(client_tracker->IncreaseReservationToFit(TEST_BUFFER_LEN));
+  ASSERT_TRUE(client.IncreaseReservationToFit(TEST_BUFFER_LEN));
   ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle));
 
   // But we can't pin again.
@@ -443,7 +418,6 @@ TEST_F(BufferPoolTest, PinWithoutReservation) {
 
   pool.DestroyPage(&client, &handle);
   pool.DeregisterClient(&client);
-  client_tracker->Close();
 }
 
 TEST_F(BufferPoolTest, ExtractBuffer) {
@@ -452,13 +426,10 @@ TEST_F(BufferPoolTest, ExtractBuffer) {
   int64_t child_reservation = TEST_BUFFER_LEN * 2;
   BufferPool pool(TEST_BUFFER_LEN, total_mem);
   global_reservations_.InitRootTracker(NULL, total_mem);
-  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
-  client_tracker->InitChildTracker(
-      NewProfile(), &global_reservations_, NULL, child_reservation);
-  ASSERT_TRUE(client_tracker->IncreaseReservationToFit(child_reservation));
   BufferPool::ClientHandle client;
-  ASSERT_OK(pool.RegisterClient(
-      "test client", client_tracker, NewFileGroup(), NewProfile(), &client));
+  ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
+      NULL, child_reservation, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservationToFit(child_reservation));
 
   BufferPool::PageHandle page;
   BufferPool::BufferHandle buffer;
@@ -472,24 +443,24 @@ TEST_F(BufferPoolTest, ExtractBuffer) {
     ASSERT_TRUE(buffer.is_open());
     ASSERT_EQ(len, buffer.len());
     ASSERT_EQ(page_data, buffer.data());
-    ASSERT_EQ(len, client_tracker->GetUsedReservation());
+    ASSERT_EQ(len, client.GetUsedReservation());
     pool.FreeBuffer(&client, &buffer);
-    ASSERT_EQ(0, client_tracker->GetUsedReservation());
+    ASSERT_EQ(0, client.GetUsedReservation());
   }
 
   // Test that ExtractBuffer() accounts correctly for pin count > 1.
   ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page));
   uint8_t* page_data = page.data();
   ASSERT_OK(pool.Pin(&client, &page));
-  ASSERT_EQ(TEST_BUFFER_LEN * 2, client_tracker->GetUsedReservation());
+  ASSERT_EQ(TEST_BUFFER_LEN * 2, client.GetUsedReservation());
   pool.ExtractBuffer(&client, &page, &buffer);
-  ASSERT_EQ(TEST_BUFFER_LEN, client_tracker->GetUsedReservation());
+  ASSERT_EQ(TEST_BUFFER_LEN, client.GetUsedReservation());
   ASSERT_FALSE(page.is_open());
   ASSERT_TRUE(buffer.is_open());
   ASSERT_EQ(TEST_BUFFER_LEN, buffer.len());
   ASSERT_EQ(page_data, buffer.data());
   pool.FreeBuffer(&client, &buffer);
-  ASSERT_EQ(0, client_tracker->GetUsedReservation());
+  ASSERT_EQ(0, client.GetUsedReservation());
 
   // Test that ExtractBuffer() DCHECKs for unpinned pages.
   ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &page));
@@ -498,7 +469,6 @@ TEST_F(BufferPoolTest, ExtractBuffer) {
   pool.DestroyPage(&client, &page);
 
   pool.DeregisterClient(&client);
-  client_tracker->Close();
 }
 
 // Test concurrent creation and destruction of pages.
@@ -534,22 +504,18 @@ TEST_F(BufferPoolTest, ConcurrentPageCreation) {
 
 void BufferPoolTest::CreatePageLoop(BufferPool* pool, TmpFileMgr::FileGroup* file_group,
     ReservationTracker* parent_tracker, int num_ops) {
-  ReservationTracker client_tracker;
-  client_tracker.InitChildTracker(NewProfile(), parent_tracker, NULL, TEST_BUFFER_LEN);
   BufferPool::ClientHandle client;
-  ASSERT_OK(pool->RegisterClient(
-      "test client", &client_tracker, file_group, NewProfile(), &client));
+  ASSERT_OK(pool->RegisterClient("test client", file_group, parent_tracker, NULL,
+      TEST_BUFFER_LEN, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservation(TEST_BUFFER_LEN));
   for (int i = 0; i < num_ops; ++i) {
     BufferPool::PageHandle handle;
-    ASSERT_TRUE(client_tracker.IncreaseReservation(TEST_BUFFER_LEN));
     ASSERT_OK(pool->CreatePage(&client, TEST_BUFFER_LEN, &handle));
     pool->Unpin(&client, &handle);
     ASSERT_OK(pool->Pin(&client, &handle));
     pool->DestroyPage(&client, &handle);
-    client_tracker.DecreaseReservation(TEST_BUFFER_LEN);
   }
   pool->DeregisterClient(&client);
-  client_tracker.Close();
 }
 
 /// Test that DCHECK fires when trying to unpin a page with spilling disabled.
@@ -559,9 +525,9 @@ TEST_F(BufferPoolTest, SpillingDisabledDcheck) {
   BufferPool::PageHandle handle;
 
   BufferPool::ClientHandle client;
-  ASSERT_OK(pool.RegisterClient(
-      "test client", &global_reservations_, NULL, NewProfile(), &client));
-  ASSERT_TRUE(global_reservations_.IncreaseReservation(2 * TEST_BUFFER_LEN));
+  ASSERT_OK(pool.RegisterClient("test client", NULL, &global_reservations_, NULL,
+      numeric_limits<int64_t>::max(), NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservation(2 * TEST_BUFFER_LEN));
   ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle));
 
   ASSERT_OK(pool.Pin(&client, &handle));
@@ -582,9 +548,9 @@ TEST_F(BufferPoolTest, EvictPageSameClient) {
   BufferPool::PageHandle handle1, handle2;
 
   BufferPool::ClientHandle client;
-  ASSERT_OK(pool.RegisterClient(
-      "test client", &global_reservations_, NewFileGroup(), NewProfile(), &client));
-  ASSERT_TRUE(global_reservations_.IncreaseReservation(TEST_BUFFER_LEN));
+  ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
+      NULL, TEST_BUFFER_LEN, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservation(TEST_BUFFER_LEN));
   ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1));
 
   // Do not have enough reservations because we pinned the page.
@@ -607,9 +573,9 @@ TEST_F(BufferPoolTest, EvictPageDifferentSizes) {
   BufferPool::PageHandle handle1, handle2;
 
   BufferPool::ClientHandle client;
-  ASSERT_OK(pool.RegisterClient(
-      "test client", &global_reservations_, NewFileGroup(), NewProfile(), &client));
-  ASSERT_TRUE(global_reservations_.IncreaseReservation(2 * TEST_BUFFER_LEN));
+  ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_,
+      NULL, TOTAL_BYTES, NewProfile(), &client));
+  ASSERT_TRUE(client.IncreaseReservation(2 * TEST_BUFFER_LEN));
   ASSERT_OK(pool.CreatePage(&client, TEST_BUFFER_LEN, &handle1));
   pool.Unpin(&client, &handle1);
 
@@ -635,14 +601,11 @@ TEST_F(BufferPoolTest, EvictPageDifferentClient) {
   global_reservations_.InitRootTracker(NULL, TOTAL_BYTES);
   BufferPool pool(TEST_BUFFER_LEN, TOTAL_BYTES);
 
-  ReservationTracker client_reservations[NUM_CLIENTS];
   BufferPool::ClientHandle clients[NUM_CLIENTS];
   for (int i = 0; i < NUM_CLIENTS; ++i) {
-    client_reservations[i].InitChildTracker(
-        NewProfile(), &global_reservations_, NULL, TEST_BUFFER_LEN);
-    ASSERT_TRUE(client_reservations[i].IncreaseReservation(TEST_BUFFER_LEN));
-    ASSERT_OK(pool.RegisterClient(Substitute("test client $0", i),
-        &client_reservations[i], NewFileGroup(), NewProfile(), &clients[i]));
+    ASSERT_OK(pool.RegisterClient(Substitute("test client $0", i), NewFileGroup(),
+        &global_reservations_, NULL, TEST_BUFFER_LEN, NewProfile(), &clients[i]));
+    ASSERT_TRUE(clients[i].IncreaseReservation(TEST_BUFFER_LEN));
   }
 
   // Create a pinned and unpinned page for the first client.
@@ -668,10 +631,7 @@ TEST_F(BufferPoolTest, EvictPageDifferentClient) {
   pool.DestroyPage(&clients[0], &handle1);
   pool.DestroyPage(&clients[0], &handle2);
   pool.FreeBuffer(&clients[1], &buffer);
-  for (int i = 0; i < NUM_CLIENTS; ++i) {
-    pool.DeregisterClient(&clients[i]);
-    client_reservations[i].Close();
-  }
+  for (BufferPool::ClientHandle& client : clients) pool.DeregisterClient(&client);
 }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/62894e32/be/src/runtime/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc
index 7a026d6..842501f 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -24,7 +24,6 @@
 #include "common/names.h"
 #include "gutil/strings/substitute.h"
 #include "runtime/bufferpool/buffer-allocator.h"
-#include "runtime/bufferpool/reservation-tracker.h"
 #include "util/bit-util.h"
 #include "util/runtime-profile-counters.h"
 #include "util/uid-util.h"
@@ -129,19 +128,19 @@ BufferPool::~BufferPool() {
   DCHECK_EQ(0, clean_pages_.size());
 }
 
-Status BufferPool::RegisterClient(const string& name, ReservationTracker* reservation,
-    TmpFileMgr::FileGroup* file_group, RuntimeProfile* profile, ClientHandle* client) {
+Status BufferPool::RegisterClient(const string& name, TmpFileMgr::FileGroup* file_group,
+    ReservationTracker* parent_reservation, MemTracker* mem_tracker,
+    int64_t reservation_limit, RuntimeProfile* profile, ClientHandle* client) {
   DCHECK(!client->is_registered());
-  DCHECK(reservation != NULL);
-  client->reservation_ = reservation;
-  client->impl_ = new Client(this, file_group, name, profile);
+  DCHECK(parent_reservation != NULL);
+  client->impl_ = new Client(this, file_group, name, parent_reservation, mem_tracker,
+      reservation_limit, profile);
   return Status::OK();
 }
 
 void BufferPool::DeregisterClient(ClientHandle* client) {
   if (!client->is_registered()) return;
-  client->reservation_->Close(); // Will DCHECK if any remaining buffers or pinned pages.
-  client->reservation_ = NULL;
+  client->impl_->Close(); // Will DCHECK if any remaining buffers or pinned pages.
   delete client->impl_; // Will DCHECK if there are any remaining pages.
   client->impl_ = NULL;
 }
@@ -190,7 +189,7 @@ Status BufferPool::Pin(ClientHandle* client, PageHandle* handle) {
   }
   // Update accounting last to avoid complicating the error return path above.
   ++page->pin_count;
-  client->reservation_->AllocateFrom(page->len);
+  client->impl_->reservation()->AllocateFrom(page->len);
   return Status::OK();
 }
 
@@ -201,10 +200,11 @@ void BufferPool::Unpin(ClientHandle* client, PageHandle* handle) {
   // If handle is pinned, we can assume that the page itself is pinned.
   DCHECK(handle->is_pinned());
   Page* page = handle->page_;
-  client->reservation_->ReleaseTo(page->len);
+  ReservationTracker* reservation = client->impl_->reservation();
+  reservation->ReleaseTo(page->len);
 
   if (--page->pin_count > 0) return;
-  client->impl_->MoveToDirtyUnpinned(client->reservation_->GetUnusedReservation(), page);
+  client->impl_->MoveToDirtyUnpinned(reservation->GetUnusedReservation(), page);
   COUNTER_ADD(client->impl_->counters().total_unpinned_bytes, handle->len());
   COUNTER_ADD(client->impl_->counters().peak_unpinned_bytes, handle->len());
 }
@@ -225,8 +225,9 @@ void BufferPool::ExtractBuffer(
 
 Status BufferPool::AllocateBuffer(
     ClientHandle* client, int64_t len, BufferHandle* handle) {
-  RETURN_IF_ERROR(client->impl_->CleanPagesBeforeAllocation(client->reservation_, len));
-  client->reservation_->AllocateFrom(len);
+  ReservationTracker* reservation = client->impl_->reservation();
+  RETURN_IF_ERROR(client->impl_->CleanPagesBeforeAllocation(reservation, len));
+  reservation->AllocateFrom(len);
   return AllocateBufferInternal(client, len, handle);
 }
 
@@ -258,7 +259,7 @@ Status BufferPool::AllocateBufferInternal(
 void BufferPool::FreeBuffer(ClientHandle* client, BufferHandle* handle) {
   if (!handle->is_open()) return; // Should be idempotent.
   DCHECK_EQ(client, handle->client_);
-  client->reservation_->ReleaseTo(handle->len_);
+  client->impl_->reservation()->ReleaseTo(handle->len_);
   FreeBufferInternal(handle);
 }
 
@@ -277,8 +278,8 @@ Status BufferPool::TransferBuffer(ClientHandle* src_client, BufferHandle* src,
   DCHECK_NE(src, dst);
   DCHECK_NE(src_client, dst_client);
 
-  dst_client->reservation_->AllocateFrom(src->len());
-  src_client->reservation_->ReleaseTo(src->len());
+  dst_client->impl_->reservation()->AllocateFrom(src->len());
+  src_client->impl_->reservation()->ReleaseTo(src->len());
   *dst = std::move(*src);
   dst->client_ = dst_client;
   return Status::OK();
@@ -348,14 +349,37 @@ Status BufferPool::EvictCleanPages(int64_t bytes_to_evict) {
   return Status::OK();
 }
 
+bool BufferPool::ClientHandle::IncreaseReservation(int64_t bytes) {
+  return impl_->reservation()->IncreaseReservation(bytes);
+}
+
+bool BufferPool::ClientHandle::IncreaseReservationToFit(int64_t bytes) {
+  return impl_->reservation()->IncreaseReservationToFit(bytes);
+}
+
+int64_t BufferPool::ClientHandle::GetReservation() const {
+  return impl_->reservation()->GetReservation();
+}
+
+int64_t BufferPool::ClientHandle::GetUsedReservation() const {
+  return impl_->reservation()->GetUsedReservation();
+}
+
+int64_t BufferPool::ClientHandle::GetUnusedReservation() const {
+  return impl_->reservation()->GetUnusedReservation();
+}
+
 BufferPool::Client::Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group,
-    const string& name, RuntimeProfile* profile)
+    const string& name, ReservationTracker* parent_reservation, MemTracker* mem_tracker,
+    int64_t reservation_limit, RuntimeProfile* profile)
   : pool_(pool),
     file_group_(file_group),
     name_(name),
     num_pages_(0),
     dirty_unpinned_bytes_(0),
     in_flight_write_bytes_(0) {
+  reservation_.InitChildTracker(
+      profile, parent_reservation, mem_tracker, reservation_limit);
   counters_.get_buffer_time = ADD_TIMER(profile, "BufferPoolGetBufferTime");
   counters_.read_wait_time = ADD_TIMER(profile, "BufferPoolReadIoWaitTime");
   counters_.read_io_ops = ADD_COUNTER(profile, "BufferPoolReadIoOps", TUnit::UNIT);
@@ -476,8 +500,8 @@ Status BufferPool::Client::MoveEvictedToPinned(
     unique_lock<mutex>* client_lock, ClientHandle* client, PageHandle* handle) {
   Page* page = handle->page_;
   DCHECK(!page->buffer.is_open());
-  RETURN_IF_ERROR(
-      CleanPagesBeforeAllocationLocked(client_lock, client->reservation_, page->len));
+  RETURN_IF_ERROR(CleanPagesBeforeAllocationLocked(
+      client_lock, client->impl_->reservation(), page->len));
 
   // Don't hold any locks while allocating or reading back the data. It is safe to modify
   // the page's buffer handle without holding any locks because no concurrent operations
@@ -620,9 +644,9 @@ string BufferPool::Client::DebugString() {
   lock_guard<mutex> lock(lock_);
   stringstream ss;
   ss << Substitute("<BufferPool::Client> $0 name: $1 write_status: $2 num_pages: $3 "
-                   "dirty_unpinned_bytes: $4 in_flight_write_bytes: $5",
+                   "dirty_unpinned_bytes: $4 in_flight_write_bytes: $5 reservation: {$6}",
       this, name_, write_status_.GetDetail(), num_pages_, dirty_unpinned_bytes_,
-      in_flight_write_bytes_);
+      in_flight_write_bytes_, reservation_.DebugString());
   ss << "\n  " << pinned_pages_.size() << " pinned pages: ";
   pinned_pages_.Iterate(bind<bool>(Page::DebugStringCallback, &ss, _1));
   ss << "\n  " << dirty_unpinned_pages_.size() << " dirty unpinned pages: ";
@@ -634,9 +658,8 @@ string BufferPool::Client::DebugString() {
 
 string BufferPool::ClientHandle::DebugString() const {
   if (is_registered()) {
-    return Substitute("<BufferPool::Client> $0 reservation: {$1} "
-                      "internal state: {$2}",
-        this, reservation_->DebugString(), impl_->DebugString());
+    return Substitute(
+        "<BufferPool::Client> $0 internal state: {$1}", this, impl_->DebugString());
   } else {
     return Substitute("<BufferPool::ClientHandle> $0 UNREGISTERED", this);
   }

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/62894e32/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h
index 4e62b3b..e27e645 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -161,17 +161,23 @@ class BufferPool : public CacheLineAligned {
 
   /// Register a client. Returns an error status and does not register the client if the
   /// arguments are invalid. 'name' is an arbitrary name used to identify the client in
-  /// any errors messages or logging. Counters for this client are added to the (non-NULL)
+  /// any errors messages or logging. If 'file_group' is non-NULL, it is used to allocate
+  /// scratch space to write unpinned pages to disk. If it is NULL, unpinning of pages is
+  /// not allowed for this client. Counters for this client are added to the (non-NULL)
   /// 'profile'. 'client' is the client to register. 'client' should not already be
-  /// registered. If 'file_group' is non-NULL, it is used to allocate scratch space to
-  /// write unpinned pages to disk. If it is NULL, unpinning of pages is not allowed for
-  /// this client.
-  Status RegisterClient(const std::string& name, ReservationTracker* reservation,
-      TmpFileMgr::FileGroup* file_group, RuntimeProfile* profile,
+  /// registered.
+  ///
+  /// The client's reservation is created as a child of 'parent_reservation' with limit
+  /// 'reservation_limit' and associated with MemTracker 'mem_tracker'. The initial
+  /// reservation is 0 bytes.
+  Status RegisterClient(const std::string& name, TmpFileMgr::FileGroup* file_group,
+      ReservationTracker* parent_reservation, MemTracker* mem_tracker,
+      int64_t reservation_limit, RuntimeProfile* profile,
       ClientHandle* client) WARN_UNUSED_RESULT;
 
   /// Deregister 'client' if it is registered. All pages must be destroyed and buffers
-  /// must be freed for the client before calling this. Idempotent.
+  /// must be freed for the client before calling this. Releases any reservation that
+  /// belongs to the client. Idempotent.
   void DeregisterClient(ClientHandle* client);
 
   /// Create a new page of 'len' bytes with pin count 1. 'len' must be a page length
@@ -316,12 +322,27 @@ class BufferPool : public CacheLineAligned {
 /// Client methods or BufferPool methods with the Client as an argument is not supported.
 class BufferPool::ClientHandle {
  public:
-  ClientHandle() : reservation_(NULL) {}
+  ClientHandle() : impl_(NULL) {}
   /// Client must be deregistered.
   ~ClientHandle() { DCHECK(!is_registered()); }
 
-  bool is_registered() const { return reservation_ != NULL; }
-  ReservationTracker* reservation() { return reservation_; }
+  /// Request to increase reservation for this client by 'bytes' by calling
+  /// ReservationTracker::IncreaseReservation(). Returns true if the reservation was
+  /// successfully increased.
+  bool IncreaseReservation(int64_t bytes) WARN_UNUSED_RESULT;
+
+  /// Tries to ensure that 'bytes' of unused reservation is available for this client
+  /// to use by calling ReservationTracker::IncreaseReservationToFit(). Returns true
+  /// if successful, after which 'bytes' can be used.
+  bool IncreaseReservationToFit(int64_t bytes) WARN_UNUSED_RESULT;
+
+  /// Accessors for this client's reservation corresponding to the identically-named
+  /// methods in ReservationTracker.
+  int64_t GetReservation() const;
+  int64_t GetUsedReservation() const;
+  int64_t GetUnusedReservation() const;
+
+  bool is_registered() const { return impl_ != NULL; }
 
   std::string DebugString() const;
 
@@ -329,11 +350,8 @@ class BufferPool::ClientHandle {
   friend class BufferPool;
   DISALLOW_COPY_AND_ASSIGN(ClientHandle);
 
-  /// The reservation tracker for the client. NULL means the client isn't registered.
-  /// All pages pinned by the client count as usage against 'reservation_'.
-  ReservationTracker* reservation_;
-
-  /// Internal state for the client. Owned by BufferPool.
+  /// Internal state for the client. NULL means the client isn't registered.
+  /// Owned by BufferPool.
   Client* impl_;
 };
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/62894e32/be/src/runtime/bufferpool/reservation-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker.h b/be/src/runtime/bufferpool/reservation-tracker.h
index 9edd37f..3d40fcc 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.h
+++ b/be/src/runtime/bufferpool/reservation-tracker.h
@@ -108,6 +108,8 @@ class ReservationTracker {
   /// If the tracker is initialized, deregister the ReservationTracker from its parent,
   /// relinquishing all this tracker's reservation. All of the reservation must be unused
   /// and all the tracker's children must be closed before calling this method.
+  /// TODO: decide on and implement policy for how far to release the reservation up
+  /// the tree. Currently the reservation is released all the way to the root.
   void Close();
 
   /// Request to increase reservation by 'bytes'. The request is either granted in

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/62894e32/be/src/runtime/bufferpool/suballocator-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/suballocator-test.cc b/be/src/runtime/bufferpool/suballocator-test.cc
index 01f0ea6..13bc597 100644
--- a/be/src/runtime/bufferpool/suballocator-test.cc
+++ b/be/src/runtime/bufferpool/suballocator-test.cc
@@ -17,6 +17,7 @@
 
 #include <algorithm>
 #include <cstdlib>
+#include <limits>
 #include <random>
 #include <string>
 #include <vector>
@@ -85,11 +86,11 @@ class SuballocatorTest : public ::testing::Test {
   /// Register a client with 'buffer_pool_'. The client is automatically deregistered
   /// and freed at the end of the test.
   void RegisterClient(
-      ReservationTracker* reservation, BufferPool::ClientHandle** client) {
+      ReservationTracker* parent_reservation, BufferPool::ClientHandle** client) {
     clients_.push_back(make_unique<BufferPool::ClientHandle>());
     *client = clients_.back().get();
-    ASSERT_OK(buffer_pool_->RegisterClient(
-        "test client", reservation, NULL, profile(), *client));
+    ASSERT_OK(buffer_pool_->RegisterClient("test client", NULL, parent_reservation, NULL,
+        numeric_limits<int64_t>::max(), profile(), *client));
   }
 
   /// Assert that the memory for all of the suballocations is writable and disjoint by
@@ -104,8 +105,8 @@ class SuballocatorTest : public ::testing::Test {
     allocs->clear();
   }
 
-  static void ExpectReservationUnused(ReservationTracker& reservation) {
-    EXPECT_EQ(reservation.GetUsedReservation(), 0) << reservation.DebugString();
+  static void ExpectReservationUnused(BufferPool::ClientHandle* client) {
+    EXPECT_EQ(client->GetUsedReservation(), 0) << client->DebugString();
   }
 
   RuntimeProfile* profile() { return profile_.get(); }
@@ -165,10 +166,10 @@ TEST_F(SuballocatorTest, SameSizeAllocations) {
   AssertMemoryValid(allocs);
 
   // Check that reservation usage matches the amount allocated.
-  EXPECT_EQ(global_reservation_.GetUsedReservation(), allocated_mem)
+  EXPECT_EQ(client->GetUsedReservation(), allocated_mem)
       << global_reservation_.DebugString();
   FreeAllocations(&allocator, &allocs);
-  ExpectReservationUnused(global_reservation_);
+  ExpectReservationUnused(client);
 }
 
 /// Check behaviour of zero-length allocation.
@@ -185,7 +186,7 @@ TEST_F(SuballocatorTest, ZeroLengthAllocation) {
   ASSERT_TRUE(alloc != nullptr) << global_reservation_.DebugString();
   EXPECT_EQ(alloc->len(), Suballocator::MIN_ALLOCATION_BYTES);
   allocator.Free(move(alloc));
-  ExpectReservationUnused(global_reservation_);
+  ExpectReservationUnused(client);
 }
 
 /// Check behaviour of out-of-range allocation.
@@ -203,7 +204,7 @@ TEST_F(SuballocatorTest, OutOfRangeAllocations) {
   // Too-large allocations fail gracefully.
   ASSERT_FALSE(allocator.Allocate(Suballocator::MAX_ALLOCATION_BYTES + 1, &alloc).ok())
       << global_reservation_.DebugString();
-  ExpectReservationUnused(global_reservation_);
+  ExpectReservationUnused(client);
 }
 
 /// Basic test to make sure that non-power-of-two suballocations are handled as expected
@@ -235,14 +236,13 @@ TEST_F(SuballocatorTest, NonPowerOfTwoAllocations) {
     // Check that it was rounded up to a power-of-two.
     EXPECT_EQ(alloc->len(), max(Suballocator::MIN_ALLOCATION_BYTES,
                                 BitUtil::RoundUpToPowerOfTwo(alloc_size)));
-    EXPECT_EQ(
-        max(TEST_BUFFER_LEN, alloc->len()), global_reservation_.GetUsedReservation())
+    EXPECT_EQ(max(TEST_BUFFER_LEN, alloc->len()), client->GetUsedReservation())
         << global_reservation_.DebugString();
     memset(alloc->data(), 0, alloc->len()); // Check memory is writable.
 
     allocator.Free(move(alloc));
   }
-  ExpectReservationUnused(global_reservation_);
+  ExpectReservationUnused(client);
 }
 
 /// Test that simulates hash table's patterns of doubling suballocations and validates
@@ -292,12 +292,12 @@ TEST_F(SuballocatorTest, DoublingAllocations) {
     // coalesced two buddies of curr_alloc_size / 2) and one buffer with only
     // 'curr_alloc_size' bytes in use (if an Allocate() call couldn't recycle memory and
     // had to allocate a new buffer).
-    EXPECT_LE(global_reservation_.GetUsedReservation(),
+    EXPECT_LE(client->GetUsedReservation(),
         TEST_BUFFER_LEN + max(TEST_BUFFER_LEN, curr_alloc_size * NUM_ALLOCS));
   }
   // Check that reservation usage behaves as expected.
   FreeAllocations(&allocator, &allocs);
-  ExpectReservationUnused(global_reservation_);
+  ExpectReservationUnused(client);
 }
 
 /// Do some randomised testing of the allocator. Simulate some interesting patterns with
@@ -352,7 +352,7 @@ TEST_F(SuballocatorTest, RandomAllocations) {
   }
   // Check that memory is released when suballocations are freed.
   FreeAllocations(&allocator, &allocs);
-  ExpectReservationUnused(global_reservation_);
+  ExpectReservationUnused(client);
 }
 
 void SuballocatorTest::AssertMemoryValid(

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/62894e32/be/src/runtime/bufferpool/suballocator.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/suballocator.cc b/be/src/runtime/bufferpool/suballocator.cc
index a4835a4..a6ab1ff 100644
--- a/be/src/runtime/bufferpool/suballocator.cc
+++ b/be/src/runtime/bufferpool/suballocator.cc
@@ -89,7 +89,7 @@ int Suballocator::ComputeListIndex(int64_t bytes) const {
 Status Suballocator::AllocateBuffer(int64_t bytes, unique_ptr<Suballocation>* result) {
   DCHECK_LE(bytes, MAX_ALLOCATION_BYTES);
   const int64_t buffer_len = max(min_buffer_len_, BitUtil::RoundUpToPowerOfTwo(bytes));
-  if (!client_->reservation()->IncreaseReservationToFit(buffer_len)) {
+  if (!client_->IncreaseReservationToFit(buffer_len)) {
     *result = nullptr;
     return Status::OK();
   }



[2/3] incubator-impala git commit: Remove key-normalizer*.h which appear to be dead code at this point.

Posted by ta...@apache.org.
Remove key-normalizer*.h which appear to be dead code at this point.

Testing done: compiled Impala.

Change-Id: If890c2a2589148db38ea333a518dc1368dcd5459
Reviewed-on: http://gerrit.cloudera.org:8080/6377
Tested-by: Impala Public Jenkins
Reviewed-by: Michael Ho <kw...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/87e95f80
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/87e95f80
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/87e95f80

Branch: refs/heads/master
Commit: 87e95f804e2760fea0407e9fc75f3966a6e51ac9
Parents: 5a333c4
Author: Michael Ho <kw...@cloudera.com>
Authored: Mon Mar 13 19:20:27 2017 -0700
Committer: Michael Ho <kw...@cloudera.com>
Committed: Wed Mar 15 05:11:15 2017 +0000

----------------------------------------------------------------------
 be/src/util/key-normalizer.h        | 111 ------------------
 be/src/util/key-normalizer.inline.h | 188 -------------------------------
 2 files changed, 299 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/87e95f80/be/src/util/key-normalizer.h
----------------------------------------------------------------------
diff --git a/be/src/util/key-normalizer.h b/be/src/util/key-normalizer.h
deleted file mode 100644
index 3c9c30f..0000000
--- a/be/src/util/key-normalizer.h
+++ /dev/null
@@ -1,111 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_UTIL_KEY_NORMALIZER_H_
-#define IMPALA_UTIL_KEY_NORMALIZER_H_
-
-#include "exprs/expr.h"
-
-namespace impala {
-
-/// Provides support for normalizing Impala expr values into a memcmp-able,
-/// fixed-length format.
-//
-/// To normalize a key, we first write a null byte (0 if nulls_first, 1 otw),
-/// followed by the normalized form of the key. We invert the bytes of the key (excluding
-/// the null byte) if the key should be sorted in descending order. Further, for any
-/// multi-byte data types, we ensure that the most significant byte is first by
-/// converting to big endian.
-//
-/// In addition to inverting descending keys and converting to big endian, here is how
-/// we normalize specific types:
-/// Integers:
-///     Invert the sign bit.
-/// Floats:
-///     Write out the inverted sign bit, followed by the exponent, followed by
-///     the fraction. If the float is negative, though, we need to invert both the exponent
-///     and fraction (since smaller number means greater actual value when negative).
-///     Conveniently, IEEE floating point numbers are already in the correct order.
-/// Timestamps:
-///     32 bits for date: 23 bits for year, 4 bits for month, and 5 bits for day.
-///     64 bits for time of day in nanoseconds.
-///     All numbers assumed unsigned.
-/// Strings:
-///     Write one character at a time with a null byte at the end (inverted if
-///     sort descending). Unlike other data types, we may write partial strings.
-///     NOTE: This assumes strings do not contain null characters.
-/// Booleans/Nulls:
-///     Left as-is.
-//
-/// Finally, we pad any remaining bytes of the key with zeroes.
-class KeyNormalizer {
- public:
-  /// Initializes the normalizer with the key exprs and length alloted to each normalized
-  /// key.
-  KeyNormalizer(const std::vector<ExprContext*>& key_exprs_ctxs, int key_len,
-      const std::vector<bool>& is_asc, const std::vector<bool>& nulls_first)
-      : key_expr_ctxs_(key_expr_ctxs), key_len_(key_len), is_asc_(is_asc),
-        nulls_first_(nulls_first) {
-  }
-
-  /// Normalizes all keys and writes the value into dst.
-  /// Returns true if we went over the max key size while writing the key.
-  /// If the return value is true, then key_idx_over_budget will be set to
-  /// the index of the key expr which went over.
-  /// TODO: Handle non-nullable columns
-  bool NormalizeKey(TupleRow* tuple_row, uint8_t* dst, int* key_idx_over_budget = NULL);
-
- private:
-  /// Returns true if we went over the max key size while writing the null bit.
-  static bool WriteNullBit(uint8_t null_bit, uint8_t* value, uint8_t* dst,
-      int* bytes_left);
-
-  /// Stores the given value in the memory address given by dst, after
-  /// converting to big endian and inverting the value if the sort is descending.
-  /// Copy of 'value' intentional, we don't want to modify original.
-  template <typename ValueType>
-  static void StoreFinalValue(ValueType value, void* dst, bool is_asc);
-
-  template <typename IntType>
-  static void NormalizeInt(void* src, void* dst, bool is_asc);
-
-  /// ResultType should be an integer type of the same size as FloatType, used
-  /// to examine the bytes of the float.
-  template <typename FloatType, typename ResultType>
-  static void NormalizeFloat(void* src, void* dst, bool is_asc);
-
-  static void NormalizeTimestamp(uint8_t* src, uint8_t* dst, bool is_asc);
-
-  /// Normalizes a sort key value and writes it to dst.
-  /// Updates bytes_left and returns true if we went over the max key size.
-  static bool WriteNormalizedKey(const ColumnType& type, bool is_asc,
-      uint8_t* value, uint8_t* dst, int* bytes_left);
-
-  /// Normalizes a column by writing a NULL byte and then the normalized value.
-  /// Updates bytes_left and returns true if we went over the max key size.
-  static bool NormalizeKeyColumn(const ColumnType& type, uint8_t null_bit, bool is_asc,
-      uint8_t* value, uint8_t* dst, int* bytes_left);
-
-  std::vector<ExprContext*> key_expr_ctxs_;
-  int key_len_;
-  std::vector<bool> is_asc_;
-  std::vector<bool> nulls_first_;
-};
-
-}
-
-#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/87e95f80/be/src/util/key-normalizer.inline.h
----------------------------------------------------------------------
diff --git a/be/src/util/key-normalizer.inline.h b/be/src/util/key-normalizer.inline.h
deleted file mode 100644
index 761dd5c..0000000
--- a/be/src/util/key-normalizer.inline.h
+++ /dev/null
@@ -1,188 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef IMPALA_UTIL_KEY_NORMALIZER_INLINE_H_
-#define IMPALA_UTIL_KEY_NORMALIZER_INLINE_H_
-
-#include "util/key-normalizer.h"
-
-#include <boost/date_time/gregorian/gregorian_types.hpp>
-
-#include "runtime/descriptors.h"
-#include "runtime/string-value.h"
-#include "runtime/timestamp-value.h"
-#include "util/bit-util.h"
-
-namespace impala {
-
-inline bool KeyNormalizer::WriteNullBit(uint8_t null_bit, uint8_t* value, uint8_t* dst,
-    int* bytes_left) {
-  // If there's not enough space for the null byte, return.
-  if (*bytes_left < 1) return true;
-  *dst = (value == NULL ? null_bit : !null_bit);
-  --*bytes_left;
-  return false;
-}
-
-template <typename ValueType>
-inline void KeyNormalizer::StoreFinalValue(ValueType value, void* dst, bool is_asc) {
-  if (sizeof(ValueType) > 1) value = BitUtil::ToBigEndian(value);
-  if (!is_asc) value = ~value;
-  memcpy(dst, &value, sizeof(ValueType));
-}
-
-template <typename IntType>
-inline void KeyNormalizer::NormalizeInt(void* src, void* dst, bool is_asc) {
-  const int num_bits = 8 * sizeof(IntType);
-  IntType sign_bit = (1LL << (num_bits - 1));
-
-  IntType value = *(reinterpret_cast<IntType*>(src));
-  value = (sign_bit ^ value);
-  StoreFinalValue<IntType>(value, dst, is_asc);
-}
-
-template <typename FloatType, typename ResultType>
-inline void KeyNormalizer::NormalizeFloat(void* src, void* dst, bool is_asc) {
-  DCHECK_EQ(sizeof(FloatType), sizeof(ResultType));
-
-  const int num_bits = 8 * sizeof(FloatType);
-  const ResultType sign_bit = (1LL << (num_bits - 1));
-
-  ResultType value = *(reinterpret_cast<ResultType*>(src));
-  if (value & sign_bit) {
-    // If the sign is negative, we'll end up inverting the whole thing.
-    value = ~value;
-  } else {
-    // Otherwise, just invert the sign bit.
-    value = (sign_bit ^ value);
-  }
-  StoreFinalValue<ResultType>(value, dst, is_asc);
-}
-
-inline void KeyNormalizer::NormalizeTimestamp(uint8_t* src, uint8_t* dst, bool is_asc) {
-  TimestampValue timestamp = *(reinterpret_cast<TimestampValue*>(src));
-
-  // Need 5 bits for day and 4 bits for month. Rest given to year.
-  boost::gregorian::date::ymd_type ymd = timestamp.date().year_month_day();
-  uint32_t date = ymd.day | (ymd.month << 5) | (ymd.year << 9);
-  StoreFinalValue<uint32_t>(date, dst, is_asc);
-
-  // Write time of day in nanoseconds in the next slot.
-  uint64_t time_ns = timestamp.time_of_day().total_nanoseconds();
-  StoreFinalValue<uint64_t>(time_ns, dst + sizeof(date), is_asc);
-}
-
-inline bool KeyNormalizer::WriteNormalizedKey(const ColumnType& type, bool is_asc,
-    uint8_t* value, uint8_t* dst, int* bytes_left) {
-  // Expend bytes_left or fail if we don't have enough.
-  // Variable-length data types (i.e., strings) account for themselves.
-  int byte_size = type.GetByteSize();
-  if (byte_size != 0) {
-    if (*bytes_left >= byte_size) {
-      *bytes_left -= byte_size;
-    } else {
-      return true;
-    }
-  }
-
-  switch(type.type) {
-    case TYPE_BIGINT:
-      NormalizeInt<int64_t>(value, dst, is_asc);
-      break;
-    case TYPE_INT:
-      NormalizeInt<int32_t>(value, dst, is_asc);
-      break;
-    case TYPE_SMALLINT:
-      NormalizeInt<int16_t>(value, dst, is_asc);
-      break;
-    case TYPE_TINYINT:
-      NormalizeInt<int8_t>(value, dst, is_asc);
-      break;
-
-    case TYPE_DOUBLE:
-      NormalizeFloat<double, uint64_t>(value, dst, is_asc);
-      break;
-    case TYPE_FLOAT:
-      NormalizeFloat<float, uint32_t>(value, dst, is_asc);
-      break;
-
-    case TYPE_TIMESTAMP:
-      NormalizeTimestamp(value, dst, is_asc);
-      break;
-
-    case TYPE_STRING:
-    case TYPE_VARCHAR: {
-      StringValue* string_val = reinterpret_cast<StringValue*>(value);
-
-      // Copy the string over, with an additional NULL at the end.
-      int size = std::min(string_val->len, *bytes_left);
-      for (int i = 0; i < size; ++i) {
-        StoreFinalValue<uint8_t>(string_val->ptr[i], dst + i, is_asc);
-      }
-      *bytes_left -= size;
-
-      if (*bytes_left == 0) return true;
-
-      StoreFinalValue<uint8_t>(0, dst + size, is_asc);
-      --*bytes_left;
-      return false;
-    }
-
-    case TYPE_BOOLEAN:
-      StoreFinalValue<uint8_t>(*reinterpret_cast<uint8_t*>(value), dst, is_asc);
-      break;
-    case TYPE_NULL:
-      StoreFinalValue<uint8_t>(0, dst, is_asc);
-      break;
-    default:
-      DCHECK(false) << "Value type not supported for normalization";
-  }
-
-  return false;
-}
-
-inline bool KeyNormalizer::NormalizeKeyColumn(const ColumnType& type, uint8_t null_bit,
-    bool is_asc, uint8_t* value, uint8_t* dst, int* bytes_left) {
-  bool went_over = WriteNullBit(null_bit, value, dst, bytes_left);
-  if (went_over || value == NULL) return went_over;
-  return WriteNormalizedKey(type, is_asc, value, dst + 1, bytes_left);
-}
-
-inline bool KeyNormalizer::NormalizeKey(TupleRow* row, uint8_t* dst,
-    int* key_idx_over_budget) {
-  int bytes_left = key_len_;
-  for (int i = 0; i < key_expr_ctxs_.size(); ++i) {
-    uint8_t* key = reinterpret_cast<uint8_t*>(key_expr_ctxs_[i]->GetValue(row));
-    int offset = key_len_ - bytes_left;
-    bool went_over = NormalizeKeyColumn(key_expr_ctxs_[i]->root()->type(),
-        !nulls_first_[i], is_asc_[i], key, dst + offset, &bytes_left);
-    if (went_over) {
-      if (key_idx_over_budget != NULL) *key_idx_over_budget = i;
-      return true;
-    }
-  }
-
-  // Zero out any unused bytes of the sort key.
-  int offset = key_len_ - bytes_left;
-  bzero(dst + offset, bytes_left);
-
-  return false;
-}
-
-}
-
-#endif