You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2016/09/11 04:21:41 UTC
incubator-impala git commit: IMPALA-3201: reservation implementation
for new buffer pool
Repository: incubator-impala
Updated Branches:
refs/heads/master 218019e59 -> f9ac593ff
IMPALA-3201: reservation implementation for new buffer pool
This patch implements the reservation bookkeeping logic for the new
buffer pool. The reservations are always guaranteed and any buffer/page
allocations require a reservation. Reservations are tracked via a
hierarchy of ReservationTrackers.
Eventually ReservationTrackers should become the main mechanism for
memory accounting, once all runtime memory is handled by the buffer
pool. In the meantime, query/process limits are enforced and memory
is reported through the preexisting MemTracker hierarchy.
Reservations are accounted as consumption against a MemTracker because
the memory is committed and cannot be used for other purposes. The
MemTracker then uses the counters from the ReservationTracker to log
information about buffer-pool memory down to the operator level.
Testing:
Includes basic tests for buffer pool client registration and various
reservation operations.
Change-Id: I35cc89e863efb4cc506657bfdaaaf633a10bbab6
Reviewed-on: http://gerrit.cloudera.org:8080/3993
Reviewed-by: Tim Armstrong <ta...@cloudera.com>
Tested-by: Internal Jenkins
Project: http://git-wip-us.apache.org/repos/asf/incubator-impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-impala/commit/f9ac593f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-impala/tree/f9ac593f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-impala/diff/f9ac593f
Branch: refs/heads/master
Commit: f9ac593ff9a9ea3a5b329eed618500c45a1e7400
Parents: 218019e
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Wed Jun 22 18:24:04 2016 -0700
Committer: Internal Jenkins <cl...@gerrit.cloudera.org>
Committed: Sun Sep 11 00:36:12 2016 +0000
----------------------------------------------------------------------
be/CMakeLists.txt | 5 +
be/src/bufferpool/CMakeLists.txt | 31 ++
be/src/bufferpool/buffer-pool-test.cc | 231 +++++++++++
be/src/bufferpool/buffer-pool.cc | 69 ++++
be/src/bufferpool/buffer-pool.h | 34 +-
.../bufferpool/reservation-tracker-counters.h | 41 ++
be/src/bufferpool/reservation-tracker-test.cc | 385 +++++++++++++++++++
be/src/bufferpool/reservation-tracker.cc | 308 +++++++++++++++
be/src/bufferpool/reservation-tracker.h | 248 ++++++++++++
be/src/runtime/mem-tracker.cc | 61 ++-
be/src/runtime/mem-tracker.h | 12 +
be/src/util/dummy-runtime-profile.h | 39 ++
be/src/util/runtime-profile-counters.h | 1 -
13 files changed, 1439 insertions(+), 26 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt
index 6aff0b3..2a46239 100644
--- a/be/CMakeLists.txt
+++ b/be/CMakeLists.txt
@@ -162,6 +162,7 @@ if (DOXYGEN_FOUND)
# Possible to not input the subdirs one by one?
set(CMAKE_DOXYGEN_INPUT
${CMAKE_SOURCE_DIR}/be/src
+ ${CMAKE_SOURCE_DIR}/be/src/bufferpool/
${CMAKE_SOURCE_DIR}/be/src/catalog/
${CMAKE_SOURCE_DIR}/be/src/common/
${CMAKE_SOURCE_DIR}/be/src/exec/
@@ -258,6 +259,7 @@ if (NOT APPLE)
endif()
set (IMPALA_LINK_LIBS
${WL_START_GROUP}
+ BufferPool
Catalog
CodeGen
Common
@@ -283,6 +285,7 @@ set (IMPALA_LINK_LIBS
# libraries when dynamic linking is enabled.
if (BUILD_SHARED_LIBS)
set (IMPALA_LINK_LIBS ${IMPALA_LINK_LIBS}
+ BufferPool
Runtime
Exec
CodeGen
@@ -414,6 +417,7 @@ endfunction(COMPILE_TO_IR)
add_subdirectory(src/gutil)
# compile these subdirs using their own CMakeLists.txt
+add_subdirectory(src/bufferpool)
add_subdirectory(src/catalog)
add_subdirectory(src/codegen)
add_subdirectory(src/common)
@@ -441,6 +445,7 @@ SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-variable")
add_subdirectory(generated-sources/gen-cpp)
link_directories(
+ ${CMAKE_CURRENT_SOURCE_DIR}/build/bufferpool
${CMAKE_CURRENT_SOURCE_DIR}/build/catalog
${CMAKE_CURRENT_SOURCE_DIR}/build/common
${CMAKE_CURRENT_SOURCE_DIR}/build/exec
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/bufferpool/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/CMakeLists.txt b/be/src/bufferpool/CMakeLists.txt
new file mode 100644
index 0000000..0930f73
--- /dev/null
+++ b/be/src/bufferpool/CMakeLists.txt
@@ -0,0 +1,31 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http:#www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# where to put generated libraries
+set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/bufferpool")
+
+# where to put generated binaries
+set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/bufferpool")
+
+add_library(BufferPool
+ buffer-pool.cc
+ reservation-tracker.cc
+)
+add_dependencies(BufferPool thrift-deps)
+
+ADD_BE_TEST(buffer-pool-test)
+ADD_BE_TEST(reservation-tracker-test)
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/bufferpool/buffer-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/buffer-pool-test.cc b/be/src/bufferpool/buffer-pool-test.cc
new file mode 100644
index 0000000..cdb163d
--- /dev/null
+++ b/be/src/bufferpool/buffer-pool-test.cc
@@ -0,0 +1,231 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <boost/bind.hpp>
+#include <boost/scoped_ptr.hpp>
+#include <boost/thread/thread.hpp>
+#include <boost/unordered_map.hpp>
+#include <cstdlib>
+#include <string>
+#include <vector>
+
+#include "bufferpool/buffer-pool.h"
+#include "bufferpool/reservation-tracker.h"
+#include "common/init.h"
+#include "common/object-pool.h"
+#include "testutil/test-macros.h"
+
+#include "common/names.h"
+
+using strings::Substitute;
+
+namespace impala {
+
+class BufferPoolTest : public ::testing::Test {
+ public:
+ virtual void SetUp() {}
+
+ virtual void TearDown() {
+ for (auto entry : query_reservations_) {
+ ReservationTracker* tracker = entry.second;
+ tracker->Close();
+ }
+
+ global_reservations_.Close();
+ obj_pool_.Clear();
+ }
+
+ /// The minimum buffer size used in most tests.
+ const static int64_t TEST_PAGE_LEN = 1024;
+
+ /// Test helper to simulate registering then deregistering a number of queries with
+ /// the given initial reservation and reservation limit.
+ void RegisterQueriesAndClients(BufferPool* pool, int query_id_hi, int num_queries,
+ int64_t initial_query_reservation, int64_t query_reservation_limit);
+
+ protected:
+ static int64_t QueryId(int hi, int lo) { return static_cast<int64_t>(hi) << 32 | lo; }
+
+ /// Helper function to create one reservation tracker per query.
+ ReservationTracker* GetQueryReservationTracker(int64_t query_id) {
+ lock_guard<SpinLock> l(query_reservations_lock_);
+ ReservationTracker* tracker = query_reservations_[query_id];
+ if (tracker != NULL) return tracker;
+ tracker = obj_pool_.Add(new ReservationTracker());
+ query_reservations_[query_id] = tracker;
+ return tracker;
+ }
+
+ RuntimeProfile* NewProfile() {
+ return obj_pool_.Add(new RuntimeProfile(&obj_pool_, "test profile"));
+ }
+
+ ObjectPool obj_pool_;
+
+ ReservationTracker global_reservations_;
+
+ // Map from query_id to the reservation tracker for that query. Reads and modifications
+ // of the map are protected by query_reservations_lock_.
+ unordered_map<int64_t, ReservationTracker*> query_reservations_;
+ SpinLock query_reservations_lock_;
+};
+
+const int64_t BufferPoolTest::TEST_PAGE_LEN;
+
+void BufferPoolTest::RegisterQueriesAndClients(BufferPool* pool, int query_id_hi,
+ int num_queries, int64_t initial_query_reservation, int64_t query_reservation_limit) {
+ Status status;
+
+ int clients_per_query = 32;
+ BufferPool::Client* clients[num_queries];
+ ReservationTracker* client_reservations[num_queries];
+
+ for (int i = 0; i < num_queries; ++i) {
+ int64_t query_id = QueryId(query_id_hi, i);
+
+ // Initialize a tracker for a new query.
+ ReservationTracker* query_reservation = GetQueryReservationTracker(query_id);
+ query_reservation->InitChildTracker(
+ NULL, &global_reservations_, NULL, query_reservation_limit);
+
+ // Test that closing then reopening child tracker works.
+ query_reservation->Close();
+ query_reservation->InitChildTracker(
+ NULL, &global_reservations_, NULL, query_reservation_limit);
+ EXPECT_TRUE(query_reservation->IncreaseReservationToFit(initial_query_reservation));
+
+ clients[i] = new BufferPool::Client[clients_per_query];
+ client_reservations[i] = new ReservationTracker[clients_per_query];
+
+ for (int j = 0; j < clients_per_query; ++j) {
+ int64_t initial_client_reservation =
+ initial_query_reservation / clients_per_query + j
+ < initial_query_reservation % clients_per_query;
+ // Reservation limit can be anything greater or equal to the initial reservation.
+ int64_t client_reservation_limit = initial_client_reservation + rand() % 100000;
+ client_reservations[i][j].InitChildTracker(
+ NULL, query_reservation, NULL, client_reservation_limit);
+ EXPECT_TRUE(
+ client_reservations[i][j].IncreaseReservationToFit(initial_client_reservation));
+ string name = Substitute("Client $0 for query $1", j, query_id);
+ EXPECT_OK(pool->RegisterClient(name, &client_reservations[i][j], &clients[i][j]));
+ }
+
+ for (int j = 0; j < clients_per_query; ++j) {
+ ASSERT_TRUE(clients[i][j].is_registered());
+ }
+ }
+
+ // Deregister clients then query.
+ for (int i = 0; i < num_queries; ++i) {
+ for (int j = 0; j < clients_per_query; ++j) {
+ pool->DeregisterClient(&clients[i][j]);
+ ASSERT_FALSE(clients[i][j].is_registered());
+ client_reservations[i][j].Close();
+ }
+
+ delete[] clients[i];
+ delete[] client_reservations[i];
+
+ GetQueryReservationTracker(QueryId(query_id_hi, i))->Close();
+ }
+}
+
+/// Test that queries and clients can be registered and deregistered with the reservation
+/// trackers and the buffer pool.
+TEST_F(BufferPoolTest, BasicRegistration) {
+ int num_concurrent_queries = 1024;
+ int64_t sum_initial_reservations = 4;
+ int64_t reservation_limit = 1024;
+ // Need enough buffers for all initial reservations.
+ int64_t total_mem = sum_initial_reservations * num_concurrent_queries;
+ global_reservations_.InitRootTracker(NewProfile(), total_mem);
+
+ BufferPool pool(TEST_PAGE_LEN, total_mem);
+
+ RegisterQueriesAndClients(
+ &pool, 0, num_concurrent_queries, sum_initial_reservations, reservation_limit);
+
+ DCHECK_EQ(global_reservations_.GetUsedReservation(), 0);
+ DCHECK_EQ(global_reservations_.GetChildReservations(), 0);
+ DCHECK_EQ(global_reservations_.GetReservation(), 0);
+ global_reservations_.Close();
+}
+
+/// Test that queries and clients can be registered and deregistered by concurrent
+/// threads.
+TEST_F(BufferPoolTest, ConcurrentRegistration) {
+ int queries_per_thread = 64;
+ int num_threads = 64;
+ int num_concurrent_queries = queries_per_thread * num_threads;
+ int64_t sum_initial_reservations = 4;
+ int64_t reservation_limit = 1024;
+ // Need enough buffers for all initial reservations.
+ int64_t total_mem = num_concurrent_queries * sum_initial_reservations;
+ global_reservations_.InitRootTracker(NewProfile(), total_mem);
+
+ BufferPool pool(TEST_PAGE_LEN, total_mem);
+
+ // Launch threads, each with a different set of query IDs.
+ thread_group workers;
+ for (int i = 0; i < num_threads; ++i) {
+ workers.add_thread(new thread(bind(&BufferPoolTest::RegisterQueriesAndClients, this,
+ &pool, i, queries_per_thread, sum_initial_reservations, reservation_limit)));
+ }
+ workers.join_all();
+
+ // All the reservations should be released at this point.
+ DCHECK_EQ(global_reservations_.GetUsedReservation(), 0);
+ DCHECK_EQ(global_reservations_.GetReservation(), 0);
+ global_reservations_.Close();
+}
+
+/// Test that reservation setup fails if the initial buffers cannot be fulfilled.
+TEST_F(BufferPoolTest, QueryReservationsUnfulfilled) {
+ Status status;
+ int num_queries = 128;
+ int64_t reservation_per_query = 128;
+ // Won't be able to fulfill initial reservation for last query.
+ int64_t total_mem = num_queries * reservation_per_query - 1;
+ global_reservations_.InitRootTracker(NewProfile(), total_mem);
+
+ for (int i = 0; i < num_queries; ++i) {
+ ReservationTracker* query_tracker = GetQueryReservationTracker(i);
+ query_tracker->InitChildTracker(
+ NewProfile(), &global_reservations_, NULL, 2 * reservation_per_query);
+ bool got_initial_reservation =
+ query_tracker->IncreaseReservationToFit(reservation_per_query);
+ if (i < num_queries - 1) {
+ ASSERT_TRUE(got_initial_reservation);
+ } else {
+ ASSERT_FALSE(got_initial_reservation);
+
+ // Getting the initial reservation should succeed after freeing up buffers from
+ // other query.
+ GetQueryReservationTracker(i - 1)->Close();
+ ASSERT_TRUE(query_tracker->IncreaseReservationToFit(reservation_per_query));
+ }
+ }
+}
+}
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+ return RUN_ALL_TESTS();
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/buffer-pool.cc b/be/src/bufferpool/buffer-pool.cc
new file mode 100644
index 0000000..21893fc
--- /dev/null
+++ b/be/src/bufferpool/buffer-pool.cc
@@ -0,0 +1,69 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "bufferpool/buffer-pool.h"
+
+#include <boost/bind.hpp>
+#include <limits>
+#include <sstream>
+
+#include "bufferpool/reservation-tracker.h"
+#include "common/names.h"
+#include "gutil/strings/substitute.h"
+#include "util/bit-util.h"
+#include "util/uid-util.h"
+
+using strings::Substitute;
+
+namespace impala {
+
+BufferPool::BufferPool(int64_t min_buffer_len, int64_t buffer_bytes_limit)
+ : min_buffer_len_(min_buffer_len), buffer_bytes_limit_(buffer_bytes_limit) {
+ DCHECK_GT(min_buffer_len, 0);
+ DCHECK_EQ(min_buffer_len, BitUtil::RoundUpToPowerOfTwo(min_buffer_len));
+}
+
+BufferPool::~BufferPool() {}
+
+Status BufferPool::RegisterClient(
+ const string& name, ReservationTracker* reservation, Client* client) {
+ DCHECK(!client->is_registered());
+ DCHECK(reservation != NULL);
+ client->reservation_ = reservation;
+ client->name_ = name;
+ return Status::OK();
+}
+
+void BufferPool::DeregisterClient(Client* client) {
+ if (!client->is_registered()) return;
+ client->reservation_->Close();
+ client->name_.clear();
+ client->reservation_ = NULL;
+}
+
+string BufferPool::Client::DebugString() const {
+ return Substitute("<BufferPool::Client> $0 name: $1 reservation: $2", this, name_,
+ reservation_->DebugString());
+}
+
+string BufferPool::DebugString() {
+ stringstream ss;
+ ss << "<BufferPool> " << this << " min_buffer_len: " << min_buffer_len_
+ << "buffer_bytes_limit: " << buffer_bytes_limit_ << "\n";
+ return ss.str();
+}
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/buffer-pool.h b/be/src/bufferpool/buffer-pool.h
index 183e538..e566543 100644
--- a/be/src/bufferpool/buffer-pool.h
+++ b/be/src/bufferpool/buffer-pool.h
@@ -22,7 +22,6 @@
#include <string>
#include <stdint.h>
-#include "bufferpool/buffer-allocator.h"
#include "common/atomic.h"
#include "common/status.h"
#include "gutil/macros.h"
@@ -125,7 +124,7 @@ class ReservationTracker;
/// calling Unpin() on the page. The page may then be written to disk and its buffer
/// repurposed internally by BufferPool.
/// * Once the operator needs the page's contents again and has sufficient unused
-/// reservations, it can call Pin(), which brings the page's contents back into memory,
+/// reservation, it can call Pin(), which brings the page's contents back into memory,
/// perhaps in a different buffer. Therefore the operator must fix up any pointers into
/// the previous buffer.
/// * If the operator is done with the page, it can call FreeBuffer() to destroy the
@@ -157,10 +156,10 @@ class BufferPool {
~BufferPool();
/// Register a client. Returns an error status and does not register the client if the
- /// arguments are invalid. 'name' is an arbitrary used to identify the client in any
- /// errors messages or logging. 'client' is the client to register. 'client' should not
- /// already be registered.
- Status RegisterClient(const std::string& name, ReservationTracker* reservations,
+ /// arguments are invalid. 'name' is an arbitrary name used to identify the client in
+ /// any errors messages or logging. 'client' is the client to register. 'client' should
+ /// not already be registered.
+ Status RegisterClient(const std::string& name, ReservationTracker* reservation,
Client* client);
/// Deregister 'client' if it is registered. Idempotent.
@@ -168,7 +167,7 @@ class BufferPool {
/// Create a new page of 'len' bytes with pin count 1. 'len' must be a page length
/// supported by BufferPool (see BufferPool class comment). The client must have
- /// sufficient unused reservations to pin the new page (otherwise it will DCHECK).
+ /// sufficient unused reservation to pin the new page (otherwise it will DCHECK).
/// CreatePage() only fails when a system error prevents the buffer pool from fulfilling
/// the reservation.
/// On success, the handle is mapped to the new page.
@@ -229,6 +228,13 @@ class BufferPool {
private:
DISALLOW_COPY_AND_ASSIGN(BufferPool);
+
+ /// The minimum length of a buffer in bytes. All buffers and pages are a multiple of
+ /// this length. This is always a power of two.
+ const int64_t min_buffer_len_;
+
+ /// The maximum physical memory in bytes that can be used for buffers.
+ const int64_t buffer_bytes_limit_;
};
/// External representation of a client of the BufferPool. Clients are used for
@@ -239,17 +245,25 @@ class BufferPool {
/// Client methods or BufferPool methods with the Client as an argument is not supported.
class BufferPool::Client {
public:
- Client() {}
+ Client() : reservation_(NULL) {}
/// Client must be deregistered.
~Client() { DCHECK(!is_registered()); }
- bool is_registered() const;
- ReservationTracker* reservations();
+ bool is_registered() const { return reservation_ != NULL; }
+ ReservationTracker* reservation() { return reservation_; }
std::string DebugString() const;
private:
+ friend class BufferPool;
DISALLOW_COPY_AND_ASSIGN(Client);
+
+ /// A name identifying the client.
+ std::string name_;
+
+ /// The reservation tracker for the client. NULL means the client isn't registered.
+ /// All pages pinned by the client count as usage against 'reservation_'.
+ ReservationTracker* reservation_;
};
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/bufferpool/reservation-tracker-counters.h
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/reservation-tracker-counters.h b/be/src/bufferpool/reservation-tracker-counters.h
new file mode 100644
index 0000000..0952a2f
--- /dev/null
+++ b/be/src/bufferpool/reservation-tracker-counters.h
@@ -0,0 +1,41 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RESERVATION_TRACKER_COUNTERS_H
+#define IMPALA_RESERVATION_TRACKER_COUNTERS_H
+
+#include "util/runtime-profile.h"
+
+namespace impala {
+
+/// A set of counters for each ReservationTracker for reporting purposes.
+///
+/// If the ReservationTracker is linked to a profile these have the same lifetime as that
+/// profile, otherwise they have the same lifetime as the ReservationTracker itself.
+struct ReservationTrackerCounters {
+ /// The tracker's peak reservation in bytes.
+ RuntimeProfile::HighWaterMarkCounter* peak_reservation;
+
+ /// The tracker's peak usage in bytes.
+ RuntimeProfile::HighWaterMarkCounter* peak_used_reservation;
+
+ /// The hard limit on the tracker's reservations
+ RuntimeProfile::Counter* reservation_limit;
+};
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/bufferpool/reservation-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/reservation-tracker-test.cc b/be/src/bufferpool/reservation-tracker-test.cc
new file mode 100644
index 0000000..e43efb8
--- /dev/null
+++ b/be/src/bufferpool/reservation-tracker-test.cc
@@ -0,0 +1,385 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <limits>
+#include <string>
+#include <vector>
+
+#include "bufferpool/reservation-tracker.h"
+#include "common/init.h"
+#include "common/object-pool.h"
+#include "runtime/mem-tracker.h"
+#include "testutil/test-macros.h"
+
+#include "common/names.h"
+
+using strings::Substitute;
+
+namespace impala {
+
+class ReservationTrackerTest : public ::testing::Test {
+ public:
+ virtual void SetUp() {}
+
+ virtual void TearDown() {
+ root_.Close();
+ obj_pool_.Clear();
+ }
+
+ /// The minimum allocation size used in most tests.
+ const static int64_t MIN_BUFFER_LEN = 1024;
+
+ protected:
+ RuntimeProfile* NewProfile() {
+ return obj_pool_.Add(new RuntimeProfile(&obj_pool_, "test profile"));
+ }
+
+ ObjectPool obj_pool_;
+
+ ReservationTracker root_;
+
+ scoped_ptr<RuntimeProfile> profile_;
+};
+
+const int64_t ReservationTrackerTest::MIN_BUFFER_LEN;
+
+TEST_F(ReservationTrackerTest, BasicSingleTracker) {
+ const int64_t limit = 16;
+ root_.InitRootTracker(NULL, limit);
+ ASSERT_EQ(0, root_.GetReservation());
+ ASSERT_EQ(0, root_.GetUsedReservation());
+
+ // Fail to increase reservation.
+ ASSERT_FALSE(root_.IncreaseReservation(limit + 1));
+ ASSERT_EQ(0, root_.GetReservation());
+ ASSERT_EQ(0, root_.GetUsedReservation());
+ ASSERT_EQ(0, root_.GetUnusedReservation());
+
+ // Successfully increase reservation.
+ ASSERT_TRUE(root_.IncreaseReservation(limit - 1));
+ ASSERT_EQ(limit - 1, root_.GetReservation());
+ ASSERT_EQ(0, root_.GetUsedReservation());
+ ASSERT_EQ(limit - 1, root_.GetUnusedReservation());
+
+ // Adjust usage.
+ root_.AllocateFrom(2);
+ ASSERT_EQ(limit - 1, root_.GetReservation());
+ ASSERT_EQ(2, root_.GetUsedReservation());
+ ASSERT_EQ(limit - 3, root_.GetUnusedReservation());
+ root_.ReleaseTo(1);
+ ASSERT_EQ(1, root_.GetUsedReservation());
+ root_.ReleaseTo(1);
+ ASSERT_EQ(0, root_.GetUsedReservation());
+ ASSERT_EQ(limit - 1, root_.GetReservation());
+ ASSERT_EQ(limit - 1, root_.GetUnusedReservation());
+}
+
+TEST_F(ReservationTrackerTest, BasicTwoLevel) {
+ const int64_t limit = 16;
+ root_.InitRootTracker(NULL, limit);
+
+ const int64_t root_reservation = limit / 2;
+ // Get half of the limit as an initial reservation.
+ ASSERT_TRUE(root_.IncreaseReservation(root_reservation));
+ ASSERT_EQ(root_reservation, root_.GetReservation());
+ ASSERT_EQ(root_reservation, root_.GetUnusedReservation());
+ ASSERT_EQ(0, root_.GetUsedReservation());
+ ASSERT_EQ(0, root_.GetChildReservations());
+
+ ReservationTracker child;
+ child.InitChildTracker(NULL, &root_, NULL, numeric_limits<int64_t>::max());
+
+ const int64_t child_reservation = root_reservation + 1;
+ // Get parent's reservation plus a bit more.
+ ASSERT_TRUE(child.IncreaseReservation(child_reservation));
+
+ ASSERT_EQ(child_reservation, root_.GetReservation());
+ ASSERT_EQ(0, root_.GetUnusedReservation());
+ ASSERT_EQ(0, root_.GetUsedReservation());
+ ASSERT_EQ(child_reservation, root_.GetChildReservations());
+
+ ASSERT_EQ(child_reservation, child.GetReservation());
+ ASSERT_EQ(child_reservation, child.GetUnusedReservation());
+ ASSERT_EQ(0, child.GetUsedReservation());
+ ASSERT_EQ(0, child.GetChildReservations());
+
+ // Check that child allocation is reflected correctly.
+ child.AllocateFrom(1);
+ ASSERT_EQ(child_reservation, child.GetReservation());
+ ASSERT_EQ(1, child.GetUsedReservation());
+ ASSERT_EQ(0, root_.GetUsedReservation());
+
+ // Check that parent reservation increase is reflected correctly.
+ ASSERT_TRUE(root_.IncreaseReservation(1));
+ ASSERT_EQ(child_reservation + 1, root_.GetReservation());
+ ASSERT_EQ(1, root_.GetUnusedReservation());
+ ASSERT_EQ(0, root_.GetUsedReservation());
+ ASSERT_EQ(child_reservation, root_.GetChildReservations());
+ ASSERT_EQ(child_reservation, child.GetReservation());
+
+ // Check that parent allocation is reflected correctly.
+ root_.AllocateFrom(1);
+ ASSERT_EQ(child_reservation + 1, root_.GetReservation());
+ ASSERT_EQ(0, root_.GetUnusedReservation());
+ ASSERT_EQ(1, root_.GetUsedReservation());
+
+ // Release allocations.
+ root_.ReleaseTo(1);
+ ASSERT_EQ(0, root_.GetUsedReservation());
+ child.ReleaseTo(1);
+ ASSERT_EQ(0, child.GetUsedReservation());
+
+ // Child reservation should be returned all the way up the tree.
+ child.DecreaseReservation(1);
+ ASSERT_EQ(child_reservation, root_.GetReservation());
+ ASSERT_EQ(child_reservation - 1, child.GetReservation());
+ ASSERT_EQ(child_reservation - 1, root_.GetChildReservations());
+
+ // Closing the child should release its reservation.
+ child.Close();
+ ASSERT_EQ(1, root_.GetReservation());
+ ASSERT_EQ(0, root_.GetChildReservations());
+}
+
+TEST_F(ReservationTrackerTest, CloseIdempotency) {
+ // Check we can close before opening.
+ root_.Close();
+
+ const int64_t limit = 16;
+ root_.InitRootTracker(NULL, limit);
+
+ // Check we can close twice
+ root_.Close();
+ root_.Close();
+}
+
+// Test that the tracker's reservation limit is enforced.
+TEST_F(ReservationTrackerTest, ReservationLimit) {
+ Status status;
+ // Setup trackers so that there is a spare buffer that the client isn't entitled to.
+ int64_t total_mem = MIN_BUFFER_LEN * 3;
+ int64_t client_limit = MIN_BUFFER_LEN * 2;
+ root_.InitRootTracker(NULL, total_mem);
+
+ ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
+ client_tracker->InitChildTracker(NULL, &root_, NULL, client_limit);
+
+ // We can only increase reservation up to the client's limit.
+ ASSERT_FALSE(client_tracker->IncreaseReservation(client_limit + 1));
+ ASSERT_TRUE(client_tracker->IncreaseReservation(client_limit));
+ ASSERT_FALSE(client_tracker->IncreaseReservation(1));
+
+ client_tracker->Close();
+}
+
+// Test that parent's reservation limit is enforced.
+TEST_F(ReservationTrackerTest, ParentReservationLimit) {
+ Status status;
+ // Setup reservations so that there is a spare buffer.
+ int64_t total_mem = MIN_BUFFER_LEN * 4;
+ int64_t parent_limit = MIN_BUFFER_LEN * 3;
+ int64_t other_client_reservation = MIN_BUFFER_LEN;
+ root_.InitRootTracker(NULL, total_mem);
+
+ // The child reservation limit is higher than the parent reservation limit, so the
+ // parent limit is the effective limit.
+ ReservationTracker* query_tracker = obj_pool_.Add(new ReservationTracker());
+ ReservationTracker* client_tracker = obj_pool_.Add(new ReservationTracker());
+ query_tracker->InitChildTracker(NULL, &root_, NULL, parent_limit);
+ client_tracker->InitChildTracker(NULL, query_tracker, NULL, total_mem * 10);
+
+ ReservationTracker* other_client_tracker = obj_pool_.Add(new ReservationTracker());
+ other_client_tracker->InitChildTracker(NULL, query_tracker, NULL, total_mem);
+ ASSERT_TRUE(other_client_tracker->IncreaseReservationToFit(other_client_reservation));
+ ASSERT_EQ(root_.GetUsedReservation(), 0);
+ ASSERT_EQ(root_.GetChildReservations(), other_client_reservation);
+ ASSERT_EQ(query_tracker->GetUsedReservation(), 0);
+ ASSERT_EQ(query_tracker->GetUnusedReservation(), 0);
+
+ // Can only increase reservation up to parent limit, excluding other reservations.
+ int64_t effective_limit = parent_limit - other_client_reservation;
+ ASSERT_FALSE(client_tracker->IncreaseReservation(effective_limit + MIN_BUFFER_LEN));
+ ASSERT_TRUE(client_tracker->IncreaseReservation(effective_limit));
+ ASSERT_FALSE(client_tracker->IncreaseReservation(MIN_BUFFER_LEN));
+
+ // Check that tracker hierarchy reports correct usage.
+ ASSERT_EQ(root_.GetUsedReservation(), 0);
+ ASSERT_EQ(root_.GetChildReservations(), parent_limit);
+ ASSERT_EQ(query_tracker->GetUsedReservation(), 0);
+ ASSERT_EQ(query_tracker->GetUnusedReservation(), 0);
+
+ client_tracker->Close();
+ other_client_tracker->Close();
+ query_tracker->Close();
+}
+
+/// Test integration of ReservationTracker with MemTracker.
+TEST_F(ReservationTrackerTest, MemTrackerIntegrationTwoLevel) {
+ // Setup a 2-level hierarchy of trackers. The child ReservationTracker is linked to
+ // the child MemTracker. We add various limits at different places to enable testing
+ // of different code paths.
+ root_.InitRootTracker(NewProfile(), MIN_BUFFER_LEN * 100);
+ MemTracker root_mem_tracker;
+ MemTracker child_mem_tracker1(-1, -1, "Child 1", &root_mem_tracker);
+ MemTracker child_mem_tracker2(MIN_BUFFER_LEN * 50, -1, "Child 2", &root_mem_tracker);
+ ReservationTracker child_reservations1, child_reservations2;
+ child_reservations1.InitChildTracker(
+ NewProfile(), &root_, &child_mem_tracker1, 500 * MIN_BUFFER_LEN);
+ child_reservations2.InitChildTracker(
+ NewProfile(), &root_, &child_mem_tracker2, 75 * MIN_BUFFER_LEN);
+
+ // Check that a single buffer reservation is accounted correctly.
+ ASSERT_TRUE(child_reservations1.IncreaseReservation(MIN_BUFFER_LEN));
+ ASSERT_EQ(MIN_BUFFER_LEN, child_reservations1.GetReservation());
+ ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker1.consumption());
+ ASSERT_EQ(MIN_BUFFER_LEN, root_mem_tracker.consumption());
+ ASSERT_EQ(MIN_BUFFER_LEN, root_.GetChildReservations());
+
+ // Check that a buffer reservation from the other child is accounted correctly.
+ ASSERT_TRUE(child_reservations2.IncreaseReservation(MIN_BUFFER_LEN));
+ ASSERT_EQ(MIN_BUFFER_LEN, child_reservations2.GetReservation());
+ ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker2.consumption());
+ ASSERT_EQ(2 * MIN_BUFFER_LEN, root_mem_tracker.consumption());
+ ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations());
+
+ // Check that hitting the MemTracker limit leaves things in a consistent state.
+ ASSERT_FALSE(child_reservations2.IncreaseReservation(MIN_BUFFER_LEN * 50));
+ ASSERT_EQ(MIN_BUFFER_LEN, child_reservations1.GetReservation());
+ ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker1.consumption());
+ ASSERT_EQ(MIN_BUFFER_LEN, child_reservations2.GetReservation());
+ ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker2.consumption());
+ ASSERT_EQ(2 * MIN_BUFFER_LEN, root_mem_tracker.consumption());
+ ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations());
+
+ // Check that hitting the ReservationTracker's local limit leaves things in a
+ // consistent state.
+ ASSERT_FALSE(child_reservations2.IncreaseReservation(MIN_BUFFER_LEN * 75));
+ ASSERT_EQ(MIN_BUFFER_LEN, child_reservations1.GetReservation());
+ ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker1.consumption());
+ ASSERT_EQ(MIN_BUFFER_LEN, child_reservations2.GetReservation());
+ ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker2.consumption());
+ ASSERT_EQ(2 * MIN_BUFFER_LEN, root_mem_tracker.consumption());
+ ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations());
+
+ // Check that hitting the ReservationTracker's parent's limit after the
+ // MemTracker consumption is incremented leaves things in a consistent state.
+ ASSERT_FALSE(child_reservations1.IncreaseReservation(MIN_BUFFER_LEN * 100));
+ ASSERT_EQ(MIN_BUFFER_LEN, child_reservations1.GetReservation());
+ ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker1.consumption());
+ ASSERT_EQ(MIN_BUFFER_LEN, child_reservations2.GetReservation());
+ ASSERT_EQ(MIN_BUFFER_LEN, child_mem_tracker2.consumption());
+ ASSERT_EQ(2 * MIN_BUFFER_LEN, root_mem_tracker.consumption());
+ ASSERT_EQ(2 * MIN_BUFFER_LEN, root_.GetChildReservations());
+
+ // Check that released memory is decremented from all trackers correctly.
+ child_reservations1.DecreaseReservation(MIN_BUFFER_LEN);
+ child_reservations2.DecreaseReservation(MIN_BUFFER_LEN);
+ ASSERT_EQ(0, child_reservations2.GetReservation());
+ ASSERT_EQ(0, child_mem_tracker2.consumption());
+ ASSERT_EQ(0, root_mem_tracker.consumption());
+ ASSERT_EQ(0, root_.GetUsedReservation());
+
+ child_reservations1.Close();
+ child_reservations2.Close();
+ child_mem_tracker1.UnregisterFromParent();
+ child_mem_tracker2.UnregisterFromParent();
+}
+
+TEST_F(ReservationTrackerTest, MemTrackerIntegrationMultiLevel) {
+ const int HIERARCHY_DEPTH = 5;
+ // Setup a multi-level hierarchy of trackers and ensure that consumption is reported
+ // correctly.
+ ReservationTracker reservations[HIERARCHY_DEPTH];
+ scoped_ptr<MemTracker> mem_trackers[HIERARCHY_DEPTH];
+
+ // We can only handle MemTracker limits at the topmost linked ReservationTracker,
+ // so avoid adding limits at lower level.
+ const int LIMIT = HIERARCHY_DEPTH;
+ vector<int> mem_limits({LIMIT * 10, LIMIT, -1, -1, -1});
+
+ // Root trackers aren't linked.
+ mem_trackers[0].reset(new MemTracker(mem_limits[0]));
+ reservations[0].InitRootTracker(NewProfile(), 500);
+ for (int i = 1; i < HIERARCHY_DEPTH; ++i) {
+ mem_trackers[i].reset(new MemTracker(
+ mem_limits[i], -1, Substitute("Child $0", i), mem_trackers[i - 1].get()));
+ reservations[i].InitChildTracker(
+ NewProfile(), &reservations[i - 1], mem_trackers[i].get(), 500);
+ }
+
+ vector<int> interesting_amounts({LIMIT - 1, LIMIT, LIMIT + 1});
+
+ // Test that all limits and consumption correctly reported when consuming
+ // from a non-root ReservationTracker that is connected to a MemTracker.
+ for (int level = 1; level < HIERARCHY_DEPTH; ++level) {
+ int64_t lowest_limit = mem_trackers[level]->lowest_limit();
+ for (int amount : interesting_amounts) {
+ bool increased = reservations[level].IncreaseReservation(amount);
+ if (lowest_limit == -1 || amount <= lowest_limit) {
+ // The increase should go through.
+ ASSERT_TRUE(increased) << reservations[level].DebugString();
+ ASSERT_EQ(amount, reservations[level].GetReservation());
+ ASSERT_EQ(amount, mem_trackers[level]->consumption());
+ for (int ancestor = 0; ancestor < level; ++ancestor) {
+ ASSERT_EQ(amount, reservations[ancestor].GetChildReservations());
+ ASSERT_EQ(amount, mem_trackers[ancestor]->consumption());
+ }
+
+ LOG(INFO) << "\n" << mem_trackers[0]->LogUsage();
+ reservations[level].DecreaseReservation(amount);
+ } else {
+ ASSERT_FALSE(increased);
+ }
+ // We should be back in the original state.
+ for (int i = 0; i < HIERARCHY_DEPTH; ++i) {
+ ASSERT_EQ(0, reservations[i].GetReservation()) << i << ": "
+ << reservations[i].DebugString();
+ ASSERT_EQ(0, reservations[i].GetChildReservations());
+ ASSERT_EQ(0, mem_trackers[i]->consumption());
+ }
+ }
+ }
+
+ // "Pull down" a reservation from the top of the hierarchy level-by-level to the
+ // leaves, checking that everything is consistent at each step.
+ for (int level = 0; level < HIERARCHY_DEPTH; ++level) {
+ const int amount = LIMIT;
+ ASSERT_TRUE(reservations[level].IncreaseReservation(amount));
+ ASSERT_EQ(amount, reservations[level].GetReservation());
+ ASSERT_EQ(0, reservations[level].GetUsedReservation());
+ if (level != 0) ASSERT_EQ(amount, mem_trackers[level]->consumption());
+ for (int ancestor = 0; ancestor < level; ++ancestor) {
+ ASSERT_EQ(0, reservations[ancestor].GetUsedReservation());
+ ASSERT_EQ(amount, reservations[ancestor].GetChildReservations());
+ ASSERT_EQ(amount, mem_trackers[ancestor]->consumption());
+ }
+ reservations[level].DecreaseReservation(amount);
+ }
+
+ for (int i = HIERARCHY_DEPTH - 1; i >= 0; --i) {
+ reservations[i].Close();
+ if (i != 0) mem_trackers[i]->UnregisterFromParent();
+ }
+}
+}
+
+int main(int argc, char** argv) {
+ ::testing::InitGoogleTest(&argc, argv);
+ impala::InitCommonRuntime(argc, argv, true, impala::TestInfo::BE_TEST);
+ return RUN_ALL_TESTS();
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/bufferpool/reservation-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/reservation-tracker.cc b/be/src/bufferpool/reservation-tracker.cc
new file mode 100644
index 0000000..1c5a14e
--- /dev/null
+++ b/be/src/bufferpool/reservation-tracker.cc
@@ -0,0 +1,308 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "bufferpool/reservation-tracker.h"
+
+#include <algorithm>
+
+#include "common/object-pool.h"
+#include "gutil/strings/substitute.h"
+#include "runtime/mem-tracker.h"
+#include "util/dummy-runtime-profile.h"
+#include "util/runtime-profile-counters.h"
+
+#include "common/names.h"
+
+using strings::Substitute;
+
+namespace impala {
+
+ReservationTracker::ReservationTracker() : initialized_(false), mem_tracker_(NULL) {}
+
+ReservationTracker::~ReservationTracker() {
+ DCHECK(!initialized_);
+}
+
+void ReservationTracker::InitRootTracker(
+ RuntimeProfile* profile, int64_t reservation_limit) {
+ lock_guard<SpinLock> l(lock_);
+ DCHECK(!initialized_);
+ parent_ = NULL;
+ mem_tracker_ = NULL;
+ reservation_limit_ = reservation_limit;
+ reservation_ = 0;
+ used_reservation_ = 0;
+ child_reservations_ = 0;
+ initialized_ = true;
+
+ InitCounters(profile, reservation_limit_);
+ COUNTER_SET(counters_.peak_reservation, reservation_);
+
+ CheckConsistency();
+}
+
+void ReservationTracker::InitChildTracker(RuntimeProfile* profile,
+ ReservationTracker* parent, MemTracker* mem_tracker, int64_t reservation_limit) {
+ DCHECK(parent != NULL);
+ DCHECK_GE(reservation_limit, 0);
+
+ lock_guard<SpinLock> l(lock_);
+ DCHECK(!initialized_);
+ parent_ = parent;
+ mem_tracker_ = mem_tracker;
+
+ reservation_limit_ = reservation_limit;
+ reservation_ = 0;
+ used_reservation_ = 0;
+ child_reservations_ = 0;
+ initialized_ = true;
+
+ if (mem_tracker_ != NULL) {
+ MemTracker* parent_mem_tracker = GetParentMemTracker();
+ if (parent_mem_tracker != NULL) {
+ // Make sure the parent links of the MemTrackers correspond to our parent links.
+ DCHECK_EQ(parent_mem_tracker, mem_tracker_->parent());
+ // Make sure we don't have a lower limit than the ancestor, since we don't enforce
+ // limits at lower links.
+ DCHECK_EQ(mem_tracker_->lowest_limit(), parent_mem_tracker->lowest_limit());
+ } else {
+ // Make sure we didn't leave a gap in the links. E.g. this tracker's grandparent
+ // shouldn't have a MemTracker.
+ ReservationTracker* ancestor = parent_;
+ while (ancestor != NULL) {
+ DCHECK(ancestor->mem_tracker_ == NULL);
+ ancestor = ancestor->parent_;
+ }
+ }
+ }
+
+ InitCounters(profile, reservation_limit_);
+
+ CheckConsistency();
+}
+
+void ReservationTracker::InitCounters(
+ RuntimeProfile* profile, int64_t reservation_limit) {
+ bool profile_provided = profile != NULL;
+ if (profile == NULL) {
+ dummy_profile_.reset(new DummyProfile);
+ profile = dummy_profile_->profile();
+ }
+
+ // Check that another tracker's counters aren't already registered in the profile.
+ DCHECK(profile->GetCounter("BufferPoolInitialReservation") == NULL);
+ counters_.reservation_limit =
+ ADD_COUNTER(profile, "BufferPoolReservationLimit", TUnit::BYTES);
+ counters_.peak_reservation =
+ profile->AddHighWaterMarkCounter("BufferPoolPeakReservation", TUnit::BYTES);
+ counters_.peak_used_reservation =
+ profile->AddHighWaterMarkCounter("BufferPoolPeakUsedReservation", TUnit::BYTES);
+
+ COUNTER_SET(counters_.reservation_limit, reservation_limit);
+
+ if (mem_tracker_ != NULL && profile_provided) {
+ mem_tracker_->EnableReservationReporting(counters_);
+ }
+}
+
+void ReservationTracker::Close() {
+ lock_guard<SpinLock> l(lock_);
+ if (!initialized_) return;
+ CheckConsistency();
+ DCHECK_EQ(used_reservation_, 0);
+ DCHECK_EQ(child_reservations_, 0);
+ // Release any reservation to parent.
+ if (parent_ != NULL) DecreaseReservationInternalLocked(reservation_, false);
+ mem_tracker_ = NULL;
+ parent_ = NULL;
+ initialized_ = false;
+}
+
+bool ReservationTracker::IncreaseReservation(int64_t bytes) {
+ lock_guard<SpinLock> l(lock_);
+ return IncreaseReservationInternalLocked(bytes, false, false);
+}
+
+bool ReservationTracker::IncreaseReservationToFit(int64_t bytes) {
+ lock_guard<SpinLock> l(lock_);
+ return IncreaseReservationInternalLocked(bytes, true, false);
+}
+
+bool ReservationTracker::IncreaseReservationInternalLocked(
+ int64_t bytes, bool use_existing_reservation, bool is_child_reservation) {
+ DCHECK(initialized_);
+ int64_t reservation_increase =
+ use_existing_reservation ? max<int64_t>(0, bytes - unused_reservation()) : bytes;
+ DCHECK_GE(reservation_increase, 0);
+
+ bool granted;
+ // Check if the increase is allowed, starting at the bottom of hierarchy.
+ if (reservation_ + reservation_increase > reservation_limit_) {
+ granted = false;
+ } else if (reservation_increase == 0) {
+ granted = true;
+ } else {
+ if (parent_ == NULL) {
+ granted = true;
+ } else {
+ lock_guard<SpinLock> l(parent_->lock_);
+ granted =
+ parent_->IncreaseReservationInternalLocked(reservation_increase, true, true);
+ }
+ if (granted && !TryUpdateMemTracker(reservation_increase)) {
+ granted = false;
+ // Roll back changes to ancestors if MemTracker update fails.
+ parent_->DecreaseReservationInternal(reservation_increase, true);
+ }
+ }
+
+ if (granted) {
+ // The reservation was granted and state updated in all ancestors: we can modify
+ // this tracker's state now.
+ UpdateReservation(reservation_increase);
+ if (is_child_reservation) child_reservations_ += bytes;
+ }
+
+ CheckConsistency();
+ return granted;
+}
+
+bool ReservationTracker::TryUpdateMemTracker(int64_t reservation_increase) {
+ if (mem_tracker_ == NULL) return true;
+ if (GetParentMemTracker() == NULL) {
+ // At the topmost link, which may be a MemTracker with a limit, we need to use
+ // TryConsume() to check the limit.
+ return mem_tracker_->TryConsume(reservation_increase);
+ } else {
+ // For lower links, there shouldn't be a limit to enforce, so we just need to
+ // update the consumption of the linked MemTracker since the reservation is
+ // already reflected in its parent.
+ mem_tracker_->ConsumeLocal(reservation_increase, GetParentMemTracker());
+ return true;
+ }
+}
+
+void ReservationTracker::DecreaseReservation(int64_t bytes) {
+ DecreaseReservationInternal(bytes, false);
+}
+
+void ReservationTracker::DecreaseReservationInternal(
+ int64_t bytes, bool is_child_reservation) {
+ lock_guard<SpinLock> l(lock_);
+ DecreaseReservationInternalLocked(bytes, is_child_reservation);
+}
+
+void ReservationTracker::DecreaseReservationInternalLocked(
+ int64_t bytes, bool is_child_reservation) {
+ DCHECK(initialized_);
+ DCHECK_GE(reservation_, bytes);
+ if (bytes == 0) return;
+ if (is_child_reservation) child_reservations_ -= bytes;
+ UpdateReservation(-bytes);
+ // The reservation should be returned up the tree.
+ if (mem_tracker_ != NULL) {
+ if (GetParentMemTracker() == NULL) {
+ mem_tracker_->Release(bytes);
+ } else {
+ mem_tracker_->ReleaseLocal(bytes, GetParentMemTracker());
+ }
+ }
+ if (parent_ != NULL) parent_->DecreaseReservationInternal(bytes, true);
+ CheckConsistency();
+}
+
+void ReservationTracker::AllocateFrom(int64_t bytes) {
+ lock_guard<SpinLock> l(lock_);
+ DCHECK(initialized_);
+ DCHECK_GE(bytes, 0);
+ DCHECK_LE(bytes, unused_reservation());
+ UpdateUsedReservation(bytes);
+ CheckConsistency();
+}
+
+void ReservationTracker::ReleaseTo(int64_t bytes) {
+ lock_guard<SpinLock> l(lock_);
+ DCHECK(initialized_);
+ DCHECK_GE(bytes, 0);
+ DCHECK_LE(bytes, used_reservation_);
+ UpdateUsedReservation(-bytes);
+ CheckConsistency();
+}
+
+int64_t ReservationTracker::GetReservation() {
+ lock_guard<SpinLock> l(lock_);
+ DCHECK(initialized_);
+ return reservation_;
+}
+
+int64_t ReservationTracker::GetUsedReservation() {
+ lock_guard<SpinLock> l(lock_);
+ DCHECK(initialized_);
+ return used_reservation_;
+}
+
+int64_t ReservationTracker::GetUnusedReservation() {
+ lock_guard<SpinLock> l(lock_);
+ DCHECK(initialized_);
+ return unused_reservation();
+}
+
+int64_t ReservationTracker::GetChildReservations() {
+ lock_guard<SpinLock> l(lock_);
+ DCHECK(initialized_);
+ return child_reservations_;
+}
+
+void ReservationTracker::CheckConsistency() const {
+ // Check internal invariants.
+ DCHECK_GE(reservation_, 0);
+ DCHECK_LE(reservation_, reservation_limit_);
+ DCHECK_GE(child_reservations_, 0);
+ DCHECK_GE(used_reservation_, 0);
+ DCHECK_LE(used_reservation_ + child_reservations_, reservation_);
+
+ DCHECK_EQ(reservation_, counters_.peak_reservation->current_value());
+ DCHECK_LE(reservation_, counters_.peak_reservation->value());
+ DCHECK_EQ(used_reservation_, counters_.peak_used_reservation->current_value());
+ DCHECK_LE(used_reservation_, counters_.peak_used_reservation->value());
+ DCHECK_EQ(reservation_limit_, counters_.reservation_limit->value());
+}
+
+void ReservationTracker::UpdateUsedReservation(int64_t delta) {
+ used_reservation_ += delta;
+ COUNTER_SET(counters_.peak_used_reservation, used_reservation_);
+ CheckConsistency();
+}
+
+void ReservationTracker::UpdateReservation(int64_t delta) {
+ reservation_ += delta;
+ COUNTER_SET(counters_.peak_reservation, reservation_);
+ CheckConsistency();
+}
+
+string ReservationTracker::DebugString() {
+ lock_guard<SpinLock> l(lock_);
+ if (!initialized_) return "<ReservationTracker>: uninitialized";
+
+ string parent_debug_string = parent_ == NULL ? "NULL" : parent_->DebugString();
+ return Substitute(
+ "<ReservationTracker>: reservation_limit $0 reservation $1 used_reservation $2 "
+ "child_reservations $3 parent:\n$4",
+ reservation_limit_, reservation_, used_reservation_, child_reservations_,
+ parent_debug_string);
+}
+}
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/bufferpool/reservation-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/bufferpool/reservation-tracker.h b/be/src/bufferpool/reservation-tracker.h
new file mode 100644
index 0000000..6bdecf0
--- /dev/null
+++ b/be/src/bufferpool/reservation-tracker.h
@@ -0,0 +1,248 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_RESERVATION_TRACKER_H
+#define IMPALA_RESERVATION_TRACKER_H
+
+#include <stdint.h>
+#include <boost/scoped_ptr.hpp>
+#include <boost/thread/locks.hpp>
+#include <string>
+
+#include "bufferpool/reservation-tracker-counters.h"
+#include "common/status.h"
+#include "util/spinlock.h"
+
+namespace impala {
+
+class DummyProfile;
+class MemTracker;
+class RuntimeProfile;
+
+/// A tracker for a hierarchy of buffer pool memory reservations, denominated in bytes.
+/// A hierarchy of ReservationTrackers provides a mechanism for subdividing buffer pool
+/// memory and enforcing upper and lower bounds on memory usage.
+///
+/// The root of the tracker tree enforces a global maximum, which is distributed among its
+/// children. Each tracker in the tree has a 'reservation': the total bytes of buffer pool
+/// memory it is entitled to use. The reservation is inclusive of any memory that is
+/// already allocated from the reservation, i.e. using a reservation to allocate memory
+/// does not subtract from the reservation.
+///
+/// A reservation can be used directly at the tracker by calling AllocateFrom(), or
+/// distributed to children of the tracker for the childrens' reservations. Each tracker
+/// in the tree can use up to its reservation without checking parent trackers. To
+/// increase its reservation, a tracker must use some of its parent's reservation (and
+/// perhaps increase reservations all the way to the root of the tree).
+///
+/// Each tracker also has a maximum reservation that is enforced. E.g. if the root of the
+/// tracker hierarchy is the global tracker for the Impala daemon and the next level of
+/// the hierarchy is made up of per-query trackers, then the maximum reservation
+/// mechanism can enforce both process-level and query-level limits on reservations.
+///
+/// Invariants:
+/// * A tracker's reservation is at most its reservation limit: reservation <= limit
+/// * A tracker's reservation is at least the sum of its childrens' reservations plus
+/// the amount of the reservation used directly at this tracker. The difference is
+/// the unused reservation:
+/// child_reservations + used_reservation + unused_reservation = reservation.
+///
+/// Thread-safety:
+/// All public ReservationTracker methods are thread-safe. If multiple threads
+/// concurrently invoke methods on a ReservationTracker, each operation is applied
+/// atomically to leave the ReservationTracker in a consistent state. Calling threads
+/// are responsible for coordinating to avoid violating any method preconditions,
+/// e.g. ensuring that there is sufficient unused reservation before calling AllocateTo().
+///
+/// Integration with MemTracker hierarchy:
+/// TODO: we will remove MemTracker and this integration once all memory is accounted via
+/// reservations.
+///
+/// Each ReservationTracker can optionally have a linked MemTracker. E.g. an exec
+/// node's ReservationTracker can be linked with the exec node's MemTracker, so that
+/// reservations are included in query memory consumption for the purposes of enforcing
+/// memory limits, reporting and logging. The reservation is accounted as consumption
+/// against the linked MemTracker and its ancestors because reserved memory is committed.
+/// Allocating from a reservation therefore does not change the consumption reflected in
+/// the MemTracker hierarchy.
+///
+/// MemTracker limits are only checked via the topmost link (i.e. the query-level
+/// trackers): we require that no MemTrackers below this level have limits.
+///
+/// We require that the MemTracker hierarchy is consistent with the ReservationTracker
+/// hierarchy. I.e. if a ReservationTracker is linked to a MemTracker "A", and its parent
+/// is linked to a MemTracker "B", then "B" must be the parent of "A"'.
+class ReservationTracker {
+ public:
+ ReservationTracker();
+ virtual ~ReservationTracker();
+
+ /// Initializes the root tracker with the given reservation limit in bytes. The initial
+ /// reservation is 0.
+ /// if 'profile' is not NULL, the counters defined in ReservationTrackerCounters are
+ /// added to 'profile'.
+ void InitRootTracker(RuntimeProfile* profile, int64_t reservation_limit);
+
+ /// Initializes a new ReservationTracker with a parent.
+ /// If 'mem_tracker' is not NULL, reservations for this ReservationTracker and its
+ /// children will be counted as consumption against 'mem_tracker'.
+ /// 'reservation_limit' is the maximum reservation for this tracker in bytes.
+ /// if 'profile' is not NULL, the counters in 'counters_' are added to 'profile'.
+ void InitChildTracker(RuntimeProfile* profile, ReservationTracker* parent,
+ MemTracker* mem_tracker, int64_t reservation_limit);
+
+ /// If the tracker is initialized, deregister the ReservationTracker from its parent,
+ /// relinquishing all this tracker's reservation. All of the reservation must be unused
+ /// and all the tracker's children must be closed before calling this method.
+ void Close();
+
+ /// Request to increase reservation by 'bytes'. The request is either granted in
+ /// full or not at all. Uses any unused reservation on ancestors and increase
+ /// ancestors' reservations if needed to fit the increased reservation.
+ /// Returns true if the reservation increase is granted, or false if not granted.
+ /// If the reservation is not granted, no modifications are made to the state of
+ /// any ReservationTrackers.
+ bool IncreaseReservation(int64_t bytes);
+
+ /// Tries to ensure that 'bytes' of unused reservation is available. If not already
+ /// available, tries to increase the reservation such that the unused reservation is
+ /// exactly equal to 'bytes'. Uses any unused reservation on ancestors and increase
+ /// ancestors' reservations if needed to fit the increased reservation.
+ /// Returns true if the reservation increase was successful or not necessary.
+ bool IncreaseReservationToFit(int64_t bytes);
+
+ /// Decrease tracker's reservation by 'bytes'. This tracker's reservation must be at
+ /// least 'bytes' before calling this method.
+ /// TODO: decide on and implement policy for how far to release the reservation up
+ /// the tree. Currently the reservation is released all the way to the root.
+ void DecreaseReservation(int64_t bytes);
+
+ /// Allocate 'bytes' from the reservation. The tracker must have at least 'bytes'
+ /// unused reservation before calling this method.
+ void AllocateFrom(int64_t bytes);
+
+ /// Release 'bytes' of previously allocated memory. The used reservation is
+ /// decreased by 'bytes'. Before the call, the used reservation must be at least
+ /// 'bytes' before calling this method.
+ void ReleaseTo(int64_t bytes);
+
+ /// Returns the amount of the reservation in bytes.
+ int64_t GetReservation();
+
+ /// Returns the current amount of the reservation used at this tracker, not including
+ /// reservations of children in bytes.
+ int64_t GetUsedReservation();
+
+ /// Returns the amount of the reservation neither used nor given to childrens'
+ /// reservations at this tracker in bytes.
+ int64_t GetUnusedReservation();
+
+ /// Returns the total reservations of children in bytes.
+ int64_t GetChildReservations();
+
+ std::string DebugString();
+
+ private:
+ /// Returns the amount of 'reservation_' that is unused.
+ inline int64_t unused_reservation() const {
+ return reservation_ - used_reservation_ - child_reservations_;
+ }
+
+ /// Returns the parent's memtracker if 'parent_' is non-NULL, or NULL otherwise.
+ MemTracker* GetParentMemTracker() const {
+ return parent_ == NULL ? NULL : parent_->mem_tracker_;
+ }
+
+ /// Initializes 'counters_', storing the counters in 'profile'.
+ /// If 'profile' is NULL, creates a dummy profile to store the counters.
+ void InitCounters(RuntimeProfile* profile, int64_t max_reservation);
+
+ /// Internal helper for IncreaseReservation(). If 'use_existing_reservation' is true,
+ /// increase by the minimum amount so that 'bytes' fits in the reservation, otherwise
+ /// just increase by 'bytes'. If 'is_child_reservation' is true, also increase
+ /// 'child_reservations_' by 'bytes'.
+ /// 'lock_' must be held by caller.
+ bool IncreaseReservationInternalLocked(
+ int64_t bytes, bool use_existing_reservation, bool is_child_reservation);
+
+ /// Update consumption on linked MemTracker. For the topmost link, return false if
+ /// this failed because it would exceed a memory limit. If there is no linked
+ /// MemTracker, just returns true.
+ /// TODO: remove once we account all memory via ReservationTrackers.
+ bool TryUpdateMemTracker(int64_t reservation_increase);
+
+ /// Internal helper for DecreaseReservation(). This behaves the same as
+ /// DecreaseReservation(), except when 'is_child_reservation' is true it decreases
+ /// 'child_reservations_' by 'bytes'.
+ void DecreaseReservationInternal(int64_t bytes, bool is_child_reservation);
+
+ /// Same as DecreaseReservationInternal(), but 'lock_' must be held by caller.
+ void DecreaseReservationInternalLocked(int64_t bytes, bool is_child_reservation);
+
+ /// Check the internal consistency of the ReservationTracker and DCHECKs if in an
+ /// inconsistent state.
+ /// 'lock_' must be held by caller.
+ void CheckConsistency() const;
+
+ /// Increase or decrease 'used_reservation_' and update profile counters accordingly.
+ /// 'lock_' must be held by caller.
+ void UpdateUsedReservation(int64_t delta);
+
+ /// Increase or decrease 'reservation_' and update profile counters accordingly.
+ /// 'lock_' must be held by caller.
+ void UpdateReservation(int64_t delta);
+
+ /// lock_ protects all members. In a hierarchy of trackers, locks must be acquired
+ /// from the bottom-up.
+ SpinLock lock_;
+
+ /// True if the tracker is initialized.
+ bool initialized_;
+
+ /// A dummy profile to hold the counters in 'counters_' in the case that no profile
+ /// is provided.
+ boost::scoped_ptr<DummyProfile> dummy_profile_;
+
+ /// The RuntimeProfile counters for this tracker.
+ /// All non-NULL if 'initialized_' is true.
+ ReservationTrackerCounters counters_;
+
+ /// The parent of this tracker in the hierarchy. Does not change after initialization.
+ ReservationTracker* parent_;
+
+ /// If non-NULL, reservations are counted as memory consumption against this tracker.
+ /// Does not change after initialization. Not owned.
+ /// TODO: remove once all memory is accounted via ReservationTrackers.
+ MemTracker* mem_tracker_;
+
+ /// The maximum reservation in bytes that this tracker can have.
+ int64_t reservation_limit_;
+
+ /// This tracker's current reservation in bytes. 'reservation_' <= 'reservation_limit_'.
+ int64_t reservation_;
+
+ /// Total reservation of children in bytes. This is included in 'reservation_'.
+ /// 'used_reservation_' + 'child_reservations_' <= 'reservation_'.
+ int64_t child_reservations_;
+
+ /// The amount of the reservation currently used by this tracker in bytes.
+ /// 'used_reservation_' + 'child_reservations_' <= 'reservation_'.
+ int64_t used_reservation_;
+};
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/runtime/mem-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index e1251f3..a9ceb76 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -22,9 +22,10 @@
#include <gperftools/malloc_extension.h>
#include <gutil/strings/substitute.h>
+#include "bufferpool/reservation-tracker-counters.h"
+#include "resourcebroker/resource-broker.h"
#include "runtime/exec-env.h"
#include "runtime/runtime-state.h"
-#include "resourcebroker/resource-broker.h"
#include "scheduling/query-resource-mgr.h"
#include "util/debug-util.h"
#include "util/mem-info.h"
@@ -134,6 +135,11 @@ void MemTracker::UnregisterFromParent() {
child_tracker_it_ = parent_->child_trackers_.end();
}
+void MemTracker::EnableReservationReporting(const ReservationTrackerCounters& counters) {
+ reservation_counters_.reset(new ReservationTrackerCounters);
+ *reservation_counters_ = counters;
+}
+
int64_t MemTracker::GetPoolMemReserved() const {
// Pool trackers should have a pool_name_ and no limit.
DCHECK(!pool_name_.empty());
@@ -205,8 +211,7 @@ shared_ptr<MemTracker> MemTracker::GetQueryMemTracker(
// First time this id registered, make a new object. Give a shared ptr to
// the caller and put a weak ptr in the map.
shared_ptr<MemTracker> tracker = make_shared<MemTracker>(byte_limit,
- rm_reserved_limit, Substitute("Query($0) Limit", lexical_cast<string>(id)),
- parent);
+ rm_reserved_limit, Substitute("Query($0)", lexical_cast<string>(id)), parent);
tracker->auto_unregister_ = true;
tracker->query_id_ = id;
request_to_mem_trackers_[id] = tracker;
@@ -246,17 +251,26 @@ void MemTracker::RefreshConsumptionFromMetric() {
}
// Calling this on the query tracker results in output like:
-// Query Limit: memory limit exceeded. Limit=100.00 MB Consumption=106.19 MB
-// Fragment 5b45e83bbc2d92bd:d3ff8a7df7a2f491: Consumption=52.00 KB
-// AGGREGATION_NODE (id=6): Consumption=44.00 KB
-// EXCHANGE_NODE (id=5): Consumption=0.00
-// DataStreamMgr: Consumption=0.00
-// Fragment 5b45e83bbc2d92bd:d3ff8a7df7a2f492: Consumption=100.00 KB
-// AGGREGATION_NODE (id=2): Consumption=36.00 KB
-// AGGREGATION_NODE (id=4): Consumption=40.00 KB
-// EXCHANGE_NODE (id=3): Consumption=0.00
-// DataStreamMgr: Consumption=0.00
-// DataStreamSender: Consumption=16.00 KB
+//
+// Query(4a4c81fedaed337d:4acadfda00000000) Limit=10.00 GB Total=508.28 MB Peak=508.45 MB
+// Fragment 4a4c81fedaed337d:4acadfda00000000: Total=8.00 KB Peak=8.00 KB
+// EXCHANGE_NODE (id=4): Total=0 Peak=0
+// DataStreamRecvr: Total=0 Peak=0
+// Block Manager: Limit=6.68 GB Total=394.00 MB Peak=394.00 MB
+// Fragment 4a4c81fedaed337d:4acadfda00000006: Total=233.72 MB Peak=242.24 MB
+// AGGREGATION_NODE (id=1): Total=139.21 MB Peak=139.84 MB
+// HDFS_SCAN_NODE (id=0): Total=93.94 MB Peak=102.24 MB
+// DataStreamSender (dst_id=2): Total=45.99 KB Peak=85.99 KB
+// Fragment 4a4c81fedaed337d:4acadfda00000003: Total=274.55 MB Peak=274.62 MB
+// AGGREGATION_NODE (id=3): Total=274.50 MB Peak=274.50 MB
+// EXCHANGE_NODE (id=2): Total=0 Peak=0
+// DataStreamRecvr: Total=45.91 KB Peak=684.07 KB
+// DataStreamSender (dst_id=4): Total=680.00 B Peak=680.00 B
+//
+// If 'reservation_metrics_' are set, we ge a more granular breakdown:
+// TrackerName: Limit=5.00 MB BufferPoolUsed/Reservation=0/5.00 MB OtherMemory=1.04 MB
+// Total=6.04 MB Peak=6.45 MB
+//
string MemTracker::LogUsage(const string& prefix) const {
if (!log_usage_if_zero_ && consumption() == 0) return "";
@@ -267,7 +281,24 @@ string MemTracker::LogUsage(const string& prefix) const {
if (rm_reserved_limit_ > 0) {
ss << " RM Limit=" << PrettyPrinter::Print(rm_reserved_limit_, TUnit::BYTES);
}
- ss << " Consumption=" << PrettyPrinter::Print(consumption(), TUnit::BYTES);
+
+ int64_t total = consumption();
+ int64_t peak = consumption_->value();
+ if (reservation_counters_ != NULL) {
+ int64_t reservation = reservation_counters_->peak_reservation->current_value();
+ int64_t used_reservation =
+ reservation_counters_->peak_used_reservation->current_value();
+ int64_t reservation_limit = reservation_counters_->reservation_limit->value();
+ ss << " BufferPoolUsed/Reservation="
+ << PrettyPrinter::Print(used_reservation, TUnit::BYTES) << "/"
+ << PrettyPrinter::Print(reservation, TUnit::BYTES);
+ if (reservation_limit != numeric_limits<int64_t>::max()) {
+ ss << " BufferPoolLimit=" << PrettyPrinter::Print(reservation_limit, TUnit::BYTES);
+ }
+ ss << " OtherMemory=" << PrettyPrinter::Print(total - reservation, TUnit::BYTES);
+ }
+ ss << " Total=" << PrettyPrinter::Print(total, TUnit::BYTES)
+ << " Peak=" << PrettyPrinter::Print(peak, TUnit::BYTES);
stringstream prefix_ss;
prefix_ss << prefix << " ";
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index 6c13af6..e3548cc 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -37,6 +37,7 @@
namespace impala {
+class ReservationTrackerCounters;
class MemTracker;
class QueryResourceMgr;
@@ -85,6 +86,10 @@ class MemTracker {
/// Removes this tracker from parent_->child_trackers_.
void UnregisterFromParent();
+ /// Include counters from a ReservationTracker in logs and other diagnostics.
+ /// The counters should be owned by the fragment's RuntimeProfile.
+ void EnableReservationReporting(const ReservationTrackerCounters& counters);
+
/// Returns a MemTracker object for query 'id'. Calling this with the same id will
/// return the same MemTracker object. An example of how this is used is to pass it
/// the same query id for all fragments of that query running on this machine. This
@@ -331,6 +336,8 @@ class MemTracker {
void RegisterMetrics(MetricGroup* metrics, const std::string& prefix);
/// Logs the usage of this tracker and all of its children (recursively).
+ /// TODO: once all memory is accounted in ReservationTracker hierarchy, move
+ /// reporting there.
std::string LogUsage(const std::string& prefix = "") const;
/// Log the memory usage when memory limit is exceeded and return a status object with
@@ -437,6 +444,11 @@ class MemTracker {
/// NULL if consumption_metric_ is set.
UIntGauge* consumption_metric_;
+ /// If non-NULL, counters from a corresponding ReservationTracker that should be
+ /// reported in logs and other diagnostics. The counters are owned by the fragment's
+ /// RuntimeProfile.
+ boost::scoped_ptr<ReservationTrackerCounters> reservation_counters_;
+
std::vector<MemTracker*> all_trackers_; // this tracker plus all of its ancestors
std::vector<MemTracker*> limit_trackers_; // all_trackers_ with valid limits
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/util/dummy-runtime-profile.h
----------------------------------------------------------------------
diff --git a/be/src/util/dummy-runtime-profile.h b/be/src/util/dummy-runtime-profile.h
new file mode 100644
index 0000000..83bccbf
--- /dev/null
+++ b/be/src/util/dummy-runtime-profile.h
@@ -0,0 +1,39 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef IMPALA_UTIL_DEBUG_RUNTIME_PROFILE_H
+#define IMPALA_UTIL_DEBUG_RUNTIME_PROFILE_H
+
+#include "common/object-pool.h"
+#include "util/runtime-profile.h"
+
+namespace impala {
+
+/// Utility class to create a self-contained profile that does not report anything.
+/// This is useful if there is sometimes a RuntimeProfile associated with an object
+/// but not always so that the object can still allocate counters in the same way.
+class DummyProfile {
+ public:
+ DummyProfile() : pool_(), profile_(&pool_, "dummy", false) {}
+ RuntimeProfile* profile() { return &profile_; }
+
+ private:
+ ObjectPool pool_;
+ RuntimeProfile profile_;
+};
+}
+#endif
http://git-wip-us.apache.org/repos/asf/incubator-impala/blob/f9ac593f/be/src/util/runtime-profile-counters.h
----------------------------------------------------------------------
diff --git a/be/src/util/runtime-profile-counters.h b/be/src/util/runtime-profile-counters.h
index 7d92861..77d7938 100644
--- a/be/src/util/runtime-profile-counters.h
+++ b/be/src/util/runtime-profile-counters.h
@@ -485,7 +485,6 @@ class ThreadCounterMeasurement {
MonotonicStopWatch sw_;
RuntimeProfile::ThreadCounters* counters_;
};
-
}
#endif