You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/09/11 04:21:41 UTC

incubator-impala git commit: IMPALA-3201: reservation implementation for new buffer pool

Repository: incubator-impala
Updated Branches:
  refs/heads/master 218019e59 -> f9ac593ff


IMPALA-3201: reservation implementation for new buffer pool

This patch implements the reservation bookkeeping logic for the new
buffer pool. The reservations are always guaranteed and any buffer/page
allocations require a reservation. Reservations are tracked via a
hierarchy of ReservationTrackers.

Eventually ReservationTrackers should become the main mechanism for
memory accounting, once all runtime memory is handled by the buffer
pool. In the meantime, query/process limits are enforced and memory
is reported through the preexisting MemTracker hierarchy.

Reservations are accounted as consumption against a MemTracker because
the memory is committed and cannot be used for other purposes. The
MemTracker then uses the counters from the ReservationTracker to log
information about buffer-pool memory down to the operator level.

Testing:
Includes basic tests for buffer pool client registration and various
reservation operations.

Change-Id: I35cc89e863efb4cc506657bfdaaaf633a10bbab6
Reviewed-on: http://gerrit.cloudera.org:8080/3993
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins


Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/f9ac593f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/f9ac593f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/f9ac593f

Branch: refs/heads/master
Commit: f9ac593ff9a9ea3a5b329eed618500c45a1e7400
Parents: 218019e
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Jun 22 18:24:04 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Sun Sep 11 00:36:12 2016 +0000

----------------------------------------------------------------------
 be/CMakeLists.txt                               |   5 +
 be/src/bufferpool/CMakeLists.txt                |  31 ++
 be/src/bufferpool/buffer-pool-test.cc           | 231 +++++++++++
 be/src/bufferpool/buffer-pool.cc                |  69 ++++
 be/src/bufferpool/buffer-pool.h                 |  34 +-
 .../bufferpool/reservation-tracker-counters.h   |  41 ++
 be/src/bufferpool/reservation-tracker-test.cc   | 385 +++++++++++++++++++
 be/src/bufferpool/reservation-tracker.cc        | 308 +++++++++++++++
 be/src/bufferpool/reservation-tracker.h         | 248 ++++++++++++
 be/src/runtime/mem-tracker.cc                   |  61 ++-
 be/src/runtime/mem-tracker.h                    |  12 +
 be/src/util/dummy-runtime-profile.h             |  39 ++
 be/src/util/runtime-profile-counters.h          |   1 -
 13 files changed, 1439 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 6aff0b3..2a46239 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -162,6 +162,7 @@ if (DOXYGEN_FOUND)
   # Possible to not input the subdirs one by one?
   set(CMAKE_DOXYGEN_INPUT
     ${CMAKE_SOURCE_DIR}/be/src
+    ${CMAKE_SOURCE_DIR}/be/src/bufferpool/
     ${CMAKE_SOURCE_DIR}/be/src/catalog/
     ${CMAKE_SOURCE_DIR}/be/src/common/
     ${CMAKE_SOURCE_DIR}/be/src/exec/
@@ -258,6 +259,7 @@ if (NOT APPLE)
 endif()
 set (IMPALA_LINK_LIBS
   ${WL_START_GROUP}
+  BufferPool
   Catalog
   CodeGen
   Common
@@ -283,6 +285,7 @@ set (IMPALA_LINK_LIBS
 # libraries when dynamic linking is enabled.
 if (BUILD_SHARED_LIBS)
   set (IMPALA_LINK_LIBS ${IMPALA_LINK_LIBS}
+    BufferPool
     Runtime
     Exec
     CodeGen
@@ -414,6 +417,7 @@ endfunction(COMPILE_TO_IR)
 add_subdirectory(src/gutil)
 
 # compile these subdirs using their own CMakeLists.txt
+add_subdirectory(src/bufferpool)
 add_subdirectory(src/catalog)
 add_subdirectory(src/codegen)
 add_subdirectory(src/common)
@@ -441,6 +445,7 @@ SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-variable")
 add_subdirectory(generated-sources/gen-cpp)
 
 link_directories(
+  ${CMAKE_CURRENT_SOURCE_DIR}/build/bufferpool
   ${CMAKE_CURRENT_SOURCE_DIR}/build/catalog
   ${CMAKE_CURRENT_SOURCE_DIR}/build/common
   ${CMAKE_CURRENT_SOURCE_DIR}/build/exec

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/bufferpool/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/CMakeLists.txt b/be/src/bufferpool/CMakeLists.txt
new file mode 100644
index 0000000..0930f73
--- /dev/null
+++ b/be/src/bufferpool/CMakeLists.txt
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http:#www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# where to put generated libraries
+set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/bufferpool")
+
+# where to put generated binaries
+set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/bufferpool")
+
+add_library(BufferPool
+  buffer-pool.cc
+  reservation-tracker.cc
+)
+add_dependencies(BufferPool thrift-deps)
+
+ADD_BE_TEST(buffer-pool-test)
+ADD_BE_TEST(reservation-tracker-test)

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/buffer-pool-test.cc b/be/src/bufferpool/buffer-pool-test.cc
new file mode 100644
index 0000000..cdb163d
--- /dev/null
+++ b/be/src/bufferpool/buffer-pool-test.cc
@@ -0,0 +1,231 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <boost/bind.hpp>
+#include <boost/scoped_ptr.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/unordered_map.hpp>
+#include <cstdlib>
+#include <string>
+#include <vector>
+
+#include "bufferpool/buffer-pool.h"
+#include "bufferpool/reservation-tracker.h"
+#include "common/init.h"
+#include "common/object-pool.h"
+#include "testutil/test-macros.h"
+
+#include "common/names.h"
+
+using strings::Substitute;
+
+namespace impala {
+
+class BufferPoolTest : public ::testing::Test {
+ public:
+  virtual void SetUp() {}
+
+  virtual void TearDown() {
+    for (auto entry : query_reservations_) {
+      ReservationTracker* tracker = entry.second;
+      tracker->Close();
+    }
+
+    global_reservations_.Close();
+    obj_pool_.Clear();
+  }
+
+  /// The minimum buffer size used in most tests.
+  const static int64_t TEST_PAGE_LEN = 1024;
+
+  /// Test helper to simulate registering then deregistering a number of queries with
+  /// the given initial reservation and reservation limit.
+  void RegisterQueriesAndClients(BufferPool* pool, int query_id_hi, int num_queries,
+      int64_t initial_query_reservation, int64_t query_reservation_limit);
+
+ protected:
+  static int64_t QueryId(int hi, int lo) { return static_cast<int64_t>(hi) << 32 | lo; }
+
+  /// Helper function to create one reservation tracker per query.
+  ReservationTracker* GetQueryReservationTracker(int64_t query_id) {
+    lock_guard<SpinLock> l(query_reservations_lock_);
+    ReservationTracker* tracker = query_reservations_[query_id];
+    if (tracker != NULL) return tracker;
+    tracker = obj_pool_.Add(new ReservationTracker());
+    query_reservations_[query_id] = tracker;
+    return tracker;
+  }
+
+  RuntimeProfile* NewProfile() {
+    return obj_pool_.Add(new RuntimeProfile(&obj_pool_, "test profile"));
+  }
+
+  ObjectPool obj_pool_;
+
+  ReservationTracker global_reservations_;
+
+  // Map from query_id to the reservation tracker for that query. Reads and modifications
+  // of the map are protected by query_reservations_lock_.
+  unordered_map<int64_t, ReservationTracker*> query_reservations_;
+  SpinLock query_reservations_lock_;
+};
+
+const int64_t BufferPoolTest::TEST_PAGE_LEN;
+
+void BufferPoolTest::RegisterQueriesAndClients(BufferPool* pool, int query_id_hi,
+    int num_queries, int64_t initial_query_reservation, int64_t query_reservation_limit) {
+  Status status;
+
+  int clients_per_query = 32;
+  BufferPool::Client* clients[num_queries];
+  ReservationTracker* client_reservations[num_queries];
+
+  for (int i = 0; i < num_queries; ++i) {
+    int64_t query_id = QueryId(query_id_hi, i);
+
+    // Initialize a tracker for a new query.
+    ReservationTracker* query_reservation = GetQueryReservationTracker(query_id);
+    query_reservation->InitChildTracker(
+        NULL, &global_reservations_, NULL, query_reservation_limit);
+
+    // Test that closing then reopening child tracker works.
+    query_reservation->Close();
+    query_reservation->InitChildTracker(
+        NULL, &global_reservations_, NULL, query_reservation_limit);
+    EXPECT_TRUE(query_reservation->IncreaseReservationToFit(initial_query_reservation));
+
+    clients[i] = new BufferPool::Client[clients_per_query];
+    client_reservations[i] = new ReservationTracker[clients_per_query];
+
+    for (int j = 0; j < clients_per_query; ++j) {
+      int64_t initial_client_reservation =
+          initial_query_reservation / clients_per_query + j
+          < initial_query_reservation % clients_per_query;
+      // Reservation limit can be anything greater or equal to the initial reservation.
+      int64_t client_reservation_limit = initial_client_reservation + rand() % 100000;
+      client_reservations[i][j].InitChildTracker(
+          NULL, query_reservation, NULL, client_reservation_limit);
+      EXPECT_TRUE(
+          client_reservations[i][j].IncreaseReservationToFit(initial_client_reservation));
+      string name = Substitute("Client $0 for query $1", j, query_id);
+      EXPECT_OK(pool->RegisterClient(name, &client_reservations[i][j], &clients[i][j]));
+    }
+
+    for (int j = 0; j < clients_per_query; ++j) {
+      ASSERT_TRUE(clients[i][j].is_registered());
+    }
+  }
+
+  // Deregister clients then query.
+  for (int i = 0; i < num_queries; ++i) {
+    for (int j = 0; j < clients_per_query; ++j) {
+      pool->DeregisterClient(&clients[i][j]);
+      ASSERT_FALSE(clients[i][j].is_registered());
+      client_reservations[i][j].Close();
+    }
+
+    delete[] clients[i];
+    delete[] client_reservations[i];
+
+    GetQueryReservationTracker(QueryId(query_id_hi, i))->Close();
+  }
+}
+
+/// Test that queries and clients can be registered and deregistered with the reservation
+/// trackers and the buffer pool.
+TEST_F(BufferPoolTest, BasicRegistration) {
+  int num_concurrent_queries = 1024;
+  int64_t sum_initial_reservations = 4;
+  int64_t reservation_limit = 1024;
+  // Need enough buffers for all initial reservations.
+  int64_t total_mem = sum_initial_reservations * num_concurrent_queries;
+  global_reservations_.InitRootTracker(NewProfile(), total_mem);
+
+  BufferPool pool(TEST_PAGE_LEN, total_mem);
+
+  RegisterQueriesAndClients(
+      &pool, 0, num_concurrent_queries, sum_initial_reservations, reservation_limit);
+
+  DCHECK_EQ(global_reservations_.GetUsedReservation(), 0);
+  DCHECK_EQ(global_reservations_.GetChildReservations(), 0);
+  DCHECK_EQ(global_reservations_.GetReservation(), 0);
+  global_reservations_.Close();
+}
+
+/// Test that queries and clients can be registered and deregistered by concurrent
+/// threads.
+TEST_F(BufferPoolTest, ConcurrentRegistration) {
+  int queries_per_thread = 64;
+  int num_threads = 64;
+  int num_concurrent_queries = queries_per_thread * num_threads;
+  int64_t sum_initial_reservations = 4;
+  int64_t reservation_limit = 1024;
+  // Need enough buffers for all initial reservations.
+  int64_t total_mem = num_concurrent_queries * sum_initial_reservations;
+  global_reservations_.InitRootTracker(NewProfile(), total_mem);
+
+  BufferPool pool(TEST_PAGE_LEN, total_mem);
+
+  // Launch threads, each with a different set of query IDs.
+  thread_group workers;
+  for (int i = 0; i < num_threads; ++i) {
+    workers.add_thread(new thread(bind(&BufferPoolTest::RegisterQueriesAndClients, this,
+        &pool, i, queries_per_thread, sum_initial_reservations, reservation_limit)));
+  }
+  workers.join_all();
+
+  // All the reservations should be released at this point.
+  DCHECK_EQ(global_reservations_.GetUsedReservation(), 0);
+  DCHECK_EQ(global_reservations_.GetReservation(), 0);
+  global_reservations_.Close();
+}
+
+/// Test that reservation setup fails if the initial buffers cannot be fulfilled.
+TEST_F(BufferPoolTest, QueryReservationsUnfulfilled) {
+  Status status;
+  int num_queries = 128;
+  int64_t reservation_per_query = 128;
+  // Won't be able to fulfill initial reservation for last query.
+  int64_t total_mem = num_queries * reservation_per_query - 1;
+  global_reservations_.InitRootTracker(NewProfile(), total_mem);
+
+  for (int i = 0; i < num_queries; ++i) {
+    ReservationTracker* query_tracker = GetQueryReservationTracker(i);
+    query_tracker->InitChildTracker(
+        NewProfile(), &global_reservations_, NULL, 2 * reservation_per_query);
+    bool got_initial_reservation =
+        query_tracker->IncreaseReservationToFit(reservation_per_query);
+    if (i < num_queries - 1) {
+      ASSERT_TRUE(got_initial_reservation);
+    } else {
+      ASSERT_FALSE(got_initial_reservation);
+
+      // Getting the initial reservation should succeed after freeing up buffers from
+      // other query.
+      GetQueryReservationTracker(i - 1)->Close();
+      ASSERT_TRUE(query_tracker->IncreaseReservationToFit(reservation_per_query));
+    }
+  }
+}
+}
+
+int main(int argc, char** argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  return RUN_ALL_TESTS();
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/buffer-pool.cc b/be/src/bufferpool/buffer-pool.cc
new file mode 100644
index 0000000..21893fc
--- /dev/null
+++ b/be/src/bufferpool/buffer-pool.cc
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "bufferpool/buffer-pool.h"
+
+#include <boost/bind.hpp>
+#include <limits>
+#include <sstream>
+
+#include "bufferpool/reservation-tracker.h"
+#include "common/names.h"
+#include "gutil/strings/substitute.h"
+#include "util/bit-util.h"
+#include "util/uid-util.h"
+
+using strings::Substitute;
+
+namespace impala {
+
+BufferPool::BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit)
+  : min_buffer_len_(min_buffer_len), buffer_bytes_limit_(buffer_bytes_limit) {
+  DCHECK_GT(min_buffer_len, 0);
+  DCHECK_EQ(min_buffer_len, BitUtil::RoundUpToPowerOfTwo(min_buffer_len));
+}
+
+BufferPool::~BufferPool() {}
+
+Status BufferPool::RegisterClient(
+    const string& name, ReservationTracker* reservation, Client* client) {
+  DCHECK(!client->is_registered());
+  DCHECK(reservation != NULL);
+  client->reservation_ = reservation;
+  client->name_ = name;
+  return Status::OK();
+}
+
+void BufferPool::DeregisterClient(Client* client) {
+  if (!client->is_registered()) return;
+  client->reservation_->Close();
+  client->name_.clear();
+  client->reservation_ = NULL;
+}
+
+string BufferPool::Client::DebugString() const {
+  return Substitute("<BufferPool::Client> $0 name: $1 reservation: $2", this, name_,
+      reservation_->DebugString());
+}
+
+string BufferPool::DebugString() {
+  stringstream ss;
+  ss << "<BufferPool> " << this << " min_buffer_len: " << min_buffer_len_
+     << "buffer_bytes_limit: " << buffer_bytes_limit_ << "\n";
+  return ss.str();
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/buffer-pool.h b/be/src/bufferpool/buffer-pool.h
index 183e538..e566543 100644
--- a/be/src/bufferpool/buffer-pool.h
+++ b/be/src/bufferpool/buffer-pool.h
@@ -22,7 +22,6 @@
 #include <string>
 #include <stdint.h>
 
-#include "bufferpool/buffer-allocator.h"
 #include "common/atomic.h"
 #include "common/status.h"
 #include "gutil/macros.h"
@@ -125,7 +124,7 @@ class ReservationTracker;
 ///   calling Unpin() on the page. The page may then be written to disk and its buffer
 ///   repurposed internally by BufferPool.
 /// * Once the operator needs the page's contents again and has sufficient unused
-///   reservations, it can call Pin(), which brings the page's contents back into memory,
+///   reservation, it can call Pin(), which brings the page's contents back into memory,
 ///   perhaps in a different buffer. Therefore the operator must fix up any pointers into
 ///   the previous buffer.
 /// * If the operator is done with the page, it can call FreeBuffer() to destroy the
@@ -157,10 +156,10 @@ class BufferPool {
   ~BufferPool();
 
   /// Register a client. Returns an error status and does not register the client if the
-  /// arguments are invalid. 'name' is an arbitrary used to identify the client in any
-  /// errors messages or logging. 'client' is the client to register. 'client' should not
-  /// already be registered.
-  Status RegisterClient(const std::string& name, ReservationTracker* reservations,
+  /// arguments are invalid. 'name' is an arbitrary name used to identify the client in
+  /// any errors messages or logging. 'client' is the client to register. 'client' should
+  /// not already be registered.
+  Status RegisterClient(const std::string& name, ReservationTracker* reservation,
       Client* client);
 
   /// Deregister 'client' if it is registered. Idempotent.
@@ -168,7 +167,7 @@ class BufferPool {
 
   /// Create a new page of 'len' bytes with pin count 1. 'len' must be a page length
   /// supported by BufferPool (see BufferPool class comment). The client must have
-  /// sufficient unused reservations to pin the new page (otherwise it will DCHECK).
+  /// sufficient unused reservation to pin the new page (otherwise it will DCHECK).
   /// CreatePage() only fails when a system error prevents the buffer pool from fulfilling
   /// the reservation.
   /// On success, the handle is mapped to the new page.
@@ -229,6 +228,13 @@ class BufferPool {
 
  private:
   DISALLOW_COPY_AND_ASSIGN(BufferPool);
+
+  /// The minimum length of a buffer in bytes. All buffers and pages are a multiple of
+  /// this length. This is always a power of two.
+  const int64_t min_buffer_len_;
+
+  /// The maximum physical memory in bytes that can be used for buffers.
+  const int64_t buffer_bytes_limit_;
 };
 
 /// External representation of a client of the BufferPool. Clients are used for
@@ -239,17 +245,25 @@ class BufferPool {
 /// Client methods or BufferPool methods with the Client as an argument is not supported.
 class BufferPool::Client {
  public:
-  Client() {}
+  Client() : reservation_(NULL) {}
   /// Client must be deregistered.
   ~Client() { DCHECK(!is_registered()); }
 
-  bool is_registered() const;
-  ReservationTracker* reservations();
+  bool is_registered() const { return reservation_ != NULL; }
+  ReservationTracker* reservation() { return reservation_; }
 
   std::string DebugString() const;
 
  private:
+  friend class BufferPool;
   DISALLOW_COPY_AND_ASSIGN(Client);
+
+  /// A name identifying the client.
+  std::string name_;
+
+  /// The reservation tracker for the client. NULL means the client isn't registered.
+  /// All pages pinned by the client count as usage against 'reservation_'.
+  ReservationTracker* reservation_;
 };
 
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/bufferpool/reservation-tracker-counters.h
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/reservation-tracker-counters.h b/be/src/bufferpool/reservation-tracker-counters.h
new file mode 100644
index 0000000..0952a2f
--- /dev/null
+++ b/be/src/bufferpool/reservation-tracker-counters.h
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RESERVATION_TRACKER_COUNTERS_H
+#define IMPALA_RESERVATION_TRACKER_COUNTERS_H
+
+#include "util/runtime-profile.h"
+
+namespace impala {
+
+/// A set of counters for each ReservationTracker for reporting purposes.
+///
+/// If the ReservationTracker is linked to a profile these have the same lifetime as that
+/// profile, otherwise they have the same lifetime as the ReservationTracker itself.
+struct ReservationTrackerCounters {
+  /// The tracker's peak reservation in bytes.
+  RuntimeProfile::HighWaterMarkCounter* peak_reservation;
+
+  /// The tracker's peak usage in bytes.
+  RuntimeProfile::HighWaterMarkCounter* peak_used_reservation;
+
+  /// The hard limit on the tracker's reservations
+  RuntimeProfile::Counter* reservation_limit;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/bufferpool/reservation-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/reservation-tracker-test.cc b/be/src/bufferpool/reservation-tracker-test.cc
new file mode 100644
index 0000000..e43efb8
--- /dev/null
+++ b/be/src/bufferpool/reservation-tracker-test.cc
@@ -0,0 +1,385 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <limits>
+#include <string>
+#include <vector>
+
+#include "bufferpool/reservation-tracker.h"
+#include "common/init.h"
+#include "common/object-pool.h"
+#include "runtime/mem-tracker.h"
+#include "testutil/test-macros.h"
+
+#include "common/names.h"
+
+using strings::Substitute;
+
+namespace impala {
+
+class ReservationTrackerTest : public ::testing::Test {
+ public:
+  virtual void SetUp() {}
+
+  virtual void TearDown() {
+    root_.Close();
+    obj_pool_.Clear();
+  }
+
+  /// The minimum allocation size used in most tests.
+  const static int64_t MIN_BUFFER_LEN = 1024;
+
+ protected:
+  RuntimeProfile* NewProfile() {
+    return obj_pool_.Add(new RuntimeProfile(&obj_pool_, "test profile"));
+  }
+
+  ObjectPool obj_pool_;
+
+  ReservationTracker root_;
+
+  scoped_ptr<RuntimeProfile> profile_;
+};
+
+const int64_t ReservationTrackerTest::MIN_BUFFER_LEN;
+
+TEST_F(ReservationTrackerTest, BasicSingleTracker) {
+  const int64_t limit = 16;
+  root_.InitRootTracker(NULL, limit);
+  ASSERT_EQ(0, root_.GetReservation());
+  ASSERT_EQ(0, root_.GetUsedReservation());
+
+  // Fail to increase reservation.
+  ASSERT_FALSE(root_.IncreaseReservation(limit + 1));
+  ASSERT_EQ(0, root_.GetReservation());
+  ASSERT_EQ(0, root_.GetUsedReservation());
+  ASSERT_EQ(0, root_.GetUnusedReservation());
+
+  // Successfully increase reservation.
+  ASSERT_TRUE(root_.IncreaseReservation(limit - 1));
+  ASSERT_EQ(limit - 1, root_.GetReservation());
+  ASSERT_EQ(0, root_.GetUsedReservation());
+  ASSERT_EQ(limit - 1, root_.GetUnusedReservation());
+
+  // Adjust usage.
+  root_.AllocateFrom(2);
+  ASSERT_EQ(limit - 1, root_.GetReservation());
+  ASSERT_EQ(2, root_.GetUsedReservation());
+  ASSERT_EQ(limit - 3, root_.GetUnusedReservation());
+  root_.ReleaseTo(1);
+  ASSERT_EQ(1, root_.GetUsedReservation());
+  root_.ReleaseTo(1);
+  ASSERT_EQ(0, root_.GetUsedReservation());
+  ASSERT_EQ(limit - 1, root_.GetReservation());
+  ASSERT_EQ(limit - 1, root_.GetUnusedReservation());
+}
+
+TEST_F(ReservationTrackerTest, BasicTwoLevel) {
+  const int64_t limit = 16;
+  root_.InitRootTracker(NULL, limit);
+
+  const int64_t root_reservation = limit / 2;
+  // Get half of the limit as an initial reservation.
+  ASSERT_TRUE(root_.IncreaseReservation(root_reservation));
+  ASSERT_EQ(root_reservation, root_.GetReservation());
+  ASSERT_EQ(root_reservation, root_.GetUnusedReservation());
+  ASSERT_EQ(0, root_.GetUsedReservation());
+  ASSERT_EQ(0, root_.GetChildReservations());
+
+  ReservationTracker child;
+  child.InitChildTracker(NULL, &root_, NULL, numeric_limits<int64_t>::max());
+
+  const int64_t child_reservation = root_reservation + 1;
+  // Get parent's reservation plus a bit more.
+  ASSERT_TRUE(child.IncreaseReservation(child_reservation));
+
+  ASSERT_EQ(child_reservation, root_.GetReservation());
+  ASSERT_EQ(0, root_.GetUnusedReservation());
+  ASSERT_EQ(0, root_.GetUsedReservation());
+  ASSERT_EQ(child_reservation, root_.GetChildReservations());
+
+  ASSERT_EQ(child_reservation, child.GetReservation());
+  ASSERT_EQ(child_reservation, child.GetUnusedReservation());
+  ASSERT_EQ(0, child.GetUsedReservation());
+  ASSERT_EQ(0, child.GetChildReservations());
+
+  // Check that child allocation is reflected correctly.
+  child.AllocateFrom(1);
+  ASSERT_EQ(child_reservation, child.GetReservation());
+  ASSERT_EQ(1, child.GetUsedReservation());
+  ASSERT_EQ(0, root_.GetUsedReservation());
+
+  // Check that parent reservation increase is reflected correctly.
+  ASSERT_TRUE(root_.IncreaseReservation(1));
+  ASSERT_EQ(child_reservation + 1, root_.GetReservation());
+  ASSERT_EQ(1, root_.GetUnusedReservation());
+  ASSERT_EQ(0, root_.GetUsedReservation());
+  ASSERT_EQ(child_reservation, root_.GetChildReservations());
+  ASSERT_EQ(child_reservation, child.GetReservation());
+
+  // Check that parent allocation is reflected correctly.
+  root_.AllocateFrom(1);
+  ASSERT_EQ(child_reservation + 1, root_.GetReservation());
+  ASSERT_EQ(0, root_.GetUnusedReservation());
+  ASSERT_EQ(1, root_.GetUsedReservation());
+
+  // Release allocations.
+  root_.ReleaseTo(1);
+  ASSERT_EQ(0, root_.GetUsedReservation());
+  child.ReleaseTo(1);
+  ASSERT_EQ(0, child.GetUsedReservation());
+
+  // Child reservation should be returned all the way up the tree.
+  child.DecreaseReservation(1);
+  ASSERT_EQ(child_reservation, root_.GetReservation());
+  ASSERT_EQ(child_reservation - 1, child.GetReservation());
+  ASSERT_EQ(child_reservation - 1, root_.GetChildReservations());
+
+  // Closing the child should release its reservation.
+  child.Close();
+  ASSERT_EQ(1, root_.GetReservation());
+  ASSERT_EQ(0, root_.GetChildReservations());
+}
+
+TEST_F(ReservationTrackerTest, CloseIdempotency) {
+  // Check we can close before opening.
+  root_.Close();
+
+  const int64_t limit = 16;
+  root_.InitRootTracker(NULL, limit);
+
+  // Check we can close twice
+  root_.Close();
+  root_.Close();
+}
+
+// Test that the tracker's reservation limit is enforced.
+TEST_F(ReservationTrackerTest, ReservationLimit) {
+  Status status;
+  // Setup trackers so that there is a spare buffer that the client isn't entitled to.
+  int64_t total_mem = MIN_BUFFER_LEN * 3;
+  int64_t client_limit = MIN_BUFFER_LEN * 2;
+  root_.InitRootTracker(NULL, total_mem);
+
+  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
+  client_tracker->InitChildTracker(NULL, &root_, NULL, client_limit);
+
+  // We can only increase reservation up to the client's limit.
+  ASSERT_FALSE(client_tracker->IncreaseReservation(client_limit + 1));
+  ASSERT_TRUE(client_tracker->IncreaseReservation(client_limit));
+  ASSERT_FALSE(client_tracker->IncreaseReservation(1));
+
+  client_tracker->Close();
+}
+
+// Test that parent's reservation limit is enforced.
+TEST_F(ReservationTrackerTest, ParentReservationLimit) {
+  Status status;
+  // Setup reservations so that there is a spare buffer.
+  int64_t total_mem = MIN_BUFFER_LEN * 4;
+  int64_t parent_limit = MIN_BUFFER_LEN * 3;
+  int64_t other_client_reservation = MIN_BUFFER_LEN;
+  root_.InitRootTracker(NULL, total_mem);
+
+  // The child reservation limit is higher than the parent reservation limit, so the
+  // parent limit is the effective limit.
+  ReservationTracker* query_tracker = obj_pool_.Add(new ReservationTracker());
+  ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
+  query_tracker->InitChildTracker(NULL, &root_, NULL, parent_limit);
+  client_tracker->InitChildTracker(NULL, query_tracker, NULL, total_mem * 10);
+
+  ReservationTracker* other_client_tracker = obj_pool_.Add(new ReservationTracker());
+  other_client_tracker->InitChildTracker(NULL, query_tracker, NULL, total_mem);
+  ASSERT_TRUE(other_client_tracker->IncreaseReservationToFit(other_client_reservation));
+  ASSERT_EQ(root_.GetUsedReservation(), 0);
+  ASSERT_EQ(root_.GetChildReservations(), other_client_reservation);
+  ASSERT_EQ(query_tracker->GetUsedReservation(), 0);
+  ASSERT_EQ(query_tracker->GetUnusedReservation(), 0);
+
+  // Can only increase reservation up to parent limit, excluding other reservations.
+  int64_t effective_limit = parent_limit - other_client_reservation;
+  ASSERT_FALSE(client_tracker->IncreaseReservation(effective_limit + MIN_BUFFER_LEN));
+  ASSERT_TRUE(client_tracker->IncreaseReservation(effective_limit));
+  ASSERT_FALSE(client_tracker->IncreaseReservation(MIN_BUFFER_LEN));
+
+  // Check that tracker hierarchy reports correct usage.
+  ASSERT_EQ(root_.GetUsedReservation(), 0);
+  ASSERT_EQ(root_.GetChildReservations(), parent_limit);
+  ASSERT_EQ(query_tracker->GetUsedReservation(), 0);
+  ASSERT_EQ(query_tracker->GetUnusedReservation(), 0);
+
+  client_tracker->Close();
+  other_client_tracker->Close();
+  query_tracker->Close();
+}
+
+/// Test integration of ReservationTracker with MemTracker.
+TEST_F(ReservationTrackerTest, MemTrackerIntegrationTwoLevel) {
+  // Setup a 2-level hierarchy of trackers. The child ReservationTracker is linked to
+  // the child MemTracker. We add various limits at different places to enable testing
+  // of different code paths.
+  root_.InitRootTracker(NewProfile(), MIN_BUFFER_LEN * 100);
+  MemTracker root_mem_tracker;
+  MemTracker child_mem_tracker1(-1, -1, "Child 1", &root_mem_tracker);
+  MemTracker child_mem_tracker2(MIN_BUFFER_LEN * 50, -1, "Child 2", &root_mem_tracker);
+  ReservationTracker child_reservations1, child_reservations2;
+  child_reservations1.InitChildTracker(
+      NewProfile(), &root_, &child_mem_tracker1, 500 * MIN_BUFFER_LEN);
+  child_reservations2.InitChildTracker(
+      NewProfile(), &root_, &child_mem_tracker2, 75 * MIN_BUFFER_LEN);
+
+  // Check that a single buffer reservation is accounted correctly.
+  ASSERT_TRUE(child_reservations1.IncreaseReservation(MIN_BUFFER_LEN));
+  ASSERT_EQ(MIN_BUFFER_LEN, child_reservations1.GetReservation());
+  ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker1.consumption());
+  ASSERT_EQ(MIN_BUFFER_LEN, root_mem_tracker.consumption());
+  ASSERT_EQ(MIN_BUFFER_LEN, root_.GetChildReservations());
+
+  // Check that a buffer reservation from the other child is accounted correctly.
+  ASSERT_TRUE(child_reservations2.IncreaseReservation(MIN_BUFFER_LEN));
+  ASSERT_EQ(MIN_BUFFER_LEN, child_reservations2.GetReservation());
+  ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker2.consumption());
+  ASSERT_EQ(2 * MIN_BUFFER_LEN, root_mem_tracker.consumption());
+  ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations());
+
+  // Check that hitting the MemTracker limit leaves things in a consistent state.
+  ASSERT_FALSE(child_reservations2.IncreaseReservation(MIN_BUFFER_LEN * 50));
+  ASSERT_EQ(MIN_BUFFER_LEN, child_reservations1.GetReservation());
+  ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker1.consumption());
+  ASSERT_EQ(MIN_BUFFER_LEN, child_reservations2.GetReservation());
+  ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker2.consumption());
+  ASSERT_EQ(2 * MIN_BUFFER_LEN, root_mem_tracker.consumption());
+  ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations());
+
+  // Check that hitting the ReservationTracker's local limit leaves things in a
+  // consistent state.
+  ASSERT_FALSE(child_reservations2.IncreaseReservation(MIN_BUFFER_LEN * 75));
+  ASSERT_EQ(MIN_BUFFER_LEN, child_reservations1.GetReservation());
+  ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker1.consumption());
+  ASSERT_EQ(MIN_BUFFER_LEN, child_reservations2.GetReservation());
+  ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker2.consumption());
+  ASSERT_EQ(2 * MIN_BUFFER_LEN, root_mem_tracker.consumption());
+  ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations());
+
+  // Check that hitting the ReservationTracker's parent's limit after the
+  // MemTracker consumption is incremented leaves things in a consistent state.
+  ASSERT_FALSE(child_reservations1.IncreaseReservation(MIN_BUFFER_LEN * 100));
+  ASSERT_EQ(MIN_BUFFER_LEN, child_reservations1.GetReservation());
+  ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker1.consumption());
+  ASSERT_EQ(MIN_BUFFER_LEN, child_reservations2.GetReservation());
+  ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker2.consumption());
+  ASSERT_EQ(2 * MIN_BUFFER_LEN, root_mem_tracker.consumption());
+  ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations());
+
+  // Check that released memory is decremented from all trackers correctly.
+  child_reservations1.DecreaseReservation(MIN_BUFFER_LEN);
+  child_reservations2.DecreaseReservation(MIN_BUFFER_LEN);
+  ASSERT_EQ(0, child_reservations2.GetReservation());
+  ASSERT_EQ(0, child_mem_tracker2.consumption());
+  ASSERT_EQ(0, root_mem_tracker.consumption());
+  ASSERT_EQ(0, root_.GetUsedReservation());
+
+  child_reservations1.Close();
+  child_reservations2.Close();
+  child_mem_tracker1.UnregisterFromParent();
+  child_mem_tracker2.UnregisterFromParent();
+}
+
+TEST_F(ReservationTrackerTest, MemTrackerIntegrationMultiLevel) {
+  const int HIERARCHY_DEPTH = 5;
+  // Setup a multi-level hierarchy of trackers and ensure that consumption is reported
+  // correctly.
+  ReservationTracker reservations[HIERARCHY_DEPTH];
+  scoped_ptr<MemTracker> mem_trackers[HIERARCHY_DEPTH];
+
+  // We can only handle MemTracker limits at the topmost linked ReservationTracker,
+  // so avoid adding limits at lower level.
+  const int LIMIT = HIERARCHY_DEPTH;
+  vector<int> mem_limits({LIMIT * 10, LIMIT, -1, -1, -1});
+
+  // Root trackers aren't linked.
+  mem_trackers[0].reset(new MemTracker(mem_limits[0]));
+  reservations[0].InitRootTracker(NewProfile(), 500);
+  for (int i = 1; i < HIERARCHY_DEPTH; ++i) {
+    mem_trackers[i].reset(new MemTracker(
+        mem_limits[i], -1, Substitute("Child $0", i), mem_trackers[i - 1].get()));
+    reservations[i].InitChildTracker(
+        NewProfile(), &reservations[i - 1], mem_trackers[i].get(), 500);
+  }
+
+  vector<int> interesting_amounts({LIMIT - 1, LIMIT, LIMIT + 1});
+
+  // Test that all limits and consumption correctly reported when consuming
+  // from a non-root ReservationTracker that is connected to a MemTracker.
+  for (int level = 1; level < HIERARCHY_DEPTH; ++level) {
+    int64_t lowest_limit = mem_trackers[level]->lowest_limit();
+    for (int amount : interesting_amounts) {
+      bool increased = reservations[level].IncreaseReservation(amount);
+      if (lowest_limit == -1 || amount <= lowest_limit) {
+        // The increase should go through.
+        ASSERT_TRUE(increased) << reservations[level].DebugString();
+        ASSERT_EQ(amount, reservations[level].GetReservation());
+        ASSERT_EQ(amount, mem_trackers[level]->consumption());
+        for (int ancestor = 0; ancestor < level; ++ancestor) {
+          ASSERT_EQ(amount, reservations[ancestor].GetChildReservations());
+          ASSERT_EQ(amount, mem_trackers[ancestor]->consumption());
+        }
+
+        LOG(INFO) << "\n" << mem_trackers[0]->LogUsage();
+        reservations[level].DecreaseReservation(amount);
+      } else {
+        ASSERT_FALSE(increased);
+      }
+      // We should be back in the original state.
+      for (int i = 0; i < HIERARCHY_DEPTH; ++i) {
+        ASSERT_EQ(0, reservations[i].GetReservation()) << i << ": "
+                                                       << reservations[i].DebugString();
+        ASSERT_EQ(0, reservations[i].GetChildReservations());
+        ASSERT_EQ(0, mem_trackers[i]->consumption());
+      }
+    }
+  }
+
+  // "Pull down" a reservation from the top of the hierarchy level-by-level to the
+  // leaves, checking that everything is consistent at each step.
+  for (int level = 0; level < HIERARCHY_DEPTH; ++level) {
+    const int amount = LIMIT;
+    ASSERT_TRUE(reservations[level].IncreaseReservation(amount));
+    ASSERT_EQ(amount, reservations[level].GetReservation());
+    ASSERT_EQ(0, reservations[level].GetUsedReservation());
+    if (level != 0) ASSERT_EQ(amount, mem_trackers[level]->consumption());
+    for (int ancestor = 0; ancestor < level; ++ancestor) {
+      ASSERT_EQ(0, reservations[ancestor].GetUsedReservation());
+      ASSERT_EQ(amount, reservations[ancestor].GetChildReservations());
+      ASSERT_EQ(amount, mem_trackers[ancestor]->consumption());
+    }
+    reservations[level].DecreaseReservation(amount);
+  }
+
+  for (int i = HIERARCHY_DEPTH - 1; i >= 0; --i) {
+    reservations[i].Close();
+    if (i != 0) mem_trackers[i]->UnregisterFromParent();
+  }
+}
+}
+
+int main(int argc, char** argv) {
+  ::testing::InitGoogleTest(&argc, argv);
+  impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+  return RUN_ALL_TESTS();
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/bufferpool/reservation-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/reservation-tracker.cc b/be/src/bufferpool/reservation-tracker.cc
new file mode 100644
index 0000000..1c5a14e
--- /dev/null
+++ b/be/src/bufferpool/reservation-tracker.cc
@@ -0,0 +1,308 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "bufferpool/reservation-tracker.h"
+
+#include <algorithm>
+
+#include "common/object-pool.h"
+#include "gutil/strings/substitute.h"
+#include "runtime/mem-tracker.h"
+#include "util/dummy-runtime-profile.h"
+#include "util/runtime-profile-counters.h"
+
+#include "common/names.h"
+
+using strings::Substitute;
+
+namespace impala {
+
+ReservationTracker::ReservationTracker() : initialized_(false), mem_tracker_(NULL) {}
+
+ReservationTracker::~ReservationTracker() {
+  DCHECK(!initialized_);
+}
+
+void ReservationTracker::InitRootTracker(
+    RuntimeProfile* profile, int64_t reservation_limit) {
+  lock_guard<SpinLock> l(lock_);
+  DCHECK(!initialized_);
+  parent_ = NULL;
+  mem_tracker_ = NULL;
+  reservation_limit_ = reservation_limit;
+  reservation_ = 0;
+  used_reservation_ = 0;
+  child_reservations_ = 0;
+  initialized_ = true;
+
+  InitCounters(profile, reservation_limit_);
+  COUNTER_SET(counters_.peak_reservation, reservation_);
+
+  CheckConsistency();
+}
+
+void ReservationTracker::InitChildTracker(RuntimeProfile* profile,
+    ReservationTracker* parent, MemTracker* mem_tracker, int64_t reservation_limit) {
+  DCHECK(parent != NULL);
+  DCHECK_GE(reservation_limit, 0);
+
+  lock_guard<SpinLock> l(lock_);
+  DCHECK(!initialized_);
+  parent_ = parent;
+  mem_tracker_ = mem_tracker;
+
+  reservation_limit_ = reservation_limit;
+  reservation_ = 0;
+  used_reservation_ = 0;
+  child_reservations_ = 0;
+  initialized_ = true;
+
+  if (mem_tracker_ != NULL) {
+    MemTracker* parent_mem_tracker = GetParentMemTracker();
+    if (parent_mem_tracker != NULL) {
+      // Make sure the parent links of the MemTrackers correspond to our parent links.
+      DCHECK_EQ(parent_mem_tracker, mem_tracker_->parent());
+      // Make sure we don't have a lower limit than the ancestor, since we don't enforce
+      // limits at lower links.
+      DCHECK_EQ(mem_tracker_->lowest_limit(), parent_mem_tracker->lowest_limit());
+    } else {
+      // Make sure we didn't leave a gap in the links. E.g. this tracker's grandparent
+      // shouldn't have a MemTracker.
+      ReservationTracker* ancestor = parent_;
+      while (ancestor != NULL) {
+        DCHECK(ancestor->mem_tracker_ == NULL);
+        ancestor = ancestor->parent_;
+      }
+    }
+  }
+
+  InitCounters(profile, reservation_limit_);
+
+  CheckConsistency();
+}
+
+void ReservationTracker::InitCounters(
+    RuntimeProfile* profile, int64_t reservation_limit) {
+  bool profile_provided = profile != NULL;
+  if (profile == NULL) {
+    dummy_profile_.reset(new DummyProfile);
+    profile = dummy_profile_->profile();
+  }
+
+  // Check that another tracker's counters aren't already registered in the profile.
+  DCHECK(profile->GetCounter("BufferPoolInitialReservation") == NULL);
+  counters_.reservation_limit =
+      ADD_COUNTER(profile, "BufferPoolReservationLimit", TUnit::BYTES);
+  counters_.peak_reservation =
+      profile->AddHighWaterMarkCounter("BufferPoolPeakReservation", TUnit::BYTES);
+  counters_.peak_used_reservation =
+      profile->AddHighWaterMarkCounter("BufferPoolPeakUsedReservation", TUnit::BYTES);
+
+  COUNTER_SET(counters_.reservation_limit, reservation_limit);
+
+  if (mem_tracker_ != NULL && profile_provided) {
+    mem_tracker_->EnableReservationReporting(counters_);
+  }
+}
+
+void ReservationTracker::Close() {
+  lock_guard<SpinLock> l(lock_);
+  if (!initialized_) return;
+  CheckConsistency();
+  DCHECK_EQ(used_reservation_, 0);
+  DCHECK_EQ(child_reservations_, 0);
+  // Release any reservation to parent.
+  if (parent_ != NULL) DecreaseReservationInternalLocked(reservation_, false);
+  mem_tracker_ = NULL;
+  parent_ = NULL;
+  initialized_ = false;
+}
+
+bool ReservationTracker::IncreaseReservation(int64_t bytes) {
+  lock_guard<SpinLock> l(lock_);
+  return IncreaseReservationInternalLocked(bytes, false, false);
+}
+
+bool ReservationTracker::IncreaseReservationToFit(int64_t bytes) {
+  lock_guard<SpinLock> l(lock_);
+  return IncreaseReservationInternalLocked(bytes, true, false);
+}
+
+bool ReservationTracker::IncreaseReservationInternalLocked(
+    int64_t bytes, bool use_existing_reservation, bool is_child_reservation) {
+  DCHECK(initialized_);
+  int64_t reservation_increase =
+      use_existing_reservation ? max<int64_t>(0, bytes - unused_reservation()) : bytes;
+  DCHECK_GE(reservation_increase, 0);
+
+  bool granted;
+  // Check if the increase is allowed, starting at the bottom of hierarchy.
+  if (reservation_ + reservation_increase > reservation_limit_) {
+    granted = false;
+  } else if (reservation_increase == 0) {
+    granted = true;
+  } else {
+    if (parent_ == NULL) {
+      granted = true;
+    } else {
+      lock_guard<SpinLock> l(parent_->lock_);
+      granted =
+          parent_->IncreaseReservationInternalLocked(reservation_increase, true, true);
+    }
+    if (granted && !TryUpdateMemTracker(reservation_increase)) {
+      granted = false;
+      // Roll back changes to ancestors if MemTracker update fails.
+      parent_->DecreaseReservationInternal(reservation_increase, true);
+    }
+  }
+
+  if (granted) {
+    // The reservation was granted and state updated in all ancestors: we can modify
+    // this tracker's state now.
+    UpdateReservation(reservation_increase);
+    if (is_child_reservation) child_reservations_ += bytes;
+  }
+
+  CheckConsistency();
+  return granted;
+}
+
+bool ReservationTracker::TryUpdateMemTracker(int64_t reservation_increase) {
+  if (mem_tracker_ == NULL) return true;
+  if (GetParentMemTracker() == NULL) {
+    // At the topmost link, which may be a MemTracker with a limit, we need to use
+    // TryConsume() to check the limit.
+    return mem_tracker_->TryConsume(reservation_increase);
+  } else {
+    // For lower links, there shouldn't be a limit to enforce, so we just need to
+    // update the consumption of the linked MemTracker since the reservation is
+    // already reflected in its parent.
+    mem_tracker_->ConsumeLocal(reservation_increase, GetParentMemTracker());
+    return true;
+  }
+}
+
+void ReservationTracker::DecreaseReservation(int64_t bytes) {
+  DecreaseReservationInternal(bytes, false);
+}
+
+void ReservationTracker::DecreaseReservationInternal(
+    int64_t bytes, bool is_child_reservation) {
+  lock_guard<SpinLock> l(lock_);
+  DecreaseReservationInternalLocked(bytes, is_child_reservation);
+}
+
+void ReservationTracker::DecreaseReservationInternalLocked(
+    int64_t bytes, bool is_child_reservation) {
+  DCHECK(initialized_);
+  DCHECK_GE(reservation_, bytes);
+  if (bytes == 0) return;
+  if (is_child_reservation) child_reservations_ -= bytes;
+  UpdateReservation(-bytes);
+  // The reservation should be returned up the tree.
+  if (mem_tracker_ != NULL) {
+    if (GetParentMemTracker() == NULL) {
+      mem_tracker_->Release(bytes);
+    } else {
+      mem_tracker_->ReleaseLocal(bytes, GetParentMemTracker());
+    }
+  }
+  if (parent_ != NULL) parent_->DecreaseReservationInternal(bytes, true);
+  CheckConsistency();
+}
+
+void ReservationTracker::AllocateFrom(int64_t bytes) {
+  lock_guard<SpinLock> l(lock_);
+  DCHECK(initialized_);
+  DCHECK_GE(bytes, 0);
+  DCHECK_LE(bytes, unused_reservation());
+  UpdateUsedReservation(bytes);
+  CheckConsistency();
+}
+
+void ReservationTracker::ReleaseTo(int64_t bytes) {
+  lock_guard<SpinLock> l(lock_);
+  DCHECK(initialized_);
+  DCHECK_GE(bytes, 0);
+  DCHECK_LE(bytes, used_reservation_);
+  UpdateUsedReservation(-bytes);
+  CheckConsistency();
+}
+
+int64_t ReservationTracker::GetReservation() {
+  lock_guard<SpinLock> l(lock_);
+  DCHECK(initialized_);
+  return reservation_;
+}
+
+int64_t ReservationTracker::GetUsedReservation() {
+  lock_guard<SpinLock> l(lock_);
+  DCHECK(initialized_);
+  return used_reservation_;
+}
+
+int64_t ReservationTracker::GetUnusedReservation() {
+  lock_guard<SpinLock> l(lock_);
+  DCHECK(initialized_);
+  return unused_reservation();
+}
+
+int64_t ReservationTracker::GetChildReservations() {
+  lock_guard<SpinLock> l(lock_);
+  DCHECK(initialized_);
+  return child_reservations_;
+}
+
+void ReservationTracker::CheckConsistency() const {
+  // Check internal invariants.
+  DCHECK_GE(reservation_, 0);
+  DCHECK_LE(reservation_, reservation_limit_);
+  DCHECK_GE(child_reservations_, 0);
+  DCHECK_GE(used_reservation_, 0);
+  DCHECK_LE(used_reservation_ + child_reservations_, reservation_);
+
+  DCHECK_EQ(reservation_, counters_.peak_reservation->current_value());
+  DCHECK_LE(reservation_, counters_.peak_reservation->value());
+  DCHECK_EQ(used_reservation_, counters_.peak_used_reservation->current_value());
+  DCHECK_LE(used_reservation_, counters_.peak_used_reservation->value());
+  DCHECK_EQ(reservation_limit_, counters_.reservation_limit->value());
+}
+
+void ReservationTracker::UpdateUsedReservation(int64_t delta) {
+  used_reservation_ += delta;
+  COUNTER_SET(counters_.peak_used_reservation, used_reservation_);
+  CheckConsistency();
+}
+
+void ReservationTracker::UpdateReservation(int64_t delta) {
+  reservation_ += delta;
+  COUNTER_SET(counters_.peak_reservation, reservation_);
+  CheckConsistency();
+}
+
+string ReservationTracker::DebugString() {
+  lock_guard<SpinLock> l(lock_);
+  if (!initialized_) return "<ReservationTracker>: uninitialized";
+
+  string parent_debug_string = parent_ == NULL ? "NULL" : parent_->DebugString();
+  return Substitute(
+      "<ReservationTracker>: reservation_limit $0 reservation $1 used_reservation $2 "
+      "child_reservations $3 parent:\n$4",
+      reservation_limit_, reservation_, used_reservation_, child_reservations_,
+      parent_debug_string);
+}
+}

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/bufferpool/reservation-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/reservation-tracker.h b/be/src/bufferpool/reservation-tracker.h
new file mode 100644
index 0000000..6bdecf0
--- /dev/null
+++ b/be/src/bufferpool/reservation-tracker.h
@@ -0,0 +1,248 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RESERVATION_TRACKER_H
+#define IMPALA_RESERVATION_TRACKER_H
+
+#include <stdint.h>
+#include <boost/scoped_ptr.hpp>
+#include <boost/thread/locks.hpp>
+#include <string>
+
+#include "bufferpool/reservation-tracker-counters.h"
+#include "common/status.h"
+#include "util/spinlock.h"
+
+namespace impala {
+
+class DummyProfile;
+class MemTracker;
+class RuntimeProfile;
+
+/// A tracker for a hierarchy of buffer pool memory reservations, denominated in bytes.
+/// A hierarchy of ReservationTrackers provides a mechanism for subdividing buffer pool
+/// memory and enforcing upper and lower bounds on memory usage.
+///
+/// The root of the tracker tree enforces a global maximum, which is distributed among its
+/// children. Each tracker in the tree has a 'reservation': the total bytes of buffer pool
+/// memory it is entitled to use. The reservation is inclusive of any memory that is
+/// already allocated from the reservation, i.e. using a reservation to allocate memory
+/// does not subtract from the reservation.
+///
+/// A reservation can be used directly at the tracker by calling AllocateFrom(), or
+/// distributed to children of the tracker for the childrens' reservations. Each tracker
+/// in the tree can use up to its reservation without checking parent trackers. To
+/// increase its reservation, a tracker must use some of its parent's reservation (and
+/// perhaps increase reservations all the way to the root of the tree).
+///
+/// Each tracker also has a maximum reservation that is enforced. E.g. if the root of the
+/// tracker hierarchy is the global tracker for the Impala daemon and the next level of
+/// the hierarchy is made up of per-query trackers, then the maximum reservation
+/// mechanism can enforce both process-level and query-level limits on reservations.
+///
+/// Invariants:
+/// * A tracker's reservation is at most its reservation limit: reservation <= limit
+/// * A tracker's reservation is at least the sum of its childrens' reservations plus
+///   the amount of the reservation used directly at this tracker. The difference is
+///   the unused reservation:
+///     child_reservations + used_reservation + unused_reservation = reservation.
+///
+/// Thread-safety:
+/// All public ReservationTracker methods are thread-safe. If multiple threads
+/// concurrently invoke methods on a ReservationTracker, each operation is applied
+/// atomically to leave the ReservationTracker in a consistent state. Calling threads
+/// are responsible for coordinating to avoid violating any method preconditions,
+/// e.g. ensuring that there is sufficient unused reservation before calling AllocateTo().
+///
+/// Integration with MemTracker hierarchy:
+/// TODO: we will remove MemTracker and this integration once all memory is accounted via
+/// reservations.
+///
+/// Each ReservationTracker can optionally have a linked MemTracker. E.g. an exec
+/// node's ReservationTracker can be linked with the exec node's MemTracker, so that
+/// reservations are included in query memory consumption for the purposes of enforcing
+/// memory limits, reporting and logging. The reservation is accounted as consumption
+/// against the linked MemTracker and its ancestors because reserved memory is committed.
+/// Allocating from a reservation therefore does not change the consumption reflected in
+/// the MemTracker hierarchy.
+///
+/// MemTracker limits are only checked via the topmost link (i.e. the query-level
+/// trackers): we require that no MemTrackers below this level have limits.
+///
+/// We require that the MemTracker hierarchy is consistent with the ReservationTracker
+/// hierarchy. I.e. if a ReservationTracker is linked to a MemTracker "A", and its parent
+/// is linked to a MemTracker "B", then "B" must be the parent of "A"'.
+class ReservationTracker {
+ public:
+  ReservationTracker();
+  virtual ~ReservationTracker();
+
+  /// Initializes the root tracker with the given reservation limit in bytes. The initial
+  /// reservation is 0.
+  /// if 'profile' is not NULL, the counters defined in ReservationTrackerCounters are
+  /// added to 'profile'.
+  void InitRootTracker(RuntimeProfile* profile, int64_t reservation_limit);
+
+  /// Initializes a new ReservationTracker with a parent.
+  /// If 'mem_tracker' is not NULL, reservations for this ReservationTracker and its
+  /// children will be counted as consumption against 'mem_tracker'.
+  /// 'reservation_limit' is the maximum reservation for this tracker in bytes.
+  /// if 'profile' is not NULL, the counters in 'counters_' are added to 'profile'.
+  void InitChildTracker(RuntimeProfile* profile, ReservationTracker* parent,
+      MemTracker* mem_tracker, int64_t reservation_limit);
+
+  /// If the tracker is initialized, deregister the ReservationTracker from its parent,
+  /// relinquishing all this tracker's reservation. All of the reservation must be unused
+  /// and all the tracker's children must be closed before calling this method.
+  void Close();
+
+  /// Request to increase reservation by 'bytes'. The request is either granted in
+  /// full or not at all. Uses any unused reservation on ancestors and increase
+  /// ancestors' reservations if needed to fit the increased reservation.
+  /// Returns true if the reservation increase is granted, or false if not granted.
+  /// If the reservation is not granted, no modifications are made to the state of
+  /// any ReservationTrackers.
+  bool IncreaseReservation(int64_t bytes);
+
+  /// Tries to ensure that 'bytes' of unused reservation is available. If not already
+  /// available, tries to increase the reservation such that the unused reservation is
+  /// exactly equal to 'bytes'. Uses any unused reservation on ancestors and increase
+  /// ancestors' reservations if needed to fit the increased reservation.
+  /// Returns true if the reservation increase was successful or not necessary.
+  bool IncreaseReservationToFit(int64_t bytes);
+
+  /// Decrease tracker's reservation by 'bytes'. This tracker's reservation must be at
+  /// least 'bytes' before calling this method.
+  /// TODO: decide on and implement policy for how far to release the reservation up
+  /// the tree. Currently the reservation is released all the way to the root.
+  void DecreaseReservation(int64_t bytes);
+
+  /// Allocate 'bytes' from the reservation. The tracker must have at least 'bytes'
+  /// unused reservation before calling this method.
+  void AllocateFrom(int64_t bytes);
+
+  /// Release 'bytes' of previously allocated memory. The used reservation is
+  /// decreased by 'bytes'. Before the call, the used reservation must be at least
+  /// 'bytes' before calling this method.
+  void ReleaseTo(int64_t bytes);
+
+  /// Returns the amount of the reservation in bytes.
+  int64_t GetReservation();
+
+  /// Returns the current amount of the reservation used at this tracker, not including
+  /// reservations of children in bytes.
+  int64_t GetUsedReservation();
+
+  /// Returns the amount of the reservation neither used nor given to childrens'
+  /// reservations at this tracker in bytes.
+  int64_t GetUnusedReservation();
+
+  /// Returns the total reservations of children in bytes.
+  int64_t GetChildReservations();
+
+  std::string DebugString();
+
+ private:
+  /// Returns the amount of 'reservation_' that is unused.
+  inline int64_t unused_reservation() const {
+    return reservation_ - used_reservation_ - child_reservations_;
+  }
+
+  /// Returns the parent's memtracker if 'parent_' is non-NULL, or NULL otherwise.
+  MemTracker* GetParentMemTracker() const {
+    return parent_ == NULL ? NULL : parent_->mem_tracker_;
+  }
+
+  /// Initializes 'counters_', storing the counters in 'profile'.
+  /// If 'profile' is NULL, creates a dummy profile to store the counters.
+  void InitCounters(RuntimeProfile* profile, int64_t max_reservation);
+
+  /// Internal helper for IncreaseReservation(). If 'use_existing_reservation' is true,
+  /// increase by the minimum amount so that 'bytes' fits in the reservation, otherwise
+  /// just increase by 'bytes'. If 'is_child_reservation' is true, also increase
+  /// 'child_reservations_' by 'bytes'.
+  /// 'lock_' must be held by caller.
+  bool IncreaseReservationInternalLocked(
+      int64_t bytes, bool use_existing_reservation, bool is_child_reservation);
+
+  /// Update consumption on linked MemTracker. For the topmost link, return false if
+  /// this failed because it would exceed a memory limit. If there is no linked
+  /// MemTracker, just returns true.
+  /// TODO: remove once we account all memory via ReservationTrackers.
+  bool TryUpdateMemTracker(int64_t reservation_increase);
+
+  /// Internal helper for DecreaseReservation(). This behaves the same as
+  /// DecreaseReservation(), except when 'is_child_reservation' is true it decreases
+  /// 'child_reservations_' by 'bytes'.
+  void DecreaseReservationInternal(int64_t bytes, bool is_child_reservation);
+
+  /// Same as DecreaseReservationInternal(), but 'lock_' must be held by caller.
+  void DecreaseReservationInternalLocked(int64_t bytes, bool is_child_reservation);
+
+  /// Check the internal consistency of the ReservationTracker and DCHECKs if in an
+  /// inconsistent state.
+  /// 'lock_' must be held by caller.
+  void CheckConsistency() const;
+
+  /// Increase or decrease 'used_reservation_' and update profile counters accordingly.
+  /// 'lock_' must be held by caller.
+  void UpdateUsedReservation(int64_t delta);
+
+  /// Increase or decrease 'reservation_' and update profile counters accordingly.
+  /// 'lock_' must be held by caller.
+  void UpdateReservation(int64_t delta);
+
+  /// lock_ protects all members. In a hierarchy of trackers, locks must be acquired
+  /// from the bottom-up.
+  SpinLock lock_;
+
+  /// True if the tracker is initialized.
+  bool initialized_;
+
+  /// A dummy profile to hold the counters in 'counters_' in the case that no profile
+  /// is provided.
+  boost::scoped_ptr<DummyProfile> dummy_profile_;
+
+  /// The RuntimeProfile counters for this tracker.
+  /// All non-NULL if 'initialized_' is true.
+  ReservationTrackerCounters counters_;
+
+  /// The parent of this tracker in the hierarchy. Does not change after initialization.
+  ReservationTracker* parent_;
+
+  /// If non-NULL, reservations are counted as memory consumption against this tracker.
+  /// Does not change after initialization. Not owned.
+  /// TODO: remove once all memory is accounted via ReservationTrackers.
+  MemTracker* mem_tracker_;
+
+  /// The maximum reservation in bytes that this tracker can have.
+  int64_t reservation_limit_;
+
+  /// This tracker's current reservation in bytes. 'reservation_' <= 'reservation_limit_'.
+  int64_t reservation_;
+
+  /// Total reservation of children in bytes. This is included in 'reservation_'.
+  /// 'used_reservation_' + 'child_reservations_' <= 'reservation_'.
+  int64_t child_reservations_;
+
+  /// The amount of the reservation currently used by this tracker in bytes.
+  /// 'used_reservation_' + 'child_reservations_' <= 'reservation_'.
+  int64_t used_reservation_;
+};
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/runtime/mem-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index e1251f3..a9ceb76 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -22,9 +22,10 @@
 #include <gperftools/malloc_extension.h>
 #include <gutil/strings/substitute.h>
 
+#include "bufferpool/reservation-tracker-counters.h"
+#include "resourcebroker/resource-broker.h"
 #include "runtime/exec-env.h"
 #include "runtime/runtime-state.h"
-#include "resourcebroker/resource-broker.h"
 #include "scheduling/query-resource-mgr.h"
 #include "util/debug-util.h"
 #include "util/mem-info.h"
@@ -134,6 +135,11 @@ void MemTracker::UnregisterFromParent() {
   child_tracker_it_ = parent_->child_trackers_.end();
 }
 
+void MemTracker::EnableReservationReporting(const ReservationTrackerCounters& counters) {
+  reservation_counters_.reset(new ReservationTrackerCounters);
+  *reservation_counters_ = counters;
+}
+
 int64_t MemTracker::GetPoolMemReserved() const {
   // Pool trackers should have a pool_name_ and no limit.
   DCHECK(!pool_name_.empty());
@@ -205,8 +211,7 @@ shared_ptr<MemTracker> MemTracker::GetQueryMemTracker(
     // First time this id registered, make a new object.  Give a shared ptr to
     // the caller and put a weak ptr in the map.
     shared_ptr<MemTracker> tracker = make_shared<MemTracker>(byte_limit,
-        rm_reserved_limit, Substitute("Query($0) Limit", lexical_cast<string>(id)),
-        parent);
+        rm_reserved_limit, Substitute("Query($0)", lexical_cast<string>(id)), parent);
     tracker->auto_unregister_ = true;
     tracker->query_id_ = id;
     request_to_mem_trackers_[id] = tracker;
@@ -246,17 +251,26 @@ void MemTracker::RefreshConsumptionFromMetric() {
 }
 
 // Calling this on the query tracker results in output like:
-// Query Limit: memory limit exceeded. Limit=100.00 MB Consumption=106.19 MB
-//   Fragment 5b45e83bbc2d92bd:d3ff8a7df7a2f491:  Consumption=52.00 KB
-//     AGGREGATION_NODE (id=6):  Consumption=44.00 KB
-//     EXCHANGE_NODE (id=5):  Consumption=0.00
-//     DataStreamMgr:  Consumption=0.00
-//   Fragment 5b45e83bbc2d92bd:d3ff8a7df7a2f492:  Consumption=100.00 KB
-//     AGGREGATION_NODE (id=2):  Consumption=36.00 KB
-//     AGGREGATION_NODE (id=4):  Consumption=40.00 KB
-//     EXCHANGE_NODE (id=3):  Consumption=0.00
-//     DataStreamMgr:  Consumption=0.00
-//     DataStreamSender:  Consumption=16.00 KB
+//
+//  Query(4a4c81fedaed337d:4acadfda00000000) Limit=10.00 GB Total=508.28 MB Peak=508.45 MB
+//    Fragment 4a4c81fedaed337d:4acadfda00000000: Total=8.00 KB Peak=8.00 KB
+//      EXCHANGE_NODE (id=4): Total=0 Peak=0
+//      DataStreamRecvr: Total=0 Peak=0
+//    Block Manager: Limit=6.68 GB Total=394.00 MB Peak=394.00 MB
+//    Fragment 4a4c81fedaed337d:4acadfda00000006: Total=233.72 MB Peak=242.24 MB
+//      AGGREGATION_NODE (id=1): Total=139.21 MB Peak=139.84 MB
+//      HDFS_SCAN_NODE (id=0): Total=93.94 MB Peak=102.24 MB
+//      DataStreamSender (dst_id=2): Total=45.99 KB Peak=85.99 KB
+//    Fragment 4a4c81fedaed337d:4acadfda00000003: Total=274.55 MB Peak=274.62 MB
+//      AGGREGATION_NODE (id=3): Total=274.50 MB Peak=274.50 MB
+//      EXCHANGE_NODE (id=2): Total=0 Peak=0
+//      DataStreamRecvr: Total=45.91 KB Peak=684.07 KB
+//      DataStreamSender (dst_id=4): Total=680.00 B Peak=680.00 B
+//
+// If 'reservation_metrics_' are set, we ge a more granular breakdown:
+//   TrackerName: Limit=5.00 MB BufferPoolUsed/Reservation=0/5.00 MB OtherMemory=1.04 MB
+//                Total=6.04 MB Peak=6.45 MB
+//
 string MemTracker::LogUsage(const string& prefix) const {
   if (!log_usage_if_zero_ && consumption() == 0) return "";
 
@@ -267,7 +281,24 @@ string MemTracker::LogUsage(const string& prefix) const {
   if (rm_reserved_limit_ > 0) {
     ss << " RM Limit=" << PrettyPrinter::Print(rm_reserved_limit_, TUnit::BYTES);
   }
-  ss << " Consumption=" << PrettyPrinter::Print(consumption(), TUnit::BYTES);
+
+  int64_t total = consumption();
+  int64_t peak = consumption_->value();
+  if (reservation_counters_ != NULL) {
+    int64_t reservation = reservation_counters_->peak_reservation->current_value();
+    int64_t used_reservation =
+        reservation_counters_->peak_used_reservation->current_value();
+    int64_t reservation_limit = reservation_counters_->reservation_limit->value();
+    ss << " BufferPoolUsed/Reservation="
+       << PrettyPrinter::Print(used_reservation, TUnit::BYTES) << "/"
+       << PrettyPrinter::Print(reservation, TUnit::BYTES);
+    if (reservation_limit != numeric_limits<int64_t>::max()) {
+      ss << " BufferPoolLimit=" << PrettyPrinter::Print(reservation_limit, TUnit::BYTES);
+    }
+    ss << " OtherMemory=" << PrettyPrinter::Print(total - reservation, TUnit::BYTES);
+  }
+  ss << " Total=" << PrettyPrinter::Print(total, TUnit::BYTES)
+     << " Peak=" << PrettyPrinter::Print(peak, TUnit::BYTES);
 
   stringstream prefix_ss;
   prefix_ss << prefix << "  ";

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index 6c13af6..e3548cc 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -37,6 +37,7 @@
 
 namespace impala {
 
+class ReservationTrackerCounters;
 class MemTracker;
 class QueryResourceMgr;
 
@@ -85,6 +86,10 @@ class MemTracker {
   /// Removes this tracker from parent_->child_trackers_.
   void UnregisterFromParent();
 
+  /// Include counters from a ReservationTracker in logs and other diagnostics.
+  /// The counters should be owned by the fragment's RuntimeProfile.
+  void EnableReservationReporting(const ReservationTrackerCounters& counters);
+
   /// Returns a MemTracker object for query 'id'.  Calling this with the same id will
   /// return the same MemTracker object.  An example of how this is used is to pass it
   /// the same query id for all fragments of that query running on this machine.  This
@@ -331,6 +336,8 @@ class MemTracker {
   void RegisterMetrics(MetricGroup* metrics, const std::string& prefix);
 
   /// Logs the usage of this tracker and all of its children (recursively).
+  /// TODO: once all memory is accounted in ReservationTracker hierarchy, move
+  /// reporting there.
   std::string LogUsage(const std::string& prefix = "") const;
 
   /// Log the memory usage when memory limit is exceeded and return a status object with
@@ -437,6 +444,11 @@ class MemTracker {
   /// NULL if consumption_metric_ is set.
   UIntGauge* consumption_metric_;
 
+  /// If non-NULL, counters from a corresponding ReservationTracker that should be
+  /// reported in logs and other diagnostics. The counters are owned by the fragment's
+  /// RuntimeProfile.
+  boost::scoped_ptr<ReservationTrackerCounters> reservation_counters_;
+
   std::vector<MemTracker*> all_trackers_;  // this tracker plus all of its ancestors
   std::vector<MemTracker*> limit_trackers_;  // all_trackers_ with valid limits
 

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/util/dummy-runtime-profile.h
----------------------------------------------------------------------
diff --git a/be/src/util/dummy-runtime-profile.h b/be/src/util/dummy-runtime-profile.h
new file mode 100644
index 0000000..83bccbf
--- /dev/null
+++ b/be/src/util/dummy-runtime-profile.h
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_UTIL_DEBUG_RUNTIME_PROFILE_H
+#define IMPALA_UTIL_DEBUG_RUNTIME_PROFILE_H
+
+#include "common/object-pool.h"
+#include "util/runtime-profile.h"
+
+namespace impala {
+
+/// Utility class to create a self-contained profile that does not report anything.
+/// This is useful if there is sometimes a RuntimeProfile associated with an object
+/// but not always so that the object can still allocate counters in the same way.
+class DummyProfile {
+ public:
+  DummyProfile() : pool_(), profile_(&pool_, "dummy", false) {}
+  RuntimeProfile* profile() { return &profile_; }
+
+ private:
+  ObjectPool pool_;
+  RuntimeProfile profile_;
+};
+}
+#endif

http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/util/runtime-profile-counters.h
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile-counters.h b/be/src/util/runtime-profile-counters.h
index 7d92861..77d7938 100644
--- a/be/src/util/runtime-profile-counters.h
+++ b/be/src/util/runtime-profile-counters.h
@@ -485,7 +485,6 @@ class ThreadCounterMeasurement {
   MonotonicStopWatch sw_;
   RuntimeProfile::ThreadCounters* counters_;
 };
-
 }
 
 #endif