You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:28 UTC

[16/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from kudu@334ecafd

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/maintenance_manager.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/maintenance_manager.cc b/be/src/kudu/util/maintenance_manager.cc
new file mode 100644
index 0000000..9a42464
--- /dev/null
+++ b/be/src/kudu/util/maintenance_manager.cc
@@ -0,0 +1,550 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/maintenance_manager.h"
+
+#include <cinttypes>
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <sstream>
+#include <string>
+#include <type_traits>
+#include <utility>
+
+#include <boost/bind.hpp>
+#include <gflags/gflags.h>
+
+#include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/debug/trace_logging.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/maintenance_manager.pb.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/process_memory.h"
+#include "kudu/util/random_util.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/threadpool.h"
+#include "kudu/util/trace.h"
+
+using std::pair;
+using std::string;
+using strings::Substitute;
+
+DEFINE_int32(maintenance_manager_num_threads, 1,
+             "Size of the maintenance manager thread pool. "
+             "For spinning disks, the number of threads should "
+             "not be above the number of devices.");
+TAG_FLAG(maintenance_manager_num_threads, stable);
+
+DEFINE_int32(maintenance_manager_polling_interval_ms, 250,
+       "Polling interval for the maintenance manager scheduler, "
+       "in milliseconds.");
+TAG_FLAG(maintenance_manager_polling_interval_ms, hidden);
+
+DEFINE_int32(maintenance_manager_history_size, 8,
+       "Number of completed operations the manager is keeping track of.");
+TAG_FLAG(maintenance_manager_history_size, hidden);
+
+DEFINE_bool(enable_maintenance_manager, true,
+       "Enable the maintenance manager, runs compaction and tablet cleaning tasks.");
+TAG_FLAG(enable_maintenance_manager, unsafe);
+
+DEFINE_int64(log_target_replay_size_mb, 1024,
+             "The target maximum size of logs to be replayed at startup. If a tablet "
+             "has in-memory operations that are causing more than this size of logs "
+             "to be retained, then the maintenance manager will prioritize flushing "
+             "these operations to disk.");
+TAG_FLAG(log_target_replay_size_mb, experimental);
+
+DEFINE_int64(data_gc_min_size_mb, 0,
+             "The (exclusive) minimum number of megabytes of ancient data on "
+             "disk, per tablet, needed to prioritize deletion of that data.");
+TAG_FLAG(data_gc_min_size_mb, experimental);
+
+DEFINE_double(data_gc_prioritization_prob, 0.5,
+             "The probability that we will prioritize data GC over performance "
+             "improvement operations. If set to 1.0, we will always prefer to "
+             "delete old data before running performance improvement operations "
+             "such as delta compaction.");
+TAG_FLAG(data_gc_prioritization_prob, experimental);
+
+namespace kudu {
+
+MaintenanceOpStats::MaintenanceOpStats() {
+  Clear();
+}
+
+void MaintenanceOpStats::Clear() {
+  valid_ = false;
+  runnable_ = false;
+  ram_anchored_ = 0;
+  logs_retained_bytes_ = 0;
+  data_retained_bytes_ = 0;
+  perf_improvement_ = 0;
+  last_modified_ = MonoTime();
+}
+
+MaintenanceOp::MaintenanceOp(std::string name, IOUsage io_usage)
+    : name_(std::move(name)),
+      running_(0),
+      cancel_(false),
+      io_usage_(io_usage) {
+}
+
+MaintenanceOp::~MaintenanceOp() {
+  CHECK(!manager_.get()) << "You must unregister the " << name_
+         << " Op before destroying it.";
+}
+
+void MaintenanceOp::Unregister() {
+  CHECK(manager_.get()) << "Op " << name_ << " was never registered.";
+  manager_->UnregisterOp(this);
+}
+
+MaintenanceManagerStatusPB_OpInstancePB OpInstance::DumpToPB() const {
+  MaintenanceManagerStatusPB_OpInstancePB pb;
+  pb.set_thread_id(thread_id);
+  pb.set_name(name);
+  if (duration.Initialized()) {
+    pb.set_duration_millis(duration.ToMilliseconds());
+  }
+  MonoDelta delta(MonoTime::Now() - start_mono_time);
+  pb.set_millis_since_start(delta.ToMilliseconds());
+  return pb;
+}
+
+const MaintenanceManager::Options MaintenanceManager::kDefaultOptions = {
+  .num_threads = 0,
+  .polling_interval_ms = 0,
+  .history_size = 0,
+};
+
+MaintenanceManager::MaintenanceManager(const Options& options,
+                                       std::string server_uuid)
+  : server_uuid_(std::move(server_uuid)),
+    num_threads_(options.num_threads <= 0 ?
+                 FLAGS_maintenance_manager_num_threads : options.num_threads),
+    cond_(&lock_),
+    shutdown_(false),
+    polling_interval_ms_(options.polling_interval_ms <= 0 ?
+          FLAGS_maintenance_manager_polling_interval_ms :
+          options.polling_interval_ms),
+    running_ops_(0),
+    completed_ops_count_(0),
+    rand_(GetRandomSeed32()),
+    memory_pressure_func_(&process_memory::UnderMemoryPressure) {
+  CHECK_OK(ThreadPoolBuilder("MaintenanceMgr").set_min_threads(num_threads_)
+               .set_max_threads(num_threads_).Build(&thread_pool_));
+  uint32_t history_size = options.history_size == 0 ?
+                          FLAGS_maintenance_manager_history_size :
+                          options.history_size;
+  completed_ops_.resize(history_size);
+}
+
+MaintenanceManager::~MaintenanceManager() {
+  Shutdown();
+}
+
+Status MaintenanceManager::Start() {
+  CHECK(!monitor_thread_);
+  RETURN_NOT_OK(Thread::Create("maintenance", "maintenance_scheduler",
+      boost::bind(&MaintenanceManager::RunSchedulerThread, this),
+      &monitor_thread_));
+  return Status::OK();
+}
+
+void MaintenanceManager::Shutdown() {
+  {
+    std::lock_guard<Mutex> guard(lock_);
+    if (shutdown_) {
+      return;
+    }
+    shutdown_ = true;
+    cond_.Broadcast();
+  }
+  if (monitor_thread_.get()) {
+    CHECK_OK(ThreadJoiner(monitor_thread_.get()).Join());
+    monitor_thread_.reset();
+    // Wait for all the running and queued tasks before shutting down. Otherwise,
+    // Shutdown() can remove a queued task silently. We count on eventually running the
+    // queued tasks to decrement their "running" count, which is incremented at the time
+    // they are enqueued.
+    thread_pool_->Wait();
+    thread_pool_->Shutdown();
+  }
+}
+
+void MaintenanceManager::RegisterOp(MaintenanceOp* op) {
+  CHECK(op);
+  std::lock_guard<Mutex> guard(lock_);
+  CHECK(!op->manager_) << "Tried to register " << op->name()
+          << ", but it was already registered.";
+  pair<OpMapTy::iterator, bool> val
+    (ops_.insert(OpMapTy::value_type(op, MaintenanceOpStats())));
+  CHECK(val.second)
+      << "Tried to register " << op->name()
+      << ", but it already exists in ops_.";
+  op->manager_ = shared_from_this();
+  op->cond_.reset(new ConditionVariable(&lock_));
+  VLOG_AND_TRACE("maintenance", 1) << LogPrefix() << "Registered " << op->name();
+}
+
+void MaintenanceManager::UnregisterOp(MaintenanceOp* op) {
+  {
+    std::lock_guard<Mutex> guard(lock_);
+    CHECK(op->manager_.get() == this) << "Tried to unregister " << op->name()
+          << ", but it is not currently registered with this maintenance manager.";
+    auto iter = ops_.find(op);
+    CHECK(iter != ops_.end()) << "Tried to unregister " << op->name()
+        << ", but it was never registered";
+    // While the op is running, wait for it to be finished.
+    if (iter->first->running_ > 0) {
+      VLOG_AND_TRACE("maintenance", 1) << LogPrefix() << "Waiting for op " << op->name()
+                                       << " to finish so we can unregister it.";
+    }
+    op->CancelAndDisable();
+    while (iter->first->running_ > 0) {
+      op->cond_->Wait();
+      iter = ops_.find(op);
+      CHECK(iter != ops_.end()) << "Tried to unregister " << op->name()
+          << ", but another thread unregistered it while we were "
+          << "waiting for it to complete";
+    }
+    ops_.erase(iter);
+  }
+  LOG_WITH_PREFIX(INFO) << "Unregistered op " << op->name();
+  op->cond_.reset();
+  // Remove the op's shared_ptr reference to us.  This might 'delete this'.
+  op->manager_.reset();
+}
+
+bool MaintenanceManager::disabled_for_tests() const {
+  return !ANNOTATE_UNPROTECTED_READ(FLAGS_enable_maintenance_manager);
+}
+
+void MaintenanceManager::RunSchedulerThread() {
+  if (!FLAGS_enable_maintenance_manager) {
+    LOG(INFO) << "Maintenance manager is disabled. Stopping thread.";
+    return;
+  }
+
+  MonoDelta polling_interval = MonoDelta::FromMilliseconds(polling_interval_ms_);
+
+  std::unique_lock<Mutex> guard(lock_);
+
+  // Set to true if the scheduler runs and finds that there is no work to do.
+  bool prev_iter_found_no_work = false;
+
+  while (true) {
+    // We'll keep sleeping if:
+    //    1) there are no free threads available to perform a maintenance op.
+    // or 2) we just tried to schedule an op but found nothing to run.
+    // However, if it's time to shut down, we want to do so immediately.
+    while ((running_ops_ >= num_threads_ || prev_iter_found_no_work || disabled_for_tests()) &&
+           !shutdown_) {
+      cond_.WaitFor(polling_interval);
+      prev_iter_found_no_work = false;
+    }
+    if (shutdown_) {
+      VLOG_AND_TRACE("maintenance", 1) << LogPrefix() << "Shutting down maintenance manager.";
+      return;
+    }
+
+    // Find the best op.
+    pair<MaintenanceOp*, string> op_and_note = FindBestOp();
+    auto* op = op_and_note.first;
+    const auto& note = op_and_note.second;
+
+    // If we found no work to do, then we should sleep before trying again to schedule.
+    // Otherwise, we can go right into trying to find the next op.
+    prev_iter_found_no_work = (op == nullptr);
+    if (!op) {
+      VLOG_AND_TRACE("maintenance", 2) << LogPrefix()
+                                       << "No maintenance operations look worth doing.";
+      continue;
+    }
+
+    // Prepare the maintenance operation.
+    op->running_++;
+    running_ops_++;
+    guard.unlock();
+    bool ready = op->Prepare();
+    guard.lock();
+    if (!ready) {
+      LOG_WITH_PREFIX(INFO) << "Prepare failed for " << op->name()
+                            << ".  Re-running scheduler.";
+      op->running_--;
+      running_ops_--;
+      op->cond_->Signal();
+      continue;
+    }
+
+    LOG_AND_TRACE("maintenance", INFO) << LogPrefix() << "Scheduling "
+                                       << op->name() << ": " << note;
+    // Run the maintenance operation.
+    Status s = thread_pool_->SubmitFunc(boost::bind(
+        &MaintenanceManager::LaunchOp, this, op));
+    CHECK(s.ok());
+  }
+}
+
+// Finding the best operation goes through four filters:
+// - If there's an Op that we can run quickly that frees log retention, we run it.
+// - If we've hit the overall process memory limit (note: this includes memory that the Ops cannot
+//   free), we run the Op with the highest RAM usage.
+// - If there are Ops that are retaining logs past our target replay size, we run the one that has
+//   the highest retention (and if many qualify, then we run the one that also frees up the
+//   most RAM).
+// - Finally, if there's nothing else that we really need to do, we run the Op that will improve
+//   performance the most.
+//
+// The reason it's done this way is that we want to prioritize limiting the amount of resources we
+// hold on to. Low IO Ops go first since we can quickly run them, then we can look at memory usage.
+// Reversing those can starve the low IO Ops when the system is under intense memory pressure.
+//
+// In the third priority we're at a point where nothing's urgent and there's nothing we can run
+// quickly.
+// TODO We currently optimize for freeing log retention but we could consider having some sort of
+// sliding priority between log retention and RAM usage. For example, is an Op that frees
+// 128MB of log retention and 12MB of RAM always better than an op that frees 12MB of log retention
+// and 128MB of RAM? Maybe a more holistic approach would be better.
+pair<MaintenanceOp*, string> MaintenanceManager::FindBestOp() {
+  TRACE_EVENT0("maintenance", "MaintenanceManager::FindBestOp");
+
+  size_t free_threads = num_threads_ - running_ops_;
+  if (free_threads == 0) {
+    return {nullptr, "no free threads"};
+  }
+
+  int64_t low_io_most_logs_retained_bytes = 0;
+  MaintenanceOp* low_io_most_logs_retained_bytes_op = nullptr;
+
+  uint64_t most_mem_anchored = 0;
+  MaintenanceOp* most_mem_anchored_op = nullptr;
+
+  int64_t most_logs_retained_bytes = 0;
+  int64_t most_logs_retained_bytes_ram_anchored = 0;
+  MaintenanceOp* most_logs_retained_bytes_op = nullptr;
+
+  int64_t most_data_retained_bytes = 0;
+  MaintenanceOp* most_data_retained_bytes_op = nullptr;
+
+  double best_perf_improvement = 0;
+  MaintenanceOp* best_perf_improvement_op = nullptr;
+  for (OpMapTy::value_type &val : ops_) {
+    MaintenanceOp* op(val.first);
+    MaintenanceOpStats& stats(val.second);
+    VLOG_WITH_PREFIX(3) << "Considering MM op " << op->name();
+    // Update op stats.
+    stats.Clear();
+    op->UpdateStats(&stats);
+    if (op->cancelled() || !stats.valid() || !stats.runnable()) {
+      continue;
+    }
+    if (stats.logs_retained_bytes() > low_io_most_logs_retained_bytes &&
+        op->io_usage() == MaintenanceOp::LOW_IO_USAGE) {
+      low_io_most_logs_retained_bytes_op = op;
+      low_io_most_logs_retained_bytes = stats.logs_retained_bytes();
+      VLOG_AND_TRACE("maintenance", 2) << LogPrefix() << "Op " << op->name() << " can free "
+                                       << stats.logs_retained_bytes() << " bytes of logs";
+    }
+
+    if (stats.ram_anchored() > most_mem_anchored) {
+      most_mem_anchored_op = op;
+      most_mem_anchored = stats.ram_anchored();
+    }
+    // We prioritize ops that can free more logs, but when it's the same we pick the one that
+    // also frees up the most memory.
+    if (stats.logs_retained_bytes() > 0 &&
+        (stats.logs_retained_bytes() > most_logs_retained_bytes ||
+            (stats.logs_retained_bytes() == most_logs_retained_bytes &&
+                stats.ram_anchored() > most_logs_retained_bytes_ram_anchored))) {
+      most_logs_retained_bytes_op = op;
+      most_logs_retained_bytes = stats.logs_retained_bytes();
+      most_logs_retained_bytes_ram_anchored = stats.ram_anchored();
+    }
+
+    if (stats.data_retained_bytes() > most_data_retained_bytes) {
+      most_data_retained_bytes_op = op;
+      most_data_retained_bytes = stats.data_retained_bytes();
+      VLOG_AND_TRACE("maintenance", 2) << LogPrefix() << "Op " << op->name() << " can free "
+                                       << stats.data_retained_bytes() << " bytes of data";
+    }
+
+    if ((!best_perf_improvement_op) ||
+        (stats.perf_improvement() > best_perf_improvement)) {
+      best_perf_improvement_op = op;
+      best_perf_improvement = stats.perf_improvement();
+    }
+  }
+
+  // Look at ops that we can run quickly that free up log retention.
+  if (low_io_most_logs_retained_bytes_op) {
+    if (low_io_most_logs_retained_bytes > 0) {
+      string notes = Substitute("free $0 bytes of WAL", low_io_most_logs_retained_bytes);
+      return {low_io_most_logs_retained_bytes_op, std::move(notes)};
+    }
+  }
+
+  // Look at free memory. If it is dangerously low, we must select something
+  // that frees memory-- the op with the most anchored memory.
+  double capacity_pct;
+  if (memory_pressure_func_(&capacity_pct)) {
+    if (!most_mem_anchored_op) {
+      std::string msg = StringPrintf("System under memory pressure "
+          "(%.2f%% of limit used). However, there are no ops currently "
+          "runnable which would free memory.", capacity_pct);
+      LOG_WITH_PREFIX(INFO) << msg;
+      return {nullptr, msg};
+    }
+    string note = StringPrintf("under memory pressure (%.2f%% used, "
+                               "can flush %" PRIu64 " bytes)",
+                               capacity_pct, most_mem_anchored);
+    return {most_mem_anchored_op, std::move(note)};
+  }
+
+  if (most_logs_retained_bytes_op &&
+      most_logs_retained_bytes / 1024 / 1024 >= FLAGS_log_target_replay_size_mb) {
+    string note = Substitute("$0 bytes log retention", most_logs_retained_bytes);
+    return {most_logs_retained_bytes_op, std::move(note)};
+  }
+
+  // Look at ops that we can run quickly that free up data on disk.
+  if (most_data_retained_bytes_op &&
+      most_data_retained_bytes > FLAGS_data_gc_min_size_mb * 1024 * 1024) {
+    if (!best_perf_improvement_op || best_perf_improvement <= 0 ||
+        rand_.NextDoubleFraction() <= FLAGS_data_gc_prioritization_prob) {
+      string note = Substitute("$0 bytes on disk", most_data_retained_bytes);
+      return {most_data_retained_bytes_op, std::move(note)};
+    }
+    VLOG(1) << "Skipping data GC due to prioritizing perf improvement";
+  }
+
+  if (best_perf_improvement_op && best_perf_improvement > 0) {
+    string note = StringPrintf("perf score=%.6f", best_perf_improvement);
+    return {best_perf_improvement_op, std::move(note)};
+  }
+  return {nullptr, "no ops with positive improvement"};
+}
+
+void MaintenanceManager::LaunchOp(MaintenanceOp* op) {
+  int64_t thread_id = Thread::CurrentThreadId();
+  OpInstance op_instance;
+  op_instance.thread_id = thread_id;
+  op_instance.name = op->name();
+  op_instance.start_mono_time = MonoTime::Now();
+  op->RunningGauge()->Increment();
+  {
+    std::lock_guard<Mutex> lock(running_instances_lock_);
+    InsertOrDie(&running_instances_, thread_id, &op_instance);
+  }
+
+  SCOPED_CLEANUP({
+    op->RunningGauge()->Decrement();
+
+    std::lock_guard<Mutex> l(lock_);
+    {
+      std::lock_guard<Mutex> lock(running_instances_lock_);
+      running_instances_.erase(thread_id);
+    }
+    op_instance.duration = MonoTime::Now() - op_instance.start_mono_time;
+    completed_ops_[completed_ops_count_ % completed_ops_.size()] = op_instance;
+    completed_ops_count_++;
+
+    op->DurationHistogram()->Increment(op_instance.duration.ToMilliseconds());
+
+    running_ops_--;
+    op->running_--;
+    op->cond_->Signal();
+    cond_.Signal(); // wake up scheduler
+  });
+
+  scoped_refptr<Trace> trace(new Trace);
+  Stopwatch sw;
+  sw.start();
+  {
+    ADOPT_TRACE(trace.get());
+    TRACE_EVENT1("maintenance", "MaintenanceManager::LaunchOp",
+                 "name", op->name());
+    op->Perform();
+    sw.stop();
+  }
+  LOG_WITH_PREFIX(INFO) << op->name() << " complete. "
+                        << "Timing: " << sw.elapsed().ToString()
+                        << " Metrics: " << trace->MetricsAsJSON();
+}
+
+void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatusPB* out_pb) {
+  DCHECK(out_pb != nullptr);
+  std::lock_guard<Mutex> guard(lock_);
+  pair<MaintenanceOp*, string> best_op_and_why = FindBestOp();
+  auto* best_op = best_op_and_why.first;
+
+  for (MaintenanceManager::OpMapTy::value_type& val : ops_) {
+    MaintenanceManagerStatusPB_MaintenanceOpPB* op_pb = out_pb->add_registered_operations();
+    MaintenanceOp* op(val.first);
+    MaintenanceOpStats& stat(val.second);
+    op_pb->set_name(op->name());
+    op_pb->set_running(op->running());
+    if (stat.valid()) {
+      op_pb->set_runnable(stat.runnable());
+      op_pb->set_ram_anchored_bytes(stat.ram_anchored());
+      op_pb->set_logs_retained_bytes(stat.logs_retained_bytes());
+      op_pb->set_perf_improvement(stat.perf_improvement());
+    } else {
+      op_pb->set_runnable(false);
+      op_pb->set_ram_anchored_bytes(0);
+      op_pb->set_logs_retained_bytes(0);
+      op_pb->set_perf_improvement(0.0);
+    }
+
+    if (best_op == op) {
+      out_pb->mutable_best_op()->CopyFrom(*op_pb);
+    }
+  }
+
+  {
+    std::lock_guard<Mutex> lock(running_instances_lock_);
+    for (const auto& running_instance : running_instances_) {
+      *out_pb->add_running_operations() = running_instance.second->DumpToPB();
+    }
+  }
+
+  for (int n = 1; n <= completed_ops_.size(); n++) {
+    int i = completed_ops_count_ - n;
+    if (i < 0) break;
+    const auto& completed_op = completed_ops_[i % completed_ops_.size()];
+
+    if (!completed_op.name.empty()) {
+      *out_pb->add_completed_operations() = completed_op.DumpToPB();
+    }
+  }
+}
+
+std::string MaintenanceManager::LogPrefix() const {
+  return Substitute("P $0: ", server_uuid_);
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/maintenance_manager.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/maintenance_manager.h b/be/src/kudu/util/maintenance_manager.h
new file mode 100644
index 0000000..7d20c8a
--- /dev/null
+++ b/be/src/kudu/util/maintenance_manager.h
@@ -0,0 +1,361 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <functional>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest_prod.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/atomic.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/random.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+template<class T>
+class AtomicGauge;
+class Histogram;
+class MaintenanceManager;
+class MaintenanceManagerStatusPB;
+class MaintenanceManagerStatusPB_OpInstancePB;
+class Thread;
+class ThreadPool;
+
+class MaintenanceOpStats {
+ public:
+  MaintenanceOpStats();
+
+  // Zero all stats. They are invalid until the first setter is called.
+  void Clear();
+
+  bool runnable() const {
+    DCHECK(valid_);
+    return runnable_;
+  }
+
+  void set_runnable(bool runnable) {
+    UpdateLastModified();
+    runnable_ = runnable;
+  }
+
+  uint64_t ram_anchored() const {
+    DCHECK(valid_);
+    return ram_anchored_;
+  }
+
+  void set_ram_anchored(uint64_t ram_anchored) {
+    UpdateLastModified();
+    ram_anchored_ = ram_anchored;
+  }
+
+  int64_t logs_retained_bytes() const {
+    DCHECK(valid_);
+    return logs_retained_bytes_;
+  }
+
+  void set_logs_retained_bytes(int64_t logs_retained_bytes) {
+    UpdateLastModified();
+    logs_retained_bytes_ = logs_retained_bytes;
+  }
+
+  int64_t data_retained_bytes() const {
+    DCHECK(valid_);
+    return data_retained_bytes_;
+  }
+
+  void set_data_retained_bytes(int64_t data_retained_bytes) {
+    UpdateLastModified();
+    data_retained_bytes_ = data_retained_bytes;
+  }
+
+  double perf_improvement() const {
+    DCHECK(valid_);
+    return perf_improvement_;
+  }
+
+  void set_perf_improvement(double perf_improvement) {
+    UpdateLastModified();
+    perf_improvement_ = perf_improvement;
+  }
+
+  const MonoTime& last_modified() const {
+    DCHECK(valid_);
+    return last_modified_;
+  }
+
+  bool valid() const {
+    return valid_;
+  }
+
+ private:
+  void UpdateLastModified() {
+    valid_ = true;
+    last_modified_ = MonoTime::Now();
+  }
+
+  // Important: Update Clear() when adding fields to this class.
+
+  // True if these stats are valid.
+  bool valid_;
+
+  // True if this op can be run now.
+  bool runnable_;
+
+  // The approximate amount of memory that not doing this operation keeps
+  // around.  This number is used to decide when to start freeing memory, so it
+  // should be fairly accurate.  May be 0.
+  uint64_t ram_anchored_;
+
+  // Approximate amount of disk space in WAL files that would be freed if this
+  // operation ran. May be 0.
+  int64_t logs_retained_bytes_;
+
+  // Approximate amount of disk space in data blocks that would be freed if
+  // this operation ran. May be 0.
+  int64_t data_retained_bytes_;
+
+  // The estimated performance improvement-- how good it is to do this on some
+  // absolute scale (yet TBD).
+  double perf_improvement_;
+
+  // The last time that the stats were modified.
+  MonoTime last_modified_;
+};
+
+// Represents an instance of a maintenance operation.
+struct OpInstance {
+  // Id of thread the instance ran on.
+  int64_t thread_id;
+  // Name of operation.
+  std::string name;
+  // Time the operation took to run. Value is unitialized if instance is still running.
+  MonoDelta duration;
+  MonoTime start_mono_time;
+
+  MaintenanceManagerStatusPB_OpInstancePB DumpToPB() const;
+};
+
+// MaintenanceOp objects represent background operations that the
+// MaintenanceManager can schedule.  Once a MaintenanceOp is registered, the
+// manager will periodically poll it for statistics.  The registrant is
+// responsible for managing the memory associated with the MaintenanceOp object.
+// Op objects should be unregistered before being de-allocated.
+class MaintenanceOp {
+ public:
+  friend class MaintenanceManager;
+
+  // General indicator of how much IO the Op will use.
+  enum IOUsage {
+    LOW_IO_USAGE, // Low impact operations like removing a file, updating metadata.
+    HIGH_IO_USAGE // Everything else.
+  };
+
+  explicit MaintenanceOp(std::string name, IOUsage io_usage);
+  virtual ~MaintenanceOp();
+
+  // Unregister this op, if it is currently registered.
+  void Unregister();
+
+  // Update the op statistics.  This will be called every scheduling period
+  // (about a few times a second), so it should not be too expensive.  It's
+  // possible for the returned statistics to be invalid; the caller should
+  // call MaintenanceOpStats::valid() before using them.  This will be run
+  // under the MaintenanceManager lock.
+  virtual void UpdateStats(MaintenanceOpStats* stats) = 0;
+
+  // Prepare to perform the operation.  This will be run without holding the
+  // maintenance manager lock.  It should be short, since it is run from the
+  // context of the maintenance op scheduler thread rather than a worker thread.
+  // If this returns false, we will abort the operation.
+  virtual bool Prepare() = 0;
+
+  // Perform the operation.  This will be run without holding the maintenance
+  // manager lock, and may take a long time.
+  virtual void Perform() = 0;
+
+  // Returns the histogram for this op that tracks duration. Cannot be NULL.
+  virtual scoped_refptr<Histogram> DurationHistogram() const = 0;
+
+  // Returns the gauge for this op that tracks when this op is running. Cannot be NULL.
+  virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const = 0;
+
+  uint32_t running() { return running_; }
+
+  std::string name() const { return name_; }
+
+  IOUsage io_usage() const { return io_usage_; }
+
+  // Return true if the operation has been cancelled due to Unregister() pending.
+  bool cancelled() const {
+    return cancel_.Load();
+  }
+
+  // Cancel this operation, which prevents new instances of it from being scheduled
+  // regardless of whether the statistics indicate it is runnable. Instances may also
+  // optionally poll 'cancelled()' on a periodic basis to know if they should abort a
+  // lengthy operation in the middle of Perform().
+  void CancelAndDisable() {
+    cancel_.Store(true);
+  }
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(MaintenanceOp);
+
+  // The name of the operation.  Op names must be unique.
+  const std::string name_;
+
+  // The number of times that this op is currently running.
+  uint32_t running_;
+
+  // Set when we are trying to unregister the maintenance operation.
+  // Ongoing operations could read this boolean and cancel themselves.
+  // New operations will not be scheduled when this boolean is set.
+  AtomicBool cancel_;
+
+  // Condition variable which the UnregisterOp function can wait on.
+  //
+  // Note: 'cond_' is used with the MaintenanceManager's mutex. As such,
+  // it only exists when the op is registered.
+  gscoped_ptr<ConditionVariable> cond_;
+
+  // The MaintenanceManager with which this op is registered, or null
+  // if it is not registered.
+  std::shared_ptr<MaintenanceManager> manager_;
+
+  IOUsage io_usage_;
+};
+
+struct MaintenanceOpComparator {
+  bool operator() (const MaintenanceOp* lhs,
+                   const MaintenanceOp* rhs) const {
+    return lhs->name().compare(rhs->name()) < 0;
+  }
+};
+
+// The MaintenanceManager manages the scheduling of background operations such
+// as flushes or compactions.  It runs these operations in the background, in a
+// thread pool.  It uses information provided in MaintenanceOpStats objects to
+// decide which operations, if any, to run.
+class MaintenanceManager : public std::enable_shared_from_this<MaintenanceManager> {
+ public:
+  struct Options {
+    int32_t num_threads;
+    int32_t polling_interval_ms;
+    uint32_t history_size;
+  };
+
+  MaintenanceManager(const Options& options, std::string server_uuid);
+  ~MaintenanceManager();
+
+  // Start running the maintenance manager.
+  // Must be called at most once.
+  Status Start();
+  void Shutdown();
+
+  // Register an op with the manager.
+  void RegisterOp(MaintenanceOp* op);
+
+  // Unregister an op with the manager.
+  // If the Op is currently running, it will not be interrupted.  However, this
+  // function will block until the Op is finished.
+  void UnregisterOp(MaintenanceOp* op);
+
+  void GetMaintenanceManagerStatusDump(MaintenanceManagerStatusPB* out_pb);
+
+  void set_memory_pressure_func_for_tests(std::function<bool(double*)> f) {
+    std::lock_guard<Mutex> guard(lock_);
+    memory_pressure_func_ = std::move(f);
+  }
+
+  static const Options kDefaultOptions;
+
+ private:
+  FRIEND_TEST(MaintenanceManagerTest, TestLogRetentionPrioritization);
+  typedef std::map<MaintenanceOp*, MaintenanceOpStats,
+          MaintenanceOpComparator> OpMapTy;
+
+  // Return true if tests have currently disabled the maintenance
+  // manager by way of changing the gflags at runtime.
+  bool disabled_for_tests() const;
+
+  void RunSchedulerThread();
+
+  // Find the best op, or null if there is nothing we want to run.
+  //
+  // Returns the op, as well as a string explanation of why that op was chosen,
+  // suitable for logging.
+  std::pair<MaintenanceOp*, std::string> FindBestOp();
+
+  void LaunchOp(MaintenanceOp* op);
+
+  std::string LogPrefix() const;
+
+  const std::string server_uuid_;
+  const int32_t num_threads_;
+  OpMapTy ops_; // registered operations
+  Mutex lock_;
+  scoped_refptr<kudu::Thread> monitor_thread_;
+  gscoped_ptr<ThreadPool> thread_pool_;
+  ConditionVariable cond_;
+  bool shutdown_;
+  int32_t polling_interval_ms_;
+  uint64_t running_ops_;
+  // Vector used as a circular buffer for recently completed ops. Elements need to be added at
+  // the completed_ops_count_ % the vector's size and then the count needs to be incremented.
+  std::vector<OpInstance> completed_ops_;
+  int64_t completed_ops_count_;
+  Random rand_;
+
+  // Function which should return true if the server is under global memory pressure.
+  // This is indirected for testing purposes.
+  std::function<bool(double*)> memory_pressure_func_;
+
+  // Running instances lock.
+  //
+  // This is separate of lock_ so that worker threads don't need to take the
+  // global MM lock when beginning operations. When taking both
+  // running_instances_lock_ and lock_, lock_ must be acquired first.
+  Mutex running_instances_lock_;
+
+  // Maps thread ids to instances of an op that they're running. Instances should be added
+  // right before MaintenanceOp::Perform() is called, and should be removed right after
+  // MaintenanceOp::Perform() completes. Any thread that adds an instance to this map
+  // owns that instance, and the instance should exist until the same thread removes it.
+  //
+  // Protected by running_instances_lock_;
+  std::unordered_map<int64_t, OpInstance*> running_instances_;
+
+  DISALLOW_COPY_AND_ASSIGN(MaintenanceManager);
+};
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/maintenance_manager.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/maintenance_manager.proto b/be/src/kudu/util/maintenance_manager.proto
new file mode 100644
index 0000000..b6b1203
--- /dev/null
+++ b/be/src/kudu/util/maintenance_manager.proto
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+package kudu;
+
+option java_package = "org.apache.kudu";
+
+// Used to present the maintenance manager's internal state.
+message MaintenanceManagerStatusPB {
+  message MaintenanceOpPB {
+    required string name = 1;
+    // Number of times this operation is currently running.
+    required uint32 running = 2;
+    required bool runnable = 3;
+    required uint64 ram_anchored_bytes = 4;
+    required int64 logs_retained_bytes = 5;
+    required double perf_improvement = 6;
+  }
+
+  message OpInstancePB {
+    required int64 thread_id = 1;
+    required string name = 2;
+    // How long the op took to run. Only present if the instance completed.
+    optional int32 duration_millis = 3;
+    // Number of milliseconds since this operation started.
+    required int32 millis_since_start = 4;
+  }
+
+  // The next operation that would run.
+  optional MaintenanceOpPB best_op = 1;
+
+  // List of all the operations.
+  repeated MaintenanceOpPB registered_operations = 2;
+
+  // This list isn't in order of anything. Can contain the same operation multiple times.
+  repeated OpInstancePB running_operations = 3;
+
+  // This list isn't in order of anything. Can contain the same operation multiple times.
+  repeated OpInstancePB completed_operations = 4;
+}

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/make_shared.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/make_shared.h b/be/src/kudu/util/make_shared.h
new file mode 100644
index 0000000..649cae7
--- /dev/null
+++ b/be/src/kudu/util/make_shared.h
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <utility>
+
+// It isn't possible to use 'std::make_shared' on a class with private or protected
+// constructors. Using friends as a workaround worked in some earlier libc++/libstdcxx
+// versions, but in the latest versions there are some static_asserts that seem to defeat
+// this trickery. So, instead, we rely on the "curiously recurring template pattern" (CRTP)
+// to inject a static 'make_shared' function inside the class.
+//
+// See https://stackoverflow.com/questions/8147027/how-do-i-call-stdmake-shared-on-a-class-with-only-protected-or-private-const
+// for some details.
+//
+// Usage:
+//
+//  class MyClass : public enable_make_shared<MyClass> {
+//   public:
+//     ...
+//
+//   protected:
+//    // The constructor must be protected rather than private.
+//    MyClass(Foo arg1, Bar arg2) {
+//    }
+//
+//  }
+//
+//    shared_ptr<MyClass> foo = MyClass::make_shared(arg1, arg2);
+template<class T>
+class enable_make_shared { // NOLINT
+ public:
+
+  // Define a static make_shared member which constructs the public subclass
+  // and casts it back to the desired class.
+  template<typename... Arg>
+  static std::shared_ptr<T> make_shared(Arg&&... args) {
+    // Define a struct subclass with a public constructor which will be accessible
+    // from make_shared.
+    struct make_shared_enabler : public T { // NOLINT
+      explicit make_shared_enabler(Arg&&... args) : T(std::forward<Arg>(args)...) {
+      }
+    };
+
+    return ::std::make_shared<make_shared_enabler>(
+        ::std::forward<Arg>(args)...);
+  }
+};

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/malloc.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/malloc.cc b/be/src/kudu/util/malloc.cc
new file mode 100644
index 0000000..3fec2db
--- /dev/null
+++ b/be/src/kudu/util/malloc.cc
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "kudu/util/malloc.h"
+
+#if defined(__linux__)
+#include <malloc.h>
+#else
+#include <malloc/malloc.h>
+#endif // defined(__linux__)
+
+namespace kudu {
+
+int64_t kudu_malloc_usable_size(const void* obj) {
+#if defined(__linux__)
+  return malloc_usable_size(const_cast<void*>(obj));
+#else
+  return malloc_size(obj);
+#endif // defined(__linux__)
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/malloc.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/malloc.h b/be/src/kudu/util/malloc.h
new file mode 100644
index 0000000..e8a27c5
--- /dev/null
+++ b/be/src/kudu/util/malloc.h
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_MALLOC_H
+#define KUDU_UTIL_MALLOC_H
+
+#include <stdint.h>
+
+namespace kudu {
+
+// Simple wrapper for malloc_usable_size().
+//
+// Really just centralizes the const_cast, as this function is often called
+// on const pointers (i.e. "this" in a const method).
+int64_t kudu_malloc_usable_size(const void* obj);
+
+} // namespace kudu
+
+#endif // KUDU_UTIL_MALLOC_H

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/map-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/map-util-test.cc b/be/src/kudu/util/map-util-test.cc
new file mode 100644
index 0000000..3aa9448
--- /dev/null
+++ b/be/src/kudu/util/map-util-test.cc
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This unit test belongs in gutil, but it depends on test_main which is
+// part of util.
+#include "kudu/gutil/map-util.h"
+
+#include <map>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include <gtest/gtest.h>
+
+using std::map;
+using std::string;
+using std::shared_ptr;
+using std::unique_ptr;
+
+namespace kudu {
+
+TEST(FloorTest, TestMapUtil) {
+  map<int, int> my_map;
+
+  ASSERT_EQ(nullptr, FindFloorOrNull(my_map, 5));
+
+  my_map[5] = 5;
+  ASSERT_EQ(5, *FindFloorOrNull(my_map, 6));
+  ASSERT_EQ(5, *FindFloorOrNull(my_map, 5));
+  ASSERT_EQ(nullptr, FindFloorOrNull(my_map, 4));
+
+  my_map[1] = 1;
+  ASSERT_EQ(5, *FindFloorOrNull(my_map, 6));
+  ASSERT_EQ(5, *FindFloorOrNull(my_map, 5));
+  ASSERT_EQ(1, *FindFloorOrNull(my_map, 4));
+  ASSERT_EQ(1, *FindFloorOrNull(my_map, 1));
+  ASSERT_EQ(nullptr, FindFloorOrNull(my_map, 0));
+}
+
+TEST(ComputeIfAbsentTest, TestComputeIfAbsent) {
+  map<string, string> my_map;
+  auto result = ComputeIfAbsent(&my_map, "key", []{ return "hello_world"; });
+  ASSERT_EQ(*result, "hello_world");
+  auto result2 = ComputeIfAbsent(&my_map, "key", [] { return "hello_world2"; });
+  ASSERT_EQ(*result2, "hello_world");
+}
+
+TEST(ComputeIfAbsentTest, TestComputeIfAbsentAndReturnAbsense) {
+  map<string, string> my_map;
+  auto result = ComputeIfAbsentReturnAbsense(&my_map, "key", []{ return "hello_world"; });
+  ASSERT_TRUE(result.second);
+  ASSERT_EQ(*result.first, "hello_world");
+  auto result2 = ComputeIfAbsentReturnAbsense(&my_map, "key", [] { return "hello_world2"; });
+  ASSERT_FALSE(result2.second);
+  ASSERT_EQ(*result2.first, "hello_world");
+}
+
+TEST(FindPointeeOrNullTest, TestFindPointeeOrNull) {
+  map<string, unique_ptr<string>> my_map;
+  auto iter = my_map.emplace("key", unique_ptr<string>(new string("hello_world")));
+  ASSERT_TRUE(iter.second);
+  string* value = FindPointeeOrNull(my_map, "key");
+  ASSERT_TRUE(value != nullptr);
+  ASSERT_EQ(*value, "hello_world");
+  my_map.erase(iter.first);
+  value = FindPointeeOrNull(my_map, "key");
+  ASSERT_TRUE(value == nullptr);
+}
+
+TEST(EraseKeyReturnValuePtrTest, TestRawAndSmartSmartPointers) {
+  map<string, unique_ptr<string>> my_map;
+  unique_ptr<string> value = EraseKeyReturnValuePtr(&my_map, "key");
+  ASSERT_TRUE(value.get() == nullptr);
+  my_map.emplace("key", unique_ptr<string>(new string("hello_world")));
+  value = EraseKeyReturnValuePtr(&my_map, "key");
+  ASSERT_EQ(*value, "hello_world");
+  value.reset();
+  value = EraseKeyReturnValuePtr(&my_map, "key");
+  ASSERT_TRUE(value.get() == nullptr);
+  map<string, shared_ptr<string>> my_map2;
+  shared_ptr<string> value2 = EraseKeyReturnValuePtr(&my_map2, "key");
+  ASSERT_TRUE(value2.get() == nullptr);
+  my_map2.emplace("key", std::make_shared<string>("hello_world"));
+  value2 = EraseKeyReturnValuePtr(&my_map2, "key");
+  ASSERT_EQ(*value2, "hello_world");
+  map<string, string*> my_map_raw;
+  my_map_raw.emplace("key", new string("hello_world"));
+  value.reset(EraseKeyReturnValuePtr(&my_map_raw, "key"));
+  ASSERT_EQ(*value, "hello_world");
+}
+
+TEST(EmplaceTest, TestEmplace) {
+  // Map with move-only value type.
+  map<string, unique_ptr<string>> my_map;
+  unique_ptr<string> val(new string("foo"));
+  ASSERT_TRUE(EmplaceIfNotPresent(&my_map, "k", std::move(val)));
+  ASSERT_TRUE(ContainsKey(my_map, "k"));
+  ASSERT_FALSE(EmplaceIfNotPresent(&my_map, "k", nullptr))
+      << "Should return false for already-present";
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/mem_tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/mem_tracker-test.cc b/be/src/kudu/util/mem_tracker-test.cc
new file mode 100644
index 0000000..dbadd09
--- /dev/null
+++ b/be/src/kudu/util/mem_tracker-test.cc
@@ -0,0 +1,285 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/mem_tracker.h"
+
+#include <atomic>
+#include <functional>
+#include <memory>
+#include <string>
+#include <system_error>
+#include <thread>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+
+using std::equal_to;
+using std::hash;
+using std::pair;
+using std::shared_ptr;
+using std::string;
+using std::unordered_map;
+using std::vector;
+using strings::Substitute;
+
+TEST(MemTrackerTest, SingleTrackerNoLimit) {
+  shared_ptr<MemTracker> t = MemTracker::CreateTracker(-1, "t");
+  EXPECT_FALSE(t->has_limit());
+  t->Consume(10);
+  EXPECT_EQ(t->consumption(), 10);
+  t->Consume(10);
+  EXPECT_EQ(t->consumption(), 20);
+  t->Release(15);
+  EXPECT_EQ(t->consumption(), 5);
+  EXPECT_FALSE(t->LimitExceeded());
+  t->Release(5);
+  EXPECT_EQ(t->consumption(), 0);
+}
+
+TEST(MemTrackerTest, SingleTrackerWithLimit) {
+  shared_ptr<MemTracker> t = MemTracker::CreateTracker(11, "t");
+  EXPECT_TRUE(t->has_limit());
+  t->Consume(10);
+  EXPECT_EQ(t->consumption(), 10);
+  EXPECT_FALSE(t->LimitExceeded());
+  t->Consume(10);
+  EXPECT_EQ(t->consumption(), 20);
+  EXPECT_TRUE(t->LimitExceeded());
+  t->Release(15);
+  EXPECT_EQ(t->consumption(), 5);
+  EXPECT_FALSE(t->LimitExceeded());
+  t->Release(5);
+}
+
+TEST(MemTrackerTest, TrackerHierarchy) {
+  shared_ptr<MemTracker> p = MemTracker::CreateTracker(100, "p");
+  shared_ptr<MemTracker> c1 = MemTracker::CreateTracker(80, "c1", p);
+  shared_ptr<MemTracker> c2 = MemTracker::CreateTracker(50, "c2", p);
+
+  // everything below limits
+  c1->Consume(60);
+  EXPECT_EQ(c1->consumption(), 60);
+  EXPECT_FALSE(c1->LimitExceeded());
+  EXPECT_FALSE(c1->AnyLimitExceeded());
+  EXPECT_EQ(c2->consumption(), 0);
+  EXPECT_FALSE(c2->LimitExceeded());
+  EXPECT_FALSE(c2->AnyLimitExceeded());
+  EXPECT_EQ(p->consumption(), 60);
+  EXPECT_FALSE(p->LimitExceeded());
+  EXPECT_FALSE(p->AnyLimitExceeded());
+
+  // p goes over limit
+  c2->Consume(50);
+  EXPECT_EQ(c1->consumption(), 60);
+  EXPECT_FALSE(c1->LimitExceeded());
+  EXPECT_TRUE(c1->AnyLimitExceeded());
+  EXPECT_EQ(c2->consumption(), 50);
+  EXPECT_FALSE(c2->LimitExceeded());
+  EXPECT_TRUE(c2->AnyLimitExceeded());
+  EXPECT_EQ(p->consumption(), 110);
+  EXPECT_TRUE(p->LimitExceeded());
+
+  // c2 goes over limit, p drops below limit
+  c1->Release(20);
+  c2->Consume(10);
+  EXPECT_EQ(c1->consumption(), 40);
+  EXPECT_FALSE(c1->LimitExceeded());
+  EXPECT_FALSE(c1->AnyLimitExceeded());
+  EXPECT_EQ(c2->consumption(), 60);
+  EXPECT_TRUE(c2->LimitExceeded());
+  EXPECT_TRUE(c2->AnyLimitExceeded());
+  EXPECT_EQ(p->consumption(), 100);
+  EXPECT_FALSE(p->LimitExceeded());
+  c1->Release(40);
+  c2->Release(60);
+}
+
+class GcFunctionHelper {
+ public:
+  static const int kNumReleaseBytes = 1;
+
+  explicit GcFunctionHelper(MemTracker* tracker) : tracker_(tracker) { }
+
+  void GcFunc() { tracker_->Release(kNumReleaseBytes); }
+
+ private:
+  MemTracker* tracker_;
+};
+
+TEST(MemTrackerTest, STLContainerAllocator) {
+  shared_ptr<MemTracker> t = MemTracker::CreateTracker(-1, "t");
+  MemTrackerAllocator<int> vec_alloc(t);
+  MemTrackerAllocator<pair<const int, int>> map_alloc(t);
+
+  // Simple test: use the allocator in a vector.
+  {
+    vector<int, MemTrackerAllocator<int> > v(vec_alloc);
+    ASSERT_EQ(0, t->consumption());
+    v.reserve(5);
+    ASSERT_EQ(5 * sizeof(int), t->consumption());
+    v.reserve(10);
+    ASSERT_EQ(10 * sizeof(int), t->consumption());
+  }
+  ASSERT_EQ(0, t->consumption());
+
+  // Complex test: use it in an unordered_map, where it must be rebound in
+  // order to allocate the map's buckets.
+  {
+    unordered_map<int, int, hash<int>, equal_to<int>, MemTrackerAllocator<pair<const int, int>>> um(
+        10,
+        hash<int>(),
+        equal_to<int>(),
+        map_alloc);
+
+    // Don't care about the value (it depends on map internals).
+    ASSERT_GT(t->consumption(), 0);
+  }
+  ASSERT_EQ(0, t->consumption());
+}
+
+TEST(MemTrackerTest, FindFunctionsTakeOwnership) {
+  // In each test, ToString() would crash if the MemTracker is destroyed when
+  // 'm' goes out of scope.
+
+  shared_ptr<MemTracker> ref;
+  {
+    shared_ptr<MemTracker> m = MemTracker::CreateTracker(-1, "test");
+    ASSERT_TRUE(MemTracker::FindTracker(m->id(), &ref));
+  }
+  LOG(INFO) << ref->ToString();
+  ref.reset();
+
+  {
+    shared_ptr<MemTracker> m = MemTracker::CreateTracker(-1, "test");
+    ref = MemTracker::FindOrCreateGlobalTracker(-1, m->id());
+  }
+  LOG(INFO) << ref->ToString();
+  ref.reset();
+
+  vector<shared_ptr<MemTracker> > refs;
+  {
+    shared_ptr<MemTracker> m = MemTracker::CreateTracker(-1, "test");
+    MemTracker::ListTrackers(&refs);
+  }
+  for (const shared_ptr<MemTracker>& r : refs) {
+    LOG(INFO) << r->ToString();
+  }
+  refs.clear();
+}
+
+TEST(MemTrackerTest, ScopedTrackedConsumption) {
+  shared_ptr<MemTracker> m = MemTracker::CreateTracker(-1, "test");
+  ASSERT_EQ(0, m->consumption());
+  {
+    ScopedTrackedConsumption consumption(m, 1);
+    ASSERT_EQ(1, m->consumption());
+
+    consumption.Reset(3);
+    ASSERT_EQ(3, m->consumption());
+  }
+  ASSERT_EQ(0, m->consumption());
+}
+
+TEST(MemTrackerTest, CollisionDetection) {
+  shared_ptr<MemTracker> p = MemTracker::CreateTracker(-1, "parent");
+  shared_ptr<MemTracker> c = MemTracker::CreateTracker(-1, "child", p);
+  vector<shared_ptr<MemTracker>> all;
+
+  // Three trackers: root, parent, and child.
+  MemTracker::ListTrackers(&all);
+  ASSERT_EQ(3, all.size());
+
+  // Now only two because the child has been destroyed.
+  c.reset();
+  MemTracker::ListTrackers(&all);
+  ASSERT_EQ(2, all.size());
+  shared_ptr<MemTracker> not_found;
+  ASSERT_FALSE(MemTracker::FindTracker("child", &not_found, p));
+
+  // Let's duplicate the parent. It's not recommended, but it's allowed.
+  shared_ptr<MemTracker> p2 = MemTracker::CreateTracker(-1, "parent");
+  ASSERT_EQ(p->ToString(), p2->ToString());
+
+  // Only when we do a Find() operation do we crash.
+#ifndef NDEBUG
+  const string kDeathMsg = "Multiple memtrackers with same id";
+  EXPECT_DEATH({
+    shared_ptr<MemTracker> found;
+    MemTracker::FindTracker("parent", &found);
+  }, kDeathMsg);
+  EXPECT_DEATH({
+    MemTracker::FindOrCreateGlobalTracker(-1, "parent");
+  }, kDeathMsg);
+#endif
+}
+
+TEST(MemTrackerTest, TestMultiThreadedRegisterAndDestroy) {
+  std::atomic<bool> done(false);
+  vector<std::thread> threads;
+  for (int i = 0; i < 10; i++) {
+    threads.emplace_back([&done]{
+        while (!done.load()) {
+          shared_ptr<MemTracker> t = MemTracker::FindOrCreateGlobalTracker(
+              1000, "foo");
+        }
+      });
+  }
+
+  SleepFor(MonoDelta::FromSeconds(AllowSlowTests() ? 5 : 1));
+  done.store(true);
+  for (auto& t : threads) {
+    t.join();
+  }
+}
+
+TEST(MemTrackerTest, TestMultiThreadedCreateFind) {
+  shared_ptr<MemTracker> p = MemTracker::CreateTracker(-1, "p");
+  shared_ptr<MemTracker> c1 = MemTracker::CreateTracker(-1, "c1", p);
+  std::atomic<bool> done(false);
+  vector<std::thread> threads;
+  threads.emplace_back([&]{
+    while (!done.load()) {
+      shared_ptr<MemTracker> c1_copy;
+      CHECK(MemTracker::FindTracker(c1->id(), &c1_copy, p));
+    }
+  });
+  for (int i = 0; i < 5; i++) {
+    threads.emplace_back([&, i]{
+      while (!done.load()) {
+        shared_ptr<MemTracker> c2 =
+            MemTracker::CreateTracker(-1, Substitute("ci-$0", i), p);
+      }
+    });
+  }
+
+  SleepFor(MonoDelta::FromMilliseconds(500));
+  done.store(true);
+  for (auto& t : threads) {
+    t.join();
+  }
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/mem_tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/mem_tracker.cc b/be/src/kudu/util/mem_tracker.cc
new file mode 100644
index 0000000..f7294d1
--- /dev/null
+++ b/be/src/kudu/util/mem_tracker.cc
@@ -0,0 +1,296 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/mem_tracker.h"
+
+#include <algorithm>
+#include <cstddef>
+#include <deque>
+#include <limits>
+#include <list>
+#include <memory>
+#include <ostream>
+
+#include "kudu/gutil/once.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/process_memory.h"
+
+namespace kudu {
+
+// NOTE: this class has been adapted from Impala, so the code style varies
+// somewhat from kudu.
+
+using std::deque;
+using std::list;
+using std::shared_ptr;
+using std::string;
+using std::vector;
+using std::weak_ptr;
+
+using strings::Substitute;
+
+// The ancestor for all trackers. Every tracker is visible from the root down.
+static shared_ptr<MemTracker> root_tracker;
+static GoogleOnceType root_tracker_once = GOOGLE_ONCE_INIT;
+
+void MemTracker::CreateRootTracker() {
+  root_tracker.reset(new MemTracker(-1, "root", shared_ptr<MemTracker>()));
+  root_tracker->Init();
+}
+
+shared_ptr<MemTracker> MemTracker::CreateTracker(int64_t byte_limit,
+                                                 const string& id,
+                                                 shared_ptr<MemTracker> parent) {
+  shared_ptr<MemTracker> real_parent;
+  if (parent) {
+    real_parent = std::move(parent);
+  } else {
+    real_parent = GetRootTracker();
+  }
+  shared_ptr<MemTracker> tracker(new MemTracker(byte_limit, id, real_parent));
+  real_parent->AddChildTracker(tracker);
+  tracker->Init();
+
+  return tracker;
+}
+
+MemTracker::MemTracker(int64_t byte_limit, const string& id, shared_ptr<MemTracker> parent)
+    : limit_(byte_limit),
+      id_(id),
+      descr_(Substitute("memory consumption for $0", id)),
+      parent_(std::move(parent)),
+      consumption_(0) {
+  VLOG(1) << "Creating tracker " << ToString();
+}
+
+MemTracker::~MemTracker() {
+  VLOG(1) << "Destroying tracker " << ToString();
+  if (parent_) {
+    DCHECK(consumption() == 0) << "Memory tracker " << ToString()
+        << " has unreleased consumption " << consumption();
+    parent_->Release(consumption());
+
+    MutexLock l(parent_->child_trackers_lock_);
+    if (child_tracker_it_ != parent_->child_trackers_.end()) {
+      parent_->child_trackers_.erase(child_tracker_it_);
+      child_tracker_it_ = parent_->child_trackers_.end();
+    }
+  }
+}
+
+string MemTracker::ToString() const {
+  string s;
+  const MemTracker* tracker = this;
+  while (tracker) {
+    if (s != "") {
+      s += "->";
+    }
+    s += tracker->id();
+    tracker = tracker->parent_.get();
+  }
+  return s;
+}
+
+bool MemTracker::FindTracker(const string& id,
+                             shared_ptr<MemTracker>* tracker,
+                             const shared_ptr<MemTracker>& parent) {
+  return FindTrackerInternal(id, tracker, parent ? parent : GetRootTracker());
+}
+
+bool MemTracker::FindTrackerInternal(const string& id,
+                                     shared_ptr<MemTracker>* tracker,
+                                     const shared_ptr<MemTracker>& parent) {
+  DCHECK(parent != NULL);
+
+  list<weak_ptr<MemTracker>> children;
+  {
+    MutexLock l(parent->child_trackers_lock_);
+    children = parent->child_trackers_;
+  }
+
+  // Search for the matching child without holding the parent's lock.
+  //
+  // If the lock were held while searching, it'd be possible for 'child' to be
+  // the last live ref to a tracker, which would lead to a recursive
+  // acquisition of the parent lock during the 'child' destructor call.
+  vector<shared_ptr<MemTracker>> found;
+  for (const auto& child_weak : children) {
+    shared_ptr<MemTracker> child = child_weak.lock();
+    if (child && child->id() == id) {
+      found.emplace_back(std::move(child));
+    }
+  }
+  if (PREDICT_TRUE(found.size() == 1)) {
+    *tracker = found[0];
+    return true;
+  } else if (found.size() > 1) {
+    LOG(DFATAL) <<
+        Substitute("Multiple memtrackers with same id ($0) found on parent $1",
+                   id, parent->ToString());
+    *tracker = found[0];
+    return true;
+  }
+  return false;
+}
+
+shared_ptr<MemTracker> MemTracker::FindOrCreateGlobalTracker(
+    int64_t byte_limit,
+    const string& id) {
+  // The calls below comprise a critical section, but we can't use the root
+  // tracker's child_trackers_lock_ to synchronize it as the lock must be
+  // released during FindTrackerInternal(). Since this function creates
+  // globally-visible MemTrackers which are the exception rather than the rule,
+  // it's reasonable to synchronize their creation on a singleton lock.
+  static Mutex find_or_create_lock;
+  MutexLock l(find_or_create_lock);
+
+  shared_ptr<MemTracker> found;
+  if (FindTrackerInternal(id, &found, GetRootTracker())) {
+    return found;
+  }
+  return CreateTracker(byte_limit, id, GetRootTracker());
+}
+
+void MemTracker::ListTrackers(vector<shared_ptr<MemTracker>>* trackers) {
+  trackers->clear();
+  deque<shared_ptr<MemTracker> > to_process;
+  to_process.push_front(GetRootTracker());
+  while (!to_process.empty()) {
+    shared_ptr<MemTracker> t = to_process.back();
+    to_process.pop_back();
+
+    trackers->push_back(t);
+    {
+      MutexLock l(t->child_trackers_lock_);
+      for (const auto& child_weak : t->child_trackers_) {
+        shared_ptr<MemTracker> child = child_weak.lock();
+        if (child) {
+          to_process.emplace_back(std::move(child));
+        }
+      }
+    }
+  }
+}
+
+void MemTracker::Consume(int64_t bytes) {
+  if (bytes < 0) {
+    Release(-bytes);
+    return;
+  }
+
+  if (bytes == 0) {
+    return;
+  }
+  for (auto& tracker : all_trackers_) {
+    tracker->consumption_.IncrementBy(bytes);
+  }
+}
+
+bool MemTracker::TryConsume(int64_t bytes) {
+  if (bytes <= 0) {
+    Release(-bytes);
+    return true;
+  }
+
+  int i = 0;
+  // Walk the tracker tree top-down, consuming memory from each in turn.
+  for (i = all_trackers_.size() - 1; i >= 0; --i) {
+    MemTracker *tracker = all_trackers_[i];
+    if (tracker->limit_ < 0) {
+      tracker->consumption_.IncrementBy(bytes);
+    } else {
+      if (!tracker->consumption_.TryIncrementBy(bytes, tracker->limit_)) {
+        break;
+      }
+    }
+  }
+  // Everyone succeeded, return.
+  if (i == -1) {
+    return true;
+  }
+
+  // Someone failed, roll back the ones that succeeded.
+  // TODO(todd): this doesn't roll it back completely since the max values for
+  // the updated trackers aren't decremented. The max values are only used
+  // for error reporting so this is probably okay. Rolling those back is
+  // pretty hard; we'd need something like 2PC.
+  for (int j = all_trackers_.size() - 1; j > i; --j) {
+    all_trackers_[j]->consumption_.IncrementBy(-bytes);
+  }
+  return false;
+}
+
+void MemTracker::Release(int64_t bytes) {
+  if (bytes < 0) {
+    Consume(-bytes);
+    return;
+  }
+
+  if (bytes == 0) {
+    return;
+  }
+
+  for (auto& tracker : all_trackers_) {
+    tracker->consumption_.IncrementBy(-bytes);
+  }
+  process_memory::MaybeGCAfterRelease(bytes);
+}
+
+bool MemTracker::AnyLimitExceeded() {
+  for (const auto& tracker : limit_trackers_) {
+    if (tracker->LimitExceeded()) {
+      return true;
+    }
+  }
+  return false;
+}
+
+int64_t MemTracker::SpareCapacity() const {
+  int64_t result = std::numeric_limits<int64_t>::max();
+  for (const auto& tracker : limit_trackers_) {
+    int64_t mem_left = tracker->limit() - tracker->consumption();
+    result = std::min(result, mem_left);
+  }
+  return result;
+}
+
+
+void MemTracker::Init() {
+  // populate all_trackers_ and limit_trackers_
+  MemTracker* tracker = this;
+  while (tracker) {
+    all_trackers_.push_back(tracker);
+    if (tracker->has_limit()) limit_trackers_.push_back(tracker);
+    tracker = tracker->parent_.get();
+  }
+  DCHECK_GT(all_trackers_.size(), 0);
+  DCHECK_EQ(all_trackers_[0], this);
+}
+
+void MemTracker::AddChildTracker(const shared_ptr<MemTracker>& tracker) {
+  MutexLock l(child_trackers_lock_);
+  tracker->child_tracker_it_ = child_trackers_.insert(child_trackers_.end(), tracker);
+}
+
+shared_ptr<MemTracker> MemTracker::GetRootTracker() {
+  GoogleOnceInit(&root_tracker_once, &MemTracker::CreateRootTracker);
+  return root_tracker;
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/mem_tracker.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/mem_tracker.h b/be/src/kudu/util/mem_tracker.h
new file mode 100644
index 0000000..14db374
--- /dev/null
+++ b/be/src/kudu/util/mem_tracker.h
@@ -0,0 +1,272 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_MEM_TRACKER_H
+#define KUDU_UTIL_MEM_TRACKER_H
+
+#include <cstdint>
+#include <list>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/util/high_water_mark.h"
+#include "kudu/util/mutex.h"
+
+namespace kudu {
+
+// A MemTracker tracks memory consumption; it contains an optional limit and is
+// arranged into a tree structure such that the consumption tracked by a
+// MemTracker is also tracked by its ancestors.
+//
+// The MemTracker hierarchy is rooted in a single static MemTracker.
+// The root MemTracker always exists, and it is the common
+// ancestor to all MemTrackers. All operations that discover MemTrackers begin
+// at the root and work their way down the tree, while operations that deal
+// with adjusting memory consumption begin at a particular MemTracker and work
+// their way up the tree to the root. All MemTrackers (except the root) must
+// have a parent. As a rule, all children belonging to a parent should have
+// unique ids, but this is only enforced during a Find() operation to allow for
+// transient duplicates (e.g. the web UI grabbing very short-lived references
+// to all MemTrackers while rendering a web page). This also means id
+// uniqueness only exists where it's actually needed.
+//
+// When a MemTracker begins its life, it has a strong reference to its parent
+// and the parent has a weak reference to it. Both remain for the lifetime of
+// the MemTracker.
+//
+// Memory consumption is tracked via calls to Consume()/Release(), either to
+// the tracker itself or to one of its descendants.
+//
+// This class is thread-safe.
+class MemTracker : public std::enable_shared_from_this<MemTracker> {
+ public:
+  ~MemTracker();
+
+  // Creates and adds the tracker to the tree so that it can be retrieved with
+  // FindTracker/FindOrCreateTracker.
+  //
+  // byte_limit < 0 means no limit; 'id' is a used as a label to uniquely identify
+  // the MemTracker for the below Find...() calls as well as the web UI.
+  //
+  // Use the two-argument form if there is no parent.
+  static std::shared_ptr<MemTracker> CreateTracker(
+      int64_t byte_limit,
+      const std::string& id,
+      std::shared_ptr<MemTracker> parent = std::shared_ptr<MemTracker>());
+
+  // If a tracker with the specified 'id' and 'parent' exists in the tree, sets
+  // 'tracker' to reference that instance. Returns false if no such tracker
+  // exists.
+  //
+  // Use the two-argument form if there is no parent.
+  //
+  // Note: this function will enforce that 'id' is unique amongst the children
+  // of 'parent'.
+  static bool FindTracker(
+      const std::string& id,
+      std::shared_ptr<MemTracker>* tracker,
+      const std::shared_ptr<MemTracker>& parent = std::shared_ptr<MemTracker>());
+
+  // If a global tracker with the specified 'id' exists in the tree, returns a
+  // shared_ptr to that instance. Otherwise, creates a new MemTracker with the
+  // specified byte_limit and id, parented to the root MemTracker.
+  //
+  // Note: this function will enforce that 'id' is unique amongst the children
+  // of the root MemTracker.
+  static std::shared_ptr<MemTracker> FindOrCreateGlobalTracker(
+      int64_t byte_limit, const std::string& id);
+
+  // Returns a list of all the valid trackers.
+  static void ListTrackers(std::vector<std::shared_ptr<MemTracker> >* trackers);
+
+  // Gets a shared_ptr to the "root" tracker, creating it if necessary.
+  static std::shared_ptr<MemTracker> GetRootTracker();
+
+  // Increases consumption of this tracker and its ancestors by 'bytes'.
+  void Consume(int64_t bytes);
+
+  // Increases consumption of this tracker and its ancestors by 'bytes' only if
+  // they can all consume 'bytes'. If this brings any of them over, none of them
+  // are updated.
+  // Returns true if the try succeeded.
+  bool TryConsume(int64_t bytes);
+
+  // Decreases consumption of this tracker and its ancestors by 'bytes'.
+  //
+  // This will also cause the process to periodically trigger tcmalloc "ReleaseMemory"
+  // to ensure that memory is released to the OS.
+  void Release(int64_t bytes);
+
+  // Returns true if a valid limit of this tracker or one of its ancestors is
+  // exceeded.
+  bool AnyLimitExceeded();
+
+  // If this tracker has a limit, checks the limit and attempts to free up some memory if
+  // the limit is exceeded by calling any added GC functions. Returns true if the limit is
+  // exceeded after calling the GC functions. Returns false if there is no limit.
+  bool LimitExceeded() {
+    return limit_ >= 0 && limit_ < consumption();
+  }
+
+  // Returns the maximum consumption that can be made without exceeding the limit on
+  // this tracker or any of its parents. Returns int64_t::max() if there are no
+  // limits and a negative value if any limit is already exceeded.
+  int64_t SpareCapacity() const;
+
+
+  int64_t limit() const { return limit_; }
+  bool has_limit() const { return limit_ >= 0; }
+  const std::string& id() const { return id_; }
+
+  // Returns the memory consumed in bytes.
+  int64_t consumption() const {
+    return consumption_.current_value();
+  }
+
+  int64_t peak_consumption() const { return consumption_.max_value(); }
+
+  // Retrieve the parent tracker, or NULL If one is not set.
+  std::shared_ptr<MemTracker> parent() const { return parent_; }
+
+  // Returns a textual representation of the tracker that is likely (but not
+  // guaranteed) to be globally unique.
+  std::string ToString() const;
+
+ private:
+  // byte_limit < 0 means no limit
+  // 'id' is the label for LogUsage() and web UI.
+  MemTracker(int64_t byte_limit, const std::string& id, std::shared_ptr<MemTracker> parent);
+
+  // Further initializes the tracker.
+  void Init();
+
+  // Adds tracker to child_trackers_.
+  void AddChildTracker(const std::shared_ptr<MemTracker>& tracker);
+
+  // Variant of FindTracker() that must be called with a non-NULL parent.
+  static bool FindTrackerInternal(
+      const std::string& id,
+      std::shared_ptr<MemTracker>* tracker,
+      const std::shared_ptr<MemTracker>& parent);
+
+  // Creates the root tracker.
+  static void CreateRootTracker();
+
+  int64_t limit_;
+  const std::string id_;
+  const std::string descr_;
+  std::shared_ptr<MemTracker> parent_;
+
+  HighWaterMark consumption_;
+
+  // this tracker plus all of its ancestors
+  std::vector<MemTracker*> all_trackers_;
+  // all_trackers_ with valid limits
+  std::vector<MemTracker*> limit_trackers_;
+
+  // All the child trackers of this tracker. Used for error reporting and
+  // listing only (i.e. updating the consumption of a parent tracker does not
+  // update that of its children).
+  mutable Mutex child_trackers_lock_;
+  std::list<std::weak_ptr<MemTracker>> child_trackers_;
+
+  // Iterator into parent_->child_trackers_ for this object. Stored to have O(1)
+  // remove.
+  std::list<std::weak_ptr<MemTracker>>::iterator child_tracker_it_;
+};
+
+// An std::allocator that manipulates a MemTracker during allocation
+// and deallocation.
+template<typename T, typename Alloc = std::allocator<T> >
+class MemTrackerAllocator : public Alloc {
+ public:
+  typedef typename Alloc::pointer pointer;
+  typedef typename Alloc::const_pointer const_pointer;
+  typedef typename Alloc::size_type size_type;
+
+  explicit MemTrackerAllocator(std::shared_ptr<MemTracker> mem_tracker)
+      : mem_tracker_(std::move(mem_tracker)) {}
+
+  // This constructor is used for rebinding.
+  template <typename U>
+  MemTrackerAllocator(const MemTrackerAllocator<U>& allocator)
+      : Alloc(allocator),
+        mem_tracker_(allocator.mem_tracker()) {
+  }
+
+  ~MemTrackerAllocator() {
+  }
+
+  pointer allocate(size_type n, const_pointer hint = 0) {
+    // Ideally we'd use TryConsume() here to enforce the tracker's limit.
+    // However, that means throwing bad_alloc if the limit is exceeded, and
+    // it's not clear that the rest of Kudu can handle that.
+    mem_tracker_->Consume(n * sizeof(T));
+    return Alloc::allocate(n, hint);
+  }
+
+  void deallocate(pointer p, size_type n) {
+    Alloc::deallocate(p, n);
+    mem_tracker_->Release(n * sizeof(T));
+  }
+
+  // This allows an allocator<T> to be used for a different type.
+  template <class U>
+  struct rebind {
+    typedef MemTrackerAllocator<U, typename Alloc::template rebind<U>::other> other;
+  };
+
+  const std::shared_ptr<MemTracker>& mem_tracker() const { return mem_tracker_; }
+
+ private:
+  std::shared_ptr<MemTracker> mem_tracker_;
+};
+
+// Convenience class that adds memory consumption to a tracker when declared,
+// releasing it when the end of scope is reached.
+class ScopedTrackedConsumption {
+ public:
+  ScopedTrackedConsumption(std::shared_ptr<MemTracker> tracker,
+                           int64_t to_consume)
+      : tracker_(std::move(tracker)), consumption_(to_consume) {
+    DCHECK(tracker_);
+    tracker_->Consume(consumption_);
+  }
+
+  void Reset(int64_t new_consumption) {
+    // Consume(-x) is the same as Release(x).
+    tracker_->Consume(new_consumption - consumption_);
+    consumption_ = new_consumption;
+  }
+
+  ~ScopedTrackedConsumption() {
+    tracker_->Release(consumption_);
+  }
+
+  int64_t consumption() const { return consumption_; }
+
+ private:
+  std::shared_ptr<MemTracker> tracker_;
+  int64_t consumption_;
+};
+
+} // namespace kudu
+
+#endif // KUDU_UTIL_MEM_TRACKER_H

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/memcmpable_varint-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/memcmpable_varint-test.cc b/be/src/kudu/util/memcmpable_varint-test.cc
new file mode 100644
index 0000000..fcbe25d
--- /dev/null
+++ b/be/src/kudu/util/memcmpable_varint-test.cc
@@ -0,0 +1,220 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <ostream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/faststring.h"
+#include "kudu/util/hexdump.h"
+#include "kudu/util/memcmpable_varint.h"
+#include "kudu/util/random.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/stopwatch.h"  // IWYU pragma: keep
+#include "kudu/util/test_util.h"
+
+// Add operator<< to print pairs, used in a test below.
+// This has to be done in the 'std' namespace due to the way that
+// template resolution works.
+namespace std {
+template<typename T1, typename T2>
+ostream &operator <<(ostream &os, const pair<T1, T2> &pair) {
+  return os << "(" << pair.first << ", " << pair.second << ")";
+}
+}
+
+using std::make_pair;
+using std::pair;
+using std::vector;
+
+namespace kudu {
+
+class TestMemcmpableVarint : public KuduTest {
+ protected:
+  TestMemcmpableVarint() : random_(SeedRandom()) {}
+
+  // Random number generator that generates different length integers
+  // with equal probability -- i.e it is equally as likely to generate
+  // a number with 8 bits as it is to generate one with 64 bits.
+  // This is useful for testing varint implementations, where a uniform
+  // random is skewed towards generating longer integers.
+  uint64_t Rand64WithRandomBitLength() {
+    return random_.Next64() >> random_.Uniform(64);
+  }
+
+  Random random_;
+};
+
+static void DoRoundTripTest(uint64_t to_encode) {
+  static faststring buf;
+  buf.clear();
+  PutMemcmpableVarint64(&buf, to_encode);
+
+  uint64_t decoded;
+  Slice slice(buf);
+  bool success = GetMemcmpableVarint64(&slice, &decoded);
+  ASSERT_TRUE(success);
+  ASSERT_EQ(to_encode, decoded);
+  ASSERT_TRUE(slice.empty());
+}
+
+
+TEST_F(TestMemcmpableVarint, TestRoundTrip) {
+  // Test the first 100K integers
+  // (exercises the special cases for <= 67823 in the code)
+  for (int i = 0; i < 100000; i++) {
+    DoRoundTripTest(i);
+  }
+
+  // Test a bunch of random integers (which are likely to be many bytes)
+  for (int i = 0; i < 100000; i++) {
+    DoRoundTripTest(random_.Next64());
+  }
+}
+
+
+// Test that a composite key can be made up of multiple memcmpable
+// varints strung together, and that the resulting key compares the
+// same as the original pair of integers (i.e left-to-right).
+TEST_F(TestMemcmpableVarint, TestCompositeKeys) {
+  faststring buf1;
+  faststring buf2;
+
+  const int n_trials = 1000;
+
+  for (int i = 0; i < n_trials; i++) {
+    buf1.clear();
+    buf2.clear();
+
+    pair<uint64_t, uint64_t> p1 =
+        make_pair(Rand64WithRandomBitLength(), Rand64WithRandomBitLength());
+    PutMemcmpableVarint64(&buf1, p1.first);
+    PutMemcmpableVarint64(&buf1, p1.second);
+
+    pair<uint64_t, uint64_t> p2 =
+        make_pair(Rand64WithRandomBitLength(), Rand64WithRandomBitLength());
+    PutMemcmpableVarint64(&buf2, p2.first);
+    PutMemcmpableVarint64(&buf2, p2.second);
+
+    SCOPED_TRACE(testing::Message() << p1 << "\n" << HexDump(Slice(buf1))
+                 << "  vs\n" << p2 << "\n" << HexDump(Slice(buf2)));
+    if (p1 < p2) {
+      ASSERT_LT(Slice(buf1).compare(Slice(buf2)), 0);
+    } else if (p1 > p2) {
+      ASSERT_GT(Slice(buf1).compare(Slice(buf2)), 0);
+    } else {
+      ASSERT_EQ(Slice(buf1).compare(Slice(buf2)), 0);
+    }
+  }
+}
+
+// Similar to the above test, but instead of being randomized, specifically
+// tests "interesting" values -- i.e values around the boundaries of where
+// the encoding changes its number of bytes.
+TEST_F(TestMemcmpableVarint, TestInterestingCompositeKeys) {
+  const vector<uint64_t> interesting_values = {
+    0, 1, 240, // 1 byte
+    241, 2000, 2287, // 2 bytes
+    2288, 40000, 67823, // 3 bytes
+    67824, 1ULL << 23, (1ULL << 24) - 1, // 4 bytes
+    1ULL << 24, 1ULL << 30, (1ULL << 32) - 1, // 5 bytes
+  };
+
+  faststring buf1;
+  faststring buf2;
+
+  for (uint64_t v1 : interesting_values) {
+    for (uint64_t v2 : interesting_values) {
+      buf1.clear();
+      pair<uint64_t, uint64_t> p1 = make_pair(v1, v2);
+      PutMemcmpableVarint64(&buf1, p1.first);
+      PutMemcmpableVarint64(&buf1, p1.second);
+
+      for (uint64_t v3 : interesting_values) {
+        for (uint64_t v4 : interesting_values) {
+          buf2.clear();
+          pair<uint64_t, uint64_t> p2 = make_pair(v3, v4);
+          PutMemcmpableVarint64(&buf2, p2.first);
+          PutMemcmpableVarint64(&buf2, p2.second);
+
+          SCOPED_TRACE(testing::Message() << p1 << "\n" << HexDump(Slice(buf1))
+                       << "  vs\n" << p2 << "\n" << HexDump(Slice(buf2)));
+          if (p1 < p2) {
+            ASSERT_LT(Slice(buf1).compare(Slice(buf2)), 0);
+          } else if (p1 > p2) {
+            ASSERT_GT(Slice(buf1).compare(Slice(buf2)), 0);
+          } else {
+            ASSERT_EQ(Slice(buf1).compare(Slice(buf2)), 0);
+          }
+        }
+      }
+    }
+  }
+}
+
+////////////////////////////////////////////////////////////
+// Benchmarks
+////////////////////////////////////////////////////////////
+
+#ifdef NDEBUG
+TEST_F(TestMemcmpableVarint, BenchmarkEncode) {
+  faststring buf;
+
+  int sum_sizes = 0; // need to do something with results to force evaluation
+
+  LOG_TIMING(INFO, "Encoding integers") {
+    for (int trial = 0; trial < 100; trial++) {
+      for (uint64_t i = 0; i < 1000000; i++) {
+        buf.clear();
+        PutMemcmpableVarint64(&buf, i);
+        sum_sizes += buf.size();
+      }
+    }
+  }
+  ASSERT_GT(sum_sizes, 1); // use 'sum_sizes' to avoid optimizing it out.
+}
+
+TEST_F(TestMemcmpableVarint, BenchmarkDecode) {
+  faststring buf;
+
+  // Encode 1M integers into the buffer
+  for (uint64_t i = 0; i < 1000000; i++) {
+    PutMemcmpableVarint64(&buf, i);
+  }
+
+  // Decode the whole buffer 100 times.
+  LOG_TIMING(INFO, "Decoding integers") {
+    uint64_t sum_vals = 0;
+    for (int trial = 0; trial < 100; trial++) {
+      Slice s(buf);
+      while (!s.empty()) {
+        uint64_t decoded;
+        CHECK(GetMemcmpableVarint64(&s, &decoded));
+        sum_vals += decoded;
+      }
+    }
+    ASSERT_GT(sum_vals, 1); // use 'sum_vals' to avoid optimizing it out.
+  }
+}
+
+#endif
+
+} // namespace kudu