You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by sa...@apache.org on 2018/08/02 02:54:31 UTC

[1/3] impala git commit: Bump Kudu version to 5211897

Repository: impala
Updated Branches:
  refs/heads/master 87b02e733 -> a76ea5c2e


Bump Kudu version to 5211897

This requires changing one error message in a test due to a change in
the error message on the Kudu side.

Change-Id: Ic118b8ddbea8cbf0412516a0450e315a7a3c62e3
Reviewed-on: http://gerrit.cloudera.org:8080/11071
Reviewed-by: Impala Public Jenkins <im...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/9f20b758
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/9f20b758
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/9f20b758

Branch: refs/heads/master
Commit: 9f20b7582a695b802fc4edb740a63a7eebe2bce4
Parents: 87b02e7
Author: Thomas Tauber-Marshall <tm...@cloudera.com>
Authored: Mon Jul 23 16:57:58 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Aug 2 01:18:02 2018 +0000

----------------------------------------------------------------------
 bin/impala-config.sh          | 4 ++--
 tests/query_test/test_kudu.py | 2 +-
 2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/9f20b758/bin/impala-config.sh
----------------------------------------------------------------------
diff --git a/bin/impala-config.sh b/bin/impala-config.sh
index 31b554c..65eae3b 100755
--- a/bin/impala-config.sh
+++ b/bin/impala-config.sh
@@ -68,7 +68,7 @@ fi
 # moving to a different build of the toolchain, e.g. when a version is bumped or a
 # compile option is changed. The build id can be found in the output of the toolchain
 # build jobs, it is constructed from the build number and toolchain git hash prefix.
-export IMPALA_TOOLCHAIN_BUILD_ID=155-3c4a3251df
+export IMPALA_TOOLCHAIN_BUILD_ID=167-64c5af8f71
 # Versions of toolchain dependencies.
 # -----------------------------------
 export IMPALA_AVRO_VERSION=1.7.4-p4
@@ -154,7 +154,7 @@ if [[ $OSTYPE == "darwin"* ]]; then
 fi
 
 # Kudu version in the toolchain; provides libkudu_client.so and minicluster binaries.
-export IMPALA_KUDU_VERSION=a954418
+export IMPALA_KUDU_VERSION=5211897
 unset IMPALA_KUDU_URL
 
 : ${CDH_DOWNLOAD_HOST:=native-toolchain.s3.amazonaws.com}

http://git-wip-us.apache.org/repos/asf/impala/blob/9f20b758/tests/query_test/test_kudu.py
----------------------------------------------------------------------
diff --git a/tests/query_test/test_kudu.py b/tests/query_test/test_kudu.py
index 3efea37..636c6c0 100644
--- a/tests/query_test/test_kudu.py
+++ b/tests/query_test/test_kudu.py
@@ -1048,7 +1048,7 @@ class TestImpalaKuduIntegration(KuduTestSuite):
       # Drop the underlying Kudu table
       kudu_client.delete_table(kudu_table.name)
       assert not kudu_client.table_exists(kudu_table.name)
-      err_msg = 'The table does not exist: table_name: "%s"' % (kudu_table.name)
+      err_msg = 'the table does not exist: table_name: "%s"' % (kudu_table.name)
       try:
         cursor.execute("REFRESH %s" % (impala_table_name))
       except Exception as e:


[2/3] impala git commit: IMPALA-7297: soft memory limit for reservation increases

Posted by sa...@apache.org.
IMPALA-7297: soft memory limit for reservation increases

https://goo.gl/N9LgQt summarises the memory problems I'm trying to
solve here.

Reject reservation increases that would leave < 10% of the memory
available under a query or process mem_limit, which put the query
or impalad into a state with a heightened chance of OOM. This
avoids OOM in some scenarios where reserved memory is increased
too aggressively and starves non-reserved memory.

One way of thinking about this that it's extending the 80%
limit heuristic for reserved memory so that there's a soft
limit of min(80%, 90% - non-reserved memory consumption)

A follow-on will use this for scanner threads to prevent
starting scanner threads that may exceed the soft limit.

Testing:
Added unit tests for MemTracker and for ReservationTracker.

Change-Id: I39dcf2dd870a59c8b8cda4488fe41ce936d715ac
Reviewed-on: http://gerrit.cloudera.org:8080/10988
Reviewed-by: Todd Lipcon <to...@apache.org>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/68736bc3
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/68736bc3
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/68736bc3

Branch: refs/heads/master
Commit: 68736bc3784ad69b7f4f485a7a3ec0fc3d8d3a9b
Parents: 9f20b75
Author: Tim Armstrong <ta...@cloudera.com>
Authored: Fri Jul 13 11:03:08 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Aug 2 01:46:19 2018 +0000

----------------------------------------------------------------------
 be/src/exec/exchange-node.cc                    |   3 +-
 be/src/rpc/impala-service-pool.cc               |   2 +-
 .../runtime/bufferpool/buffer-pool-internal.h   |   2 +-
 be/src/runtime/bufferpool/buffer-pool.cc        |  11 +-
 be/src/runtime/bufferpool/buffer-pool.h         |   9 +-
 .../bufferpool/reservation-tracker-test.cc      |  74 ++++++-----
 .../runtime/bufferpool/reservation-tracker.cc   |  17 ++-
 be/src/runtime/bufferpool/reservation-tracker.h |  17 ++-
 be/src/runtime/initial-reservations.cc          |   9 +-
 be/src/runtime/mem-pool-test.cc                 |  22 ++--
 be/src/runtime/mem-tracker-test.cc              | 126 +++++++++++++------
 be/src/runtime/mem-tracker-types.h              |  31 +++++
 be/src/runtime/mem-tracker.cc                   |  37 +++++-
 be/src/runtime/mem-tracker.h                    |  96 +++++++-------
 be/src/runtime/query-state.cc                   |   4 +-
 be/src/runtime/runtime-state.cc                 |   2 +-
 be/src/udf/udf.cc                               |   2 +-
 17 files changed, 313 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/exec/exchange-node.cc
----------------------------------------------------------------------
diff --git a/be/src/exec/exchange-node.cc b/be/src/exec/exchange-node.cc
index 07511a7..26579c1 100644
--- a/be/src/exec/exchange-node.cc
+++ b/be/src/exec/exchange-node.cc
@@ -82,7 +82,8 @@ Status ExchangeNode::Prepare(RuntimeState* state) {
   RETURN_IF_ERROR(ExecEnv::GetInstance()->buffer_pool()->RegisterClient(
       Substitute("Exchg Recvr (id=$0)", id_), nullptr,
       ExecEnv::GetInstance()->buffer_reservation(), mem_tracker(),
-      numeric_limits<int64_t>::max(), runtime_profile(), &recvr_buffer_pool_client_));
+      numeric_limits<int64_t>::max(), runtime_profile(), &recvr_buffer_pool_client_,
+      MemLimit::HARD));
 
   // TODO: figure out appropriate buffer size
   DCHECK_GT(num_senders_, 0);

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/rpc/impala-service-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/rpc/impala-service-pool.cc b/be/src/rpc/impala-service-pool.cc
index cb406b9..d00c73d 100644
--- a/be/src/rpc/impala-service-pool.cc
+++ b/be/src/rpc/impala-service-pool.cc
@@ -170,7 +170,7 @@ kudu::Status ImpalaServicePool::QueueInboundCall(
     // fail. The check and the consumption need to be atomic so as to bound the memory
     // usage.
     unique_lock<SpinLock> mem_tracker_lock(mem_tracker_lock_);
-    if (UNLIKELY(service_mem_tracker_->AnyLimitExceeded())) {
+    if (UNLIKELY(service_mem_tracker_->AnyLimitExceeded(MemLimit::HARD))) {
       // Discards the transfer early so the transfer size drops to 0. This is to ensure
       // the MemTracker::Release() call in FailAndReleaseRpc() is correct as we haven't
       // called MemTracker::Consume() at this point.

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/bufferpool/buffer-pool-internal.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool-internal.h b/be/src/runtime/bufferpool/buffer-pool-internal.h
index 598939e..64c92e5 100644
--- a/be/src/runtime/bufferpool/buffer-pool-internal.h
+++ b/be/src/runtime/bufferpool/buffer-pool-internal.h
@@ -196,7 +196,7 @@ class BufferPool::Client {
  public:
   Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group, const string& name,
       ReservationTracker* parent_reservation, MemTracker* mem_tracker,
-      int64_t reservation_limit, RuntimeProfile* profile);
+      MemLimit mem_limit_mode, int64_t reservation_limit, RuntimeProfile* profile);
 
   ~Client() {
     DCHECK_EQ(0, num_pages_);

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/bufferpool/buffer-pool.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc
index 6e6d979..6b51ed3 100644
--- a/be/src/runtime/bufferpool/buffer-pool.cc
+++ b/be/src/runtime/bufferpool/buffer-pool.cc
@@ -117,11 +117,12 @@ BufferPool::~BufferPool() {}
 
 Status BufferPool::RegisterClient(const string& name, TmpFileMgr::FileGroup* file_group,
     ReservationTracker* parent_reservation, MemTracker* mem_tracker,
-    int64_t reservation_limit, RuntimeProfile* profile, ClientHandle* client) {
+    int64_t reservation_limit, RuntimeProfile* profile, ClientHandle* client,
+    MemLimit mem_limit_mode) {
   DCHECK(!client->is_registered());
   DCHECK(parent_reservation != NULL);
   client->impl_ = new Client(this, file_group, name, parent_reservation, mem_tracker,
-      reservation_limit, profile);
+      mem_limit_mode, reservation_limit, profile);
   return Status::OK();
 }
 
@@ -385,7 +386,7 @@ void BufferPool::SubReservation::Close() {
 
 BufferPool::Client::Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group,
     const string& name, ReservationTracker* parent_reservation, MemTracker* mem_tracker,
-    int64_t reservation_limit, RuntimeProfile* profile)
+    MemLimit mem_limit_mode, int64_t reservation_limit, RuntimeProfile* profile)
   : pool_(pool),
     file_group_(file_group),
     name_(name),
@@ -394,8 +395,8 @@ BufferPool::Client::Client(BufferPool* pool, TmpFileMgr::FileGroup* file_group,
     buffers_allocated_bytes_(0) {
   // Set up a child profile with buffer pool info.
   RuntimeProfile* child_profile = profile->CreateChild("Buffer pool", true, true);
-  reservation_.InitChildTracker(
-      child_profile, parent_reservation, mem_tracker, reservation_limit);
+  reservation_.InitChildTracker(child_profile, parent_reservation, mem_tracker,
+      reservation_limit, mem_limit_mode);
   counters_.alloc_time = ADD_TIMER(child_profile, "AllocTime");
   counters_.cumulative_allocations =
       ADD_COUNTER(child_profile, "CumulativeAllocations", TUnit::UNIT);

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/bufferpool/buffer-pool.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h
index 3323e0a..eb658b0 100644
--- a/be/src/runtime/bufferpool/buffer-pool.h
+++ b/be/src/runtime/bufferpool/buffer-pool.h
@@ -29,6 +29,7 @@
 #include "common/object-pool.h"
 #include "common/status.h"
 #include "gutil/macros.h"
+#include "runtime/mem-tracker-types.h"
 #include "runtime/tmp-file-mgr.h"
 #include "util/aligned-new.h"
 #include "util/internal-queue.h"
@@ -37,7 +38,6 @@
 
 namespace impala {
 
-class MemTracker;
 class ReservationTracker;
 class RuntimeProfile;
 class SystemAllocator;
@@ -175,11 +175,12 @@ class BufferPool : public CacheLineAligned {
   ///
   /// The client's reservation is created as a child of 'parent_reservation' with limit
   /// 'reservation_limit' and associated with MemTracker 'mem_tracker'. The initial
-  /// reservation is 0 bytes.
+  /// reservation is 0 bytes. 'mem_limit_mode' determines whether reservation
+  /// increases are checked against the soft or hard limit of 'mem_tracker'.
   Status RegisterClient(const std::string& name, TmpFileMgr::FileGroup* file_group,
       ReservationTracker* parent_reservation, MemTracker* mem_tracker,
-      int64_t reservation_limit, RuntimeProfile* profile,
-      ClientHandle* client) WARN_UNUSED_RESULT;
+      int64_t reservation_limit, RuntimeProfile* profile, ClientHandle* client,
+      MemLimit mem_limit_mode = MemLimit::SOFT) WARN_UNUSED_RESULT;
 
   /// Deregister 'client' if it is registered. All pages must be destroyed and buffers
   /// must be freed for the client before calling this. Releases any reservation that

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/bufferpool/reservation-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker-test.cc b/be/src/runtime/bufferpool/reservation-tracker-test.cc
index e441402..88aa92a 100644
--- a/be/src/runtime/bufferpool/reservation-tracker-test.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker-test.cc
@@ -340,6 +340,7 @@ TEST_F(ReservationTrackerTest, MemTrackerIntegrationMultiLevel) {
   // We can only handle MemTracker limits at the topmost linked ReservationTracker,
   // so avoid adding limits at lower level.
   const int LIMIT = HIERARCHY_DEPTH;
+  const int SOFT_LIMIT = static_cast<int>(LIMIT * 0.9);
   vector<int> mem_limits({LIMIT * 10, LIMIT, -1, -1, -1});
 
   // Root trackers aren't linked.
@@ -350,47 +351,64 @@ TEST_F(ReservationTrackerTest, MemTrackerIntegrationMultiLevel) {
         mem_limits[i], Substitute("Child $0", i), mem_trackers[i - 1].get()));
   }
 
-  vector<int> interesting_amounts({LIMIT - 1, LIMIT, LIMIT + 1});
+  vector<int> interesting_amounts(
+      {LIMIT - 1, LIMIT, LIMIT + 1, SOFT_LIMIT, SOFT_LIMIT - 1});
 
   // Test that all limits and consumption correctly reported when consuming
   // from a non-root ReservationTracker that is connected to a MemTracker.
-  for (int level = 1; level < HIERARCHY_DEPTH; ++level) {
-    int64_t lowest_limit = mem_trackers[level]->lowest_limit();
-    for (int amount : interesting_amounts) {
-      // Initialize the tracker, increase reservation, then release reservation by closing
-      // the tracker.
-      reservations[level].InitChildTracker(
-          NewProfile(), &reservations[level - 1], mem_trackers[level].get(), 500);
-      bool increased = reservations[level].IncreaseReservation(amount);
-      if (lowest_limit == -1 || amount <= lowest_limit) {
-        // The increase should go through.
-        ASSERT_TRUE(increased) << reservations[level].DebugString();
-        ASSERT_EQ(amount, reservations[level].GetReservation());
-        ASSERT_EQ(amount, mem_trackers[level]->consumption());
-        for (int ancestor = 0; ancestor < level; ++ancestor) {
-          ASSERT_EQ(amount, reservations[ancestor].GetChildReservations());
-          ASSERT_EQ(amount, mem_trackers[ancestor]->consumption());
+  // Test both soft and hard limits.
+  for (MemLimit limit_mode : {MemLimit::SOFT, MemLimit::HARD}) {
+    for (int level = 1; level < HIERARCHY_DEPTH; ++level) {
+      int64_t lowest_limit = mem_trackers[level]->GetLowestLimit(limit_mode);
+      for (int amount : interesting_amounts) {
+        LOG(INFO) << "level=" << level << " limit_mode=" << static_cast<int>(limit_mode)
+                  << " amount=" << amount << " lowest_limit=" << lowest_limit;
+        // Initialize the tracker, increase reservation, then release reservation by
+        // closing the tracker.
+        reservations[level].InitChildTracker(NewProfile(), &reservations[level - 1],
+            mem_trackers[level].get(), 500, limit_mode);
+        bool increased = reservations[level].IncreaseReservation(amount);
+        if (lowest_limit == -1 || amount <= lowest_limit) {
+          // The increase should go through.
+          ASSERT_TRUE(increased)
+              << reservations[level].DebugString() << "\n"
+              << mem_trackers[0]->LogUsage(MemTracker::UNLIMITED_DEPTH);
+          ASSERT_EQ(amount, reservations[level].GetReservation());
+          ASSERT_EQ(amount, mem_trackers[level]->consumption());
+          for (int ancestor = 0; ancestor < level; ++ancestor) {
+            ASSERT_EQ(amount, reservations[ancestor].GetChildReservations());
+            ASSERT_EQ(amount, mem_trackers[ancestor]->consumption());
+          }
+
+          LOG(INFO) << "\n" << mem_trackers[0]->LogUsage(MemTracker::UNLIMITED_DEPTH);
+        } else {
+          ASSERT_FALSE(increased);
         }
-
-        LOG(INFO) << "\n" << mem_trackers[0]->LogUsage(MemTracker::UNLIMITED_DEPTH);
         reservations[level].Close();
-      } else {
-        ASSERT_FALSE(increased);
-      }
-      // Reservations should be released on all ancestors.
-      for (int i = 0; i < level; ++i) {
-        ASSERT_EQ(0, reservations[i].GetReservation()) << i << ": "
-                                                       << reservations[i].DebugString();
-        ASSERT_EQ(0, reservations[i].GetChildReservations());
-        ASSERT_EQ(0, mem_trackers[i]->consumption());
+        // Reservations should be released on all ancestors.
+        for (int i = 0; i < level; ++i) {
+          ASSERT_EQ(0, reservations[i].GetReservation()) << i << ": "
+                                                         << reservations[i].DebugString();
+          ASSERT_EQ(0, reservations[i].GetChildReservations());
+          ASSERT_EQ(0, mem_trackers[i]->consumption());
+        }
       }
+      // Set up tracker to be parent for next iteration.
+      reservations[level].InitChildTracker(NewProfile(), &reservations[level - 1],
+          mem_trackers[level].get(), 500, limit_mode);
     }
+    for (int level = 1; level < HIERARCHY_DEPTH; ++level) reservations[level].Close();
   }
 
   // "Pull down" a reservation from the top of the hierarchy level-by-level to the
   // leaves, checking that everything is consistent at each step.
   for (int level = 0; level < HIERARCHY_DEPTH; ++level) {
     const int amount = LIMIT;
+    if (level > 0) {
+      reservations[level].InitChildTracker(
+          NewProfile(), &reservations[level - 1], mem_trackers[level].get(), 500,
+          MemLimit::HARD);
+    }
     ASSERT_TRUE(reservations[level].IncreaseReservation(amount));
     ASSERT_EQ(amount, reservations[level].GetReservation());
     ASSERT_EQ(0, reservations[level].GetUsedReservation());

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/bufferpool/reservation-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker.cc b/be/src/runtime/bufferpool/reservation-tracker.cc
index f0e1839..a920e1c 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.cc
+++ b/be/src/runtime/bufferpool/reservation-tracker.cc
@@ -57,7 +57,8 @@ void ReservationTracker::InitRootTracker(
 }
 
 void ReservationTracker::InitChildTracker(RuntimeProfile* profile,
-    ReservationTracker* parent, MemTracker* mem_tracker, int64_t reservation_limit) {
+    ReservationTracker* parent, MemTracker* mem_tracker, int64_t reservation_limit,
+    MemLimit mem_limit_mode) {
   DCHECK(parent != nullptr);
   DCHECK_GE(reservation_limit, 0);
 
@@ -65,6 +66,7 @@ void ReservationTracker::InitChildTracker(RuntimeProfile* profile,
   DCHECK(!initialized_);
   parent_ = parent;
   mem_tracker_ = mem_tracker;
+  mem_limit_mode_ = mem_limit_mode;
 
   reservation_limit_ = reservation_limit;
   reservation_ = 0;
@@ -79,7 +81,8 @@ void ReservationTracker::InitChildTracker(RuntimeProfile* profile,
       DCHECK_EQ(parent_mem_tracker, mem_tracker_->parent());
       // Make sure we don't have a lower limit than the ancestor, since we don't enforce
       // limits at lower links.
-      DCHECK_EQ(mem_tracker_->lowest_limit(), parent_mem_tracker->lowest_limit());
+      DCHECK_EQ(mem_tracker_->GetLowestLimit(mem_limit_mode_),
+          parent_mem_tracker->GetLowestLimit(mem_limit_mode_));
     } else {
       // Make sure we didn't leave a gap in the links. E.g. this tracker's grandparent
       // shouldn't have a MemTracker.
@@ -208,7 +211,8 @@ bool ReservationTracker::IncreaseReservationInternalLocked(int64_t bytes,
       granted = parent_->IncreaseReservationInternalLocked(
           reservation_increase, true, true, error_status);
     }
-    if (granted && !TryConsumeFromMemTracker(reservation_increase)) {
+    if (granted
+        && !TryConsumeFromMemTracker(reservation_increase, mem_limit_mode_)) {
       granted = false;
       if (error_status != nullptr) {
         *error_status = mem_tracker_->MemLimitExceeded(nullptr,
@@ -230,13 +234,14 @@ bool ReservationTracker::IncreaseReservationInternalLocked(int64_t bytes,
   return granted;
 }
 
-bool ReservationTracker::TryConsumeFromMemTracker(int64_t reservation_increase) {
+bool ReservationTracker::TryConsumeFromMemTracker(
+    int64_t reservation_increase, MemLimit mem_limit_mode) {
   DCHECK_GE(reservation_increase, 0);
   if (mem_tracker_ == nullptr) return true;
   if (GetParentMemTracker() == nullptr) {
     // At the topmost link, which may be a MemTracker with a limit, we need to use
     // TryConsume() to check the limit.
-    return mem_tracker_->TryConsume(reservation_increase);
+    return mem_tracker_->TryConsume(reservation_increase, mem_limit_mode);
   } else {
     // For lower links, there shouldn't be a limit to enforce, so we just need to
     // update the consumption of the linked MemTracker since the reservation is
@@ -329,7 +334,7 @@ bool ReservationTracker::TransferReservationTo(ReservationTracker* other, int64_
     // We don't handle MemTrackers with limit in this function - this should always
     // succeed.
     DCHECK(tracker->mem_tracker_ == nullptr || !tracker->mem_tracker_->has_limit());
-    bool success = tracker->TryConsumeFromMemTracker(bytes);
+    bool success = tracker->TryConsumeFromMemTracker(bytes, MemLimit::HARD);
     DCHECK(success);
     if (tracker != other_path_to_common[0]) tracker->child_reservations_ += bytes;
   }

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/bufferpool/reservation-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/bufferpool/reservation-tracker.h b/be/src/runtime/bufferpool/reservation-tracker.h
index 3bf2de1..604db7f 100644
--- a/be/src/runtime/bufferpool/reservation-tracker.h
+++ b/be/src/runtime/bufferpool/reservation-tracker.h
@@ -23,14 +23,14 @@
 #include <boost/thread/locks.hpp>
 #include <string>
 
-#include "runtime/bufferpool/reservation-tracker-counters.h"
 #include "common/status.h"
+#include "runtime/bufferpool/reservation-tracker-counters.h"
+#include "runtime/mem-tracker-types.h"
 #include "util/spinlock.h"
 
 namespace impala {
 
 class DummyProfile;
-class MemTracker;
 class RuntimeProfile;
 
 /// A tracker for a hierarchy of buffer pool memory reservations, denominated in bytes.
@@ -101,9 +101,12 @@ class ReservationTracker {
   /// If 'mem_tracker' is not NULL, reservations for this ReservationTracker and its
   /// children will be counted as consumption against 'mem_tracker'.
   /// 'reservation_limit' is the maximum reservation for this tracker in bytes.
-  /// if 'profile' is not NULL, the counters in 'counters_' are added to 'profile'.
+  /// 'mem_limit_mode' determines whether reservation increases are checked against the
+  /// soft or hard limit of 'mem_tracker'. If 'profile' is not NULL, the counters in
+  /// 'counters_' are added to 'profile'.
   void InitChildTracker(RuntimeProfile* profile, ReservationTracker* parent,
-      MemTracker* mem_tracker, int64_t reservation_limit);
+      MemTracker* mem_tracker, int64_t reservation_limit,
+      MemLimit mem_limit_mode = MemLimit::SOFT);
 
   /// If the tracker is initialized, deregister the ReservationTracker from its parent,
   /// relinquishing all this tracker's reservation. All of the reservation must be unused
@@ -219,7 +222,7 @@ class ReservationTracker {
   /// because it would exceed a memory limit. If there is no linked MemTracker, just
   /// returns true.
   /// TODO: remove once we account all memory via ReservationTrackers.
-  bool TryConsumeFromMemTracker(int64_t reservation_increase);
+  bool TryConsumeFromMemTracker(int64_t reservation_increase, MemLimit mem_limit_mode);
 
   /// Decrease consumption on linked MemTracker to reflect a decrease in reservation of
   /// 'reservation_decrease'. If there is no linked MemTracker, does nothing.
@@ -301,6 +304,10 @@ class ReservationTracker {
   /// TODO: remove once all memory is accounted via ReservationTrackers.
   MemTracker* mem_tracker_ = nullptr;
 
+  /// Determines whether the soft or hard limit of 'mem_tracker_' is checked for
+  /// reservation increases.
+  MemLimit mem_limit_mode_;
+
   /// The maximum reservation in bytes that this tracker can have. Can be read with an
   /// atomic load without holding lock.
   int64_t reservation_limit_;

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/initial-reservations.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/initial-reservations.cc b/be/src/runtime/initial-reservations.cc
index 0fe00fe..e5fcd32 100644
--- a/be/src/runtime/initial-reservations.cc
+++ b/be/src/runtime/initial-reservations.cc
@@ -41,10 +41,13 @@ InitialReservations::InitialReservations(ObjectPool* obj_pool,
     ReservationTracker* query_reservation, MemTracker* query_mem_tracker,
     int64_t initial_reservation_total_claims)
   : initial_reservation_mem_tracker_(obj_pool->Add(
-      new MemTracker(-1, "Unclaimed reservations", query_mem_tracker, false))),
-      remaining_initial_reservation_claims_(initial_reservation_total_claims) {
+        new MemTracker(-1, "Unclaimed reservations", query_mem_tracker, false))),
+    remaining_initial_reservation_claims_(initial_reservation_total_claims) {
+  // Soft mem_limits should not apply to the initial reservation because we don't want
+  // to fail the query in the case where the initial reservation exceeds the soft
+  // limit.
   initial_reservations_.InitChildTracker(nullptr, query_reservation,
-      initial_reservation_mem_tracker_, numeric_limits<int64_t>::max());
+      initial_reservation_mem_tracker_, numeric_limits<int64_t>::max(), MemLimit::HARD);
 }
 
 Status InitialReservations::Init(

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/mem-pool-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-pool-test.cc b/be/src/runtime/mem-pool-test.cc
index 26176a5..f939008 100644
--- a/be/src/runtime/mem-pool-test.cc
+++ b/be/src/runtime/mem-pool-test.cc
@@ -241,35 +241,35 @@ TEST(MemPoolTest, Limits) {
   MemTracker limit2(3 * MemPoolTest::INITIAL_CHUNK_SIZE, "", &limit3);
 
   MemPool* p1 = new MemPool(&limit1);
-  EXPECT_FALSE(limit1.AnyLimitExceeded());
+  EXPECT_FALSE(limit1.AnyLimitExceeded(MemLimit::HARD));
 
   MemPool* p2 = new MemPool(&limit2);
-  EXPECT_FALSE(limit2.AnyLimitExceeded());
+  EXPECT_FALSE(limit2.AnyLimitExceeded(MemLimit::HARD));
 
   // p1 exceeds a non-shared limit
   p1->Allocate(MemPoolTest::INITIAL_CHUNK_SIZE);
-  EXPECT_FALSE(limit1.LimitExceeded());
+  EXPECT_FALSE(limit1.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE, limit1.consumption());
-  EXPECT_FALSE(limit3.LimitExceeded());
+  EXPECT_FALSE(limit3.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE, limit3.consumption());
 
   p1->Allocate(MemPoolTest::INITIAL_CHUNK_SIZE);
-  EXPECT_TRUE(limit1.LimitExceeded());
+  EXPECT_TRUE(limit1.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE * 3, limit1.consumption());
-  EXPECT_FALSE(limit3.LimitExceeded());
+  EXPECT_FALSE(limit3.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE * 3, limit3.consumption());
 
   // p2 exceeds a shared limit
   p2->Allocate(MemPoolTest::INITIAL_CHUNK_SIZE);
-  EXPECT_FALSE(limit2.LimitExceeded());
+  EXPECT_FALSE(limit2.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE, limit2.consumption());
-  EXPECT_FALSE(limit3.LimitExceeded());
+  EXPECT_FALSE(limit3.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE * 4, limit3.consumption());
 
   p2->Allocate(1);
-  EXPECT_FALSE(limit2.LimitExceeded());
+  EXPECT_FALSE(limit2.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE * 3, limit2.consumption());
-  EXPECT_TRUE(limit3.LimitExceeded());
+  EXPECT_TRUE(limit3.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(MemPoolTest::INITIAL_CHUNK_SIZE * 6, limit3.consumption());
 
   // deleting pools reduces consumption
@@ -281,7 +281,7 @@ TEST(MemPoolTest, Limits) {
 
   // Allocate another chunk
   p2->FreeAll();
-  EXPECT_FALSE(limit2.LimitExceeded());
+  EXPECT_FALSE(limit2.LimitExceeded(MemLimit::HARD));
   uint8_t* result = p2->TryAllocate(MemPoolTest::INITIAL_CHUNK_SIZE);
   ASSERT_TRUE(result != NULL);
   ASSERT_TRUE(MemPoolTest::CheckIntegrity(p2, false));

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/mem-tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker-test.cc b/be/src/runtime/mem-tracker-test.cc
index 285f897..e52c345 100644
--- a/be/src/runtime/mem-tracker-test.cc
+++ b/be/src/runtime/mem-tracker-test.cc
@@ -35,7 +35,7 @@ TEST(MemTestTest, SingleTrackerNoLimit) {
   EXPECT_EQ(t.consumption(), 20);
   t.Release(15);
   EXPECT_EQ(t.consumption(), 5);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
   // Clean up.
   t.Release(5);
 }
@@ -45,17 +45,72 @@ TEST(MemTestTest, SingleTrackerWithLimit) {
   EXPECT_TRUE(t.has_limit());
   t.Consume(10);
   EXPECT_EQ(t.consumption(), 10);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
   t.Consume(10);
   EXPECT_EQ(t.consumption(), 20);
-  EXPECT_TRUE(t.LimitExceeded());
+  EXPECT_TRUE(t.LimitExceeded(MemLimit::HARD));
   t.Release(15);
   EXPECT_EQ(t.consumption(), 5);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
   // Clean up.
   t.Release(5);
 }
 
+/// Exercise individual functions that take MemLimit::SOFT option.
+TEST(MemTestTest, SoftLimit) {
+  MemTracker t(100);
+  MemTracker child(-1, "", &t);
+
+  // Exercise functions that return limits.
+  EXPECT_EQ(90, t.soft_limit());
+  EXPECT_EQ(90, t.GetLimit(MemLimit::SOFT));
+  EXPECT_EQ(100, t.GetLimit(MemLimit::HARD));
+  EXPECT_EQ(-1, child.soft_limit());
+  EXPECT_EQ(90, t.GetLowestLimit(MemLimit::SOFT));
+  EXPECT_EQ(90, child.GetLowestLimit(MemLimit::SOFT));
+  EXPECT_EQ(100, t.GetLowestLimit(MemLimit::HARD));
+
+  // Test SpareCapacity()
+  EXPECT_EQ(100, t.SpareCapacity(MemLimit::HARD));
+  EXPECT_EQ(90, t.SpareCapacity(MemLimit::SOFT));
+  EXPECT_EQ(100, child.SpareCapacity(MemLimit::HARD));
+  EXPECT_EQ(90, child.SpareCapacity(MemLimit::SOFT));
+
+  // Test TryConsume() within soft limit.
+  EXPECT_TRUE(t.TryConsume(90, MemLimit::SOFT));
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::SOFT));
+  EXPECT_FALSE(child.AnyLimitExceeded(MemLimit::SOFT));
+
+  // Test TryConsume() going over soft limit.
+  EXPECT_FALSE(t.TryConsume(1, MemLimit::SOFT));
+  EXPECT_FALSE(child.TryConsume(1, MemLimit::SOFT));
+  EXPECT_TRUE(t.TryConsume(1, MemLimit::HARD));
+  EXPECT_TRUE(t.LimitExceeded(MemLimit::SOFT));
+  EXPECT_FALSE(child.LimitExceeded(MemLimit::SOFT));
+  EXPECT_TRUE(t.AnyLimitExceeded(MemLimit::SOFT));
+  EXPECT_TRUE(child.AnyLimitExceeded(MemLimit::SOFT));
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
+  EXPECT_FALSE(child.AnyLimitExceeded(MemLimit::HARD));
+  EXPECT_EQ(9, child.SpareCapacity(MemLimit::HARD));
+  EXPECT_EQ(-1, child.SpareCapacity(MemLimit::SOFT));
+
+  // Test Consume() going over hard limit.
+  child.Consume(10);
+  EXPECT_TRUE(t.LimitExceeded(MemLimit::SOFT));
+  EXPECT_TRUE(t.LimitExceeded(MemLimit::HARD));
+  EXPECT_FALSE(child.LimitExceeded(MemLimit::SOFT));
+  EXPECT_FALSE(child.LimitExceeded(MemLimit::HARD));
+  EXPECT_TRUE(t.AnyLimitExceeded(MemLimit::SOFT));
+  EXPECT_TRUE(t.AnyLimitExceeded(MemLimit::HARD));
+  EXPECT_TRUE(child.AnyLimitExceeded(MemLimit::SOFT));
+  EXPECT_TRUE(child.AnyLimitExceeded(MemLimit::HARD));
+  EXPECT_EQ(-1, child.SpareCapacity(MemLimit::HARD));
+  EXPECT_EQ(-11, child.SpareCapacity(MemLimit::SOFT));
+
+  t.Release(91);
+  child.Release(10);
+}
+
 TEST(MemTestTest, ConsumptionMetric) {
   TMetricDef md;
   md.__set_key("test");
@@ -80,12 +135,12 @@ TEST(MemTestTest, ConsumptionMetric) {
   t.Consume(150);
   EXPECT_EQ(t.consumption(), 0);
   EXPECT_EQ(t.peak_consumption(), 0);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(neg_t.consumption(), 0);
   t.Release(5);
   EXPECT_EQ(t.consumption(), 0);
   EXPECT_EQ(t.peak_consumption(), 0);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(neg_t.consumption(), 0);
 
   metric.Increment(10);
@@ -101,21 +156,21 @@ TEST(MemTestTest, ConsumptionMetric) {
   neg_t.Consume(1);
   EXPECT_EQ(t.consumption(), 5);
   EXPECT_EQ(t.peak_consumption(), 10);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(neg_t.consumption(), -5);
   metric.Increment(150);
   t.Consume(1);
   neg_t.Consume(1);
   EXPECT_EQ(t.consumption(), 155);
   EXPECT_EQ(t.peak_consumption(), 155);
-  EXPECT_TRUE(t.LimitExceeded());
+  EXPECT_TRUE(t.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(neg_t.consumption(), -155);
   metric.Increment(-150);
   t.Consume(-1);
   neg_t.Consume(1);
   EXPECT_EQ(t.consumption(), 5);
   EXPECT_EQ(t.peak_consumption(), 155);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(neg_t.consumption(), -5);
   // consumption_ is not updated when Consume()/Release() is called with a zero value
   metric.Increment(10);
@@ -123,7 +178,7 @@ TEST(MemTestTest, ConsumptionMetric) {
   neg_t.Consume(0);
   EXPECT_EQ(t.consumption(), 5);
   EXPECT_EQ(t.peak_consumption(), 155);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(neg_t.consumption(), -5);
   // Clean up.
   metric.Increment(-15);
@@ -139,37 +194,37 @@ TEST(MemTestTest, TrackerHierarchy) {
   // everything below limits
   c1.Consume(60);
   EXPECT_EQ(c1.consumption(), 60);
-  EXPECT_FALSE(c1.LimitExceeded());
-  EXPECT_FALSE(c1.AnyLimitExceeded());
+  EXPECT_FALSE(c1.LimitExceeded(MemLimit::HARD));
+  EXPECT_FALSE(c1.AnyLimitExceeded(MemLimit::HARD));
   EXPECT_EQ(c2.consumption(), 0);
-  EXPECT_FALSE(c2.LimitExceeded());
-  EXPECT_FALSE(c2.AnyLimitExceeded());
+  EXPECT_FALSE(c2.LimitExceeded(MemLimit::HARD));
+  EXPECT_FALSE(c2.AnyLimitExceeded(MemLimit::HARD));
   EXPECT_EQ(p.consumption(), 60);
-  EXPECT_FALSE(p.LimitExceeded());
-  EXPECT_FALSE(p.AnyLimitExceeded());
+  EXPECT_FALSE(p.LimitExceeded(MemLimit::HARD));
+  EXPECT_FALSE(p.AnyLimitExceeded(MemLimit::HARD));
 
   // p goes over limit
   c2.Consume(50);
   EXPECT_EQ(c1.consumption(), 60);
-  EXPECT_FALSE(c1.LimitExceeded());
-  EXPECT_TRUE(c1.AnyLimitExceeded());
+  EXPECT_FALSE(c1.LimitExceeded(MemLimit::HARD));
+  EXPECT_TRUE(c1.AnyLimitExceeded(MemLimit::HARD));
   EXPECT_EQ(c2.consumption(), 50);
-  EXPECT_FALSE(c2.LimitExceeded());
-  EXPECT_TRUE(c2.AnyLimitExceeded());
+  EXPECT_FALSE(c2.LimitExceeded(MemLimit::HARD));
+  EXPECT_TRUE(c2.AnyLimitExceeded(MemLimit::HARD));
   EXPECT_EQ(p.consumption(), 110);
-  EXPECT_TRUE(p.LimitExceeded());
+  EXPECT_TRUE(p.LimitExceeded(MemLimit::HARD));
 
   // c2 goes over limit, p drops below limit
   c1.Release(20);
   c2.Consume(10);
   EXPECT_EQ(c1.consumption(), 40);
-  EXPECT_FALSE(c1.LimitExceeded());
-  EXPECT_FALSE(c1.AnyLimitExceeded());
+  EXPECT_FALSE(c1.LimitExceeded(MemLimit::HARD));
+  EXPECT_FALSE(c1.AnyLimitExceeded(MemLimit::HARD));
   EXPECT_EQ(c2.consumption(), 60);
-  EXPECT_TRUE(c2.LimitExceeded());
-  EXPECT_TRUE(c2.AnyLimitExceeded());
+  EXPECT_TRUE(c2.LimitExceeded(MemLimit::HARD));
+  EXPECT_TRUE(c2.AnyLimitExceeded(MemLimit::HARD));
   EXPECT_EQ(p.consumption(), 100);
-  EXPECT_FALSE(p.LimitExceeded());
+  EXPECT_FALSE(p.LimitExceeded(MemLimit::HARD));
 
   // Clean up.
   c1.Release(40);
@@ -254,34 +309,34 @@ TEST(MemTestTest, GcFunctions) {
   ASSERT_TRUE(t.has_limit());
 
   t.Consume(9);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
 
   // Test TryConsume()
   EXPECT_FALSE(t.TryConsume(2));
   EXPECT_EQ(t.consumption(), 9);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
 
   // Attach GcFunction that releases 1 byte
   GcFunctionHelper gc_func_helper(&t);
   t.AddGcFunction(boost::bind(&GcFunctionHelper::GcFunc, &gc_func_helper));
   EXPECT_TRUE(t.TryConsume(2));
   EXPECT_EQ(t.consumption(), 10);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
 
   // GcFunction will be called even though TryConsume() fails
   EXPECT_FALSE(t.TryConsume(2));
   EXPECT_EQ(t.consumption(), 9);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
 
   // GcFunction won't be called
   EXPECT_TRUE(t.TryConsume(1));
   EXPECT_EQ(t.consumption(), 10);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
 
-  // Test LimitExceeded()
+  // Test LimitExceeded(MemLimit::HARD)
   t.Consume(1);
   EXPECT_EQ(t.consumption(), 11);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(t.consumption(), 10);
 
   // Add more GcFunctions, test that we only call them until the limit is no longer
@@ -292,13 +347,12 @@ TEST(MemTestTest, GcFunctions) {
   t.AddGcFunction(boost::bind(&GcFunctionHelper::GcFunc, &gc_func_helper3));
   t.Consume(1);
   EXPECT_EQ(t.consumption(), 11);
-  EXPECT_FALSE(t.LimitExceeded());
+  EXPECT_FALSE(t.LimitExceeded(MemLimit::HARD));
   EXPECT_EQ(t.consumption(), 10);
 
-  //Clean up.
+  // Clean up.
   t.Release(10);
 }
-
 }
 
 IMPALA_TEST_MAIN();

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/mem-tracker-types.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker-types.h b/be/src/runtime/mem-tracker-types.h
new file mode 100644
index 0000000..e199f1b
--- /dev/null
+++ b/be/src/runtime/mem-tracker-types.h
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef IMPALA_RUNTIME_MEM_TRACKER_TYPES_H
+#define IMPALA_RUNTIME_MEM_TRACKER_TYPES_H
+
+namespace impala {
+class MemTracker;
+
+/// Mode argument passed to various MemTracker methods to indicate whether a soft or hard
+/// limit should be used.
+enum class MemLimit { HARD, SOFT };
+
+}
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/mem-tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.cc b/be/src/runtime/mem-tracker.cc
index 237484f..cb615a1 100644
--- a/be/src/runtime/mem-tracker.cc
+++ b/be/src/runtime/mem-tracker.cc
@@ -38,6 +38,9 @@ using std::priority_queue;
 using std::greater;
 using namespace strings;
 
+DEFINE_double_hidden(soft_mem_limit_frac, 0.9, "(Advanced) Soft memory limit as a "
+    "fraction of hard memory limit.");
+
 namespace impala {
 
 const string MemTracker::COUNTER_NAME = "PeakMemoryUsage";
@@ -45,9 +48,17 @@ const string MemTracker::COUNTER_NAME = "PeakMemoryUsage";
 // Name for request pool MemTrackers. '$0' is replaced with the pool name.
 const string REQUEST_POOL_MEM_TRACKER_LABEL_FORMAT = "RequestPool=$0";
 
+/// Calculate the soft limit for a MemTracker based on the hard limit 'limit'.
+static int64_t CalcSoftLimit(int64_t limit) {
+  if (limit < 0) return -1;
+  double frac = max(0.0, min(1.0, FLAGS_soft_mem_limit_frac));
+  return static_cast<int64_t>(limit * frac);
+}
+
 MemTracker::MemTracker(
     int64_t byte_limit, const string& label, MemTracker* parent, bool log_usage_if_zero)
   : limit_(byte_limit),
+    soft_limit_(CalcSoftLimit(byte_limit)),
     label_(label),
     parent_(parent),
     consumption_(&local_counter_),
@@ -64,6 +75,7 @@ MemTracker::MemTracker(
 MemTracker::MemTracker(RuntimeProfile* profile, int64_t byte_limit,
     const std::string& label, MemTracker* parent)
   : limit_(byte_limit),
+    soft_limit_(CalcSoftLimit(byte_limit)),
     label_(label),
     parent_(parent),
     consumption_(profile->AddHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES)),
@@ -80,6 +92,7 @@ MemTracker::MemTracker(RuntimeProfile* profile, int64_t byte_limit,
 MemTracker::MemTracker(IntGauge* consumption_metric,
     int64_t byte_limit, const string& label, MemTracker* parent)
   : limit_(byte_limit),
+    soft_limit_(CalcSoftLimit(byte_limit)),
     label_(label),
     parent_(parent),
     consumption_(&local_counter_),
@@ -95,6 +108,7 @@ MemTracker::MemTracker(IntGauge* consumption_metric,
 
 void MemTracker::Init() {
   DCHECK_GE(limit_, -1);
+  DCHECK_LE(soft_limit_, limit_);
   if (parent_ != NULL) parent_->AddChildTracker(this);
   // populate all_trackers_ and limit_trackers_
   MemTracker* tracker = this;
@@ -133,6 +147,25 @@ void MemTracker::EnableReservationReporting(const ReservationTrackerCounters& co
   delete reservation_counters_.Swap(new ReservationTrackerCounters(counters));
 }
 
+int64_t MemTracker::GetLowestLimit(MemLimit mode) const {
+  if (limit_trackers_.empty()) return -1;
+  int64_t min_limit = numeric_limits<int64_t>::max();
+  for (MemTracker* limit_tracker : limit_trackers_) {
+    DCHECK(limit_tracker->has_limit());
+    min_limit = min(min_limit, limit_tracker->GetLimit(mode));
+  }
+  return min_limit;
+}
+
+int64_t MemTracker::SpareCapacity(MemLimit mode) const {
+  int64_t result = numeric_limits<int64_t>::max();
+  for (MemTracker* tracker : limit_trackers_) {
+    int64_t mem_left = tracker->GetLimit(mode) - tracker->consumption();
+    result = std::min(result, mem_left);
+  }
+  return result;
+}
+
 int64_t MemTracker::GetPoolMemReserved() {
   // Pool trackers should have a pool_name_ and no limit.
   DCHECK(!pool_name_.empty());
@@ -274,7 +307,7 @@ string MemTracker::LogUsage(int max_recursive_depth, const string& prefix,
 
   stringstream ss;
   ss << prefix << label_ << ":";
-  if (CheckLimitExceeded()) ss << " memory limit exceeded.";
+  if (CheckLimitExceeded(MemLimit::HARD)) ss << " memory limit exceeded.";
   if (limit_ > 0) ss << " Limit=" << PrettyPrinter::Print(limit_, TUnit::BYTES);
 
   ReservationTrackerCounters* reservation_counters = reservation_counters_.Load();
@@ -392,7 +425,7 @@ Status MemTracker::MemLimitExceeded(RuntimeState* state, const std::string& deta
   ss << endl;
   ExecEnv* exec_env = ExecEnv::GetInstance();
   MemTracker* process_tracker = exec_env->process_mem_tracker();
-  const int64_t process_capacity = process_tracker->SpareCapacity();
+  const int64_t process_capacity = process_tracker->SpareCapacity(MemLimit::HARD);
   ss << "Memory left in process limit: "
      << PrettyPrinter::Print(process_capacity, TUnit::BYTES) << endl;
 

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/mem-tracker.h
----------------------------------------------------------------------
diff --git a/be/src/runtime/mem-tracker.h b/be/src/runtime/mem-tracker.h
index 01fd331..d4b4dd4 100644
--- a/be/src/runtime/mem-tracker.h
+++ b/be/src/runtime/mem-tracker.h
@@ -29,6 +29,7 @@
 
 #include "common/logging.h"
 #include "common/atomic.h"
+#include "runtime/mem-tracker-types.h"
 #include "util/debug-util.h"
 #include "util/internal-queue.h"
 #include "util/metrics.h"
@@ -40,7 +41,6 @@
 namespace impala {
 
 class ObjectPool;
-class MemTracker;
 struct ReservationTrackerCounters;
 class TQueryOptions;
 
@@ -48,6 +48,12 @@ class TQueryOptions;
 /// and can be arranged into a tree structure such that the consumption tracked
 /// by a MemTracker is also tracked by its ancestors.
 ///
+/// A MemTracker has a hard and a soft limit derived from the limit. If the hard limit
+/// is exceeded, all memory allocations and queries should fail until we are under the
+/// limit again. The soft limit can be exceeded without causing query failures, but
+/// consumers of memory that can tolerate running without more memory should not allocate
+/// memory in excess of the soft limit.
+///
 /// We use a five-level hierarchy of mem trackers: process, pool, query, fragment
 /// instance. Specific parts of the fragment (exec nodes, sinks, etc) will add a
 /// fifth level when they are initialized. This function also initializes a user
@@ -77,22 +83,23 @@ class MemTracker {
  public:
   /// 'byte_limit' < 0 means no limit
   /// 'label' is the label used in the usage string (LogUsage())
-  /// If 'log_usage_if_zero' is false, this tracker (and its children) will not be included
+  /// If 'log_usage_if_zero' is false, this tracker (and its children) will not be
+  /// included
   /// in LogUsage() output if consumption is 0.
   MemTracker(int64_t byte_limit = -1, const std::string& label = std::string(),
-      MemTracker* parent = NULL, bool log_usage_if_zero = true);
+      MemTracker* parent = nullptr, bool log_usage_if_zero = true);
 
   /// C'tor for tracker for which consumption counter is created as part of a profile.
   /// The counter is created with name COUNTER_NAME.
   MemTracker(RuntimeProfile* profile, int64_t byte_limit,
-      const std::string& label = std::string(), MemTracker* parent = NULL);
+      const std::string& label = std::string(), MemTracker* parent = nullptr);
 
   /// C'tor for tracker that uses consumption_metric as the consumption value.
   /// Consume()/Release() can still be called. This is used for the root process tracker
   /// (if 'parent' is NULL). It is also to report on other categories of memory under the
   /// process tracker, e.g. buffer pool free buffers (if 'parent - non-NULL).
   MemTracker(IntGauge* consumption_metric, int64_t byte_limit = -1,
-      const std::string& label = std::string(), MemTracker* parent = NULL);
+      const std::string& label = std::string(), MemTracker* parent = nullptr);
 
   ~MemTracker();
 
@@ -129,13 +136,13 @@ class MemTracker {
       return;
     }
 
-    if (consumption_metric_ != NULL) {
+    if (consumption_metric_ != nullptr) {
       RefreshConsumptionFromMetric();
       return;
     }
     for (MemTracker* tracker : all_trackers_) {
       tracker->consumption_->Add(bytes);
-      if (tracker->consumption_metric_ == NULL) {
+      if (tracker->consumption_metric_ == nullptr) {
         DCHECK_GE(tracker->consumption_->current_value(), 0);
       }
     }
@@ -148,7 +155,7 @@ class MemTracker {
   /// the limit recorded in one of its ancestors already happened.
   void ConsumeLocal(int64_t bytes, MemTracker* end_tracker) {
     DCHECK(!closed_) << label_;
-    DCHECK(consumption_metric_ == NULL) << "Should not be called on root.";
+    DCHECK(consumption_metric_ == nullptr) << "Should not be called on root.";
     for (MemTracker* tracker : all_trackers_) {
       if (tracker == end_tracker) return;
       DCHECK(!tracker->has_limit());
@@ -163,19 +170,21 @@ class MemTracker {
   }
 
   /// Increases consumption of this tracker and its ancestors by 'bytes' only if
-  /// they can all consume 'bytes'. If this brings any of them over, none of them
-  /// are updated.
-  /// Returns true if the try succeeded.
+  /// they can all consume 'bytes' without exceeding limit (hard or soft) specified
+  /// by 'mode'. If any limit would be exceed, no MemTrackers are updated. If the
+  /// caller can tolerate an allocation failing, it should set mode=SOFT so that
+  /// other callers that may not tolerate allocation failures have a better chance
+  /// of success. Returns true if the consumption was successfully updated.
   WARN_UNUSED_RESULT
-  bool TryConsume(int64_t bytes) {
+  bool TryConsume(int64_t bytes, MemLimit mode = MemLimit::HARD) {
     DCHECK(!closed_) << label_;
-    if (consumption_metric_ != NULL) RefreshConsumptionFromMetric();
+    if (consumption_metric_ != nullptr) RefreshConsumptionFromMetric();
     if (UNLIKELY(bytes <= 0)) return true;
     int i;
     // Walk the tracker tree top-down.
     for (i = all_trackers_.size() - 1; i >= 0; --i) {
       MemTracker* tracker = all_trackers_[i];
-      const int64_t limit = tracker->limit();
+      const int64_t limit = tracker->GetLimit(mode);
       if (limit < 0) {
         tracker->consumption_->Add(bytes); // No limit at this tracker.
       } else {
@@ -215,7 +224,7 @@ class MemTracker {
       return;
     }
 
-    if (consumption_metric_ != NULL) {
+    if (consumption_metric_ != nullptr) {
       RefreshConsumptionFromMetric();
       return;
     }
@@ -227,9 +236,10 @@ class MemTracker {
       /// metric. Don't blow up in this case. (Note that this doesn't affect non-process
       /// trackers since we can enforce that the reported memory usage is internally
       /// consistent.)
-      if (tracker->consumption_metric_ == NULL) {
+      if (tracker->consumption_metric_ == nullptr) {
         DCHECK_GE(tracker->consumption_->current_value(), 0)
-          << std::endl << tracker->LogUsage(UNLIMITED_DEPTH);
+            << std::endl
+            << tracker->LogUsage(UNLIMITED_DEPTH);
       }
     }
   }
@@ -242,22 +252,23 @@ class MemTracker {
 
   /// Returns true if a valid limit of this tracker or one of its ancestors is
   /// exceeded.
-  bool AnyLimitExceeded() {
+  bool AnyLimitExceeded(MemLimit mode) {
     for (MemTracker* tracker : limit_trackers_) {
-      if (tracker->LimitExceeded()) return true;
+      if (tracker->LimitExceeded(mode)) return true;
     }
     return false;
   }
 
   /// If this tracker has a limit, checks the limit and attempts to free up some memory if
-  /// the limit is exceeded by calling any added GC functions. Returns true if the limit is
-  /// exceeded after calling the GC functions. Returns false if there is no limit.
-  bool LimitExceeded() {
-    if (UNLIKELY(CheckLimitExceeded())) {
-      if (bytes_over_limit_metric_ != NULL) {
+  /// the hard limit is exceeded by calling any added GC functions. Returns true if the
+  /// limit is exceeded after calling the GC functions. Returns false if there is no limit
+  /// or consumption is under the limit.
+  bool LimitExceeded(MemLimit mode) {
+    if (UNLIKELY(CheckLimitExceeded(mode))) {
+      if (mode == MemLimit::HARD && bytes_over_limit_metric_ != nullptr) {
         bytes_over_limit_metric_->SetValue(consumption() - limit_);
       }
-      return GcMemory(limit_);
+      return mode == MemLimit::HARD ? GcMemory(limit_) : true;
     }
     return false;
   }
@@ -265,15 +276,7 @@ class MemTracker {
   /// Returns the maximum consumption that can be made without exceeding the limit on
   /// this tracker or any of its parents. Returns int64_t::max() if there are no
   /// limits and a negative value if any limit is already exceeded.
-  int64_t SpareCapacity() const {
-    int64_t result = std::numeric_limits<int64_t>::max();
-    for (std::vector<MemTracker*>::const_iterator tracker = limit_trackers_.begin();
-         tracker != limit_trackers_.end(); ++tracker) {
-      int64_t mem_left = (*tracker)->limit() - (*tracker)->consumption();
-      result = std::min(result, mem_left);
-    }
-    return result;
-  }
+  int64_t SpareCapacity(MemLimit mode) const;
 
   /// Refresh the memory consumption value from the consumption metric. Only valid to
   /// call if this tracker has a consumption metric.
@@ -284,19 +287,17 @@ class MemTracker {
 
   int64_t limit() const { return limit_; }
   bool has_limit() const { return limit_ >= 0; }
+  int64_t soft_limit() const { return soft_limit_; }
+  int64_t GetLimit(MemLimit mode) const {
+    if (mode == MemLimit::SOFT) return soft_limit();
+    DCHECK_ENUM_EQ(mode, MemLimit::HARD);
+    return limit();
+  }
   const std::string& label() const { return label_; }
 
   /// Returns the lowest limit for this tracker and its ancestors. Returns
   /// -1 if there is no limit.
-  int64_t lowest_limit() const {
-    if (limit_trackers_.empty()) return -1;
-    int64_t v = std::numeric_limits<int64_t>::max();
-    for (int i = 0; i < limit_trackers_.size(); ++i) {
-      DCHECK(limit_trackers_[i]->has_limit());
-      v = std::min(v, limit_trackers_[i]->limit());
-    }
-    return v;
-  }
+  int64_t GetLowestLimit(MemLimit mode) const;
 
   /// Returns the memory 'reserved' by this resource pool mem tracker, which is the sum
   /// of the memory reserved by the queries in it (i.e. its child trackers). The mem
@@ -373,7 +374,10 @@ class MemTracker {
   friend class PoolMemTrackerRegistry;
 
   /// Returns true if the current memory tracker's limit is exceeded.
-  bool CheckLimitExceeded() const { return limit_ >= 0 && limit_ < consumption(); }
+  bool CheckLimitExceeded(MemLimit mode) const {
+    int64_t limit = GetLimit(mode);
+    return limit >= 0 && limit < consumption();
+  }
 
   /// If consumption is higher than max_consumption, attempts to free memory by calling
   /// any added GC functions.  Returns true if max_consumption is still exceeded. Takes
@@ -427,6 +431,10 @@ class MemTracker {
   /// there is no consumption limit.
   const int64_t limit_;
 
+  /// Soft limit on memory consumption, in bytes. Can be exceeded but callers to
+  /// TryConsume() can opt not to exceed this limit. If -1, there is no consumption limit.
+  const int64_t soft_limit_;
+
   std::string label_;
 
   /// The parent of this tracker. The pointer is never modified, even after this tracker

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/query-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/query-state.cc b/be/src/runtime/query-state.cc
index 3dbe012..c86095c 100644
--- a/be/src/runtime/query-state.cc
+++ b/be/src/runtime/query-state.cc
@@ -120,7 +120,7 @@ Status QueryState::Init(const TExecQueryFInstancesParams& rpc_params) {
   // more resources.
   ExecEnv* exec_env = ExecEnv::GetInstance();
   MemTracker* process_mem_tracker = exec_env->process_mem_tracker();
-  if (process_mem_tracker->LimitExceeded()) {
+  if (process_mem_tracker->LimitExceeded(MemLimit::HARD)) {
     string msg = Substitute(
         "Query $0 could not start because the backend Impala daemon "
         "is over its memory limit", PrintId(query_id()));
@@ -162,7 +162,7 @@ void QueryState::InitMemTrackers() {
 
 Status QueryState::InitBufferPoolState() {
   ExecEnv* exec_env = ExecEnv::GetInstance();
-  int64_t mem_limit = query_mem_tracker_->lowest_limit();
+  int64_t mem_limit = query_mem_tracker_->GetLowestLimit(MemLimit::HARD);
   int64_t max_reservation;
   if (query_options().__isset.buffer_pool_limit
       && query_options().buffer_pool_limit > 0) {

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/runtime/runtime-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/runtime-state.cc b/be/src/runtime/runtime-state.cc
index 5cc80ce..5bb5a2d 100644
--- a/be/src/runtime/runtime-state.cc
+++ b/be/src/runtime/runtime-state.cc
@@ -255,7 +255,7 @@ void RuntimeState::SetMemLimitExceeded(MemTracker* tracker,
 
 Status RuntimeState::CheckQueryState() {
   DCHECK(instance_mem_tracker_ != nullptr);
-  if (UNLIKELY(instance_mem_tracker_->AnyLimitExceeded())) {
+  if (UNLIKELY(instance_mem_tracker_->AnyLimitExceeded(MemLimit::HARD))) {
     SetMemLimitExceeded(instance_mem_tracker_.get());
   }
   return GetQueryStatus();

http://git-wip-us.apache.org/repos/asf/impala/blob/68736bc3/be/src/udf/udf.cc
----------------------------------------------------------------------
diff --git a/be/src/udf/udf.cc b/be/src/udf/udf.cc
index 032f036..4ed3cc2 100644
--- a/be/src/udf/udf.cc
+++ b/be/src/udf/udf.cc
@@ -323,7 +323,7 @@ inline bool FunctionContextImpl::CheckAllocResult(const char* fn_name,
 inline void FunctionContextImpl::CheckMemLimit(const char* fn_name, int64_t byte_size) {
 #ifndef IMPALA_UDF_SDK_BUILD
   MemTracker* mem_tracker = udf_pool_->mem_tracker();
-  if (mem_tracker->AnyLimitExceeded()) {
+  if (mem_tracker->AnyLimitExceeded(MemLimit::HARD)) {
     ErrorMsg msg = ErrorMsg(TErrorCode::UDF_MEM_LIMIT_EXCEEDED, string(fn_name));
     state_->SetMemLimitExceeded(mem_tracker, byte_size, &msg);
   }


[3/3] impala git commit: IMPALA-7376: DCHECK hit if a fragment instance fails to initialize the filter bank

Posted by sa...@apache.org.
IMPALA-7376: DCHECK hit if a fragment instance fails to initialize the filter bank

While Prepare()-ing a fragment instance, if we fail to initialize the
runtime filter bank, we will exit FIS::Prepare() without acquiring a
thread token (AcquireThreadToken()):

FIS::Finalize() is called always regardless of whether the fragment
instance succeeded or failed. And FIS::Finalize() tries to ReleaseThreadToken()
even though it might not have gotten acquired, causing a DCHECK to be hit.

This patch fixes it by making sure that no failable code is run before
acquiring the thread token, thereby ensuring that the thread token is
always acquired and thus avoiding the above crash.

A test is added to confirm this as well. This test crashes without the
code changes in this patch.

Change-Id: I1d6e7afc18fe2f0e1e29d2bd8a5f804a78f7043a
Reviewed-on: http://gerrit.cloudera.org:8080/11096
Reviewed-by: Sailesh Mukil <sa...@cloudera.com>
Tested-by: Impala Public Jenkins <im...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/impala/repo
Commit: http://git-wip-us.apache.org/repos/asf/impala/commit/a76ea5c2
Tree: http://git-wip-us.apache.org/repos/asf/impala/tree/a76ea5c2
Diff: http://git-wip-us.apache.org/repos/asf/impala/diff/a76ea5c2

Branch: refs/heads/master
Commit: a76ea5c2ea055b8ba1aee9df9e574fc31cdebbc0
Parents: 68736bc
Author: Sailesh Mukil <sa...@apache.org>
Authored: Tue Jul 31 16:27:11 2018 -0700
Committer: Impala Public Jenkins <im...@cloudera.com>
Committed: Thu Aug 2 02:20:58 2018 +0000

----------------------------------------------------------------------
 be/src/runtime/fragment-instance-state.cc |  8 ++++++--
 tests/failure/test_failpoints.py          | 10 ++++++++++
 2 files changed, 16 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/impala/blob/a76ea5c2/be/src/runtime/fragment-instance-state.cc
----------------------------------------------------------------------
diff --git a/be/src/runtime/fragment-instance-state.cc b/be/src/runtime/fragment-instance-state.cc
index 11122c7..cbae601 100644
--- a/be/src/runtime/fragment-instance-state.cc
+++ b/be/src/runtime/fragment-instance-state.cc
@@ -132,11 +132,15 @@ Status FragmentInstanceState::Prepare() {
   event_sequence_->Start(query_state_->fragment_events_start_time());
   UpdateState(StateEvent::PREPARE_START);
 
+  // Reserve one main thread from the pool
+  runtime_state_->resource_pool()->AcquireThreadToken();
+
+  // Exercise debug actions at the first point where errors are possible in Prepare().
+  RETURN_IF_ERROR(DebugAction(query_state_->query_options(), "FIS_IN_PREPARE"));
+
   RETURN_IF_ERROR(runtime_state_->InitFilterBank(
       fragment_ctx_.fragment.runtime_filters_reservation_bytes));
 
-  // Reserve one main thread from the pool
-  runtime_state_->resource_pool()->AcquireThreadToken();
   avg_thread_tokens_ = profile()->AddSamplingCounter("AverageThreadTokens",
       bind<int64_t>(mem_fn(&ThreadResourcePool::num_threads),
           runtime_state_->resource_pool()));

http://git-wip-us.apache.org/repos/asf/impala/blob/a76ea5c2/tests/failure/test_failpoints.py
----------------------------------------------------------------------
diff --git a/tests/failure/test_failpoints.py b/tests/failure/test_failpoints.py
index 7366900..278c2f1 100644
--- a/tests/failure/test_failpoints.py
+++ b/tests/failure/test_failpoints.py
@@ -136,6 +136,16 @@ class TestFailpoints(ImpalaTestSuite):
         node_ids.append(int(match.group('node_id')))
     return node_ids
 
+  def test_lifecycle_failures(self):
+    """Test that targeted failure injections in the query lifecycle do not cause crashes
+    or hangs"""
+    query = "select * from tpch.lineitem limit 10000"
+
+    # Fail the Prepare() phase of all fragment instances.
+    debug_action = 'FIS_IN_PREPARE:FAIL@1.0'
+    self.execute_query_expect_failure(self.client, query,
+        query_options={'debug_action':debug_action})
+
   def __execute_fail_action(self, query, vector):
     try:
       self.execute_query(query, vector.get_value('exec_option'),