You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by sa...@apache.org on 2018/08/02 02:54:32 UTC
[2/3] impala git commit: IMPALA-7297: soft memory limit for
reservation increases
IMPALA-7297: soft memory limit for reservation increases
https://goo.gl/N9LgQt summarises the memory problems I'm trying to
solve here.
Reject reservation increases that would leave < 10% of the memory
available under a query or process mem_limit, which put the query
or impalad into a state with a heightened chance of OOM. This
avoids OOM in some scenarios where reserved memory is increased
too aggressively and starves non-reserved memory.
One way of thinking about this that it's extending the 80%
limit heuristic for reserved memory so that there's a soft
limit of min(80%, 90% - non-reserved memory consumption)
A follow-on will use this for scanner threads to prevent
starting scanner threads that may exceed the soft limit.
Testing:
Added unit tests for MemTracker and for ReservationTracker.
Change-Id: I39dcf2dd870a59c8b8cda4488fe41ce936d715ac
Reviewed-on: http://gerrit.cloudera.org:8080/10988
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Impala Public Jenkins <im...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/68736bc3
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/68736bc3
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/68736bc3
Branch: refs/heads/master
Commit: 68736bc3784ad69b7f4f485a7a3ec0fc3d8d3a9b
Parents: 9f20b75
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Fri Jul 13 11:03:08 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Aug 2 01:46:19 2018 +0000
----------------------------------------------------------------------
be/src/exec/exchange-node.cc | 3 +-
be/src/rpc/impala-service-pool.cc | 2 +-
.../runtime/bufferpool/buffer-pool-internal.h | 2 +-
be/src/runtime/bufferpool/buffer-pool.cc | 11 +-
be/src/runtime/bufferpool/buffer-pool.h | 9 +-
.../bufferpool/reservation-tracker-test.cc | 74 ++++++-----
.../runtime/bufferpool/reservation-tracker.cc | 17 ++-
be/src/runtime/bufferpool/reservation-tracker.h | 17 ++-
be/src/runtime/initial-reservations.cc | 9 +-
be/src/runtime/mem-pool-test.cc | 22 ++--
be/src/runtime/mem-tracker-test.cc | 126 +++++++++++++------
be/src/runtime/mem-tracker-types.h | 31 +++++
be/src/runtime/mem-tracker.cc | 37 +++++-
be/src/runtime/mem-tracker.h | 96 +++++++-------
be/src/runtime/query-state.cc | 4 +-
be/src/runtime/runtime-state.cc | 2 +-
be/src/udf/udf.cc | 2 +-
17 files changed, 313 insertions(+), 151 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/exec/exchange-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index 07511a7..26579c1 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -82,7 +82,8 @@ Status ExchangeNode::Prepare(RuntimeState* state) {
RETURN_IF_ERROR(ExecEnv::GetInstance()->buffer_pool()->RegisterClient(
Substitute("Exchg Recvr (id=$0)", id_), nullptr,
ExecEnv::GetInstance()->buffer_reservation(), mem_tracker(),
- numeric_limits<int64_t>::max(), runtime_profile(), &recvr_buffer_pool_client_));
+ numeric_limits<int64_t>::max(), runtime_profile(), &recvr_buffer_pool_client_,
+ MemLimit::HARD));
// TODO: figure out appropriate buffer size
DCHECK_GT(num_senders_, 0);
http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/rpc/impala-service-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/impala-service-pool.cc b/be/src/rpc/impala-service-pool.cc
index cb406b9..d00c73d 100644
--- a/be/src/rpc/impala-service-pool.cc
+++ b/be/src/rpc/impala-service-pool.cc
@@ -170,7 +170,7 @@ kudu::Status ImpalaServicePool::QueueInboundCall(
// fail. The check and the consumption need to be atomic so as to bound the memory
// usage.
unique_lock<SpinLock> mem_tracker_lock(mem_tracker_lock_);
- if (UNLIKELY(service_mem_tracker_->AnyLimitExceeded())) {
+ if (UNLIKELY(service_mem_tracker_->AnyLimitExceeded(MemLimit::HARD))) {
// Discards the transfer early so the transfer size drops to 0. This is to ensure
// the MemTracker::Release() call in FailAndReleaseRpc() is correct as we haven't
// called MemTracker::Consume() at this point.
http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/bufferpool/buffer-pool-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-internal.h b/be/src/runtime/bufferpool/buffer-pool-internal.h
index 598939e..64c92e5 100644
--- a/be/src/runtime/bufferpool/buffer-pool-internal.h
+++ b/be/src/runtime/bufferpool/buffer-pool-internal.h
@@ -196,7 +196,7 @@ class BufferPool::Client {
public:
Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group, const string& name,
ReservationTracker* parent_reservation, MemTracker* mem_tracker,
- int64_t reservation_limit, RuntimeProfile* profile);
+ MemLimit mem_limit_mode, int64_t reservation_limit, RuntimeProfile* profile);
~Client() {
DCHECK_EQ(0, num_pages_);
http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc
index 6e6d979..6b51ed3 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -117,11 +117,12 @@ BufferPool::~BufferPool() {}
Status BufferPool::RegisterClient(const string& name, TmpFileMgr::FileGroup* file_group,
ReservationTracker* parent_reservation, MemTracker* mem_tracker,
- int64_t reservation_limit, RuntimeProfile* profile, ClientHandle* client) {
+ int64_t reservation_limit, RuntimeProfile* profile, ClientHandle* client,
+ MemLimit mem_limit_mode) {
DCHECK(!client->is_registered());
DCHECK(parent_reservation != NULL);
client->impl_ = new Client(this, file_group, name, parent_reservation, mem_tracker,
- reservation_limit, profile);
+ mem_limit_mode, reservation_limit, profile);
return Status::OK();
}
@@ -385,7 +386,7 @@ void BufferPool::SubReservation::Close() {
BufferPool::Client::Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group,
const string& name, ReservationTracker* parent_reservation, MemTracker* mem_tracker,
- int64_t reservation_limit, RuntimeProfile* profile)
+ MemLimit mem_limit_mode, int64_t reservation_limit, RuntimeProfile* profile)
: pool_(pool),
file_group_(file_group),
name_(name),
@@ -394,8 +395,8 @@ BufferPool::Client::Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group,
buffers_allocated_bytes_(0) {
// Set up a child profile with buffer pool info.
RuntimeProfile* child_profile = profile->CreateChild("Buffer pool", true, true);
- reservation_.InitChildTracker(
- child_profile, parent_reservation, mem_tracker, reservation_limit);
+ reservation_.InitChildTracker(child_profile, parent_reservation, mem_tracker,
+ reservation_limit, mem_limit_mode);
counters_.alloc_time = ADD_TIMER(child_profile, "AllocTime");
counters_.cumulative_allocations =
ADD_COUNTER(child_profile, "CumulativeAllocations", TUnit::UNIT);
http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h
index 3323e0a..eb658b0 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -29,6 +29,7 @@
#include "common/object-pool.h"
#include "common/status.h"
#include "gutil/macros.h"
+#include "runtime/mem-tracker-types.h"
#include "runtime/tmp-file-mgr.h"
#include "util/aligned-new.h"
#include "util/internal-queue.h"
@@ -37,7 +38,6 @@
namespace impala {
-class MemTracker;
class ReservationTracker;
class RuntimeProfile;
class SystemAllocator;
@@ -175,11 +175,12 @@ class BufferPool : public CacheLineAligned {
///
/// The client's reservation is created as a child of 'parent_reservation' with limit
/// 'reservation_limit' and associated with MemTracker 'mem_tracker'. The initial
- /// reservation is 0 bytes.
+ /// reservation is 0 bytes. 'mem_limit_mode' determines whether reservation
+ /// increases are checked against the soft or hard limit of 'mem_tracker'.
Status RegisterClient(const std::string& name, TmpFileMgr::FileGroup* file_group,
ReservationTracker* parent_reservation, MemTracker* mem_tracker,
- int64_t reservation_limit, RuntimeProfile* profile,
- ClientHandle* client) WARN_UNUSED_RESULT;
+ int64_t reservation_limit, RuntimeProfile* profile, ClientHandle* client,
+ MemLimit mem_limit_mode = MemLimit::SOFT) WARN_UNUSED_RESULT;
/// Deregister 'client' if it is registered. All pages must be destroyed and buffers
/// must be freed for the client before calling this. Releases any reservation that
http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/bufferpool/reservation-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker-test.cc b/be/src/runtime/bufferpool/reservation-tracker-test.cc
index e441402..88aa92a 100644
--- a/be/src/runtime/bufferpool/reservation-tracker-test.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker-test.cc
@@ -340,6 +340,7 @@ TEST_F(ReservationTrackerTest, MemTrackerIntegrationMultiLevel) {
// We can only handle MemTracker limits at the topmost linked ReservationTracker,
// so avoid adding limits at lower level.
const int LIMIT = HIERARCHY_DEPTH;
+ const int SOFT_LIMIT = static_cast<int>(LIMIT * 0.9);
vector<int> mem_limits({LIMIT * 10, LIMIT, -1, -1, -1});
// Root trackers aren't linked.
@@ -350,47 +351,64 @@ TEST_F(ReservationTrackerTest, MemTrackerIntegrationMultiLevel) {
mem_limits[i], Substitute("Child $0", i), mem_trackers[i - 1].get()));
}
- vector<int> interesting_amounts({LIMIT - 1, LIMIT, LIMIT + 1});
+ vector<int> interesting_amounts(
+ {LIMIT - 1, LIMIT, LIMIT + 1, SOFT_LIMIT, SOFT_LIMIT - 1});
// Test that all limits and consumption correctly reported when consuming
// from a non-root ReservationTracker that is connected to a MemTracker.
- for (int level = 1; level < HIERARCHY_DEPTH; ++level) {
- int64_t lowest_limit = mem_trackers[level]->lowest_limit();
- for (int amount : interesting_amounts) {
- // Initialize the tracker, increase reservation, then release reservation by closing
- // the tracker.
- reservations[level].InitChildTracker(
- NewProfile(), &reservations[level - 1], mem_trackers[level].get(), 500);
- bool increased = reservations[level].IncreaseReservation(amount);
- if (lowest_limit == -1 || amount <= lowest_limit) {
- // The increase should go through.
- ASSERT_TRUE(increased) << reservations[level].DebugString();
- ASSERT_EQ(amount, reservations[level].GetReservation());
- ASSERT_EQ(amount, mem_trackers[level]->consumption());
- for (int ancestor = 0; ancestor < level; ++ancestor) {
- ASSERT_EQ(amount, reservations[ancestor].GetChildReservations());
- ASSERT_EQ(amount, mem_trackers[ancestor]->consumption());
+ // Test both soft and hard limits.
+ for (MemLimit limit_mode : {MemLimit::SOFT, MemLimit::HARD}) {
+ for (int level = 1; level < HIERARCHY_DEPTH; ++level) {
+ int64_t lowest_limit = mem_trackers[level]->GetLowestLimit(limit_mode);
+ for (int amount : interesting_amounts) {
+ LOG(INFO) << "level=" << level << " limit_mode=" << static_cast<int>(limit_mode)
+ << " amount=" << amount << " lowest_limit=" << lowest_limit;
+ // Initialize the tracker, increase reservation, then release reservation by
+ // closing the tracker.
+ reservations[level].InitChildTracker(NewProfile(), &reservations[level - 1],
+ mem_trackers[level].get(), 500, limit_mode);
+ bool increased = reservations[level].IncreaseReservation(amount);
+ if (lowest_limit == -1 || amount <= lowest_limit) {
+ // The increase should go through.
+ ASSERT_TRUE(increased)
+ << reservations[level].DebugString() << "\n"
+ << mem_trackers[0]->LogUsage(MemTracker::UNLIMITED_DEPTH);
+ ASSERT_EQ(amount, reservations[level].GetReservation());
+ ASSERT_EQ(amount, mem_trackers[level]->consumption());
+ for (int ancestor = 0; ancestor < level; ++ancestor) {
+ ASSERT_EQ(amount, reservations[ancestor].GetChildReservations());
+ ASSERT_EQ(amount, mem_trackers[ancestor]->consumption());
+ }
+
+ LOG(INFO) << "\n" << mem_trackers[0]->LogUsage(MemTracker::UNLIMITED_DEPTH);
+ } else {
+ ASSERT_FALSE(increased);
}
-
- LOG(INFO) << "\n" << mem_trackers[0]->LogUsage(MemTracker::UNLIMITED_DEPTH);
reservations[level].Close();
- } else {
- ASSERT_FALSE(increased);
- }
- // Reservations should be released on all ancestors.
- for (int i = 0; i < level; ++i) {
- ASSERT_EQ(0, reservations[i].GetReservation()) << i << ": "
- << reservations[i].DebugString();
- ASSERT_EQ(0, reservations[i].GetChildReservations());
- ASSERT_EQ(0, mem_trackers[i]->consumption());
+ // Reservations should be released on all ancestors.
+ for (int i = 0; i < level; ++i) {
+ ASSERT_EQ(0, reservations[i].GetReservation()) << i << ": "
+ << reservations[i].DebugString();
+ ASSERT_EQ(0, reservations[i].GetChildReservations());
+ ASSERT_EQ(0, mem_trackers[i]->consumption());
+ }
}
+ // Set up tracker to be parent for next iteration.
+ reservations[level].InitChildTracker(NewProfile(), &reservations[level - 1],
+ mem_trackers[level].get(), 500, limit_mode);
}
+ for (int level = 1; level < HIERARCHY_DEPTH; ++level) reservations[level].Close();
}
// "Pull down" a reservation from the top of the hierarchy level-by-level to the
// leaves, checking that everything is consistent at each step.
for (int level = 0; level < HIERARCHY_DEPTH; ++level) {
const int amount = LIMIT;
+ if (level > 0) {
+ reservations[level].InitChildTracker(
+ NewProfile(), &reservations[level - 1], mem_trackers[level].get(), 500,
+ MemLimit::HARD);
+ }
ASSERT_TRUE(reservations[level].IncreaseReservation(amount));
ASSERT_EQ(amount, reservations[level].GetReservation());
ASSERT_EQ(0, reservations[level].GetUsedReservation());
http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/bufferpool/reservation-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker.cc b/be/src/runtime/bufferpool/reservation-tracker.cc
index f0e1839..a920e1c 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker.cc
@@ -57,7 +57,8 @@ void ReservationTracker::InitRootTracker(
}
void ReservationTracker::InitChildTracker(RuntimeProfile* profile,
- ReservationTracker* parent, MemTracker* mem_tracker, int64_t reservation_limit) {
+ ReservationTracker* parent, MemTracker* mem_tracker, int64_t reservation_limit,
+ MemLimit mem_limit_mode) {
DCHECK(parent != nullptr);
DCHECK_GE(reservation_limit, 0);
@@ -65,6 +66,7 @@ void ReservationTracker::InitChildTracker(RuntimeProfile* profile,
DCHECK(!initialized_);
parent_ = parent;
mem_tracker_ = mem_tracker;
+ mem_limit_mode_ = mem_limit_mode;
reservation_limit_ = reservation_limit;
reservation_ = 0;
@@ -79,7 +81,8 @@ void ReservationTracker::InitChildTracker(RuntimeProfile* profile,
DCHECK_EQ(parent_mem_tracker, mem_tracker_->parent());
// Make sure we don't have a lower limit than the ancestor, since we don't enforce
// limits at lower links.
- DCHECK_EQ(mem_tracker_->lowest_limit(), parent_mem_tracker->lowest_limit());
+ DCHECK_EQ(mem_tracker_->GetLowestLimit(mem_limit_mode_),
+ parent_mem_tracker->GetLowestLimit(mem_limit_mode_));
} else {
// Make sure we didn't leave a gap in the links. E.g. this tracker's grandparent
// shouldn't have a MemTracker.
@@ -208,7 +211,8 @@ bool ReservationTracker::IncreaseReservationInternalLocked(int64_t bytes,
granted = parent_->IncreaseReservationInternalLocked(
reservation_increase, true, true, error_status);
}
- if (granted && !TryConsumeFromMemTracker(reservation_increase)) {
+ if (granted
+ && !TryConsumeFromMemTracker(reservation_increase, mem_limit_mode_)) {
granted = false;
if (error_status != nullptr) {
*error_status = mem_tracker_->MemLimitExceeded(nullptr,
@@ -230,13 +234,14 @@ bool ReservationTracker::IncreaseReservationInternalLocked(int64_t bytes,
return granted;
}
-bool ReservationTracker::TryConsumeFromMemTracker(int64_t reservation_increase) {
+bool ReservationTracker::TryConsumeFromMemTracker(
+ int64_t reservation_increase, MemLimit mem_limit_mode) {
DCHECK_GE(reservation_increase, 0);
if (mem_tracker_ == nullptr) return true;
if (GetParentMemTracker() == nullptr) {
// At the topmost link, which may be a MemTracker with a limit, we need to use
// TryConsume() to check the limit.
- return mem_tracker_->TryConsume(reservation_increase);
+ return mem_tracker_->TryConsume(reservation_increase, mem_limit_mode);
} else {
// For lower links, there shouldn't be a limit to enforce, so we just need to
// update the consumption of the linked MemTracker since the reservation is
@@ -329,7 +334,7 @@ bool ReservationTracker::TransferReservationTo(ReservationTracker* other, int64_
// We don't handle MemTrackers with limit in this function - this should always
// succeed.
DCHECK(tracker->mem_tracker_ == nullptr || !tracker->mem_tracker_->has_limit());
- bool success = tracker->TryConsumeFromMemTracker(bytes);
+ bool success = tracker->TryConsumeFromMemTracker(bytes, MemLimit::HARD);
DCHECK(success);
if (tracker != other_path_to_common[0]) tracker->child_reservations_ += bytes;
}
http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/bufferpool/reservation-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker.h b/be/src/runtime/bufferpool/reservation-tracker.h
index 3bf2de1..604db7f 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.h
+++ b/be/src/runtime/bufferpool/reservation-tracker.h
@@ -23,14 +23,14 @@
#include <boost/thread/locks.hpp>
#include <string>
-#include "runtime/bufferpool/reservation-tracker-counters.h"
#include "common/status.h"
+#include "runtime/bufferpool/reservation-tracker-counters.h"
+#include "runtime/mem-tracker-types.h"
#include "util/spinlock.h"
namespace impala {
class DummyProfile;
-class MemTracker;
class RuntimeProfile;
/// A tracker for a hierarchy of buffer pool memory reservations, denominated in bytes.
@@ -101,9 +101,12 @@ class ReservationTracker {
/// If 'mem_tracker' is not NULL, reservations for this ReservationTracker and its
/// children will be counted as consumption against 'mem_tracker'.
/// 'reservation_limit' is the maximum reservation for this tracker in bytes.
- /// if 'profile' is not NULL, the counters in 'counters_' are added to 'profile'.
+ /// 'mem_limit_mode' determines whether reservation increases are checked against the
+ /// soft or hard limit of 'mem_tracker'. If 'profile' is not NULL, the counters in
+ /// 'counters_' are added to 'profile'.
void InitChildTracker(RuntimeProfile* profile, ReservationTracker* parent,
- MemTracker* mem_tracker, int64_t reservation_limit);
+ MemTracker* mem_tracker, int64_t reservation_limit,
+ MemLimit mem_limit_mode = MemLimit::SOFT);
/// If the tracker is initialized, deregister the ReservationTracker from its parent,
/// relinquishing all this tracker's reservation. All of the reservation must be unused
@@ -219,7 +222,7 @@ class ReservationTracker {
/// because it would exceed a memory limit. If there is no linked MemTracker, just
/// returns true.
/// TODO: remove once we account all memory via ReservationTrackers.
- bool TryConsumeFromMemTracker(int64_t reservation_increase);
+ bool TryConsumeFromMemTracker(int64_t reservation_increase, MemLimit mem_limit_mode);
/// Decrease consumption on linked MemTracker to reflect a decrease in reservation of
/// 'reservation_decrease'. If there is no linked MemTracker, does nothing.
@@ -301,6 +304,10 @@ class ReservationTracker {
/// TODO: remove once all memory is accounted via ReservationTrackers.
MemTracker* mem_tracker_ = nullptr;
+ /// Determines whether the soft or hard limit of 'mem_tracker_' is checked for
+ /// reservation increases.
+ MemLimit mem_limit_mode_;
+
/// The maximum reservation in bytes that this tracker can have. Can be read with an
/// atomic load without holding lock.
int64_t reservation_limit_;
http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/initial-reservations.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/initial-reservations.cc b/be/src/runtime/initial-reservations.cc
index 0fe00fe..e5fcd32 100644
--- a/be/src/runtime/initial-reservations.cc
+++ b/be/src/runtime/initial-reservations.cc
@@ -41,10 +41,13 @@ InitialReservations::InitialReservations(ObjectPool* obj_pool,
ReservationTracker* query_reservation, MemTracker* query_mem_tracker,
int64_t initial_reservation_total_claims)
: initial_reservation_mem_tracker_(obj_pool->Add(
- new MemTracker(-1, "Unclaimed reservations", query_mem_tracker, false))),
- remaining_initial_reservation_claims_(initial_reservation_total_claims) {
+ new MemTracker(-1, "Unclaimed reservations", query_mem_tracker, false))),
+ remaining_initial_reservation_claims_(initial_reservation_total_claims) {
+ // Soft mem_limits should not apply to the initial reservation because we don't want
+ // to fail the query in the case where the initial reservation exceeds the soft
+ // limit.
initial_reservations_.InitChildTracker(nullptr, query_reservation,
- initial_reservation_mem_tracker_, numeric_limits<int64_t>::max());
+ initial_reservation_mem_tracker_, numeric_limits<int64_t>::max(), MemLimit::HARD);
}
Status InitialReservations::Init(
http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/mem-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-pool-test.cc b/be/src/runtime/mem-pool-test.cc
index 26176a5..f939008 100644
--- a/be/src/runtime/mem-pool-test.cc
+++ b/be/src/runtime/mem-pool-test.cc
@@ -241,35 +241,35 @@ TEST(MemPoolTest, Limits) {
MemTracker limit2(3 * MemPoolTest::INITIAL_CHUNK_SIZE, "", &limit3);
MemPool* p1 = new MemPool(&limit1);
- EXPECT_FALSE(limit1.AnyLimitExceeded());
+ EXPECT_FALSE(limit1.AnyLimitExceeded(MemLimit::HARD));
MemPool* p2 = new MemPool(&limit2);
- EXPECT_FALSE(limit2.AnyLimitExceeded());
+ EXPECT_FALSE(limit2.AnyLimitExceeded(MemLimit::HARD));
// p1 exceeds a non-shared limit
p1->Allocate(MemPoolTest::INITIAL_CHUNK_SIZE);
- EXPECT_FALSE(limit1.LimitExceeded());
+ EXPECT_FALSE(limit1.LimitExceeded(MemLimit::HARD));
EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE, limit1.consumption());
- EXPECT_FALSE(limit3.LimitExceeded());
+ EXPECT_FALSE(limit3.LimitExceeded(MemLimit::HARD));
EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE, limit3.consumption());
p1->Allocate(MemPoolTest::INITIAL_CHUNK_SIZE);
- EXPECT_TRUE(limit1.LimitExceeded());
+ EXPECT_TRUE(limit1.LimitExceeded(MemLimit::HARD));
EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE * 3, limit1.consumption());
- EXPECT_FALSE(limit3.LimitExceeded());
+ EXPECT_FALSE(limit3.LimitExceeded(MemLimit::HARD));
EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE * 3, limit3.consumption());
// p2 exceeds a shared limit
p2->Allocate(MemPoolTest::INITIAL_CHUNK_SIZE);
- EXPECT_FALSE(limit2.LimitExceeded());
+ EXPECT_FALSE(limit2.LimitExceeded(MemLimit::HARD));
EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE, limit2.consumption());
- EXPECT_FALSE(limit3.LimitExceeded());
+ EXPECT_FALSE(limit3.LimitExceeded(MemLimit::HARD));
EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE * 4, limit3.consumption());
p2->Allocate(1);
- EXPECT_FALSE(limit2.LimitExceeded());
+ EXPECT_FALSE(limit2.LimitExceeded(MemLimit::HARD));
EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE * 3, limit2.consumption());
- EXPECT_TRUE(limit3.LimitExceeded());
+ EXPECT_TRUE(limit3.LimitExceeded(MemLimit::HARD));
EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE * 6, limit3.consumption());
// deleting pools reduces consumption
@@ -281,7 +281,7 @@ TEST(MemPoolTest, Limits) {
// Allocate another chunk
p2->FreeAll();
- EXPECT_FALSE(limit2.LimitExceeded());
+ EXPECT_FALSE(limit2.LimitExceeded(MemLimit::HARD));
uint8_t* result = p2->TryAllocate(MemPoolTest::INITIAL_CHUNK_SIZE);
ASSERT_TRUE(result != NULL);
ASSERT_TRUE(MemPoolTest::CheckIntegrity(p2, false));
http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/mem-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker-test.cc b/be/src/runtime/mem-tracker-test.cc
index 285f897..e52c345 100644
--- a/be/src/runtime/mem-tracker-test.cc
+++ b/be/src/runtime/mem-tracker-test.cc
@@ -35,7 +35,7 @@ TEST(MemTestTest, SingleTrackerNoLimit) {
EXPECT_EQ(t.consumption(), 20);
t.Release(15);
EXPECT_EQ(t.consumption(), 5);
- EXPECT_FALSE(t.LimitExceeded());
+ EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
// Clean up.
t.Release(5);
}
@@ -45,17 +45,72 @@ TEST(MemTestTest, SingleTrackerWithLimit) {
EXPECT_TRUE(t.has_limit());
t.Consume(10);
EXPECT_EQ(t.consumption(), 10);
- EXPECT_FALSE(t.LimitExceeded());
+ EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
t.Consume(10);
EXPECT_EQ(t.consumption(), 20);
- EXPECT_TRUE(t.LimitExceeded());
+ EXPECT_TRUE(t.LimitExceeded(MemLimit::HARD));
t.Release(15);
EXPECT_EQ(t.consumption(), 5);
- EXPECT_FALSE(t.LimitExceeded());
+ EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
// Clean up.
t.Release(5);
}
+/// Exercise individual functions that take MemLimit::SOFT option.
+TEST(MemTestTest, SoftLimit) {
+ MemTracker t(100);
+ MemTracker child(-1, "", &t);
+
+ // Exercise functions that return limits.
+ EXPECT_EQ(90, t.soft_limit());
+ EXPECT_EQ(90, t.GetLimit(MemLimit::SOFT));
+ EXPECT_EQ(100, t.GetLimit(MemLimit::HARD));
+ EXPECT_EQ(-1, child.soft_limit());
+ EXPECT_EQ(90, t.GetLowestLimit(MemLimit::SOFT));
+ EXPECT_EQ(90, child.GetLowestLimit(MemLimit::SOFT));
+ EXPECT_EQ(100, t.GetLowestLimit(MemLimit::HARD));
+
+ // Test SpareCapacity()
+ EXPECT_EQ(100, t.SpareCapacity(MemLimit::HARD));
+ EXPECT_EQ(90, t.SpareCapacity(MemLimit::SOFT));
+ EXPECT_EQ(100, child.SpareCapacity(MemLimit::HARD));
+ EXPECT_EQ(90, child.SpareCapacity(MemLimit::SOFT));
+
+ // Test TryConsume() within soft limit.
+ EXPECT_TRUE(t.TryConsume(90, MemLimit::SOFT));
+ EXPECT_FALSE(t.LimitExceeded(MemLimit::SOFT));
+ EXPECT_FALSE(child.AnyLimitExceeded(MemLimit::SOFT));
+
+ // Test TryConsume() going over soft limit.
+ EXPECT_FALSE(t.TryConsume(1, MemLimit::SOFT));
+ EXPECT_FALSE(child.TryConsume(1, MemLimit::SOFT));
+ EXPECT_TRUE(t.TryConsume(1, MemLimit::HARD));
+ EXPECT_TRUE(t.LimitExceeded(MemLimit::SOFT));
+ EXPECT_FALSE(child.LimitExceeded(MemLimit::SOFT));
+ EXPECT_TRUE(t.AnyLimitExceeded(MemLimit::SOFT));
+ EXPECT_TRUE(child.AnyLimitExceeded(MemLimit::SOFT));
+ EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
+ EXPECT_FALSE(child.AnyLimitExceeded(MemLimit::HARD));
+ EXPECT_EQ(9, child.SpareCapacity(MemLimit::HARD));
+ EXPECT_EQ(-1, child.SpareCapacity(MemLimit::SOFT));
+
+ // Test Consume() going over hard limit.
+ child.Consume(10);
+ EXPECT_TRUE(t.LimitExceeded(MemLimit::SOFT));
+ EXPECT_TRUE(t.LimitExceeded(MemLimit::HARD));
+ EXPECT_FALSE(child.LimitExceeded(MemLimit::SOFT));
+ EXPECT_FALSE(child.LimitExceeded(MemLimit::HARD));
+ EXPECT_TRUE(t.AnyLimitExceeded(MemLimit::SOFT));
+ EXPECT_TRUE(t.AnyLimitExceeded(MemLimit::HARD));
+ EXPECT_TRUE(child.AnyLimitExceeded(MemLimit::SOFT));
+ EXPECT_TRUE(child.AnyLimitExceeded(MemLimit::HARD));
+ EXPECT_EQ(-1, child.SpareCapacity(MemLimit::HARD));
+ EXPECT_EQ(-11, child.SpareCapacity(MemLimit::SOFT));
+
+ t.Release(91);
+ child.Release(10);
+}
+
TEST(MemTestTest, ConsumptionMetric) {
TMetricDef md;
md.__set_key("test");
@@ -80,12 +135,12 @@ TEST(MemTestTest, ConsumptionMetric) {
t.Consume(150);
EXPECT_EQ(t.consumption(), 0);
EXPECT_EQ(t.peak_consumption(), 0);
- EXPECT_FALSE(t.LimitExceeded());
+ EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
EXPECT_EQ(neg_t.consumption(), 0);
t.Release(5);
EXPECT_EQ(t.consumption(), 0);
EXPECT_EQ(t.peak_consumption(), 0);
- EXPECT_FALSE(t.LimitExceeded());
+ EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
EXPECT_EQ(neg_t.consumption(), 0);
metric.Increment(10);
@@ -101,21 +156,21 @@ TEST(MemTestTest, ConsumptionMetric) {
neg_t.Consume(1);
EXPECT_EQ(t.consumption(), 5);
EXPECT_EQ(t.peak_consumption(), 10);
- EXPECT_FALSE(t.LimitExceeded());
+ EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
EXPECT_EQ(neg_t.consumption(), -5);
metric.Increment(150);
t.Consume(1);
neg_t.Consume(1);
EXPECT_EQ(t.consumption(), 155);
EXPECT_EQ(t.peak_consumption(), 155);
- EXPECT_TRUE(t.LimitExceeded());
+ EXPECT_TRUE(t.LimitExceeded(MemLimit::HARD));
EXPECT_EQ(neg_t.consumption(), -155);
metric.Increment(-150);
t.Consume(-1);
neg_t.Consume(1);
EXPECT_EQ(t.consumption(), 5);
EXPECT_EQ(t.peak_consumption(), 155);
- EXPECT_FALSE(t.LimitExceeded());
+ EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
EXPECT_EQ(neg_t.consumption(), -5);
// consumption_ is not updated when Consume()/Release() is called with a zero value
metric.Increment(10);
@@ -123,7 +178,7 @@ TEST(MemTestTest, ConsumptionMetric) {
neg_t.Consume(0);
EXPECT_EQ(t.consumption(), 5);
EXPECT_EQ(t.peak_consumption(), 155);
- EXPECT_FALSE(t.LimitExceeded());
+ EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
EXPECT_EQ(neg_t.consumption(), -5);
// Clean up.
metric.Increment(-15);
@@ -139,37 +194,37 @@ TEST(MemTestTest, TrackerHierarchy) {
// everything below limits
c1.Consume(60);
EXPECT_EQ(c1.consumption(), 60);
- EXPECT_FALSE(c1.LimitExceeded());
- EXPECT_FALSE(c1.AnyLimitExceeded());
+ EXPECT_FALSE(c1.LimitExceeded(MemLimit::HARD));
+ EXPECT_FALSE(c1.AnyLimitExceeded(MemLimit::HARD));
EXPECT_EQ(c2.consumption(), 0);
- EXPECT_FALSE(c2.LimitExceeded());
- EXPECT_FALSE(c2.AnyLimitExceeded());
+ EXPECT_FALSE(c2.LimitExceeded(MemLimit::HARD));
+ EXPECT_FALSE(c2.AnyLimitExceeded(MemLimit::HARD));
EXPECT_EQ(p.consumption(), 60);
- EXPECT_FALSE(p.LimitExceeded());
- EXPECT_FALSE(p.AnyLimitExceeded());
+ EXPECT_FALSE(p.LimitExceeded(MemLimit::HARD));
+ EXPECT_FALSE(p.AnyLimitExceeded(MemLimit::HARD));
// p goes over limit
c2.Consume(50);
EXPECT_EQ(c1.consumption(), 60);
- EXPECT_FALSE(c1.LimitExceeded());
- EXPECT_TRUE(c1.AnyLimitExceeded());
+ EXPECT_FALSE(c1.LimitExceeded(MemLimit::HARD));
+ EXPECT_TRUE(c1.AnyLimitExceeded(MemLimit::HARD));
EXPECT_EQ(c2.consumption(), 50);
- EXPECT_FALSE(c2.LimitExceeded());
- EXPECT_TRUE(c2.AnyLimitExceeded());
+ EXPECT_FALSE(c2.LimitExceeded(MemLimit::HARD));
+ EXPECT_TRUE(c2.AnyLimitExceeded(MemLimit::HARD));
EXPECT_EQ(p.consumption(), 110);
- EXPECT_TRUE(p.LimitExceeded());
+ EXPECT_TRUE(p.LimitExceeded(MemLimit::HARD));
// c2 goes over limit, p drops below limit
c1.Release(20);
c2.Consume(10);
EXPECT_EQ(c1.consumption(), 40);
- EXPECT_FALSE(c1.LimitExceeded());
- EXPECT_FALSE(c1.AnyLimitExceeded());
+ EXPECT_FALSE(c1.LimitExceeded(MemLimit::HARD));
+ EXPECT_FALSE(c1.AnyLimitExceeded(MemLimit::HARD));
EXPECT_EQ(c2.consumption(), 60);
- EXPECT_TRUE(c2.LimitExceeded());
- EXPECT_TRUE(c2.AnyLimitExceeded());
+ EXPECT_TRUE(c2.LimitExceeded(MemLimit::HARD));
+ EXPECT_TRUE(c2.AnyLimitExceeded(MemLimit::HARD));
EXPECT_EQ(p.consumption(), 100);
- EXPECT_FALSE(p.LimitExceeded());
+ EXPECT_FALSE(p.LimitExceeded(MemLimit::HARD));
// Clean up.
c1.Release(40);
@@ -254,34 +309,34 @@ TEST(MemTestTest, GcFunctions) {
ASSERT_TRUE(t.has_limit());
t.Consume(9);
- EXPECT_FALSE(t.LimitExceeded());
+ EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
// Test TryConsume()
EXPECT_FALSE(t.TryConsume(2));
EXPECT_EQ(t.consumption(), 9);
- EXPECT_FALSE(t.LimitExceeded());
+ EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
// Attach GcFunction that releases 1 byte
GcFunctionHelper gc_func_helper(&t);
t.AddGcFunction(boost::bind(&GcFunctionHelper::GcFunc, &gc_func_helper));
EXPECT_TRUE(t.TryConsume(2));
EXPECT_EQ(t.consumption(), 10);
- EXPECT_FALSE(t.LimitExceeded());
+ EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
// GcFunction will be called even though TryConsume() fails
EXPECT_FALSE(t.TryConsume(2));
EXPECT_EQ(t.consumption(), 9);
- EXPECT_FALSE(t.LimitExceeded());
+ EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
// GcFunction won't be called
EXPECT_TRUE(t.TryConsume(1));
EXPECT_EQ(t.consumption(), 10);
- EXPECT_FALSE(t.LimitExceeded());
+ EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
- // Test LimitExceeded()
+ // Test LimitExceeded(MemLimit::HARD)
t.Consume(1);
EXPECT_EQ(t.consumption(), 11);
- EXPECT_FALSE(t.LimitExceeded());
+ EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
EXPECT_EQ(t.consumption(), 10);
// Add more GcFunctions, test that we only call them until the limit is no longer
@@ -292,13 +347,12 @@ TEST(MemTestTest, GcFunctions) {
t.AddGcFunction(boost::bind(&GcFunctionHelper::GcFunc, &gc_func_helper3));
t.Consume(1);
EXPECT_EQ(t.consumption(), 11);
- EXPECT_FALSE(t.LimitExceeded());
+ EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
EXPECT_EQ(t.consumption(), 10);
- //Clean up.
+ // Clean up.
t.Release(10);
}
-
}
IMPALA_TEST_MAIN();
http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/mem-tracker-types.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker-types.h b/be/src/runtime/mem-tracker-types.h
new file mode 100644
index 0000000..e199f1b
--- /dev/null
+++ b/be/src/runtime/mem-tracker-types.h
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef IMPALA_RUNTIME_MEM_TRACKER_TYPES_H
+#define IMPALA_RUNTIME_MEM_TRACKER_TYPES_H
+
+namespace impala {
+class MemTracker;
+
+/// Mode argument passed to various MemTracker methods to indicate whether a soft or hard
+/// limit should be used.
+enum class MemLimit { HARD, SOFT };
+
+}
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/mem-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index 237484f..cb615a1 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -38,6 +38,9 @@ using std::priority_queue;
using std::greater;
using namespace strings;
+DEFINE_double_hidden(soft_mem_limit_frac, 0.9, "(Advanced) Soft memory limit as a "
+ "fraction of hard memory limit.");
+
namespace impala {
const string MemTracker::COUNTER_NAME = "PeakMemoryUsage";
@@ -45,9 +48,17 @@ const string MemTracker::COUNTER_NAME = "PeakMemoryUsage";
// Name for request pool MemTrackers. '$0' is replaced with the pool name.
const string REQUEST_POOL_MEM_TRACKER_LABEL_FORMAT = "RequestPool=$0";
+/// Calculate the soft limit for a MemTracker based on the hard limit 'limit'.
+static int64_t CalcSoftLimit(int64_t limit) {
+ if (limit < 0) return -1;
+ double frac = max(0.0, min(1.0, FLAGS_soft_mem_limit_frac));
+ return static_cast<int64_t>(limit * frac);
+}
+
MemTracker::MemTracker(
int64_t byte_limit, const string& label, MemTracker* parent, bool log_usage_if_zero)
: limit_(byte_limit),
+ soft_limit_(CalcSoftLimit(byte_limit)),
label_(label),
parent_(parent),
consumption_(&local_counter_),
@@ -64,6 +75,7 @@ MemTracker::MemTracker(
MemTracker::MemTracker(RuntimeProfile* profile, int64_t byte_limit,
const std::string& label, MemTracker* parent)
: limit_(byte_limit),
+ soft_limit_(CalcSoftLimit(byte_limit)),
label_(label),
parent_(parent),
consumption_(profile->AddHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES)),
@@ -80,6 +92,7 @@ MemTracker::MemTracker(RuntimeProfile* profile, int64_t byte_limit,
MemTracker::MemTracker(IntGauge* consumption_metric,
int64_t byte_limit, const string& label, MemTracker* parent)
: limit_(byte_limit),
+ soft_limit_(CalcSoftLimit(byte_limit)),
label_(label),
parent_(parent),
consumption_(&local_counter_),
@@ -95,6 +108,7 @@ MemTracker::MemTracker(IntGauge* consumption_metric,
void MemTracker::Init() {
DCHECK_GE(limit_, -1);
+ DCHECK_LE(soft_limit_, limit_);
if (parent_ != NULL) parent_->AddChildTracker(this);
// populate all_trackers_ and limit_trackers_
MemTracker* tracker = this;
@@ -133,6 +147,25 @@ void MemTracker::EnableReservationReporting(const ReservationTrackerCounters& co
delete reservation_counters_.Swap(new ReservationTrackerCounters(counters));
}
+int64_t MemTracker::GetLowestLimit(MemLimit mode) const {
+ if (limit_trackers_.empty()) return -1;
+ int64_t min_limit = numeric_limits<int64_t>::max();
+ for (MemTracker* limit_tracker : limit_trackers_) {
+ DCHECK(limit_tracker->has_limit());
+ min_limit = min(min_limit, limit_tracker->GetLimit(mode));
+ }
+ return min_limit;
+}
+
+int64_t MemTracker::SpareCapacity(MemLimit mode) const {
+ int64_t result = numeric_limits<int64_t>::max();
+ for (MemTracker* tracker : limit_trackers_) {
+ int64_t mem_left = tracker->GetLimit(mode) - tracker->consumption();
+ result = std::min(result, mem_left);
+ }
+ return result;
+}
+
int64_t MemTracker::GetPoolMemReserved() {
// Pool trackers should have a pool_name_ and no limit.
DCHECK(!pool_name_.empty());
@@ -274,7 +307,7 @@ string MemTracker::LogUsage(int max_recursive_depth, const string& prefix,
stringstream ss;
ss << prefix << label_ << ":";
- if (CheckLimitExceeded()) ss << " memory limit exceeded.";
+ if (CheckLimitExceeded(MemLimit::HARD)) ss << " memory limit exceeded.";
if (limit_ > 0) ss << " Limit=" << PrettyPrinter::Print(limit_, TUnit::BYTES);
ReservationTrackerCounters* reservation_counters = reservation_counters_.Load();
@@ -392,7 +425,7 @@ Status MemTracker::MemLimitExceeded(RuntimeState* state, const std::string& deta
ss << endl;
ExecEnv* exec_env = ExecEnv::GetInstance();
MemTracker* process_tracker = exec_env->process_mem_tracker();
- const int64_t process_capacity = process_tracker->SpareCapacity();
+ const int64_t process_capacity = process_tracker->SpareCapacity(MemLimit::HARD);
ss << "Memory left in process limit: "
<< PrettyPrinter::Print(process_capacity, TUnit::BYTES) << endl;
http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index 01fd331..d4b4dd4 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -29,6 +29,7 @@
#include "common/logging.h"
#include "common/atomic.h"
+#include "runtime/mem-tracker-types.h"
#include "util/debug-util.h"
#include "util/internal-queue.h"
#include "util/metrics.h"
@@ -40,7 +41,6 @@
namespace impala {
class ObjectPool;
-class MemTracker;
struct ReservationTrackerCounters;
class TQueryOptions;
@@ -48,6 +48,12 @@ class TQueryOptions;
/// and can be arranged into a tree structure such that the consumption tracked
/// by a MemTracker is also tracked by its ancestors.
///
+/// A MemTracker has a hard and a soft limit derived from the limit. If the hard limit
+/// is exceeded, all memory allocations and queries should fail until we are under the
+/// limit again. The soft limit can be exceeded without causing query failures, but
+/// consumers of memory that can tolerate running without more memory should not allocate
+/// memory in excess of the soft limit.
+///
/// We use a five-level hierarchy of mem trackers: process, pool, query, fragment
/// instance. Specific parts of the fragment (exec nodes, sinks, etc) will add a
/// fifth level when they are initialized. This function also initializes a user
@@ -77,22 +83,23 @@ class MemTracker {
public:
/// 'byte_limit' < 0 means no limit
/// 'label' is the label used in the usage string (LogUsage())
- /// If 'log_usage_if_zero' is false, this tracker (and its children) will not be included
+ /// If 'log_usage_if_zero' is false, this tracker (and its children) will not be
+ /// included
/// in LogUsage() output if consumption is 0.
MemTracker(int64_t byte_limit = -1, const std::string& label = std::string(),
- MemTracker* parent = NULL, bool log_usage_if_zero = true);
+ MemTracker* parent = nullptr, bool log_usage_if_zero = true);
/// C'tor for tracker for which consumption counter is created as part of a profile.
/// The counter is created with name COUNTER_NAME.
MemTracker(RuntimeProfile* profile, int64_t byte_limit,
- const std::string& label = std::string(), MemTracker* parent = NULL);
+ const std::string& label = std::string(), MemTracker* parent = nullptr);
/// C'tor for tracker that uses consumption_metric as the consumption value.
/// Consume()/Release() can still be called. This is used for the root process tracker
/// (if 'parent' is NULL). It is also to report on other categories of memory under the
/// process tracker, e.g. buffer pool free buffers (if 'parent - non-NULL).
MemTracker(IntGauge* consumption_metric, int64_t byte_limit = -1,
- const std::string& label = std::string(), MemTracker* parent = NULL);
+ const std::string& label = std::string(), MemTracker* parent = nullptr);
~MemTracker();
@@ -129,13 +136,13 @@ class MemTracker {
return;
}
- if (consumption_metric_ != NULL) {
+ if (consumption_metric_ != nullptr) {
RefreshConsumptionFromMetric();
return;
}
for (MemTracker* tracker : all_trackers_) {
tracker->consumption_->Add(bytes);
- if (tracker->consumption_metric_ == NULL) {
+ if (tracker->consumption_metric_ == nullptr) {
DCHECK_GE(tracker->consumption_->current_value(), 0);
}
}
@@ -148,7 +155,7 @@ class MemTracker {
/// the limit recorded in one of its ancestors already happened.
void ConsumeLocal(int64_t bytes, MemTracker* end_tracker) {
DCHECK(!closed_) << label_;
- DCHECK(consumption_metric_ == NULL) << "Should not be called on root.";
+ DCHECK(consumption_metric_ == nullptr) << "Should not be called on root.";
for (MemTracker* tracker : all_trackers_) {
if (tracker == end_tracker) return;
DCHECK(!tracker->has_limit());
@@ -163,19 +170,21 @@ class MemTracker {
}
/// Increases consumption of this tracker and its ancestors by 'bytes' only if
- /// they can all consume 'bytes'. If this brings any of them over, none of them
- /// are updated.
- /// Returns true if the try succeeded.
+ /// they can all consume 'bytes' without exceeding limit (hard or soft) specified
+ /// by 'mode'. If any limit would be exceed, no MemTrackers are updated. If the
+ /// caller can tolerate an allocation failing, it should set mode=SOFT so that
+ /// other callers that may not tolerate allocation failures have a better chance
+ /// of success. Returns true if the consumption was successfully updated.
WARN_UNUSED_RESULT
- bool TryConsume(int64_t bytes) {
+ bool TryConsume(int64_t bytes, MemLimit mode = MemLimit::HARD) {
DCHECK(!closed_) << label_;
- if (consumption_metric_ != NULL) RefreshConsumptionFromMetric();
+ if (consumption_metric_ != nullptr) RefreshConsumptionFromMetric();
if (UNLIKELY(bytes <= 0)) return true;
int i;
// Walk the tracker tree top-down.
for (i = all_trackers_.size() - 1; i >= 0; --i) {
MemTracker* tracker = all_trackers_[i];
- const int64_t limit = tracker->limit();
+ const int64_t limit = tracker->GetLimit(mode);
if (limit < 0) {
tracker->consumption_->Add(bytes); // No limit at this tracker.
} else {
@@ -215,7 +224,7 @@ class MemTracker {
return;
}
- if (consumption_metric_ != NULL) {
+ if (consumption_metric_ != nullptr) {
RefreshConsumptionFromMetric();
return;
}
@@ -227,9 +236,10 @@ class MemTracker {
/// metric. Don't blow up in this case. (Note that this doesn't affect non-process
/// trackers since we can enforce that the reported memory usage is internally
/// consistent.)
- if (tracker->consumption_metric_ == NULL) {
+ if (tracker->consumption_metric_ == nullptr) {
DCHECK_GE(tracker->consumption_->current_value(), 0)
- << std::endl << tracker->LogUsage(UNLIMITED_DEPTH);
+ << std::endl
+ << tracker->LogUsage(UNLIMITED_DEPTH);
}
}
}
@@ -242,22 +252,23 @@ class MemTracker {
/// Returns true if a valid limit of this tracker or one of its ancestors is
/// exceeded.
- bool AnyLimitExceeded() {
+ bool AnyLimitExceeded(MemLimit mode) {
for (MemTracker* tracker : limit_trackers_) {
- if (tracker->LimitExceeded()) return true;
+ if (tracker->LimitExceeded(mode)) return true;
}
return false;
}
/// If this tracker has a limit, checks the limit and attempts to free up some memory if
- /// the limit is exceeded by calling any added GC functions. Returns true if the limit is
- /// exceeded after calling the GC functions. Returns false if there is no limit.
- bool LimitExceeded() {
- if (UNLIKELY(CheckLimitExceeded())) {
- if (bytes_over_limit_metric_ != NULL) {
+ /// the hard limit is exceeded by calling any added GC functions. Returns true if the
+ /// limit is exceeded after calling the GC functions. Returns false if there is no limit
+ /// or consumption is under the limit.
+ bool LimitExceeded(MemLimit mode) {
+ if (UNLIKELY(CheckLimitExceeded(mode))) {
+ if (mode == MemLimit::HARD && bytes_over_limit_metric_ != nullptr) {
bytes_over_limit_metric_->SetValue(consumption() - limit_);
}
- return GcMemory(limit_);
+ return mode == MemLimit::HARD ? GcMemory(limit_) : true;
}
return false;
}
@@ -265,15 +276,7 @@ class MemTracker {
/// Returns the maximum consumption that can be made without exceeding the limit on
/// this tracker or any of its parents. Returns int64_t::max() if there are no
/// limits and a negative value if any limit is already exceeded.
- int64_t SpareCapacity() const {
- int64_t result = std::numeric_limits<int64_t>::max();
- for (std::vector<MemTracker*>::const_iterator tracker = limit_trackers_.begin();
- tracker != limit_trackers_.end(); ++tracker) {
- int64_t mem_left = (*tracker)->limit() - (*tracker)->consumption();
- result = std::min(result, mem_left);
- }
- return result;
- }
+ int64_t SpareCapacity(MemLimit mode) const;
/// Refresh the memory consumption value from the consumption metric. Only valid to
/// call if this tracker has a consumption metric.
@@ -284,19 +287,17 @@ class MemTracker {
int64_t limit() const { return limit_; }
bool has_limit() const { return limit_ >= 0; }
+ int64_t soft_limit() const { return soft_limit_; }
+ int64_t GetLimit(MemLimit mode) const {
+ if (mode == MemLimit::SOFT) return soft_limit();
+ DCHECK_ENUM_EQ(mode, MemLimit::HARD);
+ return limit();
+ }
const std::string& label() const { return label_; }
/// Returns the lowest limit for this tracker and its ancestors. Returns
/// -1 if there is no limit.
- int64_t lowest_limit() const {
- if (limit_trackers_.empty()) return -1;
- int64_t v = std::numeric_limits<int64_t>::max();
- for (int i = 0; i < limit_trackers_.size(); ++i) {
- DCHECK(limit_trackers_[i]->has_limit());
- v = std::min(v, limit_trackers_[i]->limit());
- }
- return v;
- }
+ int64_t GetLowestLimit(MemLimit mode) const;
/// Returns the memory 'reserved' by this resource pool mem tracker, which is the sum
/// of the memory reserved by the queries in it (i.e. its child trackers). The mem
@@ -373,7 +374,10 @@ class MemTracker {
friend class PoolMemTrackerRegistry;
/// Returns true if the current memory tracker's limit is exceeded.
- bool CheckLimitExceeded() const { return limit_ >= 0 && limit_ < consumption(); }
+ bool CheckLimitExceeded(MemLimit mode) const {
+ int64_t limit = GetLimit(mode);
+ return limit >= 0 && limit < consumption();
+ }
/// If consumption is higher than max_consumption, attempts to free memory by calling
/// any added GC functions. Returns true if max_consumption is still exceeded. Takes
@@ -427,6 +431,10 @@ class MemTracker {
/// there is no consumption limit.
const int64_t limit_;
+ /// Soft limit on memory consumption, in bytes. Can be exceeded but callers to
+ /// TryConsume() can opt not to exceed this limit. If -1, there is no consumption limit.
+ const int64_t soft_limit_;
+
std::string label_;
/// The parent of this tracker. The pointer is never modified, even after this tracker
http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 3dbe012..c86095c 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -120,7 +120,7 @@ Status QueryState::Init(const TExecQueryFInstancesParams& rpc_params) {
// more resources.
ExecEnv* exec_env = ExecEnv::GetInstance();
MemTracker* process_mem_tracker = exec_env->process_mem_tracker();
- if (process_mem_tracker->LimitExceeded()) {
+ if (process_mem_tracker->LimitExceeded(MemLimit::HARD)) {
string msg = Substitute(
"Query $0 could not start because the backend Impala daemon "
"is over its memory limit", PrintId(query_id()));
@@ -162,7 +162,7 @@ void QueryState::InitMemTrackers() {
Status QueryState::InitBufferPoolState() {
ExecEnv* exec_env = ExecEnv::GetInstance();
- int64_t mem_limit = query_mem_tracker_->lowest_limit();
+ int64_t mem_limit = query_mem_tracker_->GetLowestLimit(MemLimit::HARD);
int64_t max_reservation;
if (query_options().__isset.buffer_pool_limit
&& query_options().buffer_pool_limit > 0) {
http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 5cc80ce..5bb5a2d 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -255,7 +255,7 @@ void RuntimeState::SetMemLimitExceeded(MemTracker* tracker,
Status RuntimeState::CheckQueryState() {
DCHECK(instance_mem_tracker_ != nullptr);
- if (UNLIKELY(instance_mem_tracker_->AnyLimitExceeded())) {
+ if (UNLIKELY(instance_mem_tracker_->AnyLimitExceeded(MemLimit::HARD))) {
SetMemLimitExceeded(instance_mem_tracker_.get());
}
return GetQueryStatus();
http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/udf/udf.cc
----------------------------------------------------------------------
diff --git a/be/src/udf/udf.cc b/be/src/udf/udf.cc
index 032f036..4ed3cc2 100644
--- a/be/src/udf/udf.cc
+++ b/be/src/udf/udf.cc
@@ -323,7 +323,7 @@ inline bool FunctionContextImpl::CheckAllocResult(const char* fn_name,
inline void FunctionContextImpl::CheckMemLimit(const char* fn_name, int64_t byte_size) {
#ifndef IMPALA_UDF_SDK_BUILD
MemTracker* mem_tracker = udf_pool_->mem_tracker();
- if (mem_tracker->AnyLimitExceeded()) {
+ if (mem_tracker->AnyLimitExceeded(MemLimit::HARD)) {
ErrorMsg msg = ErrorMsg(TErrorCode::UDF_MEM_LIMIT_EXCEEDED, string(fn_name));
state_->SetMemLimitExceeded(mem_tracker, byte_size, &msg);
}