You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by sa...@apache.org on 2018/08/02 02:54:32 UTC

[2/3] impala git commit: IMPALA-7297: soft memory limit for reservation increases

IMPALA-7297: soft memory limit for reservation increases

https://goo.gl/N9LgQt summarises the memory problems I'm trying to
solve here.

Reject reservation increases that would leave < 10% of the memory
available under a query or process mem_limit, which put the query
or impalad into a state with a heightened chance of OOM. This
avoids OOM in some scenarios where reserved memory is increased
too aggressively and starves non-reserved memory.

One way of thinking about this that it's extending the 80%
limit heuristic for reserved memory so that there's a soft
limit of min(80%, 90% - non-reserved memory consumption)

A follow-on will use this for scanner threads to prevent
starting scanner threads that may exceed the soft limit.

Testing:
Added unit tests for MemTracker and for ReservationTracker.

Change-Id: I39dcf2dd870a59c8b8cda4488fe41ce936d715ac
Reviewed-on: http://gerrit.cloudera.org:8080/10988
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/68736bc3
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/68736bc3
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/68736bc3

Branch: refs/heads/master
Commit: 68736bc3784ad69b7f4f485a7a3ec0fc3d8d3a9b
Parents: 9f20b75
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Fri Jul 13 11:03:08 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Aug 2 01:46:19 2018 +0000

----------------------------------------------------------------------
 be/src/exec/exchange-node.cc                    |   3 +-
 be/src/rpc/impala-service-pool.cc               |   2 +-
 .../runtime/bufferpool/buffer-pool-internal.h   |   2 +-
 be/src/runtime/bufferpool/buffer-pool.cc        |  11 +-
 be/src/runtime/bufferpool/buffer-pool.h         |   9 +-
 .../bufferpool/reservation-tracker-test.cc      |  74 ++++++-----
 .../runtime/bufferpool/reservation-tracker.cc   |  17 ++-
 be/src/runtime/bufferpool/reservation-tracker.h |  17 ++-
 be/src/runtime/initial-reservations.cc          |   9 +-
 be/src/runtime/mem-pool-test.cc                 |  22 ++--
 be/src/runtime/mem-tracker-test.cc              | 126 +++++++++++++------
 be/src/runtime/mem-tracker-types.h              |  31 +++++
 be/src/runtime/mem-tracker.cc                   |  37 +++++-
 be/src/runtime/mem-tracker.h                    |  96 +++++++-------
 be/src/runtime/query-state.cc                   |   4 +-
 be/src/runtime/runtime-state.cc                 |   2 +-
 be/src/udf/udf.cc                               |   2 +-
 17 files changed, 313 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/exec/exchange-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index 07511a7..26579c1 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -82,7 +82,8 @@ Status ExchangeNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(ExecEnv::GetInstance()->buffer_pool()->RegisterClient(
       Substitute("Exchg Recvr (id=$0)", id_), nullptr,
       ExecEnv::GetInstance()->buffer_reservation(), mem_tracker(),
-      numeric_limits<int64_t>::max(), runtime_profile(), &recvr_buffer_pool_client_));
+      numeric_limits<int64_t>::max(), runtime_profile(), &recvr_buffer_pool_client_,
+      MemLimit::HARD));
 
   // TODO: figure out appropriate buffer size
   DCHECK_GT(num_senders_, 0);

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/rpc/impala-service-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/impala-service-pool.cc b/be/src/rpc/impala-service-pool.cc
index cb406b9..d00c73d 100644
--- a/be/src/rpc/impala-service-pool.cc
+++ b/be/src/rpc/impala-service-pool.cc
@@ -170,7 +170,7 @@ kudu::Status ImpalaServicePool::QueueInboundCall(
     // fail. The check and the consumption need to be atomic so as to bound the memory
     // usage.
     unique_lock<SpinLock> mem_tracker_lock(mem_tracker_lock_);
-    if (UNLIKELY(service_mem_tracker_->AnyLimitExceeded())) {
+    if (UNLIKELY(service_mem_tracker_->AnyLimitExceeded(MemLimit::HARD))) {
       // Discards the transfer early so the transfer size drops to 0. This is to ensure
       // the MemTracker::Release() call in FailAndReleaseRpc() is correct as we haven't
       // called MemTracker::Consume() at this point.

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/bufferpool/buffer-pool-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-internal.h b/be/src/runtime/bufferpool/buffer-pool-internal.h
index 598939e..64c92e5 100644
--- a/be/src/runtime/bufferpool/buffer-pool-internal.h
+++ b/be/src/runtime/bufferpool/buffer-pool-internal.h
@@ -196,7 +196,7 @@ class BufferPool::Client {
  public:
   Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group, const string& name,
       ReservationTracker* parent_reservation, MemTracker* mem_tracker,
-      int64_t reservation_limit, RuntimeProfile* profile);
+      MemLimit mem_limit_mode, int64_t reservation_limit, RuntimeProfile* profile);
 
   ~Client() {
     DCHECK_EQ(0, num_pages_);

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc
index 6e6d979..6b51ed3 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -117,11 +117,12 @@ BufferPool::~BufferPool() {}
 
 Status BufferPool::RegisterClient(const string& name, TmpFileMgr::FileGroup* file_group,
     ReservationTracker* parent_reservation, MemTracker* mem_tracker,
-    int64_t reservation_limit, RuntimeProfile* profile, ClientHandle* client) {
+    int64_t reservation_limit, RuntimeProfile* profile, ClientHandle* client,
+    MemLimit mem_limit_mode) {
   DCHECK(!client->is_registered());
   DCHECK(parent_reservation != NULL);
   client->impl_ = new Client(this, file_group, name, parent_reservation, mem_tracker,
-      reservation_limit, profile);
+      mem_limit_mode, reservation_limit, profile);
   return Status::OK();
 }
 
@@ -385,7 +386,7 @@ void BufferPool::SubReservation::Close() {
 
 BufferPool::Client::Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group,
     const string& name, ReservationTracker* parent_reservation, MemTracker* mem_tracker,
-    int64_t reservation_limit, RuntimeProfile* profile)
+    MemLimit mem_limit_mode, int64_t reservation_limit, RuntimeProfile* profile)
   : pool_(pool),
     file_group_(file_group),
     name_(name),
@@ -394,8 +395,8 @@ BufferPool::Client::Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group,
     buffers_allocated_bytes_(0) {
   // Set up a child profile with buffer pool info.
   RuntimeProfile* child_profile = profile->CreateChild("Buffer pool", true, true);
-  reservation_.InitChildTracker(
-      child_profile, parent_reservation, mem_tracker, reservation_limit);
+  reservation_.InitChildTracker(child_profile, parent_reservation, mem_tracker,
+      reservation_limit, mem_limit_mode);
   counters_.alloc_time = ADD_TIMER(child_profile, "AllocTime");
   counters_.cumulative_allocations =
       ADD_COUNTER(child_profile, "CumulativeAllocations", TUnit::UNIT);

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h
index 3323e0a..eb658b0 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -29,6 +29,7 @@
 #include "common/object-pool.h"
 #include "common/status.h"
 #include "gutil/macros.h"
+#include "runtime/mem-tracker-types.h"
 #include "runtime/tmp-file-mgr.h"
 #include "util/aligned-new.h"
 #include "util/internal-queue.h"
@@ -37,7 +38,6 @@
 
 namespace impala {
 
-class MemTracker;
 class ReservationTracker;
 class RuntimeProfile;
 class SystemAllocator;
@@ -175,11 +175,12 @@ class BufferPool : public CacheLineAligned {
   ///
   /// The client's reservation is created as a child of 'parent_reservation' with limit
   /// 'reservation_limit' and associated with MemTracker 'mem_tracker'. The initial
-  /// reservation is 0 bytes.
+  /// reservation is 0 bytes. 'mem_limit_mode' determines whether reservation
+  /// increases are checked against the soft or hard limit of 'mem_tracker'.
   Status RegisterClient(const std::string& name, TmpFileMgr::FileGroup* file_group,
       ReservationTracker* parent_reservation, MemTracker* mem_tracker,
-      int64_t reservation_limit, RuntimeProfile* profile,
-      ClientHandle* client) WARN_UNUSED_RESULT;
+      int64_t reservation_limit, RuntimeProfile* profile, ClientHandle* client,
+      MemLimit mem_limit_mode = MemLimit::SOFT) WARN_UNUSED_RESULT;
 
   /// Deregister 'client' if it is registered. All pages must be destroyed and buffers
   /// must be freed for the client before calling this. Releases any reservation that

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/bufferpool/reservation-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker-test.cc b/be/src/runtime/bufferpool/reservation-tracker-test.cc
index e441402..88aa92a 100644
--- a/be/src/runtime/bufferpool/reservation-tracker-test.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker-test.cc
@@ -340,6 +340,7 @@ TEST_F(ReservationTrackerTest, MemTrackerIntegrationMultiLevel) {
   // We can only handle MemTracker limits at the topmost linked ReservationTracker,
   // so avoid adding limits at lower level.
   const int LIMIT = HIERARCHY_DEPTH;
+  const int SOFT_LIMIT = static_cast<int>(LIMIT * 0.9);
   vector<int> mem_limits({LIMIT * 10, LIMIT, -1, -1, -1});
 
   // Root trackers aren't linked.
@@ -350,47 +351,64 @@ TEST_F(ReservationTrackerTest, MemTrackerIntegrationMultiLevel) {
         mem_limits[i], Substitute("Child $0", i), mem_trackers[i - 1].get()));
   }
 
-  vector<int> interesting_amounts({LIMIT - 1, LIMIT, LIMIT + 1});
+  vector<int> interesting_amounts(
+      {LIMIT - 1, LIMIT, LIMIT + 1, SOFT_LIMIT, SOFT_LIMIT - 1});
 
   // Test that all limits and consumption correctly reported when consuming
   // from a non-root ReservationTracker that is connected to a MemTracker.
-  for (int level = 1; level < HIERARCHY_DEPTH; ++level) {
-    int64_t lowest_limit = mem_trackers[level]->lowest_limit();
-    for (int amount : interesting_amounts) {
-      // Initialize the tracker, increase reservation, then release reservation by closing
-      // the tracker.
-      reservations[level].InitChildTracker(
-          NewProfile(), &reservations[level - 1], mem_trackers[level].get(), 500);
-      bool increased = reservations[level].IncreaseReservation(amount);
-      if (lowest_limit == -1 || amount <= lowest_limit) {
-        // The increase should go through.
-        ASSERT_TRUE(increased) << reservations[level].DebugString();
-        ASSERT_EQ(amount, reservations[level].GetReservation());
-        ASSERT_EQ(amount, mem_trackers[level]->consumption());
-        for (int ancestor = 0; ancestor < level; ++ancestor) {
-          ASSERT_EQ(amount, reservations[ancestor].GetChildReservations());
-          ASSERT_EQ(amount, mem_trackers[ancestor]->consumption());
+  // Test both soft and hard limits.
+  for (MemLimit limit_mode : {MemLimit::SOFT, MemLimit::HARD}) {
+    for (int level = 1; level < HIERARCHY_DEPTH; ++level) {
+      int64_t lowest_limit = mem_trackers[level]->GetLowestLimit(limit_mode);
+      for (int amount : interesting_amounts) {
+        LOG(INFO) << "level=" << level << " limit_mode=" << static_cast<int>(limit_mode)
+                  << " amount=" << amount << " lowest_limit=" << lowest_limit;
+        // Initialize the tracker, increase reservation, then release reservation by
+        // closing the tracker.
+        reservations[level].InitChildTracker(NewProfile(), &reservations[level - 1],
+            mem_trackers[level].get(), 500, limit_mode);
+        bool increased = reservations[level].IncreaseReservation(amount);
+        if (lowest_limit == -1 || amount <= lowest_limit) {
+          // The increase should go through.
+          ASSERT_TRUE(increased)
+              << reservations[level].DebugString() << "\n"
+              << mem_trackers[0]->LogUsage(MemTracker::UNLIMITED_DEPTH);
+          ASSERT_EQ(amount, reservations[level].GetReservation());
+          ASSERT_EQ(amount, mem_trackers[level]->consumption());
+          for (int ancestor = 0; ancestor < level; ++ancestor) {
+            ASSERT_EQ(amount, reservations[ancestor].GetChildReservations());
+            ASSERT_EQ(amount, mem_trackers[ancestor]->consumption());
+          }
+
+          LOG(INFO) << "\n" << mem_trackers[0]->LogUsage(MemTracker::UNLIMITED_DEPTH);
+        } else {
+          ASSERT_FALSE(increased);
         }
-
-        LOG(INFO) << "\n" << mem_trackers[0]->LogUsage(MemTracker::UNLIMITED_DEPTH);
         reservations[level].Close();
-      } else {
-        ASSERT_FALSE(increased);
-      }
-      // Reservations should be released on all ancestors.
-      for (int i = 0; i < level; ++i) {
-        ASSERT_EQ(0, reservations[i].GetReservation()) << i << ": "
-                                                       << reservations[i].DebugString();
-        ASSERT_EQ(0, reservations[i].GetChildReservations());
-        ASSERT_EQ(0, mem_trackers[i]->consumption());
+        // Reservations should be released on all ancestors.
+        for (int i = 0; i < level; ++i) {
+          ASSERT_EQ(0, reservations[i].GetReservation()) << i << ": "
+                                                         << reservations[i].DebugString();
+          ASSERT_EQ(0, reservations[i].GetChildReservations());
+          ASSERT_EQ(0, mem_trackers[i]->consumption());
+        }
       }
+      // Set up tracker to be parent for next iteration.
+      reservations[level].InitChildTracker(NewProfile(), &reservations[level - 1],
+          mem_trackers[level].get(), 500, limit_mode);
     }
+    for (int level = 1; level < HIERARCHY_DEPTH; ++level) reservations[level].Close();
   }
 
   // "Pull down" a reservation from the top of the hierarchy level-by-level to the
   // leaves, checking that everything is consistent at each step.
   for (int level = 0; level < HIERARCHY_DEPTH; ++level) {
     const int amount = LIMIT;
+    if (level > 0) {
+      reservations[level].InitChildTracker(
+          NewProfile(), &reservations[level - 1], mem_trackers[level].get(), 500,
+          MemLimit::HARD);
+    }
     ASSERT_TRUE(reservations[level].IncreaseReservation(amount));
     ASSERT_EQ(amount, reservations[level].GetReservation());
     ASSERT_EQ(0, reservations[level].GetUsedReservation());

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/bufferpool/reservation-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker.cc b/be/src/runtime/bufferpool/reservation-tracker.cc
index f0e1839..a920e1c 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker.cc
@@ -57,7 +57,8 @@ void ReservationTracker::InitRootTracker(
 }
 
 void ReservationTracker::InitChildTracker(RuntimeProfile* profile,
-    ReservationTracker* parent, MemTracker* mem_tracker, int64_t reservation_limit) {
+    ReservationTracker* parent, MemTracker* mem_tracker, int64_t reservation_limit,
+    MemLimit mem_limit_mode) {
   DCHECK(parent != nullptr);
   DCHECK_GE(reservation_limit, 0);
 
@@ -65,6 +66,7 @@ void ReservationTracker::InitChildTracker(RuntimeProfile* profile,
   DCHECK(!initialized_);
   parent_ = parent;
   mem_tracker_ = mem_tracker;
+  mem_limit_mode_ = mem_limit_mode;
 
   reservation_limit_ = reservation_limit;
   reservation_ = 0;
@@ -79,7 +81,8 @@ void ReservationTracker::InitChildTracker(RuntimeProfile* profile,
       DCHECK_EQ(parent_mem_tracker, mem_tracker_->parent());
       // Make sure we don't have a lower limit than the ancestor, since we don't enforce
       // limits at lower links.
-      DCHECK_EQ(mem_tracker_->lowest_limit(), parent_mem_tracker->lowest_limit());
+      DCHECK_EQ(mem_tracker_->GetLowestLimit(mem_limit_mode_),
+          parent_mem_tracker->GetLowestLimit(mem_limit_mode_));
     } else {
       // Make sure we didn't leave a gap in the links. E.g. this tracker's grandparent
       // shouldn't have a MemTracker.
@@ -208,7 +211,8 @@ bool ReservationTracker::IncreaseReservationInternalLocked(int64_t bytes,
       granted = parent_->IncreaseReservationInternalLocked(
           reservation_increase, true, true, error_status);
     }
-    if (granted && !TryConsumeFromMemTracker(reservation_increase)) {
+    if (granted
+        && !TryConsumeFromMemTracker(reservation_increase, mem_limit_mode_)) {
       granted = false;
       if (error_status != nullptr) {
         *error_status = mem_tracker_->MemLimitExceeded(nullptr,
@@ -230,13 +234,14 @@ bool ReservationTracker::IncreaseReservationInternalLocked(int64_t bytes,
   return granted;
 }
 
-bool ReservationTracker::TryConsumeFromMemTracker(int64_t reservation_increase) {
+bool ReservationTracker::TryConsumeFromMemTracker(
+    int64_t reservation_increase, MemLimit mem_limit_mode) {
   DCHECK_GE(reservation_increase, 0);
   if (mem_tracker_ == nullptr) return true;
   if (GetParentMemTracker() == nullptr) {
     // At the topmost link, which may be a MemTracker with a limit, we need to use
     // TryConsume() to check the limit.
-    return mem_tracker_->TryConsume(reservation_increase);
+    return mem_tracker_->TryConsume(reservation_increase, mem_limit_mode);
   } else {
     // For lower links, there shouldn't be a limit to enforce, so we just need to
     // update the consumption of the linked MemTracker since the reservation is
@@ -329,7 +334,7 @@ bool ReservationTracker::TransferReservationTo(ReservationTracker* other, int64_
     // We don't handle MemTrackers with limit in this function - this should always
     // succeed.
     DCHECK(tracker->mem_tracker_ == nullptr || !tracker->mem_tracker_->has_limit());
-    bool success = tracker->TryConsumeFromMemTracker(bytes);
+    bool success = tracker->TryConsumeFromMemTracker(bytes, MemLimit::HARD);
     DCHECK(success);
     if (tracker != other_path_to_common[0]) tracker->child_reservations_ += bytes;
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/bufferpool/reservation-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker.h b/be/src/runtime/bufferpool/reservation-tracker.h
index 3bf2de1..604db7f 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.h
+++ b/be/src/runtime/bufferpool/reservation-tracker.h
@@ -23,14 +23,14 @@
 #include <boost/thread/locks.hpp>
 #include <string>
 
-#include "runtime/bufferpool/reservation-tracker-counters.h"
 #include "common/status.h"
+#include "runtime/bufferpool/reservation-tracker-counters.h"
+#include "runtime/mem-tracker-types.h"
 #include "util/spinlock.h"
 
 namespace impala {
 
 class DummyProfile;
-class MemTracker;
 class RuntimeProfile;
 
 /// A tracker for a hierarchy of buffer pool memory reservations, denominated in bytes.
@@ -101,9 +101,12 @@ class ReservationTracker {
   /// If 'mem_tracker' is not NULL, reservations for this ReservationTracker and its
   /// children will be counted as consumption against 'mem_tracker'.
   /// 'reservation_limit' is the maximum reservation for this tracker in bytes.
-  /// if 'profile' is not NULL, the counters in 'counters_' are added to 'profile'.
+  /// 'mem_limit_mode' determines whether reservation increases are checked against the
+  /// soft or hard limit of 'mem_tracker'. If 'profile' is not NULL, the counters in
+  /// 'counters_' are added to 'profile'.
   void InitChildTracker(RuntimeProfile* profile, ReservationTracker* parent,
-      MemTracker* mem_tracker, int64_t reservation_limit);
+      MemTracker* mem_tracker, int64_t reservation_limit,
+      MemLimit mem_limit_mode = MemLimit::SOFT);
 
   /// If the tracker is initialized, deregister the ReservationTracker from its parent,
   /// relinquishing all this tracker's reservation. All of the reservation must be unused
@@ -219,7 +222,7 @@ class ReservationTracker {
   /// because it would exceed a memory limit. If there is no linked MemTracker, just
   /// returns true.
   /// TODO: remove once we account all memory via ReservationTrackers.
-  bool TryConsumeFromMemTracker(int64_t reservation_increase);
+  bool TryConsumeFromMemTracker(int64_t reservation_increase, MemLimit mem_limit_mode);
 
   /// Decrease consumption on linked MemTracker to reflect a decrease in reservation of
   /// 'reservation_decrease'. If there is no linked MemTracker, does nothing.
@@ -301,6 +304,10 @@ class ReservationTracker {
   /// TODO: remove once all memory is accounted via ReservationTrackers.
   MemTracker* mem_tracker_ = nullptr;
 
+  /// Determines whether the soft or hard limit of 'mem_tracker_' is checked for
+  /// reservation increases.
+  MemLimit mem_limit_mode_;
+
   /// The maximum reservation in bytes that this tracker can have. Can be read with an
   /// atomic load without holding lock.
   int64_t reservation_limit_;

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/initial-reservations.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/initial-reservations.cc b/be/src/runtime/initial-reservations.cc
index 0fe00fe..e5fcd32 100644
--- a/be/src/runtime/initial-reservations.cc
+++ b/be/src/runtime/initial-reservations.cc
@@ -41,10 +41,13 @@ InitialReservations::InitialReservations(ObjectPool* obj_pool,
     ReservationTracker* query_reservation, MemTracker* query_mem_tracker,
     int64_t initial_reservation_total_claims)
   : initial_reservation_mem_tracker_(obj_pool->Add(
-      new MemTracker(-1, "Unclaimed reservations", query_mem_tracker, false))),
-      remaining_initial_reservation_claims_(initial_reservation_total_claims) {
+        new MemTracker(-1, "Unclaimed reservations", query_mem_tracker, false))),
+    remaining_initial_reservation_claims_(initial_reservation_total_claims) {
+  // Soft mem_limits should not apply to the initial reservation because we don't want
+  // to fail the query in the case where the initial reservation exceeds the soft
+  // limit.
   initial_reservations_.InitChildTracker(nullptr, query_reservation,
-      initial_reservation_mem_tracker_, numeric_limits<int64_t>::max());
+      initial_reservation_mem_tracker_, numeric_limits<int64_t>::max(), MemLimit::HARD);
 }
 
 Status InitialReservations::Init(

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/mem-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-pool-test.cc b/be/src/runtime/mem-pool-test.cc
index 26176a5..f939008 100644
--- a/be/src/runtime/mem-pool-test.cc
+++ b/be/src/runtime/mem-pool-test.cc
@@ -241,35 +241,35 @@ TEST(MemPoolTest, Limits) {
   MemTracker limit2(3 * MemPoolTest::INITIAL_CHUNK_SIZE, "", &limit3);
 
   MemPool* p1 = new MemPool(&limit1);
-  EXPECT_FALSE(limit1.AnyLimitExceeded());
+  EXPECT_FALSE(limit1.AnyLimitExceeded(MemLimit::HARD));
 
   MemPool* p2 = new MemPool(&limit2);
-  EXPECT_FALSE(limit2.AnyLimitExceeded());
+  EXPECT_FALSE(limit2.AnyLimitExceeded(MemLimit::HARD));
 
   // p1 exceeds a non-shared limit
   p1->Allocate(MemPoolTest::INITIAL_CHUNK_SIZE);
-  EXPECT_FALSE(limit1.LimitExceeded());
+  EXPECT_FALSE(limit1.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE, limit1.consumption());
-  EXPECT_FALSE(limit3.LimitExceeded());
+  EXPECT_FALSE(limit3.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE, limit3.consumption());
 
   p1->Allocate(MemPoolTest::INITIAL_CHUNK_SIZE);
-  EXPECT_TRUE(limit1.LimitExceeded());
+  EXPECT_TRUE(limit1.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE * 3, limit1.consumption());
-  EXPECT_FALSE(limit3.LimitExceeded());
+  EXPECT_FALSE(limit3.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE * 3, limit3.consumption());
 
   // p2 exceeds a shared limit
   p2->Allocate(MemPoolTest::INITIAL_CHUNK_SIZE);
-  EXPECT_FALSE(limit2.LimitExceeded());
+  EXPECT_FALSE(limit2.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE, limit2.consumption());
-  EXPECT_FALSE(limit3.LimitExceeded());
+  EXPECT_FALSE(limit3.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE * 4, limit3.consumption());
 
   p2->Allocate(1);
-  EXPECT_FALSE(limit2.LimitExceeded());
+  EXPECT_FALSE(limit2.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE * 3, limit2.consumption());
-  EXPECT_TRUE(limit3.LimitExceeded());
+  EXPECT_TRUE(limit3.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE * 6, limit3.consumption());
 
   // deleting pools reduces consumption
@@ -281,7 +281,7 @@ TEST(MemPoolTest, Limits) {
 
   // Allocate another chunk
   p2->FreeAll();
-  EXPECT_FALSE(limit2.LimitExceeded());
+  EXPECT_FALSE(limit2.LimitExceeded(MemLimit::HARD));
   uint8_t* result = p2->TryAllocate(MemPoolTest::INITIAL_CHUNK_SIZE);
   ASSERT_TRUE(result != NULL);
   ASSERT_TRUE(MemPoolTest::CheckIntegrity(p2, false));

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/mem-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker-test.cc b/be/src/runtime/mem-tracker-test.cc
index 285f897..e52c345 100644
--- a/be/src/runtime/mem-tracker-test.cc
+++ b/be/src/runtime/mem-tracker-test.cc
@@ -35,7 +35,7 @@ TEST(MemTestTest, SingleTrackerNoLimit) {
   EXPECT_EQ(t.consumption(), 20);
   t.Release(15);
   EXPECT_EQ(t.consumption(), 5);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
   // Clean up.
   t.Release(5);
 }
@@ -45,17 +45,72 @@ TEST(MemTestTest, SingleTrackerWithLimit) {
   EXPECT_TRUE(t.has_limit());
   t.Consume(10);
   EXPECT_EQ(t.consumption(), 10);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
   t.Consume(10);
   EXPECT_EQ(t.consumption(), 20);
-  EXPECT_TRUE(t.LimitExceeded());
+  EXPECT_TRUE(t.LimitExceeded(MemLimit::HARD));
   t.Release(15);
   EXPECT_EQ(t.consumption(), 5);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
   // Clean up.
   t.Release(5);
 }
 
+/// Exercise individual functions that take MemLimit::SOFT option.
+TEST(MemTestTest, SoftLimit) {
+  MemTracker t(100);
+  MemTracker child(-1, "", &t);
+
+  // Exercise functions that return limits.
+  EXPECT_EQ(90, t.soft_limit());
+  EXPECT_EQ(90, t.GetLimit(MemLimit::SOFT));
+  EXPECT_EQ(100, t.GetLimit(MemLimit::HARD));
+  EXPECT_EQ(-1, child.soft_limit());
+  EXPECT_EQ(90, t.GetLowestLimit(MemLimit::SOFT));
+  EXPECT_EQ(90, child.GetLowestLimit(MemLimit::SOFT));
+  EXPECT_EQ(100, t.GetLowestLimit(MemLimit::HARD));
+
+  // Test SpareCapacity()
+  EXPECT_EQ(100, t.SpareCapacity(MemLimit::HARD));
+  EXPECT_EQ(90, t.SpareCapacity(MemLimit::SOFT));
+  EXPECT_EQ(100, child.SpareCapacity(MemLimit::HARD));
+  EXPECT_EQ(90, child.SpareCapacity(MemLimit::SOFT));
+
+  // Test TryConsume() within soft limit.
+  EXPECT_TRUE(t.TryConsume(90, MemLimit::SOFT));
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::SOFT));
+  EXPECT_FALSE(child.AnyLimitExceeded(MemLimit::SOFT));
+
+  // Test TryConsume() going over soft limit.
+  EXPECT_FALSE(t.TryConsume(1, MemLimit::SOFT));
+  EXPECT_FALSE(child.TryConsume(1, MemLimit::SOFT));
+  EXPECT_TRUE(t.TryConsume(1, MemLimit::HARD));
+  EXPECT_TRUE(t.LimitExceeded(MemLimit::SOFT));
+  EXPECT_FALSE(child.LimitExceeded(MemLimit::SOFT));
+  EXPECT_TRUE(t.AnyLimitExceeded(MemLimit::SOFT));
+  EXPECT_TRUE(child.AnyLimitExceeded(MemLimit::SOFT));
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
+  EXPECT_FALSE(child.AnyLimitExceeded(MemLimit::HARD));
+  EXPECT_EQ(9, child.SpareCapacity(MemLimit::HARD));
+  EXPECT_EQ(-1, child.SpareCapacity(MemLimit::SOFT));
+
+  // Test Consume() going over hard limit.
+  child.Consume(10);
+  EXPECT_TRUE(t.LimitExceeded(MemLimit::SOFT));
+  EXPECT_TRUE(t.LimitExceeded(MemLimit::HARD));
+  EXPECT_FALSE(child.LimitExceeded(MemLimit::SOFT));
+  EXPECT_FALSE(child.LimitExceeded(MemLimit::HARD));
+  EXPECT_TRUE(t.AnyLimitExceeded(MemLimit::SOFT));
+  EXPECT_TRUE(t.AnyLimitExceeded(MemLimit::HARD));
+  EXPECT_TRUE(child.AnyLimitExceeded(MemLimit::SOFT));
+  EXPECT_TRUE(child.AnyLimitExceeded(MemLimit::HARD));
+  EXPECT_EQ(-1, child.SpareCapacity(MemLimit::HARD));
+  EXPECT_EQ(-11, child.SpareCapacity(MemLimit::SOFT));
+
+  t.Release(91);
+  child.Release(10);
+}
+
 TEST(MemTestTest, ConsumptionMetric) {
   TMetricDef md;
   md.__set_key("test");
@@ -80,12 +135,12 @@ TEST(MemTestTest, ConsumptionMetric) {
   t.Consume(150);
   EXPECT_EQ(t.consumption(), 0);
   EXPECT_EQ(t.peak_consumption(), 0);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(neg_t.consumption(), 0);
   t.Release(5);
   EXPECT_EQ(t.consumption(), 0);
   EXPECT_EQ(t.peak_consumption(), 0);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(neg_t.consumption(), 0);
 
   metric.Increment(10);
@@ -101,21 +156,21 @@ TEST(MemTestTest, ConsumptionMetric) {
   neg_t.Consume(1);
   EXPECT_EQ(t.consumption(), 5);
   EXPECT_EQ(t.peak_consumption(), 10);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(neg_t.consumption(), -5);
   metric.Increment(150);
   t.Consume(1);
   neg_t.Consume(1);
   EXPECT_EQ(t.consumption(), 155);
   EXPECT_EQ(t.peak_consumption(), 155);
-  EXPECT_TRUE(t.LimitExceeded());
+  EXPECT_TRUE(t.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(neg_t.consumption(), -155);
   metric.Increment(-150);
   t.Consume(-1);
   neg_t.Consume(1);
   EXPECT_EQ(t.consumption(), 5);
   EXPECT_EQ(t.peak_consumption(), 155);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(neg_t.consumption(), -5);
   // consumption_ is not updated when Consume()/Release() is called with a zero value
   metric.Increment(10);
@@ -123,7 +178,7 @@ TEST(MemTestTest, ConsumptionMetric) {
   neg_t.Consume(0);
   EXPECT_EQ(t.consumption(), 5);
   EXPECT_EQ(t.peak_consumption(), 155);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(neg_t.consumption(), -5);
   // Clean up.
   metric.Increment(-15);
@@ -139,37 +194,37 @@ TEST(MemTestTest, TrackerHierarchy) {
   // everything below limits
   c1.Consume(60);
   EXPECT_EQ(c1.consumption(), 60);
-  EXPECT_FALSE(c1.LimitExceeded());
-  EXPECT_FALSE(c1.AnyLimitExceeded());
+  EXPECT_FALSE(c1.LimitExceeded(MemLimit::HARD));
+  EXPECT_FALSE(c1.AnyLimitExceeded(MemLimit::HARD));
   EXPECT_EQ(c2.consumption(), 0);
-  EXPECT_FALSE(c2.LimitExceeded());
-  EXPECT_FALSE(c2.AnyLimitExceeded());
+  EXPECT_FALSE(c2.LimitExceeded(MemLimit::HARD));
+  EXPECT_FALSE(c2.AnyLimitExceeded(MemLimit::HARD));
   EXPECT_EQ(p.consumption(), 60);
-  EXPECT_FALSE(p.LimitExceeded());
-  EXPECT_FALSE(p.AnyLimitExceeded());
+  EXPECT_FALSE(p.LimitExceeded(MemLimit::HARD));
+  EXPECT_FALSE(p.AnyLimitExceeded(MemLimit::HARD));
 
   // p goes over limit
   c2.Consume(50);
   EXPECT_EQ(c1.consumption(), 60);
-  EXPECT_FALSE(c1.LimitExceeded());
-  EXPECT_TRUE(c1.AnyLimitExceeded());
+  EXPECT_FALSE(c1.LimitExceeded(MemLimit::HARD));
+  EXPECT_TRUE(c1.AnyLimitExceeded(MemLimit::HARD));
   EXPECT_EQ(c2.consumption(), 50);
-  EXPECT_FALSE(c2.LimitExceeded());
-  EXPECT_TRUE(c2.AnyLimitExceeded());
+  EXPECT_FALSE(c2.LimitExceeded(MemLimit::HARD));
+  EXPECT_TRUE(c2.AnyLimitExceeded(MemLimit::HARD));
   EXPECT_EQ(p.consumption(), 110);
-  EXPECT_TRUE(p.LimitExceeded());
+  EXPECT_TRUE(p.LimitExceeded(MemLimit::HARD));
 
   // c2 goes over limit, p drops below limit
   c1.Release(20);
   c2.Consume(10);
   EXPECT_EQ(c1.consumption(), 40);
-  EXPECT_FALSE(c1.LimitExceeded());
-  EXPECT_FALSE(c1.AnyLimitExceeded());
+  EXPECT_FALSE(c1.LimitExceeded(MemLimit::HARD));
+  EXPECT_FALSE(c1.AnyLimitExceeded(MemLimit::HARD));
   EXPECT_EQ(c2.consumption(), 60);
-  EXPECT_TRUE(c2.LimitExceeded());
-  EXPECT_TRUE(c2.AnyLimitExceeded());
+  EXPECT_TRUE(c2.LimitExceeded(MemLimit::HARD));
+  EXPECT_TRUE(c2.AnyLimitExceeded(MemLimit::HARD));
   EXPECT_EQ(p.consumption(), 100);
-  EXPECT_FALSE(p.LimitExceeded());
+  EXPECT_FALSE(p.LimitExceeded(MemLimit::HARD));
 
   // Clean up.
   c1.Release(40);
@@ -254,34 +309,34 @@ TEST(MemTestTest, GcFunctions) {
   ASSERT_TRUE(t.has_limit());
 
   t.Consume(9);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
 
   // Test TryConsume()
   EXPECT_FALSE(t.TryConsume(2));
   EXPECT_EQ(t.consumption(), 9);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
 
   // Attach GcFunction that releases 1 byte
   GcFunctionHelper gc_func_helper(&t);
   t.AddGcFunction(boost::bind(&GcFunctionHelper::GcFunc, &gc_func_helper));
   EXPECT_TRUE(t.TryConsume(2));
   EXPECT_EQ(t.consumption(), 10);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
 
   // GcFunction will be called even though TryConsume() fails
   EXPECT_FALSE(t.TryConsume(2));
   EXPECT_EQ(t.consumption(), 9);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
 
   // GcFunction won't be called
   EXPECT_TRUE(t.TryConsume(1));
   EXPECT_EQ(t.consumption(), 10);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
 
-  // Test LimitExceeded()
+  // Test LimitExceeded(MemLimit::HARD)
   t.Consume(1);
   EXPECT_EQ(t.consumption(), 11);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(t.consumption(), 10);
 
   // Add more GcFunctions, test that we only call them until the limit is no longer
@@ -292,13 +347,12 @@ TEST(MemTestTest, GcFunctions) {
   t.AddGcFunction(boost::bind(&GcFunctionHelper::GcFunc, &gc_func_helper3));
   t.Consume(1);
   EXPECT_EQ(t.consumption(), 11);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(t.consumption(), 10);
 
-  //Clean up.
+  // Clean up.
   t.Release(10);
 }
-
 }
 
 IMPALA_TEST_MAIN();

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/mem-tracker-types.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker-types.h b/be/src/runtime/mem-tracker-types.h
new file mode 100644
index 0000000..e199f1b
--- /dev/null
+++ b/be/src/runtime/mem-tracker-types.h
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef IMPALA_RUNTIME_MEM_TRACKER_TYPES_H
+#define IMPALA_RUNTIME_MEM_TRACKER_TYPES_H
+
+namespace impala {
+class MemTracker;
+
+/// Mode argument passed to various MemTracker methods to indicate whether a soft or hard
+/// limit should be used.
+enum class MemLimit { HARD, SOFT };
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/mem-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index 237484f..cb615a1 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -38,6 +38,9 @@ using std::priority_queue;
 using std::greater;
 using namespace strings;
 
+DEFINE_double_hidden(soft_mem_limit_frac, 0.9, "(Advanced) Soft memory limit as a "
+    "fraction of hard memory limit.");
+
 namespace impala {
 
 const string MemTracker::COUNTER_NAME = "PeakMemoryUsage";
@@ -45,9 +48,17 @@ const string MemTracker::COUNTER_NAME = "PeakMemoryUsage";
 // Name for request pool MemTrackers. '$0' is replaced with the pool name.
 const string REQUEST_POOL_MEM_TRACKER_LABEL_FORMAT = "RequestPool=$0";
 
+/// Calculate the soft limit for a MemTracker based on the hard limit 'limit'.
+static int64_t CalcSoftLimit(int64_t limit) {
+  if (limit < 0) return -1;
+  double frac = max(0.0, min(1.0, FLAGS_soft_mem_limit_frac));
+  return static_cast<int64_t>(limit * frac);
+}
+
 MemTracker::MemTracker(
     int64_t byte_limit, const string& label, MemTracker* parent, bool log_usage_if_zero)
   : limit_(byte_limit),
+    soft_limit_(CalcSoftLimit(byte_limit)),
     label_(label),
     parent_(parent),
     consumption_(&local_counter_),
@@ -64,6 +75,7 @@ MemTracker::MemTracker(
 MemTracker::MemTracker(RuntimeProfile* profile, int64_t byte_limit,
     const std::string& label, MemTracker* parent)
   : limit_(byte_limit),
+    soft_limit_(CalcSoftLimit(byte_limit)),
     label_(label),
     parent_(parent),
     consumption_(profile->AddHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES)),
@@ -80,6 +92,7 @@ MemTracker::MemTracker(RuntimeProfile* profile, int64_t byte_limit,
 MemTracker::MemTracker(IntGauge* consumption_metric,
     int64_t byte_limit, const string& label, MemTracker* parent)
   : limit_(byte_limit),
+    soft_limit_(CalcSoftLimit(byte_limit)),
     label_(label),
     parent_(parent),
     consumption_(&local_counter_),
@@ -95,6 +108,7 @@ MemTracker::MemTracker(IntGauge* consumption_metric,
 
 void MemTracker::Init() {
   DCHECK_GE(limit_, -1);
+  DCHECK_LE(soft_limit_, limit_);
   if (parent_ != NULL) parent_->AddChildTracker(this);
   // populate all_trackers_ and limit_trackers_
   MemTracker* tracker = this;
@@ -133,6 +147,25 @@ void MemTracker::EnableReservationReporting(const ReservationTrackerCounters& co
   delete reservation_counters_.Swap(new ReservationTrackerCounters(counters));
 }
 
+int64_t MemTracker::GetLowestLimit(MemLimit mode) const {
+  if (limit_trackers_.empty()) return -1;
+  int64_t min_limit = numeric_limits<int64_t>::max();
+  for (MemTracker* limit_tracker : limit_trackers_) {
+    DCHECK(limit_tracker->has_limit());
+    min_limit = min(min_limit, limit_tracker->GetLimit(mode));
+  }
+  return min_limit;
+}
+
+int64_t MemTracker::SpareCapacity(MemLimit mode) const {
+  int64_t result = numeric_limits<int64_t>::max();
+  for (MemTracker* tracker : limit_trackers_) {
+    int64_t mem_left = tracker->GetLimit(mode) - tracker->consumption();
+    result = std::min(result, mem_left);
+  }
+  return result;
+}
+
 int64_t MemTracker::GetPoolMemReserved() {
   // Pool trackers should have a pool_name_ and no limit.
   DCHECK(!pool_name_.empty());
@@ -274,7 +307,7 @@ string MemTracker::LogUsage(int max_recursive_depth, const string& prefix,
 
   stringstream ss;
   ss << prefix << label_ << ":";
-  if (CheckLimitExceeded()) ss << " memory limit exceeded.";
+  if (CheckLimitExceeded(MemLimit::HARD)) ss << " memory limit exceeded.";
   if (limit_ > 0) ss << " Limit=" << PrettyPrinter::Print(limit_, TUnit::BYTES);
 
   ReservationTrackerCounters* reservation_counters = reservation_counters_.Load();
@@ -392,7 +425,7 @@ Status MemTracker::MemLimitExceeded(RuntimeState* state, const std::string& deta
   ss << endl;
   ExecEnv* exec_env = ExecEnv::GetInstance();
   MemTracker* process_tracker = exec_env->process_mem_tracker();
-  const int64_t process_capacity = process_tracker->SpareCapacity();
+  const int64_t process_capacity = process_tracker->SpareCapacity(MemLimit::HARD);
   ss << "Memory left in process limit: "
      << PrettyPrinter::Print(process_capacity, TUnit::BYTES) << endl;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index 01fd331..d4b4dd4 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -29,6 +29,7 @@
 
 #include "common/logging.h"
 #include "common/atomic.h"
+#include "runtime/mem-tracker-types.h"
 #include "util/debug-util.h"
 #include "util/internal-queue.h"
 #include "util/metrics.h"
@@ -40,7 +41,6 @@
 namespace impala {
 
 class ObjectPool;
-class MemTracker;
 struct ReservationTrackerCounters;
 class TQueryOptions;
 
@@ -48,6 +48,12 @@ class TQueryOptions;
 /// and can be arranged into a tree structure such that the consumption tracked
 /// by a MemTracker is also tracked by its ancestors.
 ///
+/// A MemTracker has a hard and a soft limit derived from the limit. If the hard limit
+/// is exceeded, all memory allocations and queries should fail until we are under the
+/// limit again. The soft limit can be exceeded without causing query failures, but
+/// consumers of memory that can tolerate running without more memory should not allocate
+/// memory in excess of the soft limit.
+///
 /// We use a five-level hierarchy of mem trackers: process, pool, query, fragment
 /// instance. Specific parts of the fragment (exec nodes, sinks, etc) will add a
 /// fifth level when they are initialized. This function also initializes a user
@@ -77,22 +83,23 @@ class MemTracker {
  public:
   /// 'byte_limit' < 0 means no limit
   /// 'label' is the label used in the usage string (LogUsage())
-  /// If 'log_usage_if_zero' is false, this tracker (and its children) will not be included
+  /// If 'log_usage_if_zero' is false, this tracker (and its children) will not be
+  /// included
   /// in LogUsage() output if consumption is 0.
   MemTracker(int64_t byte_limit = -1, const std::string& label = std::string(),
-      MemTracker* parent = NULL, bool log_usage_if_zero = true);
+      MemTracker* parent = nullptr, bool log_usage_if_zero = true);
 
   /// C'tor for tracker for which consumption counter is created as part of a profile.
   /// The counter is created with name COUNTER_NAME.
   MemTracker(RuntimeProfile* profile, int64_t byte_limit,
-      const std::string& label = std::string(), MemTracker* parent = NULL);
+      const std::string& label = std::string(), MemTracker* parent = nullptr);
 
   /// C'tor for tracker that uses consumption_metric as the consumption value.
   /// Consume()/Release() can still be called. This is used for the root process tracker
   /// (if 'parent' is NULL). It is also to report on other categories of memory under the
   /// process tracker, e.g. buffer pool free buffers (if 'parent - non-NULL).
   MemTracker(IntGauge* consumption_metric, int64_t byte_limit = -1,
-      const std::string& label = std::string(), MemTracker* parent = NULL);
+      const std::string& label = std::string(), MemTracker* parent = nullptr);
 
   ~MemTracker();
 
@@ -129,13 +136,13 @@ class MemTracker {
       return;
     }
 
-    if (consumption_metric_ != NULL) {
+    if (consumption_metric_ != nullptr) {
       RefreshConsumptionFromMetric();
       return;
     }
     for (MemTracker* tracker : all_trackers_) {
       tracker->consumption_->Add(bytes);
-      if (tracker->consumption_metric_ == NULL) {
+      if (tracker->consumption_metric_ == nullptr) {
         DCHECK_GE(tracker->consumption_->current_value(), 0);
       }
     }
@@ -148,7 +155,7 @@ class MemTracker {
   /// the limit recorded in one of its ancestors already happened.
   void ConsumeLocal(int64_t bytes, MemTracker* end_tracker) {
     DCHECK(!closed_) << label_;
-    DCHECK(consumption_metric_ == NULL) << "Should not be called on root.";
+    DCHECK(consumption_metric_ == nullptr) << "Should not be called on root.";
     for (MemTracker* tracker : all_trackers_) {
       if (tracker == end_tracker) return;
       DCHECK(!tracker->has_limit());
@@ -163,19 +170,21 @@ class MemTracker {
   }
 
   /// Increases consumption of this tracker and its ancestors by 'bytes' only if
-  /// they can all consume 'bytes'. If this brings any of them over, none of them
-  /// are updated.
-  /// Returns true if the try succeeded.
+  /// they can all consume 'bytes' without exceeding limit (hard or soft) specified
+  /// by 'mode'. If any limit would be exceed, no MemTrackers are updated. If the
+  /// caller can tolerate an allocation failing, it should set mode=SOFT so that
+  /// other callers that may not tolerate allocation failures have a better chance
+  /// of success. Returns true if the consumption was successfully updated.
   WARN_UNUSED_RESULT
-  bool TryConsume(int64_t bytes) {
+  bool TryConsume(int64_t bytes, MemLimit mode = MemLimit::HARD) {
     DCHECK(!closed_) << label_;
-    if (consumption_metric_ != NULL) RefreshConsumptionFromMetric();
+    if (consumption_metric_ != nullptr) RefreshConsumptionFromMetric();
     if (UNLIKELY(bytes <= 0)) return true;
     int i;
     // Walk the tracker tree top-down.
     for (i = all_trackers_.size() - 1; i >= 0; --i) {
       MemTracker* tracker = all_trackers_[i];
-      const int64_t limit = tracker->limit();
+      const int64_t limit = tracker->GetLimit(mode);
       if (limit < 0) {
         tracker->consumption_->Add(bytes); // No limit at this tracker.
       } else {
@@ -215,7 +224,7 @@ class MemTracker {
       return;
     }
 
-    if (consumption_metric_ != NULL) {
+    if (consumption_metric_ != nullptr) {
       RefreshConsumptionFromMetric();
       return;
     }
@@ -227,9 +236,10 @@ class MemTracker {
       /// metric. Don't blow up in this case. (Note that this doesn't affect non-process
       /// trackers since we can enforce that the reported memory usage is internally
       /// consistent.)
-      if (tracker->consumption_metric_ == NULL) {
+      if (tracker->consumption_metric_ == nullptr) {
         DCHECK_GE(tracker->consumption_->current_value(), 0)
-          << std::endl << tracker->LogUsage(UNLIMITED_DEPTH);
+            << std::endl
+            << tracker->LogUsage(UNLIMITED_DEPTH);
       }
     }
   }
@@ -242,22 +252,23 @@ class MemTracker {
 
   /// Returns true if a valid limit of this tracker or one of its ancestors is
   /// exceeded.
-  bool AnyLimitExceeded() {
+  bool AnyLimitExceeded(MemLimit mode) {
     for (MemTracker* tracker : limit_trackers_) {
-      if (tracker->LimitExceeded()) return true;
+      if (tracker->LimitExceeded(mode)) return true;
     }
     return false;
   }
 
   /// If this tracker has a limit, checks the limit and attempts to free up some memory if
-  /// the limit is exceeded by calling any added GC functions. Returns true if the limit is
-  /// exceeded after calling the GC functions. Returns false if there is no limit.
-  bool LimitExceeded() {
-    if (UNLIKELY(CheckLimitExceeded())) {
-      if (bytes_over_limit_metric_ != NULL) {
+  /// the hard limit is exceeded by calling any added GC functions. Returns true if the
+  /// limit is exceeded after calling the GC functions. Returns false if there is no limit
+  /// or consumption is under the limit.
+  bool LimitExceeded(MemLimit mode) {
+    if (UNLIKELY(CheckLimitExceeded(mode))) {
+      if (mode == MemLimit::HARD && bytes_over_limit_metric_ != nullptr) {
         bytes_over_limit_metric_->SetValue(consumption() - limit_);
       }
-      return GcMemory(limit_);
+      return mode == MemLimit::HARD ? GcMemory(limit_) : true;
     }
     return false;
   }
@@ -265,15 +276,7 @@ class MemTracker {
   /// Returns the maximum consumption that can be made without exceeding the limit on
   /// this tracker or any of its parents. Returns int64_t::max() if there are no
   /// limits and a negative value if any limit is already exceeded.
-  int64_t SpareCapacity() const {
-    int64_t result = std::numeric_limits<int64_t>::max();
-    for (std::vector<MemTracker*>::const_iterator tracker = limit_trackers_.begin();
-         tracker != limit_trackers_.end(); ++tracker) {
-      int64_t mem_left = (*tracker)->limit() - (*tracker)->consumption();
-      result = std::min(result, mem_left);
-    }
-    return result;
-  }
+  int64_t SpareCapacity(MemLimit mode) const;
 
   /// Refresh the memory consumption value from the consumption metric. Only valid to
   /// call if this tracker has a consumption metric.
@@ -284,19 +287,17 @@ class MemTracker {
 
   int64_t limit() const { return limit_; }
   bool has_limit() const { return limit_ >= 0; }
+  int64_t soft_limit() const { return soft_limit_; }
+  int64_t GetLimit(MemLimit mode) const {
+    if (mode == MemLimit::SOFT) return soft_limit();
+    DCHECK_ENUM_EQ(mode, MemLimit::HARD);
+    return limit();
+  }
   const std::string& label() const { return label_; }
 
   /// Returns the lowest limit for this tracker and its ancestors. Returns
   /// -1 if there is no limit.
-  int64_t lowest_limit() const {
-    if (limit_trackers_.empty()) return -1;
-    int64_t v = std::numeric_limits<int64_t>::max();
-    for (int i = 0; i < limit_trackers_.size(); ++i) {
-      DCHECK(limit_trackers_[i]->has_limit());
-      v = std::min(v, limit_trackers_[i]->limit());
-    }
-    return v;
-  }
+  int64_t GetLowestLimit(MemLimit mode) const;
 
   /// Returns the memory 'reserved' by this resource pool mem tracker, which is the sum
   /// of the memory reserved by the queries in it (i.e. its child trackers). The mem
@@ -373,7 +374,10 @@ class MemTracker {
   friend class PoolMemTrackerRegistry;
 
   /// Returns true if the current memory tracker's limit is exceeded.
-  bool CheckLimitExceeded() const { return limit_ >= 0 && limit_ < consumption(); }
+  bool CheckLimitExceeded(MemLimit mode) const {
+    int64_t limit = GetLimit(mode);
+    return limit >= 0 && limit < consumption();
+  }
 
   /// If consumption is higher than max_consumption, attempts to free memory by calling
   /// any added GC functions.  Returns true if max_consumption is still exceeded. Takes
@@ -427,6 +431,10 @@ class MemTracker {
   /// there is no consumption limit.
   const int64_t limit_;
 
+  /// Soft limit on memory consumption, in bytes. Can be exceeded but callers to
+  /// TryConsume() can opt not to exceed this limit. If -1, there is no consumption limit.
+  const int64_t soft_limit_;
+
   std::string label_;
 
   /// The parent of this tracker. The pointer is never modified, even after this tracker

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 3dbe012..c86095c 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -120,7 +120,7 @@ Status QueryState::Init(const TExecQueryFInstancesParams& rpc_params) {
   // more resources.
   ExecEnv* exec_env = ExecEnv::GetInstance();
   MemTracker* process_mem_tracker = exec_env->process_mem_tracker();
-  if (process_mem_tracker->LimitExceeded()) {
+  if (process_mem_tracker->LimitExceeded(MemLimit::HARD)) {
     string msg = Substitute(
         "Query $0 could not start because the backend Impala daemon "
         "is over its memory limit", PrintId(query_id()));
@@ -162,7 +162,7 @@ void QueryState::InitMemTrackers() {
 
 Status QueryState::InitBufferPoolState() {
   ExecEnv* exec_env = ExecEnv::GetInstance();
-  int64_t mem_limit = query_mem_tracker_->lowest_limit();
+  int64_t mem_limit = query_mem_tracker_->GetLowestLimit(MemLimit::HARD);
   int64_t max_reservation;
   if (query_options().__isset.buffer_pool_limit
       && query_options().buffer_pool_limit > 0) {

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 5cc80ce..5bb5a2d 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -255,7 +255,7 @@ void RuntimeState::SetMemLimitExceeded(MemTracker* tracker,
 
 Status RuntimeState::CheckQueryState() {
   DCHECK(instance_mem_tracker_ != nullptr);
-  if (UNLIKELY(instance_mem_tracker_->AnyLimitExceeded())) {
+  if (UNLIKELY(instance_mem_tracker_->AnyLimitExceeded(MemLimit::HARD))) {
     SetMemLimitExceeded(instance_mem_tracker_.get());
   }
   return GetQueryStatus();

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/udf/udf.cc
----------------------------------------------------------------------
diff --git a/be/src/udf/udf.cc b/be/src/udf/udf.cc
index 032f036..4ed3cc2 100644
--- a/be/src/udf/udf.cc
+++ b/be/src/udf/udf.cc
@@ -323,7 +323,7 @@ inline bool FunctionContextImpl::CheckAllocResult(const char* fn_name,
 inline void FunctionContextImpl::CheckMemLimit(const char* fn_name, int64_t byte_size) {
 #ifndef IMPALA_UDF_SDK_BUILD
   MemTracker* mem_tracker = udf_pool_->mem_tracker();
-  if (mem_tracker->AnyLimitExceeded()) {
+  if (mem_tracker->AnyLimitExceeded(MemLimit::HARD)) {
     ErrorMsg msg = ErrorMsg(TErrorCode::UDF_MEM_LIMIT_EXCEEDED, string(fn_name));
     state_->SetMemLimitExceeded(mem_tracker, byte_size, &msg);
   }