You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:29 UTC

[17/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from kudu@334ecafd

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/knapsack_solver.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/knapsack_solver.h b/be/src/kudu/util/knapsack_solver.h
new file mode 100644
index 0000000..2c37065
--- /dev/null
+++ b/be/src/kudu/util/knapsack_solver.h
@@ -0,0 +1,269 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_KNAPSACK_SOLVER_H
+#define KUDU_UTIL_KNAPSACK_SOLVER_H
+
+#include <glog/logging.h>
+#include <algorithm>
+#include <utility>
+#include <vector>
+#include "kudu/gutil/macros.h"
+
+namespace kudu {
+
+// Solver for the 0-1 knapsack problem. This uses dynamic programming
+// to solve the problem exactly.
+//
+// Given a knapsack capacity of 'W' and a number of potential items 'n',
+// this solver is O(nW) time and space.
+//
+// This implementation is cribbed from wikipedia. The only interesting
+// bit here that doesn't directly match the pseudo-code is that we
+// maintain the "taken" bitmap keeping track of which items were
+// taken, so we can efficiently "trace back" the chosen items.
+template<class Traits>
+class KnapsackSolver {
+ public:
+  typedef typename Traits::item_type item_type;
+  typedef typename Traits::value_type value_type;
+  typedef std::pair<int, value_type> solution_type;
+
+  KnapsackSolver() {}
+  ~KnapsackSolver() {}
+
+  // Solve a knapsack problem in one shot. Finds the set of
+  // items in 'items' such that their weights add up to no
+  // more than 'knapsack_capacity' and maximizes the sum
+  // of their values.
+  // The indexes of the chosen items are stored in 'chosen_items',
+  // and the maximal value is stored in 'optimal_value'.
+  void Solve(std::vector<item_type> &items,
+             int knapsack_capacity,
+             std::vector<int>* chosen_items,
+             value_type* optimal_value);
+
+
+  // The following functions are a more advanced API for solving
+  // knapsack problems, allowing the caller to obtain incremental
+  // results as each item is considered. See the implementation of
+  // Solve() for usage.
+
+  // Prepare to solve a knapsack problem with the given capacity and
+  // item set. The vector of items must remain valid and unchanged
+  // until the next call to Reset().
+  void Reset(int knapsack_capacity,
+             const std::vector<item_type>* items);
+
+  // Process the next item in 'items'. Returns false if there
+  // were no more items to process.
+  bool ProcessNext();
+
+  // Returns the current best solution after the most recent ProcessNext
+  // call. *solution is a pair of (knapsack weight used, value obtained).
+  solution_type GetSolution();
+
+  // Trace the path of item indexes used to achieve the given best
+  // solution as of the latest ProcessNext() call.
+  void TracePath(const solution_type& best,
+                 std::vector<int>* chosen_items);
+
+ private:
+
+  // The state kept by the DP algorithm.
+  class KnapsackBlackboard {
+   public:
+    typedef std::pair<int, value_type> solution_type;
+    KnapsackBlackboard() :
+      n_items_(0),
+      n_weights_(0),
+      cur_item_idx_(0),
+      best_solution_(0, 0) {
+    }
+
+    void ResizeAndClear(int n_items, int max_weight);
+
+    // Current maximum value at the given weight
+    value_type &max_at(int weight) {
+      DCHECK_GE(weight, 0);
+      DCHECK_LT(weight, n_weights_);
+      return max_value_[weight];
+    }
+
+    // Consider the next item to be put into the knapsack
+    // Moves the "state" of the solution forward
+    void Advance(value_type new_val, int new_wt);
+
+    // How many items have been considered
+    int current_item_index() const { return cur_item_idx_; }
+
+    bool item_taken(int item, int weight) const {
+      DCHECK_GE(weight, 0);
+      DCHECK_LT(weight, n_weights_);
+      DCHECK_GE(item, 0);
+      DCHECK_LT(item, n_items_);
+      return item_taken_[index(item, weight)];
+    }
+
+    solution_type best_solution() { return best_solution_; }
+
+    bool done() { return cur_item_idx_ == n_items_; }
+
+   private:
+    void MarkTaken(int item, int weight) {
+      item_taken_[index(item, weight)] = true;
+    }
+
+    // If the dynamic programming matrix has more than this number of cells,
+    // then warn.
+    static const int kWarnDimension = 10000000;
+
+    int index(int item, int weight) const {
+      return n_weights_ * item + weight;
+    }
+
+    // vector with maximum value at the i-th position meaning that it is
+    // the maximum value you can get given a knapsack of weight capacity i
+    // while only considering items 0..cur_item_idx_-1
+    std::vector<value_type> max_value_;
+    std::vector<bool> item_taken_; // TODO: record difference vectors?
+    int n_items_, n_weights_;
+    int cur_item_idx_;
+    // Best current solution
+    solution_type best_solution_;
+
+    DISALLOW_COPY_AND_ASSIGN(KnapsackBlackboard);
+  };
+
+  KnapsackBlackboard bb_;
+  const std::vector<item_type>* items_;
+  int knapsack_capacity_;
+
+  DISALLOW_COPY_AND_ASSIGN(KnapsackSolver);
+};
+
+template<class Traits>
+inline void KnapsackSolver<Traits>::Reset(int knapsack_capacity,
+                                          const std::vector<item_type>* items) {
+  DCHECK_GE(knapsack_capacity, 0);
+  items_ = items;
+  knapsack_capacity_ = knapsack_capacity;
+  bb_.ResizeAndClear(items->size(), knapsack_capacity);
+}
+
+template<class Traits>
+inline bool KnapsackSolver<Traits>::ProcessNext() {
+  if (bb_.done()) return false;
+
+  const item_type& item = (*items_)[bb_.current_item_index()];
+  int item_weight = Traits::get_weight(item);
+  value_type item_value = Traits::get_value(item);
+  bb_.Advance(item_value, item_weight);
+
+  return true;
+}
+
+template<class Traits>
+inline void KnapsackSolver<Traits>::Solve(std::vector<item_type> &items,
+                                          int knapsack_capacity,
+                                          std::vector<int>* chosen_items,
+                                          value_type* optimal_value) {
+  Reset(knapsack_capacity, &items);
+
+  while (ProcessNext()) {
+  }
+
+  solution_type best = GetSolution();
+  *optimal_value = best.second;
+  TracePath(best, chosen_items);
+}
+
+template<class Traits>
+inline typename KnapsackSolver<Traits>::solution_type KnapsackSolver<Traits>::GetSolution() {
+  return bb_.best_solution();
+}
+
+template<class Traits>
+inline void KnapsackSolver<Traits>::TracePath(const solution_type& best,
+                                              std::vector<int>* chosen_items) {
+  chosen_items->clear();
+  // Retrace back which set of items corresponded to this value.
+  int w = best.first;
+  chosen_items->clear();
+  for (int k = bb_.current_item_index() - 1; k >= 0; k--) {
+    if (bb_.item_taken(k, w)) {
+      const item_type& taken = (*items_)[k];
+      chosen_items->push_back(k);
+      w -= Traits::get_weight(taken);
+      DCHECK_GE(w, 0);
+    }
+  }
+}
+
+template<class Traits>
+void KnapsackSolver<Traits>::KnapsackBlackboard::ResizeAndClear(int n_items,
+                                                                int max_weight) {
+  CHECK_GT(n_items, 0);
+  CHECK_GE(max_weight, 0);
+
+  // Rather than zero-indexing the weights, we size the array from
+  // 0 to max_weight. This avoids having to subtract 1 every time
+  // we index into the array.
+  n_weights_ = max_weight + 1;
+  max_value_.resize(n_weights_);
+
+  int dimension = index(n_items, n_weights_);
+  if (dimension > kWarnDimension) {
+    LOG(WARNING) << "Knapsack problem " << n_items << "x" << n_weights_
+                 << " is large: may be inefficient!";
+  }
+  item_taken_.resize(dimension);
+  n_items_ = n_items;
+
+  // Clear
+  std::fill(max_value_.begin(), max_value_.end(), 0);
+  std::fill(item_taken_.begin(), item_taken_.end(), false);
+  best_solution_ = std::make_pair(0, 0);
+
+  cur_item_idx_ = 0;
+}
+
+template<class Traits>
+void KnapsackSolver<Traits>::KnapsackBlackboard::Advance(value_type new_val, int new_wt) {
+  // Use the dynamic programming formula:
+  // Define mv(i, j) as maximum value considering items 0..i-1 with knapsack weight j
+  // Then:
+  // if j - weight(i) >= 0, then:
+  // mv(i, j) = max(mv(i-1, j), mv(i-1, j-weight(i)) + value(j))
+  // else mv(i, j) = mv(i-1, j)
+  // Since the recursive formula requires an access of j-weight(i), we go in reverse.
+  for (int j = n_weights_ - 1; j >= new_wt ; --j) {
+    value_type val_if_taken = max_value_[j - new_wt] + new_val;
+    if (max_value_[j] < val_if_taken) {
+      max_value_[j] = val_if_taken;
+      MarkTaken(cur_item_idx_, j);
+      // Check if new solution found
+      if (best_solution_.second < val_if_taken) {
+        best_solution_ = std::make_pair(j, val_if_taken);
+      }
+    }
+  }
+
+  cur_item_idx_++;
+}
+
+} // namespace kudu
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/locks.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/locks.cc b/be/src/kudu/util/locks.cc
new file mode 100644
index 0000000..380bee4
--- /dev/null
+++ b/be/src/kudu/util/locks.cc
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/locks.h"
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/util/malloc.h"
+
+namespace kudu {
+
+using base::subtle::Acquire_CompareAndSwap;
+using base::subtle::NoBarrier_Load;
+using base::subtle::Release_Store;
+
+size_t percpu_rwlock::memory_footprint_excluding_this() const {
+  // Because locks_ is a dynamic array of non-trivially-destructable types,
+  // the returned pointer from new[] isn't guaranteed to point at the start of
+  // a memory block, rendering it useless for malloc_usable_size().
+  //
+  // Rather than replace locks_ with a vector or something equivalent, we'll
+  // just measure the memory footprint using sizeof(), with the understanding
+  // that we might be inaccurate due to malloc "slop".
+  //
+  // See https://code.google.com/p/address-sanitizer/issues/detail?id=395 for
+  // more details.
+  return n_cpus_ * sizeof(padded_lock);
+}
+
+size_t percpu_rwlock::memory_footprint_including_this() const {
+  return kudu_malloc_usable_size(this) + memory_footprint_excluding_this();
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/locks.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/locks.h b/be/src/kudu/util/locks.h
new file mode 100644
index 0000000..f70955c
--- /dev/null
+++ b/be/src/kudu/util/locks.h
@@ -0,0 +1,294 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_LOCKS_H
+#define KUDU_UTIL_LOCKS_H
+
+#include <sched.h>
+
+#include <algorithm>  // IWYU pragma: keep
+#include <cstddef>
+#include <mutex>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/spinlock.h"
+#include "kudu/gutil/sysinfo.h"
+#include "kudu/util/rw_semaphore.h"
+
+namespace kudu {
+
+// Wrapper around the Google SpinLock class to adapt it to the method names
+// expected by Boost.
+class simple_spinlock {
+ public:
+  simple_spinlock() {}
+
+  void lock() {
+    l_.Lock();
+  }
+
+  void unlock() {
+    l_.Unlock();
+  }
+
+  bool try_lock() {
+    return l_.TryLock();
+  }
+
+  // Return whether the lock is currently held.
+  //
+  // This state can change at any instant, so this is only really useful
+  // for assertions where you expect to hold the lock. The success of
+  // such an assertion isn't a guarantee that the current thread is the
+  // holder, but the failure of such an assertion _is_ a guarantee that
+  // the current thread is _not_ holding the lock!
+  bool is_locked() {
+    return l_.IsHeld();
+  }
+
+ private:
+  base::SpinLock l_;
+
+  DISALLOW_COPY_AND_ASSIGN(simple_spinlock);
+};
+
+struct padded_spinlock : public simple_spinlock {
+  char padding[CACHELINE_SIZE - (sizeof(simple_spinlock) % CACHELINE_SIZE)];
+};
+
+// Reader-writer lock.
+// This is functionally equivalent to rw_semaphore in rw_semaphore.h, but should be
+// used whenever the lock is expected to only be acquired on a single thread.
+// It adds TSAN annotations which will detect misuse of the lock, but those
+// annotations also assume that the same thread the takes the lock will unlock it.
+//
+// See rw_semaphore.h for documentation on the individual methods where unclear.
+class rw_spinlock {
+ public:
+  rw_spinlock() {
+    ANNOTATE_RWLOCK_CREATE(this);
+  }
+  ~rw_spinlock() {
+    ANNOTATE_RWLOCK_DESTROY(this);
+  }
+
+  void lock_shared() {
+    sem_.lock_shared();
+    ANNOTATE_RWLOCK_ACQUIRED(this, 0);
+  }
+
+  void unlock_shared() {
+    ANNOTATE_RWLOCK_RELEASED(this, 0);
+    sem_.unlock_shared();
+  }
+
+  bool try_lock() {
+    bool ret = sem_.try_lock();
+    if (ret) {
+      ANNOTATE_RWLOCK_ACQUIRED(this, 1);
+    }
+    return ret;
+  }
+
+  void lock() {
+    sem_.lock();
+    ANNOTATE_RWLOCK_ACQUIRED(this, 1);
+  }
+
+  void unlock() {
+    ANNOTATE_RWLOCK_RELEASED(this, 1);
+    sem_.unlock();
+  }
+
+  bool is_write_locked() const {
+    return sem_.is_write_locked();
+  }
+
+  bool is_locked() const {
+    return sem_.is_locked();
+  }
+
+ private:
+  rw_semaphore sem_;
+};
+
+// A reader-writer lock implementation which is biased for use cases where
+// the write lock is taken infrequently, but the read lock is used often.
+//
+// Internally, this creates N underlying reader-writer locks, one per CPU. When a thread
+// wants to lock in read (shared) mode, it locks only its own CPU's lock in read
+// mode. When it wants to lock in write (exclusive) mode, it locks all CPUs' rwlocks in
+// write mode. The use of reader-writer locks ensures that, even if a thread gets
+// preempted when holding one of the per-CPU locks in read mode, the next thread
+// scheduled onto that CPU will not need to block on the first thread.
+//
+// This means that in the read-mostly case, different readers will not cause any
+// cacheline contention.
+//
+// Usage:
+//   percpu_rwlock mylock;
+//
+//   // Lock shared:
+//   {
+//     kudu::shared_lock<rw_spinlock> lock(mylock.get_lock());
+//     ...
+//   }
+//
+//   // Lock exclusive:
+//
+//   {
+//     std::lock_guard<percpu_rwlock> lock(mylock);
+//     ...
+//   }
+class percpu_rwlock {
+ public:
+  percpu_rwlock() {
+#if defined(__APPLE__) || defined(THREAD_SANITIZER)
+    // OSX doesn't have a way to get the index of the CPU running this thread, so
+    // we'll just use a single lock.
+    //
+    // TSAN limits the number of simultaneous lock acquisitions to 64, so we
+    // can't create one lock per core on machines with lots of cores. So, we'll
+    // also just use a single lock.
+    n_cpus_ = 1;
+#else
+    n_cpus_ = base::MaxCPUIndex() + 1;
+#endif
+    CHECK_GT(n_cpus_, 0);
+    locks_ = new padded_lock[n_cpus_];
+  }
+
+  ~percpu_rwlock() {
+    delete [] locks_;
+  }
+
+  rw_spinlock &get_lock() {
+#if defined(__APPLE__) || defined(THREAD_SANITIZER)
+    int cpu = 0;
+#else
+    int cpu = sched_getcpu();
+    CHECK_LT(cpu, n_cpus_);
+#endif  // defined(__APPLE__)
+    return locks_[cpu].lock;
+  }
+
+  bool try_lock() {
+    for (int i = 0; i < n_cpus_; i++) {
+      if (!locks_[i].lock.try_lock()) {
+        while (i--) {
+          locks_[i].lock.unlock();
+        }
+        return false;
+      }
+    }
+    return true;
+  }
+
+  // Return true if this lock is held on any CPU.
+  // See simple_spinlock::is_locked() for details about where this is useful.
+  bool is_locked() const {
+    for (int i = 0; i < n_cpus_; i++) {
+      if (locks_[i].lock.is_locked()) return true;
+    }
+    return false;
+  }
+
+  bool is_write_locked() const {
+    for (int i = 0; i < n_cpus_; i++) {
+      if (!locks_[i].lock.is_write_locked()) return false;
+    }
+    return true;
+  }
+
+  void lock() {
+    for (int i = 0; i < n_cpus_; i++) {
+      locks_[i].lock.lock();
+    }
+  }
+
+  void unlock() {
+    for (int i = 0; i < n_cpus_; i++) {
+      locks_[i].lock.unlock();
+    }
+  }
+
+  // Returns the memory usage of this object without the object itself. Should
+  // be used when embedded inside another object.
+  size_t memory_footprint_excluding_this() const;
+
+  // Returns the memory usage of this object including the object itself.
+  // Should be used when allocated on the heap.
+  size_t memory_footprint_including_this() const;
+
+ private:
+  struct padded_lock {
+    rw_spinlock lock;
+    char padding[CACHELINE_SIZE - (sizeof(rw_spinlock) % CACHELINE_SIZE)];
+  };
+
+  int n_cpus_;
+  padded_lock *locks_;
+};
+
+// Simple implementation of the std::shared_lock API, which is not available in
+// the standard library until C++14. Defers error checking to the underlying
+// mutex.
+
+template <typename Mutex>
+class shared_lock {
+ public:
+  shared_lock()
+      : m_(nullptr) {
+  }
+
+  explicit shared_lock(Mutex& m)
+      : m_(&m) {
+    m_->lock_shared();
+  }
+
+  shared_lock(Mutex& m, std::try_to_lock_t t)
+      : m_(nullptr) {
+    if (m.try_lock_shared()) {
+      m_ = &m;
+    }
+  }
+
+  bool owns_lock() const {
+    return m_;
+  }
+
+  void swap(shared_lock& other) {
+    std::swap(m_,other.m_);
+  }
+
+  ~shared_lock() {
+    if (m_ != nullptr) {
+      m_->unlock_shared();
+    }
+  }
+
+ private:
+  Mutex* m_;
+  DISALLOW_COPY_AND_ASSIGN(shared_lock<Mutex>);
+};
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/logging-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/logging-test.cc b/be/src/kudu/util/logging-test.cc
new file mode 100644
index 0000000..cceece8
--- /dev/null
+++ b/be/src/kudu/util/logging-test.cc
@@ -0,0 +1,249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <atomic>
+#include <cstdint>
+#include <ctime>
+#include <ostream>
+#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gmock/gmock-matchers.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/async_logger.h"
+#include "kudu/util/barrier.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/logging_test_util.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/test_macros.h"  // IWYU pragma: keep
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::vector;
+
+namespace kudu {
+
+// Test the KLOG_EVERY_N_SECS(...) macro.
+TEST(LoggingTest, TestThrottledLogging) {
+  StringVectorSink sink;
+  ScopedRegisterSink srs(&sink);
+
+  for (int i = 0; i < 10000; i++) {
+    KLOG_EVERY_N_SECS(INFO, 1) << "test" << THROTTLE_MSG;
+    SleepFor(MonoDelta::FromMilliseconds(1));
+    if (sink.logged_msgs().size() >= 2) break;
+  }
+  const vector<string>& msgs = sink.logged_msgs();
+  ASSERT_GE(msgs.size(), 2);
+
+  // The first log line shouldn't have a suppression count.
+  EXPECT_THAT(msgs[0], testing::ContainsRegex("test$"));
+  // The second one should have suppressed at least three digits worth of log messages.
+  EXPECT_THAT(msgs[1], testing::ContainsRegex("\\[suppressed [0-9]{3,} similar messages\\]"));
+}
+
+TEST(LoggingTest, TestAdvancedThrottling) {
+  StringVectorSink sink;
+  ScopedRegisterSink srs(&sink);
+
+  logging::LogThrottler throttle_a;
+
+  // First, log only using a single tag and throttler.
+  for (int i = 0; i < 100000; i++) {
+    KLOG_EVERY_N_SECS_THROTTLER(INFO, 1, throttle_a, "tag_a") << "test" << THROTTLE_MSG;
+    SleepFor(MonoDelta::FromMilliseconds(1));
+    if (sink.logged_msgs().size() >= 2) break;
+  }
+  auto& msgs = sink.logged_msgs();
+  ASSERT_GE(msgs.size(), 2);
+
+  // The first log line shouldn't have a suppression count.
+  EXPECT_THAT(msgs[0], testing::ContainsRegex("test$"));
+  // The second one should have suppressed at least three digits worth of log messages.
+  EXPECT_THAT(msgs[1], testing::ContainsRegex("\\[suppressed [0-9]{3,} similar messages\\]"));
+  msgs.clear();
+
+  // Now, try logging using two different tags in rapid succession. This should not
+  // throttle, because the tag is switching.
+  KLOG_EVERY_N_SECS_THROTTLER(INFO, 1, throttle_a, "tag_b") << "test b" << THROTTLE_MSG;
+  KLOG_EVERY_N_SECS_THROTTLER(INFO, 1, throttle_a, "tag_b") << "test b" << THROTTLE_MSG;
+  KLOG_EVERY_N_SECS_THROTTLER(INFO, 1, throttle_a, "tag_c") << "test c" << THROTTLE_MSG;
+  KLOG_EVERY_N_SECS_THROTTLER(INFO, 1, throttle_a, "tag_b") << "test b" << THROTTLE_MSG;
+  ASSERT_EQ(msgs.size(), 3);
+  EXPECT_THAT(msgs[0], testing::ContainsRegex("test b$"));
+  EXPECT_THAT(msgs[1], testing::ContainsRegex("test c$"));
+  EXPECT_THAT(msgs[2], testing::ContainsRegex("test b$"));
+}
+
+// Test Logger implementation that just counts the number of messages
+// and flushes.
+//
+// This is purposefully thread-unsafe because we expect that the
+// AsyncLogger is only accessing the underlying logger from a single
+// thhread.
+class CountingLogger : public google::base::Logger {
+ public:
+  void Write(bool force_flush,
+             time_t /*timestamp*/,
+             const char* /*message*/,
+             int /*message_len*/) override {
+    message_count_++;
+    if (force_flush) {
+      Flush();
+    }
+  }
+
+  void Flush() override {
+    // Simulate a slow disk.
+    SleepFor(MonoDelta::FromMilliseconds(5));
+    flush_count_++;
+  }
+
+  uint32_t LogSize() override {
+    return 0;
+  }
+
+  std::atomic<int> flush_count_ = {0};
+  std::atomic<int> message_count_ = {0};
+};
+
+TEST(LoggingTest, TestAsyncLogger) {
+  const int kNumThreads = 4;
+  const int kNumMessages = 10000;
+  const int kBuffer = 10000;
+  CountingLogger base;
+  AsyncLogger async(&base, kBuffer);
+  async.Start();
+
+  vector<std::thread> threads;
+  Barrier go_barrier(kNumThreads + 1);
+  // Start some threads writing log messages.
+  for (int i = 0; i < kNumThreads; i++) {
+    threads.emplace_back([&]() {
+        go_barrier.Wait();
+        for (int m = 0; m < kNumMessages; m++) {
+          async.Write(true, m, "x", 1);
+        }
+      });
+  }
+
+  // And a thread calling Flush().
+  threads.emplace_back([&]() {
+      go_barrier.Wait();
+      for (int i = 0; i < 10; i++) {
+        async.Flush();
+        SleepFor(MonoDelta::FromMilliseconds(3));
+      }
+    });
+
+  for (auto& t : threads) {
+    t.join();
+  }
+  async.Stop();
+  ASSERT_EQ(base.message_count_, kNumMessages * kNumThreads);
+  // The async logger should only flush once per "batch" rather than
+  // once per message, even though we wrote every message with
+  // 'flush' set to true.
+  ASSERT_LT(base.flush_count_, kNumMessages * kNumThreads);
+  ASSERT_GT(async.app_threads_blocked_count_for_tests(), 0);
+}
+
+TEST(LoggingTest, TestAsyncLoggerAutoFlush) {
+  const int kBuffer = 10000;
+  CountingLogger base;
+  AsyncLogger async(&base, kBuffer);
+
+  FLAGS_logbufsecs = 1;
+  async.Start();
+
+  // Write some log messages with non-force_flush types.
+  async.Write(false, 0, "test-x", 1);
+  async.Write(false, 1, "test-y", 1);
+
+  // The flush wait timeout might take a little bit of time to run.
+  ASSERT_EVENTUALLY([&]() {
+    ASSERT_EQ(base.message_count_, 2);
+    // The AsyncLogger should have flushed at least once by the timer automatically
+    // so there should be no more messages in the buffer.
+    ASSERT_GT(base.flush_count_, 0);
+  });
+  async.Stop();
+}
+
+// Basic test that the redaction utilities work as expected.
+TEST(LoggingTest, TestRedactionBasic) {
+  ASSERT_STREQ("<redacted>", KUDU_REDACT("hello"));
+  {
+    ScopedDisableRedaction no_redaction;
+    ASSERT_STREQ("hello", KUDU_REDACT("hello"));
+  }
+  ASSERT_STREQ("hello", KUDU_DISABLE_REDACTION(KUDU_REDACT("hello")));
+}
+
+// Typically, ToString() methods apply to some complex object with a bunch
+// of fields, some of which are user data (need redaction) and others of which
+// are not. This shows an example of a such a function, which will behave
+// differently based on whether the calling scope has explicitly disabled
+// redaction.
+string SomeComplexStringify(const string& public_data, const string& private_data) {
+  return strings::Substitute("public=$0, private=$1",
+                             public_data,
+                             KUDU_REDACT(private_data));
+}
+
+TEST(LoggingTest, TestRedactionIllustrateUsage) {
+  // By default, the private data will be redacted.
+  ASSERT_EQ("public=abc, private=<redacted>", SomeComplexStringify("abc", "def"));
+
+  // We can wrap the expression in KUDU_DISABLE_REDACTION(...) to evaluate it
+  // with redaction temporarily disabled.
+  ASSERT_EQ("public=abc, private=def", KUDU_DISABLE_REDACTION(SomeComplexStringify("abc", "def")));
+
+  // Or we can execute an entire scope with redaction disabled.
+  KUDU_DISABLE_REDACTION({
+    ASSERT_EQ("public=abc, private=def", SomeComplexStringify("abc", "def"));
+  });
+}
+
+
+TEST(LoggingTest, TestLogTiming) {
+  LOG_TIMING(INFO, "foo") {
+  }
+  {
+    SCOPED_LOG_TIMING(INFO, "bar");
+  }
+  LOG_SLOW_EXECUTION(INFO, 1, "baz") {
+  }
+
+  // Previous implementations of the above macro confused clang-tidy's use-after-move
+  // check and generated false positives.
+  string s1 = "hello";
+  string s2;
+  LOG_SLOW_EXECUTION(INFO, 1, "baz") {
+    LOG(INFO) << s1;
+    s2 = std::move(s1);
+  }
+
+  ASSERT_EQ("hello", s2);
+}
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/logging.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/logging.cc b/be/src/kudu/util/logging.cc
new file mode 100644
index 0000000..fcf035f
--- /dev/null
+++ b/be/src/kudu/util/logging.cc
@@ -0,0 +1,413 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "kudu/util/logging.h"
+
+#include <unistd.h>
+
+#include <cstdint>
+#include <cstdio>
+#include <cstdlib>
+#include <ctime>
+#include <fstream>
+#include <mutex>
+#include <utility>
+
+#include <boost/uuid/random_generator.hpp>
+#include <boost/uuid/uuid_io.hpp>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/callback.h"  // IWYU pragma: keep
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/spinlock.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/async_logger.h"
+#include "kudu/util/debug-util.h"
+#include "kudu/util/debug/leakcheck_disabler.h"
+#include "kudu/util/env_util.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/logging_callback.h"
+#include "kudu/util/minidump.h"
+#include "kudu/util/signal.h"
+#include "kudu/util/status.h"
+
+DEFINE_string(log_filename, "",
+    "Prefix of log filename - "
+    "full path is <log_dir>/<log_filename>.[INFO|WARN|ERROR|FATAL]");
+TAG_FLAG(log_filename, stable);
+
+DEFINE_bool(log_async, true,
+            "Enable asynchronous writing to log files. This improves "
+            "latency and stability.");
+TAG_FLAG(log_async, hidden);
+
+DEFINE_int32(log_async_buffer_bytes_per_level, 2 * 1024 * 1024,
+             "The number of bytes of buffer space used by each log "
+             "level. Only relevant when --log_async is enabled.");
+TAG_FLAG(log_async_buffer_bytes_per_level, hidden);
+
+DEFINE_int32(max_log_files, 10,
+    "Maximum number of log files to retain per severity level. The most recent "
+    "log files are retained. If set to 0, all log files are retained.");
+TAG_FLAG(max_log_files, runtime);
+TAG_FLAG(max_log_files, experimental);
+
+#define PROJ_NAME "kudu"
+
+bool logging_initialized = false;
+
+using namespace std; // NOLINT(*)
+using namespace boost::uuids; // NOLINT(*)
+
+using base::SpinLock;
+using base::SpinLockHolder;
+
+namespace kudu {
+
+__thread bool tls_redact_user_data = true;
+kudu::RedactContext g_should_redact;
+const char* const kRedactionMessage = "<redacted>";
+
+namespace {
+
+class SimpleSink : public google::LogSink {
+ public:
+  explicit SimpleSink(LoggingCallback cb) : cb_(std::move(cb)) {}
+
+  virtual ~SimpleSink() OVERRIDE {
+  }
+
+  virtual void send(google::LogSeverity severity, const char* full_filename,
+                    const char* base_filename, int line,
+                    const struct ::tm* tm_time,
+                    const char* message, size_t message_len) OVERRIDE {
+    LogSeverity kudu_severity;
+    switch (severity) {
+      case google::INFO:
+        kudu_severity = SEVERITY_INFO;
+        break;
+      case google::WARNING:
+        kudu_severity = SEVERITY_WARNING;
+        break;
+      case google::ERROR:
+        kudu_severity = SEVERITY_ERROR;
+        break;
+      case google::FATAL:
+        kudu_severity = SEVERITY_FATAL;
+        break;
+      default:
+        LOG(FATAL) << "Unknown glog severity: " << severity;
+    }
+    cb_.Run(kudu_severity, full_filename, line, tm_time, message, message_len);
+  }
+
+ private:
+
+  LoggingCallback cb_;
+};
+
+SpinLock logging_mutex(base::LINKER_INITIALIZED);
+
+// There can only be a single instance of a SimpleSink.
+//
+// Protected by 'logging_mutex'.
+SimpleSink* registered_sink = nullptr;
+
+// Records the logging severity after the first call to
+// InitGoogleLoggingSafe{Basic}. Calls to UnregisterLoggingCallback()
+// will restore stderr logging back to this severity level.
+//
+// Protected by 'logging_mutex'.
+int initial_stderr_severity;
+
+void EnableAsyncLogging() {
+  debug::ScopedLeakCheckDisabler leaky;
+
+  // Enable Async for every level except for FATAL. Fatal should be synchronous
+  // to ensure that we get the fatal log message written before exiting.
+  for (auto level : { google::INFO, google::WARNING, google::ERROR }) {
+    auto* orig = google::base::GetLogger(level);
+    auto* async = new AsyncLogger(orig, FLAGS_log_async_buffer_bytes_per_level);
+    async->Start();
+    google::base::SetLogger(level, async);
+  }
+}
+
+void UnregisterLoggingCallbackUnlocked() {
+  CHECK(logging_mutex.IsHeld());
+  CHECK(registered_sink);
+
+  // Restore logging to stderr, then remove our sink. This ordering ensures
+  // that no log messages are missed.
+  google::SetStderrLogging(initial_stderr_severity);
+  google::RemoveLogSink(registered_sink);
+  delete registered_sink;
+  registered_sink = nullptr;
+}
+
+void FlushCoverageOnExit() {
+  // Coverage flushing is not re-entrant, but this might be called from a
+  // crash signal context, so avoid re-entrancy.
+  static __thread bool in_call = false;
+  if (in_call) return;
+  in_call = true;
+
+  // The failure writer will be called multiple times per exit.
+  // We only need to flush coverage once. We use a 'once' here so that,
+  // if another thread is already flushing, we'll block and wait for them
+  // to finish before allowing this thread to call abort().
+  static std::once_flag once;
+  std::call_once(once, [] {
+      static const char msg[] = "Flushing coverage data before crash...\n";
+      write(STDERR_FILENO, msg, arraysize(msg));
+      TryFlushCoverage();
+    });
+  in_call = false;
+}
+
+// On SEGVs, etc, glog will call this function to write the error to stderr. This
+// implementation is copied from glog with the exception that we also flush coverage
+// the first time it's called.
+//
+// NOTE: this is only used in coverage builds!
+void FailureWriterWithCoverage(const char* data, int size) {
+  FlushCoverageOnExit();
+
+  // Original implementation from glog:
+  if (write(STDERR_FILENO, data, size) < 0) {
+    // Ignore errors.
+  }
+}
+
+// GLog "failure function". This is called in the case of LOG(FATAL) to
+// ensure that we flush coverage even on crashes.
+//
+// NOTE: this is only used in coverage builds!
+void FlushCoverageAndAbort() {
+  FlushCoverageOnExit();
+  abort();
+}
+} // anonymous namespace
+
+void InitGoogleLoggingSafe(const char* arg) {
+  SpinLockHolder l(&logging_mutex);
+  if (logging_initialized) return;
+
+  google::InstallFailureSignalHandler();
+
+  if (!FLAGS_log_filename.empty()) {
+    for (int severity = google::INFO; severity <= google::FATAL; ++severity) {
+      google::SetLogSymlink(severity, FLAGS_log_filename.c_str());
+    }
+  }
+
+  // This forces our logging to use /tmp rather than looking for a
+  // temporary directory if none is specified. This is done so that we
+  // can reliably construct the log file name without duplicating the
+  // complex logic that glog uses to guess at a temporary dir.
+  if (FLAGS_log_dir.empty()) {
+    FLAGS_log_dir = "/tmp";
+  }
+
+  if (!FLAGS_logtostderr) {
+    // Verify that a log file can be created in log_dir by creating a tmp file.
+    ostringstream ss;
+    random_generator uuid_generator;
+    ss << FLAGS_log_dir << "/" << PROJ_NAME "_test_log." << uuid_generator();
+    const string file_name = ss.str();
+    ofstream test_file(file_name.c_str());
+    if (!test_file.is_open()) {
+      ostringstream error_msg;
+      error_msg << "Could not open file in log_dir " << FLAGS_log_dir;
+      perror(error_msg.str().c_str());
+      // Unlock the mutex before exiting the program to avoid mutex d'tor assert.
+      logging_mutex.Unlock();
+      exit(1);
+    }
+    remove(file_name.c_str());
+  }
+
+  google::InitGoogleLogging(arg);
+
+  // In coverage builds, we should flush coverage before exiting on crash.
+  // This way, fault injection tests still capture coverage of the daemon
+  // that "crashed".
+  if (IsCoverageBuild()) {
+    // We have to use both the "failure writer" and the "FailureFunction".
+    // This allows us to handle both LOG(FATAL) and unintended crashes like
+    // SEGVs.
+    google::InstallFailureWriter(FailureWriterWithCoverage);
+    google::InstallFailureFunction(FlushCoverageAndAbort);
+  }
+
+  // Needs to be done after InitGoogleLogging
+  if (FLAGS_log_filename.empty()) {
+    CHECK_STRNE(google::ProgramInvocationShortName(), "UNKNOWN")
+        << ": must initialize gflags before glog";
+    FLAGS_log_filename = google::ProgramInvocationShortName();
+  }
+
+  // File logging: on.
+  // Stderr logging threshold: FLAGS_stderrthreshold.
+  // Sink logging: off.
+  initial_stderr_severity = FLAGS_stderrthreshold;
+
+  // Ignore SIGPIPE early in the startup process so that threads writing to TLS
+  // sockets do not crash when writing to a closed socket. See KUDU-1910.
+  IgnoreSigPipe();
+
+  // For minidump support. Must be called before logging threads started.
+  CHECK_OK(BlockSigUSR1());
+
+  if (FLAGS_log_async) {
+    EnableAsyncLogging();
+  }
+
+  logging_initialized = true;
+}
+
+void InitGoogleLoggingSafeBasic(const char* arg) {
+  SpinLockHolder l(&logging_mutex);
+  if (logging_initialized) return;
+
+  google::InitGoogleLogging(arg);
+
+  // This also disables file-based logging.
+  google::LogToStderr();
+
+  // File logging: off.
+  // Stderr logging threshold: INFO.
+  // Sink logging: off.
+  initial_stderr_severity = google::INFO;
+  logging_initialized = true;
+}
+
+void RegisterLoggingCallback(const LoggingCallback& cb) {
+  SpinLockHolder l(&logging_mutex);
+  CHECK(logging_initialized);
+
+  if (registered_sink) {
+    LOG(WARNING) << "Cannot register logging callback: one already registered";
+    return;
+  }
+
+  // AddLogSink() claims to take ownership of the sink, but it doesn't
+  // really; it actually expects it to remain valid until
+  // google::ShutdownGoogleLogging() is called.
+  registered_sink = new SimpleSink(cb);
+  google::AddLogSink(registered_sink);
+
+  // Even when stderr logging is ostensibly off, it's still emitting
+  // ERROR-level stuff. This is the default.
+  google::SetStderrLogging(google::ERROR);
+
+  // File logging: yes, if InitGoogleLoggingSafe() was called earlier.
+  // Stderr logging threshold: ERROR.
+  // Sink logging: on.
+}
+
+void UnregisterLoggingCallback() {
+  SpinLockHolder l(&logging_mutex);
+  CHECK(logging_initialized);
+
+  if (!registered_sink) {
+    LOG(WARNING) << "Cannot unregister logging callback: none registered";
+    return;
+  }
+
+  UnregisterLoggingCallbackUnlocked();
+  // File logging: yes, if InitGoogleLoggingSafe() was called earlier.
+  // Stderr logging threshold: initial_stderr_severity.
+  // Sink logging: off.
+}
+
+void GetFullLogFilename(google::LogSeverity severity, string* filename) {
+  ostringstream ss;
+  ss << FLAGS_log_dir << "/" << FLAGS_log_filename << "."
+     << google::GetLogSeverityName(severity);
+  *filename = ss.str();
+}
+
+std::string FormatTimestampForLog(MicrosecondsInt64 micros_since_epoch) {
+  time_t secs_since_epoch = micros_since_epoch / 1000000;
+  int usecs = micros_since_epoch % 1000000;
+  struct tm tm_time;
+  localtime_r(&secs_since_epoch, &tm_time);
+
+  return StringPrintf("%02d%02d %02d:%02d:%02d.%06d",
+                      1 + tm_time.tm_mon,
+                      tm_time.tm_mday,
+                      tm_time.tm_hour,
+                      tm_time.tm_min,
+                      tm_time.tm_sec,
+                      usecs);
+}
+
+void ShutdownLoggingSafe() {
+  SpinLockHolder l(&logging_mutex);
+  if (!logging_initialized) return;
+
+  if (registered_sink) {
+    UnregisterLoggingCallbackUnlocked();
+  }
+
+  google::ShutdownGoogleLogging();
+
+  logging_initialized = false;
+}
+
+Status DeleteExcessLogFiles(Env* env) {
+  int32_t max_log_files = FLAGS_max_log_files;
+  // Ignore bad input or disable log rotation.
+  if (max_log_files <= 0) return Status::OK();
+
+  for (int severity = 0; severity < google::NUM_SEVERITIES; ++severity) {
+    // Build glob pattern for input
+    // e.g. /var/log/kudu/kudu-master.*.INFO.*
+    string pattern = strings::Substitute("$0/$1.*.$2.*", FLAGS_log_dir, FLAGS_log_filename,
+                                         google::GetLogSeverityName(severity));
+
+    // Keep the 'max_log_files' most recent log files, as compared by
+    // modification time. Glog files contain a second-granularity timestamp in
+    // the name, so this could potentially use the filename sort order as
+    // guaranteed by glob, however this code has been adapted from Impala which
+    // uses mtime to determine which files to delete, and there haven't been any
+    // issues in production settings.
+    RETURN_NOT_OK(env_util::DeleteExcessFilesByPattern(env, pattern, max_log_files));
+  }
+  return Status::OK();
+}
+
+// Support for the special THROTTLE_MSG token in a log message stream.
+ostream& operator<<(ostream &os, const PRIVATE_ThrottleMsg& /*unused*/) {
+  using google::LogMessage;
+#ifdef DISABLE_RTTI
+  LogMessage::LogStream *log = static_cast<LogMessage::LogStream*>(&os);
+#else
+  LogMessage::LogStream *log = dynamic_cast<LogMessage::LogStream*>(&os);
+#endif
+  CHECK(log && log == log->self())
+      << "You must not use COUNTER with non-glog ostream";
+  int ctr = log->ctr();
+  if (ctr > 0) {
+    os << " [suppressed " << ctr << " similar messages]";
+  }
+  return os;
+}
+
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/logging.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/logging.h b/be/src/kudu/util/logging.h
new file mode 100644
index 0000000..428dadc
--- /dev/null
+++ b/be/src/kudu/util/logging.h
@@ -0,0 +1,367 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_LOGGING_H
+#define KUDU_UTIL_LOGGING_H
+
+#include <iosfwd>
+#include <string>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/logging_callback.h"
+#include "kudu/util/status.h"
+
+////////////////////////////////////////////////////////////////////////////////
+// Redaction support
+////////////////////////////////////////////////////////////////////////////////
+
+// Disable redaction of user data while evaluating the expression 'expr'.
+// This may be used inline as an expression, such as:
+//
+//   LOG(INFO) << KUDU_DISABLE_REDACTION(schema.DebugRow(my_row));
+//
+// or with a block:
+//
+//  KUDU_DISABLE_REDACTION({
+//    LOG(INFO) << schema.DebugRow(my_row);
+//  });
+//
+// Redaction should be disabled in the following cases:
+//
+// 1) Outputting strings to a "secure" endpoint (for example an authenticated and authorized
+//    web UI)
+//
+// 2) Using methods like schema.DebugRow(...) when the parameter is not in fact a user-provided
+//    row, but instead some piece of metadata such as a partition boundary.
+#define KUDU_DISABLE_REDACTION(expr) ([&]() {        \
+      kudu::ScopedDisableRedaction s;                \
+      return (expr);                                 \
+    })()
+
+// Evaluates to 'true' if the caller should redact any user data in the current scope.
+// Most callers should instead use KUDU_REDACT(...) defined below, but this can be useful
+// to short-circuit expensive logic.
+#define KUDU_SHOULD_REDACT() ((kudu::g_should_redact == kudu::RedactContext::ALL ||    \
+  kudu::g_should_redact == kudu::RedactContext::LOG) && kudu::tls_redact_user_data)
+
+// Either evaluate and return 'expr', or return the string "<redacted>", depending on whether
+// redaction is enabled in the current scope.
+#define KUDU_REDACT(expr) \
+  (KUDU_SHOULD_REDACT() ? kRedactionMessage : (expr))
+
+// Like the above, but with the additional condition that redaction will only
+// be performed if 'cond' must be true.
+#define KUDU_MAYBE_REDACT_IF(cond, expr) \
+  ((KUDU_SHOULD_REDACT() && (cond)) ? kudu::kRedactionMessage : (expr))
+
+////////////////////////////////////////
+// Redaction implementation details follow.
+////////////////////////////////////////
+
+namespace kudu {
+
+// Flag which allows redaction to be enabled or disabled for a thread context.
+// Defaults to enabling redaction, since it's the safer default with respect to
+// leaking user data, and it's easier to identify when data is over-redacted
+// than vice-versa.
+extern __thread bool tls_redact_user_data;
+
+// Redacted log messages are replaced with this constant.
+extern const char* const kRedactionMessage;
+
+enum class RedactContext {
+  ALL,
+  LOG,
+  NONE
+};
+
+// Flag to indicate which redaction context is enabled.
+extern kudu::RedactContext g_should_redact;
+
+class ScopedDisableRedaction {
+ public:
+  ScopedDisableRedaction()
+      : old_val_(tls_redact_user_data) {
+    tls_redact_user_data = false;
+  }
+
+  ~ScopedDisableRedaction() {
+    tls_redact_user_data = old_val_;
+  }
+
+ private:
+  bool old_val_;
+};
+
+} // namespace kudu
+
+////////////////////////////////////////////////////////////////////////////////
+// Throttled logging support
+////////////////////////////////////////////////////////////////////////////////
+
+// Logs a message throttled to appear at most once every 'n_secs' seconds to
+// the given severity.
+//
+// The log message may include the special token 'THROTTLE_MSG' which expands
+// to either an empty string or '[suppressed <n> similar messages]'.
+//
+// Example usage:
+//   KLOG_EVERY_N_SECS(WARNING, 1) << "server is low on memory" << THROTTLE_MSG;
+//
+//
+// Advanced per-instance throttling
+// -----------------------------------
+// For cases where the throttling should be scoped to a given class instance,
+// you may define a logging::LogThrottler object and pass it to the
+// KLOG_EVERY_N_SECS_THROTTLER(...) macro. In addition, you must pass a "tag".
+// Only log messages with equal tags (by pointer equality) will be throttled.
+// For example:
+//
+//    struct MyThing {
+//      string name;
+//      LogThrottler throttler;
+//    };
+//
+//    if (...) {
+//      LOG_EVERY_N_SECS_THROTTLER(INFO, 1, my_thing->throttler, "coffee") <<
+//        my_thing->name << " needs coffee!";
+//    } else {
+//      LOG_EVERY_N_SECS_THROTTLER(INFO, 1, my_thing->throttler, "wine") <<
+//        my_thing->name << " needs wine!";
+//    }
+//
+// In this example, the "coffee"-related message will be collapsed into other
+// such messages within the prior one second; however, if the state alternates
+// between the "coffee" message and the "wine" message, then each such alternation
+// will yield a message.
+
+#define KLOG_EVERY_N_SECS_THROTTLER(severity, n_secs, throttler, tag) \
+  int VARNAME_LINENUM(num_suppressed) = 0;                            \
+  if ((throttler).ShouldLog(n_secs, tag, &VARNAME_LINENUM(num_suppressed)))  \
+    google::LogMessage( \
+      __FILE__, __LINE__, google::GLOG_ ## severity, VARNAME_LINENUM(num_suppressed), \
+      &google::LogMessage::SendToLog).stream()
+
+#define KLOG_EVERY_N_SECS(severity, n_secs) \
+  static logging::LogThrottler LOG_THROTTLER;  \
+  KLOG_EVERY_N_SECS_THROTTLER(severity, n_secs, LOG_THROTTLER, "no-tag")
+
+
+namespace kudu {
+enum PRIVATE_ThrottleMsg {THROTTLE_MSG};
+} // namespace kudu
+
+////////////////////////////////////////////////////////////////////////////////
+// Versions of glog macros for "LOG_EVERY" and "LOG_FIRST" that annotate the
+// benign races on their internal static variables.
+////////////////////////////////////////////////////////////////////////////////
+
+// The "base" macros.
+#define KUDU_SOME_KIND_OF_LOG_EVERY_N(severity, n, what_to_do) \
+  static int LOG_OCCURRENCES = 0, LOG_OCCURRENCES_MOD_N = 0; \
+  ANNOTATE_BENIGN_RACE(&LOG_OCCURRENCES, "Logging every N is approximate"); \
+  ANNOTATE_BENIGN_RACE(&LOG_OCCURRENCES_MOD_N, "Logging every N is approximate"); \
+  ++LOG_OCCURRENCES; \
+  if (++LOG_OCCURRENCES_MOD_N > n) LOG_OCCURRENCES_MOD_N -= n; \
+  if (LOG_OCCURRENCES_MOD_N == 1) \
+    google::LogMessage( \
+        __FILE__, __LINE__, google::GLOG_ ## severity, LOG_OCCURRENCES, \
+        &what_to_do).stream()
+
+#define KUDU_SOME_KIND_OF_LOG_IF_EVERY_N(severity, condition, n, what_to_do) \
+  static int LOG_OCCURRENCES = 0, LOG_OCCURRENCES_MOD_N = 0; \
+  ANNOTATE_BENIGN_RACE(&LOG_OCCURRENCES, "Logging every N is approximate"); \
+  ANNOTATE_BENIGN_RACE(&LOG_OCCURRENCES_MOD_N, "Logging every N is approximate"); \
+  ++LOG_OCCURRENCES; \
+  if (condition && \
+      ((LOG_OCCURRENCES_MOD_N=(LOG_OCCURRENCES_MOD_N + 1) % n) == (1 % n))) \
+    google::LogMessage( \
+        __FILE__, __LINE__, google::GLOG_ ## severity, LOG_OCCURRENCES, \
+                 &what_to_do).stream()
+
+#define KUDU_SOME_KIND_OF_PLOG_EVERY_N(severity, n, what_to_do) \
+  static int LOG_OCCURRENCES = 0, LOG_OCCURRENCES_MOD_N = 0; \
+  ANNOTATE_BENIGN_RACE(&LOG_OCCURRENCES, "Logging every N is approximate"); \
+  ANNOTATE_BENIGN_RACE(&LOG_OCCURRENCES_MOD_N, "Logging every N is approximate"); \
+  ++LOG_OCCURRENCES; \
+  if (++LOG_OCCURRENCES_MOD_N > n) LOG_OCCURRENCES_MOD_N -= n; \
+  if (LOG_OCCURRENCES_MOD_N == 1) \
+    google::ErrnoLogMessage( \
+        __FILE__, __LINE__, google::GLOG_ ## severity, LOG_OCCURRENCES, \
+        &what_to_do).stream()
+
+#define KUDU_SOME_KIND_OF_LOG_FIRST_N(severity, n, what_to_do) \
+  static uint64_t LOG_OCCURRENCES = 0; \
+  ANNOTATE_BENIGN_RACE(&LOG_OCCURRENCES, "Logging the first N is approximate"); \
+  if (LOG_OCCURRENCES++ < n) \
+    google::LogMessage( \
+      __FILE__, __LINE__, google::GLOG_ ## severity, LOG_OCCURRENCES, \
+      &what_to_do).stream()
+
+// The direct user-facing macros.
+#define KLOG_EVERY_N(severity, n) \
+  GOOGLE_GLOG_COMPILE_ASSERT(google::GLOG_ ## severity < \
+                             google::NUM_SEVERITIES, \
+                             INVALID_REQUESTED_LOG_SEVERITY); \
+  KUDU_SOME_KIND_OF_LOG_EVERY_N(severity, (n), google::LogMessage::SendToLog)
+
+#define KSYSLOG_EVERY_N(severity, n) \
+  KUDU_SOME_KIND_OF_LOG_EVERY_N(severity, (n), google::LogMessage::SendToSyslogAndLog)
+
+#define KPLOG_EVERY_N(severity, n) \
+  KUDU_SOME_KIND_OF_PLOG_EVERY_N(severity, (n), google::LogMessage::SendToLog)
+
+#define KLOG_FIRST_N(severity, n) \
+  KUDU_SOME_KIND_OF_LOG_FIRST_N(severity, (n), google::LogMessage::SendToLog)
+
+#define KLOG_IF_EVERY_N(severity, condition, n) \
+  KUDU_SOME_KIND_OF_LOG_IF_EVERY_N(severity, (condition), (n), google::LogMessage::SendToLog)
+
+// We also disable the un-annotated glog macros for anyone who includes this header.
+#undef LOG_EVERY_N
+#define LOG_EVERY_N(severity, n) \
+  GOOGLE_GLOG_COMPILE_ASSERT(false, "LOG_EVERY_N is deprecated. Please use KLOG_EVERY_N.")
+
+#undef SYSLOG_EVERY_N
+#define SYSLOG_EVERY_N(severity, n) \
+  GOOGLE_GLOG_COMPILE_ASSERT(false, "SYSLOG_EVERY_N is deprecated. Please use KSYSLOG_EVERY_N.")
+
+#undef PLOG_EVERY_N
+#define PLOG_EVERY_N(severity, n) \
+  GOOGLE_GLOG_COMPILE_ASSERT(false, "PLOG_EVERY_N is deprecated. Please use KPLOG_EVERY_N.")
+
+#undef LOG_FIRST_N
+#define LOG_FIRST_N(severity, n) \
+  GOOGLE_GLOG_COMPILE_ASSERT(false, "LOG_FIRST_N is deprecated. Please use KLOG_FIRST_N.")
+
+#undef LOG_IF_EVERY_N
+#define LOG_IF_EVERY_N(severity, condition, n) \
+  GOOGLE_GLOG_COMPILE_ASSERT(false, "LOG_IF_EVERY_N is deprecated. Please use KLOG_IF_EVERY_N.")
+
+namespace kudu {
+
+class Env;
+
+// glog doesn't allow multiple invocations of InitGoogleLogging. This method conditionally
+// calls InitGoogleLogging only if it hasn't been called before.
+//
+// It also takes care of installing the google failure signal handler and
+// setting the signal handler for SIGPIPE to SIG_IGN.
+void InitGoogleLoggingSafe(const char* arg);
+
+// Like InitGoogleLoggingSafe() but stripped down: no signal handlers are
+// installed, regular logging is disabled, and log events of any severity
+// will be written to stderr.
+//
+// These properties make it attractive for us in libraries.
+void InitGoogleLoggingSafeBasic(const char* arg);
+
+// Demotes stderr logging to ERROR or higher and registers 'cb' as the
+// recipient for all log events.
+//
+// Subsequent calls to RegisterLoggingCallback no-op (until the callback
+// is unregistered with UnregisterLoggingCallback()).
+void RegisterLoggingCallback(const LoggingCallback& cb);
+
+// Unregisters a callback previously registered with
+// RegisterLoggingCallback() and promotes stderr logging back to all
+// severities.
+//
+// If no callback is registered, this is a no-op.
+void UnregisterLoggingCallback();
+
+// Returns the full pathname of the symlink to the most recent log
+// file corresponding to this severity
+void GetFullLogFilename(google::LogSeverity severity, std::string* filename);
+
+// Format a timestamp in the same format as used by GLog.
+std::string FormatTimestampForLog(MicrosecondsInt64 micros_since_epoch);
+
+// Shuts down the google logging library. Call before exit to ensure that log files are
+// flushed.
+void ShutdownLoggingSafe();
+
+// Deletes excess rotated log files.
+//
+// Keeps at most 'FLAG_max_log_files' of the most recent log files at every
+// severity level, using the file's modified time to determine recency.
+Status DeleteExcessLogFiles(Env* env);
+
+namespace logging {
+
+// A LogThrottler instance tracks the throttling state for a particular
+// log message.
+//
+// This is used internally by KLOG_EVERY_N_SECS, but can also be used
+// explicitly in conjunction with KLOG_EVERY_N_SECS_THROTTLER. See the
+// macro descriptions above for details.
+class LogThrottler {
+ public:
+  LogThrottler() : num_suppressed_(0), last_ts_(0), last_tag_(nullptr) {
+    ANNOTATE_BENIGN_RACE_SIZED(this, sizeof(*this), "OK to be sloppy with log throttling");
+  }
+
+  bool ShouldLog(int n_secs, const char* tag, int* num_suppressed) {
+    MicrosecondsInt64 ts = GetMonoTimeMicros();
+
+    // When we switch tags, we should not show the "suppressed" messages, because
+    // in fact it's a different message that we skipped. So, reset it to zero,
+    // and always log the new message.
+    if (tag != last_tag_) {
+      *num_suppressed = num_suppressed_ = 0;
+      last_tag_ = tag;
+      last_ts_ = ts;
+      return true;
+    }
+
+    if (ts - last_ts_ < n_secs * 1000000) {
+      *num_suppressed = base::subtle::NoBarrier_AtomicIncrement(&num_suppressed_, 1);
+      return false;
+    }
+    last_ts_ = ts;
+    *num_suppressed = base::subtle::NoBarrier_AtomicExchange(&num_suppressed_, 0);
+    return true;
+  }
+ private:
+  Atomic32 num_suppressed_;
+  MicrosecondsInt64 last_ts_;
+  const char* last_tag_;
+};
+} // namespace logging
+
+std::ostream& operator<<(std::ostream &os, const PRIVATE_ThrottleMsg&);
+
+// Convenience macros to prefix log messages with some prefix, these are the unlocked
+// versions and should not obtain a lock (if one is required to obtain the prefix).
+// There must be a LogPrefixUnlocked()/LogPrefixLocked() method available in the current
+// scope in order to use these macros.
+#define LOG_WITH_PREFIX_UNLOCKED(severity) LOG(severity) << LogPrefixUnlocked()
+#define VLOG_WITH_PREFIX_UNLOCKED(verboselevel) LOG_IF(INFO, VLOG_IS_ON(verboselevel)) \
+  << LogPrefixUnlocked()
+
+// Same as the above, but obtain the lock.
+#define LOG_WITH_PREFIX(severity) LOG(severity) << LogPrefix()
+#define VLOG_WITH_PREFIX(verboselevel) LOG_IF(INFO, VLOG_IS_ON(verboselevel)) \
+  << LogPrefix()
+
+} // namespace kudu
+
+#endif // KUDU_UTIL_LOGGING_H

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/logging_callback.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/logging_callback.h b/be/src/kudu/util/logging_callback.h
new file mode 100644
index 0000000..83fb973
--- /dev/null
+++ b/be/src/kudu/util/logging_callback.h
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_LOGGING_CALLBACK_H
+#define KUDU_UTIL_LOGGING_CALLBACK_H
+
+#include <ctime>
+#include <string>
+
+#include "kudu/gutil/callback_forward.h"
+
+namespace kudu {
+
+enum LogSeverity {
+  SEVERITY_INFO,
+  SEVERITY_WARNING,
+  SEVERITY_ERROR,
+  SEVERITY_FATAL
+};
+
+// Callback for simple logging.
+//
+// 'message' is NOT terminated with an endline.
+typedef Callback<void(LogSeverity severity,
+                      const char* filename,
+                      int line_number,
+                      const struct ::tm* time,
+                      const char* message,
+                      size_t message_len)> LoggingCallback;
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/logging_test_util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/logging_test_util.h b/be/src/kudu/util/logging_test_util.h
new file mode 100644
index 0000000..8102375
--- /dev/null
+++ b/be/src/kudu/util/logging_test_util.h
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_LOGGING_TEST_UTIL_H
+#define KUDU_LOGGING_TEST_UTIL_H
+
+#include <glog/logging.h>
+#include <string>
+#include <vector>
+
+namespace kudu {
+
+// GLog sink that keeps an internal buffer of messages that have been logged.
+class StringVectorSink : public google::LogSink {
+ public:
+  void send(google::LogSeverity severity, const char* full_filename,
+            const char* base_filename, int line,
+            const struct ::tm* tm_time,
+            const char* message, size_t message_len) override {
+    logged_msgs_.push_back(ToString(severity, base_filename, line,
+                                    tm_time, message, message_len));
+  }
+
+  std::vector<std::string>& logged_msgs() {
+    return logged_msgs_;
+  }
+
+ private:
+  std::vector<std::string> logged_msgs_;
+};
+
+// RAII wrapper around registering a LogSink with GLog.
+struct ScopedRegisterSink {
+  explicit ScopedRegisterSink(google::LogSink* s) : s_(s) {
+    google::AddLogSink(s_);
+  }
+  ~ScopedRegisterSink() {
+    google::RemoveLogSink(s_);
+  }
+
+  google::LogSink* s_;
+};
+
+} // namespace kudu
+
+#endif

http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/maintenance_manager-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/maintenance_manager-test.cc b/be/src/kudu/util/maintenance_manager-test.cc
new file mode 100644
index 0000000..6777e06
--- /dev/null
+++ b/be/src/kudu/util/maintenance_manager-test.cc
@@ -0,0 +1,369 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <atomic>
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <boost/bind.hpp> // IWYU pragma: keep
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/maintenance_manager.h"
+#include "kudu/util/maintenance_manager.pb.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/thread.h"
+
+using std::shared_ptr;
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+METRIC_DEFINE_entity(test);
+METRIC_DEFINE_gauge_uint32(test, maintenance_ops_running,
+                           "Number of Maintenance Operations Running",
+                           kudu::MetricUnit::kMaintenanceOperations,
+                           "The number of background maintenance operations currently running.");
+METRIC_DEFINE_histogram(test, maintenance_op_duration,
+                        "Maintenance Operation Duration",
+                        kudu::MetricUnit::kSeconds, "", 60000000LU, 2);
+
+DECLARE_int64(log_target_replay_size_mb);
+
+namespace kudu {
+
+static const int kHistorySize = 4;
+static const char kFakeUuid[] = "12345";
+
+class MaintenanceManagerTest : public KuduTest {
+ public:
+  void SetUp() override {
+    MaintenanceManager::Options options;
+    options.num_threads = 2;
+    options.polling_interval_ms = 1;
+    options.history_size = kHistorySize;
+    manager_.reset(new MaintenanceManager(options, kFakeUuid));
+    manager_->set_memory_pressure_func_for_tests(
+        [&](double* consumption) {
+          return indicate_memory_pressure_.load();
+        });
+    ASSERT_OK(manager_->Start());
+  }
+
+  void TearDown() override {
+    manager_->Shutdown();
+  }
+
+ protected:
+  shared_ptr<MaintenanceManager> manager_;
+  std::atomic<bool> indicate_memory_pressure_ { false };
+};
+
+// Just create the MaintenanceManager and then shut it down, to make sure
+// there are no race conditions there.
+TEST_F(MaintenanceManagerTest, TestCreateAndShutdown) {
+}
+
+class TestMaintenanceOp : public MaintenanceOp {
+ public:
+  TestMaintenanceOp(const std::string& name,
+                    IOUsage io_usage)
+    : MaintenanceOp(name, io_usage),
+      ram_anchored_(500),
+      logs_retained_bytes_(0),
+      perf_improvement_(0),
+      metric_entity_(METRIC_ENTITY_test.Instantiate(&metric_registry_, "test")),
+      maintenance_op_duration_(METRIC_maintenance_op_duration.Instantiate(metric_entity_)),
+      maintenance_ops_running_(METRIC_maintenance_ops_running.Instantiate(metric_entity_, 0)),
+      remaining_runs_(1),
+      prepared_runs_(0),
+      sleep_time_(MonoDelta::FromSeconds(0)) {
+  }
+
+  virtual ~TestMaintenanceOp() {}
+
+  virtual bool Prepare() OVERRIDE {
+    std::lock_guard<Mutex> guard(lock_);
+    if (remaining_runs_ == 0) {
+      return false;
+    }
+    remaining_runs_--;
+    prepared_runs_++;
+    DLOG(INFO) << "Prepared op " << name();
+    return true;
+  }
+
+  virtual void Perform() OVERRIDE {
+    {
+      std::lock_guard<Mutex> guard(lock_);
+      DLOG(INFO) << "Performing op " << name();
+
+      // Ensure that we don't call Perform() more times than we returned
+      // success from Prepare().
+      CHECK_GE(prepared_runs_, 1);
+      prepared_runs_--;
+    }
+
+    SleepFor(sleep_time_);
+  }
+
+  virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE {
+    std::lock_guard<Mutex> guard(lock_);
+    stats->set_runnable(remaining_runs_ > 0);
+    stats->set_ram_anchored(ram_anchored_);
+    stats->set_logs_retained_bytes(logs_retained_bytes_);
+    stats->set_perf_improvement(perf_improvement_);
+  }
+
+  void set_remaining_runs(int runs) {
+    std::lock_guard<Mutex> guard(lock_);
+    remaining_runs_ = runs;
+  }
+
+  void set_sleep_time(MonoDelta time) {
+    std::lock_guard<Mutex> guard(lock_);
+    sleep_time_ = time;
+  }
+
+  void set_ram_anchored(uint64_t ram_anchored) {
+    std::lock_guard<Mutex> guard(lock_);
+    ram_anchored_ = ram_anchored;
+  }
+
+  void set_logs_retained_bytes(uint64_t logs_retained_bytes) {
+    std::lock_guard<Mutex> guard(lock_);
+    logs_retained_bytes_ = logs_retained_bytes;
+  }
+
+  void set_perf_improvement(uint64_t perf_improvement) {
+    std::lock_guard<Mutex> guard(lock_);
+    perf_improvement_ = perf_improvement;
+  }
+
+  virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE {
+    return maintenance_op_duration_;
+  }
+
+  virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE {
+    return maintenance_ops_running_;
+  }
+
+ private:
+  Mutex lock_;
+
+  uint64_t ram_anchored_;
+  uint64_t logs_retained_bytes_;
+  uint64_t perf_improvement_;
+  MetricRegistry metric_registry_;
+  scoped_refptr<MetricEntity> metric_entity_;
+  scoped_refptr<Histogram> maintenance_op_duration_;
+  scoped_refptr<AtomicGauge<uint32_t> > maintenance_ops_running_;
+
+  // The number of remaining times this operation will run before disabling
+  // itself.
+  int remaining_runs_;
+  // The number of Prepared() operations which have not yet been Perform()ed.
+  int prepared_runs_;
+
+  // The amount of time each op invocation will sleep.
+  MonoDelta sleep_time_;
+};
+
+// Create an op and wait for it to start running.  Unregister it while it is
+// running and verify that UnregisterOp waits for it to finish before
+// proceeding.
+TEST_F(MaintenanceManagerTest, TestRegisterUnregister) {
+  TestMaintenanceOp op1("1", MaintenanceOp::HIGH_IO_USAGE);
+  op1.set_perf_improvement(10);
+  // Register initially with no remaining runs. We'll later enable it once it's
+  // already registered.
+  op1.set_remaining_runs(0);
+  manager_->RegisterOp(&op1);
+  scoped_refptr<kudu::Thread> thread;
+  CHECK_OK(Thread::Create(
+      "TestThread", "TestRegisterUnregister",
+      boost::bind(&TestMaintenanceOp::set_remaining_runs, &op1, 1), &thread));
+  ASSERT_EVENTUALLY([&]() {
+      ASSERT_EQ(op1.DurationHistogram()->TotalCount(), 1);
+    });
+  manager_->UnregisterOp(&op1);
+  ThreadJoiner(thread.get()).Join();
+}
+
+// Regression test for KUDU-1495: when an operation is being unregistered,
+// new instances of that operation should not be scheduled.
+TEST_F(MaintenanceManagerTest, TestNewOpsDontGetScheduledDuringUnregister) {
+  TestMaintenanceOp op1("1", MaintenanceOp::HIGH_IO_USAGE);
+  op1.set_perf_improvement(10);
+
+  // Set the op to run up to 10 times, and each time should sleep for a second.
+  op1.set_remaining_runs(10);
+  op1.set_sleep_time(MonoDelta::FromSeconds(1));
+  manager_->RegisterOp(&op1);
+
+  // Wait until two instances of the ops start running, since we have two MM threads.
+  ASSERT_EVENTUALLY([&]() {
+      ASSERT_EQ(op1.RunningGauge()->value(), 2);
+    });
+
+  // Trigger Unregister while they are running. This should wait for the currently-
+  // running operations to complete, but no new operations should be scheduled.
+  manager_->UnregisterOp(&op1);
+
+  // Hence, we should have run only the original two that we saw above.
+  ASSERT_LE(op1.DurationHistogram()->TotalCount(), 2);
+}
+
+// Test that we'll run an operation that doesn't improve performance when memory
+// pressure gets high.
+TEST_F(MaintenanceManagerTest, TestMemoryPressure) {
+  TestMaintenanceOp op("op", MaintenanceOp::HIGH_IO_USAGE);
+  op.set_ram_anchored(100);
+  manager_->RegisterOp(&op);
+
+  // At first, we don't want to run this, since there is no perf_improvement.
+  SleepFor(MonoDelta::FromMilliseconds(20));
+  ASSERT_EQ(0, op.DurationHistogram()->TotalCount());
+
+  // Fake that the server is under memory pressure.
+  indicate_memory_pressure_ = true;
+
+  ASSERT_EVENTUALLY([&]() {
+      ASSERT_EQ(op.DurationHistogram()->TotalCount(), 1);
+    });
+  manager_->UnregisterOp(&op);
+}
+
+// Test that ops are prioritized correctly when we add log retention.
+TEST_F(MaintenanceManagerTest, TestLogRetentionPrioritization) {
+  const int64_t kMB = 1024 * 1024;
+
+  manager_->Shutdown();
+
+  TestMaintenanceOp op1("op1", MaintenanceOp::LOW_IO_USAGE);
+  op1.set_ram_anchored(0);
+  op1.set_logs_retained_bytes(100 * kMB);
+
+  TestMaintenanceOp op2("op2", MaintenanceOp::HIGH_IO_USAGE);
+  op2.set_ram_anchored(100);
+  op2.set_logs_retained_bytes(100 * kMB);
+
+  TestMaintenanceOp op3("op3", MaintenanceOp::HIGH_IO_USAGE);
+  op3.set_ram_anchored(200);
+  op3.set_logs_retained_bytes(100 * kMB);
+
+  manager_->RegisterOp(&op1);
+  manager_->RegisterOp(&op2);
+  manager_->RegisterOp(&op3);
+
+  // We want to do the low IO op first since it clears up some log retention.
+  auto op_and_why = manager_->FindBestOp();
+  ASSERT_EQ(&op1, op_and_why.first);
+  EXPECT_EQ(op_and_why.second, "free 104857600 bytes of WAL");
+
+  manager_->UnregisterOp(&op1);
+
+  // Low IO is taken care of, now we find the op that clears the most log retention and ram.
+  // However, with the default settings, we won't bother running any of these operations
+  // which only retain 100MB of logs.
+  op_and_why = manager_->FindBestOp();
+  ASSERT_EQ(nullptr, op_and_why.first);
+  EXPECT_EQ(op_and_why.second, "no ops with positive improvement");
+
+  // If we change the target WAL size, we will select these ops.
+  FLAGS_log_target_replay_size_mb = 50;
+  op_and_why = manager_->FindBestOp();
+  ASSERT_EQ(&op3, op_and_why.first);
+  EXPECT_EQ(op_and_why.second, "104857600 bytes log retention");
+
+  manager_->UnregisterOp(&op3);
+
+  op_and_why = manager_->FindBestOp();
+  ASSERT_EQ(&op2, op_and_why.first);
+  EXPECT_EQ(op_and_why.second, "104857600 bytes log retention");
+
+  manager_->UnregisterOp(&op2);
+}
+
+// Test retrieving a list of an op's running instances
+TEST_F(MaintenanceManagerTest, TestRunningInstances) {
+  TestMaintenanceOp op("op", MaintenanceOp::HIGH_IO_USAGE);
+  op.set_perf_improvement(10);
+  op.set_remaining_runs(2);
+  op.set_sleep_time(MonoDelta::FromSeconds(1));
+  manager_->RegisterOp(&op);
+
+  // Check that running instances are added to the maintenance manager's collection,
+  // and fields are getting filled.
+  ASSERT_EVENTUALLY([&]() {
+      MaintenanceManagerStatusPB status_pb;
+      manager_->GetMaintenanceManagerStatusDump(&status_pb);
+      ASSERT_EQ(status_pb.running_operations_size(), 2);
+      const MaintenanceManagerStatusPB_OpInstancePB& instance1 = status_pb.running_operations(0);
+      const MaintenanceManagerStatusPB_OpInstancePB& instance2 = status_pb.running_operations(1);
+      ASSERT_EQ(instance1.name(), op.name());
+      ASSERT_NE(instance1.thread_id(), instance2.thread_id());
+    });
+
+  // Wait for instances to complete.
+  manager_->UnregisterOp(&op);
+
+  // Check that running instances are removed from collection after completion.
+  MaintenanceManagerStatusPB status_pb;
+  manager_->GetMaintenanceManagerStatusDump(&status_pb);
+  ASSERT_EQ(status_pb.running_operations_size(), 0);
+}
+// Test adding operations and make sure that the history of recently completed operations
+// is correct in that it wraps around and doesn't grow.
+TEST_F(MaintenanceManagerTest, TestCompletedOpsHistory) {
+  for (int i = 0; i < 5; i++) {
+    string name = Substitute("op$0", i);
+    TestMaintenanceOp op(name, MaintenanceOp::HIGH_IO_USAGE);
+    op.set_perf_improvement(1);
+    op.set_ram_anchored(100);
+    manager_->RegisterOp(&op);
+
+    ASSERT_EVENTUALLY([&]() {
+        ASSERT_EQ(op.DurationHistogram()->TotalCount(), 1);
+      });
+    manager_->UnregisterOp(&op);
+
+    MaintenanceManagerStatusPB status_pb;
+    manager_->GetMaintenanceManagerStatusDump(&status_pb);
+    // The size should be at most the history_size.
+    ASSERT_GE(kHistorySize, status_pb.completed_operations_size());
+    // The most recently completed op should always be first, even if we wrap
+    // around.
+    ASSERT_EQ(name, status_pb.completed_operations(0).name());
+  }
+}
+
+} // namespace kudu