You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:29 UTC
[17/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from
kudu@334ecafd
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/knapsack_solver.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/knapsack_solver.h b/be/src/kudu/util/knapsack_solver.h
new file mode 100644
index 0000000..2c37065
--- /dev/null
+++ b/be/src/kudu/util/knapsack_solver.h
@@ -0,0 +1,269 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_KNAPSACK_SOLVER_H
+#define KUDU_UTIL_KNAPSACK_SOLVER_H
+
+#include <glog/logging.h>
+#include <algorithm>
+#include <utility>
+#include <vector>
+#include "kudu/gutil/macros.h"
+
+namespace kudu {
+
+// Solver for the 0-1 knapsack problem. This uses dynamic programming
+// to solve the problem exactly.
+//
+// Given a knapsack capacity of 'W' and a number of potential items 'n',
+// this solver is O(nW) time and space.
+//
+// This implementation is cribbed from wikipedia. The only interesting
+// bit here that doesn't directly match the pseudo-code is that we
+// maintain the "taken" bitmap keeping track of which items were
+// taken, so we can efficiently "trace back" the chosen items.
+template<class Traits>
+class KnapsackSolver {
+ public:
+ typedef typename Traits::item_type item_type;
+ typedef typename Traits::value_type value_type;
+ typedef std::pair<int, value_type> solution_type;
+
+ KnapsackSolver() {}
+ ~KnapsackSolver() {}
+
+ // Solve a knapsack problem in one shot. Finds the set of
+ // items in 'items' such that their weights add up to no
+ // more than 'knapsack_capacity' and maximizes the sum
+ // of their values.
+ // The indexes of the chosen items are stored in 'chosen_items',
+ // and the maximal value is stored in 'optimal_value'.
+ void Solve(std::vector<item_type> &items,
+ int knapsack_capacity,
+ std::vector<int>* chosen_items,
+ value_type* optimal_value);
+
+
+ // The following functions are a more advanced API for solving
+ // knapsack problems, allowing the caller to obtain incremental
+ // results as each item is considered. See the implementation of
+ // Solve() for usage.
+
+ // Prepare to solve a knapsack problem with the given capacity and
+ // item set. The vector of items must remain valid and unchanged
+ // until the next call to Reset().
+ void Reset(int knapsack_capacity,
+ const std::vector<item_type>* items);
+
+ // Process the next item in 'items'. Returns false if there
+ // were no more items to process.
+ bool ProcessNext();
+
+ // Returns the current best solution after the most recent ProcessNext
+ // call. *solution is a pair of (knapsack weight used, value obtained).
+ solution_type GetSolution();
+
+ // Trace the path of item indexes used to achieve the given best
+ // solution as of the latest ProcessNext() call.
+ void TracePath(const solution_type& best,
+ std::vector<int>* chosen_items);
+
+ private:
+
+ // The state kept by the DP algorithm.
+ class KnapsackBlackboard {
+ public:
+ typedef std::pair<int, value_type> solution_type;
+ KnapsackBlackboard() :
+ n_items_(0),
+ n_weights_(0),
+ cur_item_idx_(0),
+ best_solution_(0, 0) {
+ }
+
+ void ResizeAndClear(int n_items, int max_weight);
+
+ // Current maximum value at the given weight
+ value_type &max_at(int weight) {
+ DCHECK_GE(weight, 0);
+ DCHECK_LT(weight, n_weights_);
+ return max_value_[weight];
+ }
+
+ // Consider the next item to be put into the knapsack
+ // Moves the "state" of the solution forward
+ void Advance(value_type new_val, int new_wt);
+
+ // How many items have been considered
+ int current_item_index() const { return cur_item_idx_; }
+
+ bool item_taken(int item, int weight) const {
+ DCHECK_GE(weight, 0);
+ DCHECK_LT(weight, n_weights_);
+ DCHECK_GE(item, 0);
+ DCHECK_LT(item, n_items_);
+ return item_taken_[index(item, weight)];
+ }
+
+ solution_type best_solution() { return best_solution_; }
+
+ bool done() { return cur_item_idx_ == n_items_; }
+
+ private:
+ void MarkTaken(int item, int weight) {
+ item_taken_[index(item, weight)] = true;
+ }
+
+ // If the dynamic programming matrix has more than this number of cells,
+ // then warn.
+ static const int kWarnDimension = 10000000;
+
+ int index(int item, int weight) const {
+ return n_weights_ * item + weight;
+ }
+
+ // vector with maximum value at the i-th position meaning that it is
+ // the maximum value you can get given a knapsack of weight capacity i
+ // while only considering items 0..cur_item_idx_-1
+ std::vector<value_type> max_value_;
+ std::vector<bool> item_taken_; // TODO: record difference vectors?
+ int n_items_, n_weights_;
+ int cur_item_idx_;
+ // Best current solution
+ solution_type best_solution_;
+
+ DISALLOW_COPY_AND_ASSIGN(KnapsackBlackboard);
+ };
+
+ KnapsackBlackboard bb_;
+ const std::vector<item_type>* items_;
+ int knapsack_capacity_;
+
+ DISALLOW_COPY_AND_ASSIGN(KnapsackSolver);
+};
+
+template<class Traits>
+inline void KnapsackSolver<Traits>::Reset(int knapsack_capacity,
+ const std::vector<item_type>* items) {
+ DCHECK_GE(knapsack_capacity, 0);
+ items_ = items;
+ knapsack_capacity_ = knapsack_capacity;
+ bb_.ResizeAndClear(items->size(), knapsack_capacity);
+}
+
+template<class Traits>
+inline bool KnapsackSolver<Traits>::ProcessNext() {
+ if (bb_.done()) return false;
+
+ const item_type& item = (*items_)[bb_.current_item_index()];
+ int item_weight = Traits::get_weight(item);
+ value_type item_value = Traits::get_value(item);
+ bb_.Advance(item_value, item_weight);
+
+ return true;
+}
+
+template<class Traits>
+inline void KnapsackSolver<Traits>::Solve(std::vector<item_type> &items,
+ int knapsack_capacity,
+ std::vector<int>* chosen_items,
+ value_type* optimal_value) {
+ Reset(knapsack_capacity, &items);
+
+ while (ProcessNext()) {
+ }
+
+ solution_type best = GetSolution();
+ *optimal_value = best.second;
+ TracePath(best, chosen_items);
+}
+
+template<class Traits>
+inline typename KnapsackSolver<Traits>::solution_type KnapsackSolver<Traits>::GetSolution() {
+ return bb_.best_solution();
+}
+
+template<class Traits>
+inline void KnapsackSolver<Traits>::TracePath(const solution_type& best,
+ std::vector<int>* chosen_items) {
+ chosen_items->clear();
+ // Retrace back which set of items corresponded to this value.
+ int w = best.first;
+ chosen_items->clear();
+ for (int k = bb_.current_item_index() - 1; k >= 0; k--) {
+ if (bb_.item_taken(k, w)) {
+ const item_type& taken = (*items_)[k];
+ chosen_items->push_back(k);
+ w -= Traits::get_weight(taken);
+ DCHECK_GE(w, 0);
+ }
+ }
+}
+
+template<class Traits>
+void KnapsackSolver<Traits>::KnapsackBlackboard::ResizeAndClear(int n_items,
+ int max_weight) {
+ CHECK_GT(n_items, 0);
+ CHECK_GE(max_weight, 0);
+
+ // Rather than zero-indexing the weights, we size the array from
+ // 0 to max_weight. This avoids having to subtract 1 every time
+ // we index into the array.
+ n_weights_ = max_weight + 1;
+ max_value_.resize(n_weights_);
+
+ int dimension = index(n_items, n_weights_);
+ if (dimension > kWarnDimension) {
+ LOG(WARNING) << "Knapsack problem " << n_items << "x" << n_weights_
+ << " is large: may be inefficient!";
+ }
+ item_taken_.resize(dimension);
+ n_items_ = n_items;
+
+ // Clear
+ std::fill(max_value_.begin(), max_value_.end(), 0);
+ std::fill(item_taken_.begin(), item_taken_.end(), false);
+ best_solution_ = std::make_pair(0, 0);
+
+ cur_item_idx_ = 0;
+}
+
+template<class Traits>
+void KnapsackSolver<Traits>::KnapsackBlackboard::Advance(value_type new_val, int new_wt) {
+ // Use the dynamic programming formula:
+ // Define mv(i, j) as maximum value considering items 0..i-1 with knapsack weight j
+ // Then:
+ // if j - weight(i) >= 0, then:
+ // mv(i, j) = max(mv(i-1, j), mv(i-1, j-weight(i)) + value(j))
+ // else mv(i, j) = mv(i-1, j)
+ // Since the recursive formula requires an access of j-weight(i), we go in reverse.
+ for (int j = n_weights_ - 1; j >= new_wt ; --j) {
+ value_type val_if_taken = max_value_[j - new_wt] + new_val;
+ if (max_value_[j] < val_if_taken) {
+ max_value_[j] = val_if_taken;
+ MarkTaken(cur_item_idx_, j);
+ // Check if new solution found
+ if (best_solution_.second < val_if_taken) {
+ best_solution_ = std::make_pair(j, val_if_taken);
+ }
+ }
+ }
+
+ cur_item_idx_++;
+}
+
+} // namespace kudu
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/locks.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/locks.cc b/be/src/kudu/util/locks.cc
new file mode 100644
index 0000000..380bee4
--- /dev/null
+++ b/be/src/kudu/util/locks.cc
@@ -0,0 +1,47 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/locks.h"
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/util/malloc.h"
+
+namespace kudu {
+
+using base::subtle::Acquire_CompareAndSwap;
+using base::subtle::NoBarrier_Load;
+using base::subtle::Release_Store;
+
+size_t percpu_rwlock::memory_footprint_excluding_this() const {
+ // Because locks_ is a dynamic array of non-trivially-destructable types,
+ // the returned pointer from new[] isn't guaranteed to point at the start of
+ // a memory block, rendering it useless for malloc_usable_size().
+ //
+ // Rather than replace locks_ with a vector or something equivalent, we'll
+ // just measure the memory footprint using sizeof(), with the understanding
+ // that we might be inaccurate due to malloc "slop".
+ //
+ // See https://code.google.com/p/address-sanitizer/issues/detail?id=395 for
+ // more details.
+ return n_cpus_ * sizeof(padded_lock);
+}
+
+size_t percpu_rwlock::memory_footprint_including_this() const {
+ return kudu_malloc_usable_size(this) + memory_footprint_excluding_this();
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/locks.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/locks.h b/be/src/kudu/util/locks.h
new file mode 100644
index 0000000..f70955c
--- /dev/null
+++ b/be/src/kudu/util/locks.h
@@ -0,0 +1,294 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_LOCKS_H
+#define KUDU_UTIL_LOCKS_H
+
+#include <sched.h>
+
+#include <algorithm> // IWYU pragma: keep
+#include <cstddef>
+#include <mutex>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/spinlock.h"
+#include "kudu/gutil/sysinfo.h"
+#include "kudu/util/rw_semaphore.h"
+
+namespace kudu {
+
+// Wrapper around the Google SpinLock class to adapt it to the method names
+// expected by Boost.
+class simple_spinlock {
+ public:
+ simple_spinlock() {}
+
+ void lock() {
+ l_.Lock();
+ }
+
+ void unlock() {
+ l_.Unlock();
+ }
+
+ bool try_lock() {
+ return l_.TryLock();
+ }
+
+ // Return whether the lock is currently held.
+ //
+ // This state can change at any instant, so this is only really useful
+ // for assertions where you expect to hold the lock. The success of
+ // such an assertion isn't a guarantee that the current thread is the
+ // holder, but the failure of such an assertion _is_ a guarantee that
+ // the current thread is _not_ holding the lock!
+ bool is_locked() {
+ return l_.IsHeld();
+ }
+
+ private:
+ base::SpinLock l_;
+
+ DISALLOW_COPY_AND_ASSIGN(simple_spinlock);
+};
+
+struct padded_spinlock : public simple_spinlock {
+ char padding[CACHELINE_SIZE - (sizeof(simple_spinlock) % CACHELINE_SIZE)];
+};
+
+// Reader-writer lock.
+// This is functionally equivalent to rw_semaphore in rw_semaphore.h, but should be
+// used whenever the lock is expected to only be acquired on a single thread.
+// It adds TSAN annotations which will detect misuse of the lock, but those
+// annotations also assume that the same thread the takes the lock will unlock it.
+//
+// See rw_semaphore.h for documentation on the individual methods where unclear.
+class rw_spinlock {
+ public:
+ rw_spinlock() {
+ ANNOTATE_RWLOCK_CREATE(this);
+ }
+ ~rw_spinlock() {
+ ANNOTATE_RWLOCK_DESTROY(this);
+ }
+
+ void lock_shared() {
+ sem_.lock_shared();
+ ANNOTATE_RWLOCK_ACQUIRED(this, 0);
+ }
+
+ void unlock_shared() {
+ ANNOTATE_RWLOCK_RELEASED(this, 0);
+ sem_.unlock_shared();
+ }
+
+ bool try_lock() {
+ bool ret = sem_.try_lock();
+ if (ret) {
+ ANNOTATE_RWLOCK_ACQUIRED(this, 1);
+ }
+ return ret;
+ }
+
+ void lock() {
+ sem_.lock();
+ ANNOTATE_RWLOCK_ACQUIRED(this, 1);
+ }
+
+ void unlock() {
+ ANNOTATE_RWLOCK_RELEASED(this, 1);
+ sem_.unlock();
+ }
+
+ bool is_write_locked() const {
+ return sem_.is_write_locked();
+ }
+
+ bool is_locked() const {
+ return sem_.is_locked();
+ }
+
+ private:
+ rw_semaphore sem_;
+};
+
+// A reader-writer lock implementation which is biased for use cases where
+// the write lock is taken infrequently, but the read lock is used often.
+//
+// Internally, this creates N underlying reader-writer locks, one per CPU. When a thread
+// wants to lock in read (shared) mode, it locks only its own CPU's lock in read
+// mode. When it wants to lock in write (exclusive) mode, it locks all CPUs' rwlocks in
+// write mode. The use of reader-writer locks ensures that, even if a thread gets
+// preempted when holding one of the per-CPU locks in read mode, the next thread
+// scheduled onto that CPU will not need to block on the first thread.
+//
+// This means that in the read-mostly case, different readers will not cause any
+// cacheline contention.
+//
+// Usage:
+// percpu_rwlock mylock;
+//
+// // Lock shared:
+// {
+// kudu::shared_lock<rw_spinlock> lock(mylock.get_lock());
+// ...
+// }
+//
+// // Lock exclusive:
+//
+// {
+// std::lock_guard<percpu_rwlock> lock(mylock);
+// ...
+// }
+class percpu_rwlock {
+ public:
+ percpu_rwlock() {
+#if defined(__APPLE__) || defined(THREAD_SANITIZER)
+ // OSX doesn't have a way to get the index of the CPU running this thread, so
+ // we'll just use a single lock.
+ //
+ // TSAN limits the number of simultaneous lock acquisitions to 64, so we
+ // can't create one lock per core on machines with lots of cores. So, we'll
+ // also just use a single lock.
+ n_cpus_ = 1;
+#else
+ n_cpus_ = base::MaxCPUIndex() + 1;
+#endif
+ CHECK_GT(n_cpus_, 0);
+ locks_ = new padded_lock[n_cpus_];
+ }
+
+ ~percpu_rwlock() {
+ delete [] locks_;
+ }
+
+ rw_spinlock &get_lock() {
+#if defined(__APPLE__) || defined(THREAD_SANITIZER)
+ int cpu = 0;
+#else
+ int cpu = sched_getcpu();
+ CHECK_LT(cpu, n_cpus_);
+#endif // defined(__APPLE__)
+ return locks_[cpu].lock;
+ }
+
+ bool try_lock() {
+ for (int i = 0; i < n_cpus_; i++) {
+ if (!locks_[i].lock.try_lock()) {
+ while (i--) {
+ locks_[i].lock.unlock();
+ }
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // Return true if this lock is held on any CPU.
+ // See simple_spinlock::is_locked() for details about where this is useful.
+ bool is_locked() const {
+ for (int i = 0; i < n_cpus_; i++) {
+ if (locks_[i].lock.is_locked()) return true;
+ }
+ return false;
+ }
+
+ bool is_write_locked() const {
+ for (int i = 0; i < n_cpus_; i++) {
+ if (!locks_[i].lock.is_write_locked()) return false;
+ }
+ return true;
+ }
+
+ void lock() {
+ for (int i = 0; i < n_cpus_; i++) {
+ locks_[i].lock.lock();
+ }
+ }
+
+ void unlock() {
+ for (int i = 0; i < n_cpus_; i++) {
+ locks_[i].lock.unlock();
+ }
+ }
+
+ // Returns the memory usage of this object without the object itself. Should
+ // be used when embedded inside another object.
+ size_t memory_footprint_excluding_this() const;
+
+ // Returns the memory usage of this object including the object itself.
+ // Should be used when allocated on the heap.
+ size_t memory_footprint_including_this() const;
+
+ private:
+ struct padded_lock {
+ rw_spinlock lock;
+ char padding[CACHELINE_SIZE - (sizeof(rw_spinlock) % CACHELINE_SIZE)];
+ };
+
+ int n_cpus_;
+ padded_lock *locks_;
+};
+
+// Simple implementation of the std::shared_lock API, which is not available in
+// the standard library until C++14. Defers error checking to the underlying
+// mutex.
+
+template <typename Mutex>
+class shared_lock {
+ public:
+ shared_lock()
+ : m_(nullptr) {
+ }
+
+ explicit shared_lock(Mutex& m)
+ : m_(&m) {
+ m_->lock_shared();
+ }
+
+ shared_lock(Mutex& m, std::try_to_lock_t t)
+ : m_(nullptr) {
+ if (m.try_lock_shared()) {
+ m_ = &m;
+ }
+ }
+
+ bool owns_lock() const {
+ return m_;
+ }
+
+ void swap(shared_lock& other) {
+ std::swap(m_,other.m_);
+ }
+
+ ~shared_lock() {
+ if (m_ != nullptr) {
+ m_->unlock_shared();
+ }
+ }
+
+ private:
+ Mutex* m_;
+ DISALLOW_COPY_AND_ASSIGN(shared_lock<Mutex>);
+};
+
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/logging-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/logging-test.cc b/be/src/kudu/util/logging-test.cc
new file mode 100644
index 0000000..cceece8
--- /dev/null
+++ b/be/src/kudu/util/logging-test.cc
@@ -0,0 +1,249 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <atomic>
+#include <cstdint>
+#include <ctime>
+#include <ostream>
+#include <string>
+#include <thread>
+#include <utility>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gmock/gmock-matchers.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/async_logger.h"
+#include "kudu/util/barrier.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/logging_test_util.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/test_macros.h" // IWYU pragma: keep
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::vector;
+
+namespace kudu {
+
+// Test the KLOG_EVERY_N_SECS(...) macro.
+TEST(LoggingTest, TestThrottledLogging) {
+ StringVectorSink sink;
+ ScopedRegisterSink srs(&sink);
+
+ for (int i = 0; i < 10000; i++) {
+ KLOG_EVERY_N_SECS(INFO, 1) << "test" << THROTTLE_MSG;
+ SleepFor(MonoDelta::FromMilliseconds(1));
+ if (sink.logged_msgs().size() >= 2) break;
+ }
+ const vector<string>& msgs = sink.logged_msgs();
+ ASSERT_GE(msgs.size(), 2);
+
+ // The first log line shouldn't have a suppression count.
+ EXPECT_THAT(msgs[0], testing::ContainsRegex("test$"));
+ // The second one should have suppressed at least three digits worth of log messages.
+ EXPECT_THAT(msgs[1], testing::ContainsRegex("\\[suppressed [0-9]{3,} similar messages\\]"));
+}
+
+TEST(LoggingTest, TestAdvancedThrottling) {
+ StringVectorSink sink;
+ ScopedRegisterSink srs(&sink);
+
+ logging::LogThrottler throttle_a;
+
+ // First, log only using a single tag and throttler.
+ for (int i = 0; i < 100000; i++) {
+ KLOG_EVERY_N_SECS_THROTTLER(INFO, 1, throttle_a, "tag_a") << "test" << THROTTLE_MSG;
+ SleepFor(MonoDelta::FromMilliseconds(1));
+ if (sink.logged_msgs().size() >= 2) break;
+ }
+ auto& msgs = sink.logged_msgs();
+ ASSERT_GE(msgs.size(), 2);
+
+ // The first log line shouldn't have a suppression count.
+ EXPECT_THAT(msgs[0], testing::ContainsRegex("test$"));
+ // The second one should have suppressed at least three digits worth of log messages.
+ EXPECT_THAT(msgs[1], testing::ContainsRegex("\\[suppressed [0-9]{3,} similar messages\\]"));
+ msgs.clear();
+
+ // Now, try logging using two different tags in rapid succession. This should not
+ // throttle, because the tag is switching.
+ KLOG_EVERY_N_SECS_THROTTLER(INFO, 1, throttle_a, "tag_b") << "test b" << THROTTLE_MSG;
+ KLOG_EVERY_N_SECS_THROTTLER(INFO, 1, throttle_a, "tag_b") << "test b" << THROTTLE_MSG;
+ KLOG_EVERY_N_SECS_THROTTLER(INFO, 1, throttle_a, "tag_c") << "test c" << THROTTLE_MSG;
+ KLOG_EVERY_N_SECS_THROTTLER(INFO, 1, throttle_a, "tag_b") << "test b" << THROTTLE_MSG;
+ ASSERT_EQ(msgs.size(), 3);
+ EXPECT_THAT(msgs[0], testing::ContainsRegex("test b$"));
+ EXPECT_THAT(msgs[1], testing::ContainsRegex("test c$"));
+ EXPECT_THAT(msgs[2], testing::ContainsRegex("test b$"));
+}
+
+// Test Logger implementation that just counts the number of messages
+// and flushes.
+//
+// This is purposefully thread-unsafe because we expect that the
+// AsyncLogger is only accessing the underlying logger from a single
+// thhread.
+class CountingLogger : public google::base::Logger {
+ public:
+ void Write(bool force_flush,
+ time_t /*timestamp*/,
+ const char* /*message*/,
+ int /*message_len*/) override {
+ message_count_++;
+ if (force_flush) {
+ Flush();
+ }
+ }
+
+ void Flush() override {
+ // Simulate a slow disk.
+ SleepFor(MonoDelta::FromMilliseconds(5));
+ flush_count_++;
+ }
+
+ uint32_t LogSize() override {
+ return 0;
+ }
+
+ std::atomic<int> flush_count_ = {0};
+ std::atomic<int> message_count_ = {0};
+};
+
+TEST(LoggingTest, TestAsyncLogger) {
+ const int kNumThreads = 4;
+ const int kNumMessages = 10000;
+ const int kBuffer = 10000;
+ CountingLogger base;
+ AsyncLogger async(&base, kBuffer);
+ async.Start();
+
+ vector<std::thread> threads;
+ Barrier go_barrier(kNumThreads + 1);
+ // Start some threads writing log messages.
+ for (int i = 0; i < kNumThreads; i++) {
+ threads.emplace_back([&]() {
+ go_barrier.Wait();
+ for (int m = 0; m < kNumMessages; m++) {
+ async.Write(true, m, "x", 1);
+ }
+ });
+ }
+
+ // And a thread calling Flush().
+ threads.emplace_back([&]() {
+ go_barrier.Wait();
+ for (int i = 0; i < 10; i++) {
+ async.Flush();
+ SleepFor(MonoDelta::FromMilliseconds(3));
+ }
+ });
+
+ for (auto& t : threads) {
+ t.join();
+ }
+ async.Stop();
+ ASSERT_EQ(base.message_count_, kNumMessages * kNumThreads);
+ // The async logger should only flush once per "batch" rather than
+ // once per message, even though we wrote every message with
+ // 'flush' set to true.
+ ASSERT_LT(base.flush_count_, kNumMessages * kNumThreads);
+ ASSERT_GT(async.app_threads_blocked_count_for_tests(), 0);
+}
+
+TEST(LoggingTest, TestAsyncLoggerAutoFlush) {
+ const int kBuffer = 10000;
+ CountingLogger base;
+ AsyncLogger async(&base, kBuffer);
+
+ FLAGS_logbufsecs = 1;
+ async.Start();
+
+ // Write some log messages with non-force_flush types.
+ async.Write(false, 0, "test-x", 1);
+ async.Write(false, 1, "test-y", 1);
+
+ // The flush wait timeout might take a little bit of time to run.
+ ASSERT_EVENTUALLY([&]() {
+ ASSERT_EQ(base.message_count_, 2);
+ // The AsyncLogger should have flushed at least once by the timer automatically
+ // so there should be no more messages in the buffer.
+ ASSERT_GT(base.flush_count_, 0);
+ });
+ async.Stop();
+}
+
+// Basic test that the redaction utilities work as expected.
+TEST(LoggingTest, TestRedactionBasic) {
+ ASSERT_STREQ("<redacted>", KUDU_REDACT("hello"));
+ {
+ ScopedDisableRedaction no_redaction;
+ ASSERT_STREQ("hello", KUDU_REDACT("hello"));
+ }
+ ASSERT_STREQ("hello", KUDU_DISABLE_REDACTION(KUDU_REDACT("hello")));
+}
+
+// Typically, ToString() methods apply to some complex object with a bunch
+// of fields, some of which are user data (need redaction) and others of which
+// are not. This shows an example of a such a function, which will behave
+// differently based on whether the calling scope has explicitly disabled
+// redaction.
+string SomeComplexStringify(const string& public_data, const string& private_data) {
+ return strings::Substitute("public=$0, private=$1",
+ public_data,
+ KUDU_REDACT(private_data));
+}
+
+TEST(LoggingTest, TestRedactionIllustrateUsage) {
+ // By default, the private data will be redacted.
+ ASSERT_EQ("public=abc, private=<redacted>", SomeComplexStringify("abc", "def"));
+
+ // We can wrap the expression in KUDU_DISABLE_REDACTION(...) to evaluate it
+ // with redaction temporarily disabled.
+ ASSERT_EQ("public=abc, private=def", KUDU_DISABLE_REDACTION(SomeComplexStringify("abc", "def")));
+
+ // Or we can execute an entire scope with redaction disabled.
+ KUDU_DISABLE_REDACTION({
+ ASSERT_EQ("public=abc, private=def", SomeComplexStringify("abc", "def"));
+ });
+}
+
+
+TEST(LoggingTest, TestLogTiming) {
+ LOG_TIMING(INFO, "foo") {
+ }
+ {
+ SCOPED_LOG_TIMING(INFO, "bar");
+ }
+ LOG_SLOW_EXECUTION(INFO, 1, "baz") {
+ }
+
+ // Previous implementations of the above macro confused clang-tidy's use-after-move
+ // check and generated false positives.
+ string s1 = "hello";
+ string s2;
+ LOG_SLOW_EXECUTION(INFO, 1, "baz") {
+ LOG(INFO) << s1;
+ s2 = std::move(s1);
+ }
+
+ ASSERT_EQ("hello", s2);
+}
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/logging.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/logging.cc b/be/src/kudu/util/logging.cc
new file mode 100644
index 0000000..fcf035f
--- /dev/null
+++ b/be/src/kudu/util/logging.cc
@@ -0,0 +1,413 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "kudu/util/logging.h"
+
+#include <unistd.h>
+
+#include <cstdint>
+#include <cstdio>
+#include <cstdlib>
+#include <ctime>
+#include <fstream>
+#include <mutex>
+#include <utility>
+
+#include <boost/uuid/random_generator.hpp>
+#include <boost/uuid/uuid_io.hpp>
+#include <gflags/gflags.h>
+#include <glog/logging.h>
+
+#include "kudu/gutil/callback.h" // IWYU pragma: keep
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/spinlock.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/async_logger.h"
+#include "kudu/util/debug-util.h"
+#include "kudu/util/debug/leakcheck_disabler.h"
+#include "kudu/util/env_util.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/logging_callback.h"
+#include "kudu/util/minidump.h"
+#include "kudu/util/signal.h"
+#include "kudu/util/status.h"
+
+DEFINE_string(log_filename, "",
+ "Prefix of log filename - "
+ "full path is <log_dir>/<log_filename>.[INFO|WARN|ERROR|FATAL]");
+TAG_FLAG(log_filename, stable);
+
+DEFINE_bool(log_async, true,
+ "Enable asynchronous writing to log files. This improves "
+ "latency and stability.");
+TAG_FLAG(log_async, hidden);
+
+DEFINE_int32(log_async_buffer_bytes_per_level, 2 * 1024 * 1024,
+ "The number of bytes of buffer space used by each log "
+ "level. Only relevant when --log_async is enabled.");
+TAG_FLAG(log_async_buffer_bytes_per_level, hidden);
+
+DEFINE_int32(max_log_files, 10,
+ "Maximum number of log files to retain per severity level. The most recent "
+ "log files are retained. If set to 0, all log files are retained.");
+TAG_FLAG(max_log_files, runtime);
+TAG_FLAG(max_log_files, experimental);
+
+#define PROJ_NAME "kudu"
+
+bool logging_initialized = false;
+
+using namespace std; // NOLINT(*)
+using namespace boost::uuids; // NOLINT(*)
+
+using base::SpinLock;
+using base::SpinLockHolder;
+
+namespace kudu {
+
+__thread bool tls_redact_user_data = true;
+kudu::RedactContext g_should_redact;
+const char* const kRedactionMessage = "<redacted>";
+
+namespace {
+
+class SimpleSink : public google::LogSink {
+ public:
+ explicit SimpleSink(LoggingCallback cb) : cb_(std::move(cb)) {}
+
+ virtual ~SimpleSink() OVERRIDE {
+ }
+
+ virtual void send(google::LogSeverity severity, const char* full_filename,
+ const char* base_filename, int line,
+ const struct ::tm* tm_time,
+ const char* message, size_t message_len) OVERRIDE {
+ LogSeverity kudu_severity;
+ switch (severity) {
+ case google::INFO:
+ kudu_severity = SEVERITY_INFO;
+ break;
+ case google::WARNING:
+ kudu_severity = SEVERITY_WARNING;
+ break;
+ case google::ERROR:
+ kudu_severity = SEVERITY_ERROR;
+ break;
+ case google::FATAL:
+ kudu_severity = SEVERITY_FATAL;
+ break;
+ default:
+ LOG(FATAL) << "Unknown glog severity: " << severity;
+ }
+ cb_.Run(kudu_severity, full_filename, line, tm_time, message, message_len);
+ }
+
+ private:
+
+ LoggingCallback cb_;
+};
+
+SpinLock logging_mutex(base::LINKER_INITIALIZED);
+
+// There can only be a single instance of a SimpleSink.
+//
+// Protected by 'logging_mutex'.
+SimpleSink* registered_sink = nullptr;
+
+// Records the logging severity after the first call to
+// InitGoogleLoggingSafe{Basic}. Calls to UnregisterLoggingCallback()
+// will restore stderr logging back to this severity level.
+//
+// Protected by 'logging_mutex'.
+int initial_stderr_severity;
+
+void EnableAsyncLogging() {
+ debug::ScopedLeakCheckDisabler leaky;
+
+ // Enable Async for every level except for FATAL. Fatal should be synchronous
+ // to ensure that we get the fatal log message written before exiting.
+ for (auto level : { google::INFO, google::WARNING, google::ERROR }) {
+ auto* orig = google::base::GetLogger(level);
+ auto* async = new AsyncLogger(orig, FLAGS_log_async_buffer_bytes_per_level);
+ async->Start();
+ google::base::SetLogger(level, async);
+ }
+}
+
+void UnregisterLoggingCallbackUnlocked() {
+ CHECK(logging_mutex.IsHeld());
+ CHECK(registered_sink);
+
+ // Restore logging to stderr, then remove our sink. This ordering ensures
+ // that no log messages are missed.
+ google::SetStderrLogging(initial_stderr_severity);
+ google::RemoveLogSink(registered_sink);
+ delete registered_sink;
+ registered_sink = nullptr;
+}
+
+void FlushCoverageOnExit() {
+ // Coverage flushing is not re-entrant, but this might be called from a
+ // crash signal context, so avoid re-entrancy.
+ static __thread bool in_call = false;
+ if (in_call) return;
+ in_call = true;
+
+ // The failure writer will be called multiple times per exit.
+ // We only need to flush coverage once. We use a 'once' here so that,
+ // if another thread is already flushing, we'll block and wait for them
+ // to finish before allowing this thread to call abort().
+ static std::once_flag once;
+ std::call_once(once, [] {
+ static const char msg[] = "Flushing coverage data before crash...\n";
+ write(STDERR_FILENO, msg, arraysize(msg));
+ TryFlushCoverage();
+ });
+ in_call = false;
+}
+
+// On SEGVs, etc, glog will call this function to write the error to stderr. This
+// implementation is copied from glog with the exception that we also flush coverage
+// the first time it's called.
+//
+// NOTE: this is only used in coverage builds!
+void FailureWriterWithCoverage(const char* data, int size) {
+ FlushCoverageOnExit();
+
+ // Original implementation from glog:
+ if (write(STDERR_FILENO, data, size) < 0) {
+ // Ignore errors.
+ }
+}
+
+// GLog "failure function". This is called in the case of LOG(FATAL) to
+// ensure that we flush coverage even on crashes.
+//
+// NOTE: this is only used in coverage builds!
+void FlushCoverageAndAbort() {
+ FlushCoverageOnExit();
+ abort();
+}
+} // anonymous namespace
+
+void InitGoogleLoggingSafe(const char* arg) {
+ SpinLockHolder l(&logging_mutex);
+ if (logging_initialized) return;
+
+ google::InstallFailureSignalHandler();
+
+ if (!FLAGS_log_filename.empty()) {
+ for (int severity = google::INFO; severity <= google::FATAL; ++severity) {
+ google::SetLogSymlink(severity, FLAGS_log_filename.c_str());
+ }
+ }
+
+ // This forces our logging to use /tmp rather than looking for a
+ // temporary directory if none is specified. This is done so that we
+ // can reliably construct the log file name without duplicating the
+ // complex logic that glog uses to guess at a temporary dir.
+ if (FLAGS_log_dir.empty()) {
+ FLAGS_log_dir = "/tmp";
+ }
+
+ if (!FLAGS_logtostderr) {
+ // Verify that a log file can be created in log_dir by creating a tmp file.
+ ostringstream ss;
+ random_generator uuid_generator;
+ ss << FLAGS_log_dir << "/" << PROJ_NAME "_test_log." << uuid_generator();
+ const string file_name = ss.str();
+ ofstream test_file(file_name.c_str());
+ if (!test_file.is_open()) {
+ ostringstream error_msg;
+ error_msg << "Could not open file in log_dir " << FLAGS_log_dir;
+ perror(error_msg.str().c_str());
+ // Unlock the mutex before exiting the program to avoid mutex d'tor assert.
+ logging_mutex.Unlock();
+ exit(1);
+ }
+ remove(file_name.c_str());
+ }
+
+ google::InitGoogleLogging(arg);
+
+ // In coverage builds, we should flush coverage before exiting on crash.
+ // This way, fault injection tests still capture coverage of the daemon
+ // that "crashed".
+ if (IsCoverageBuild()) {
+ // We have to use both the "failure writer" and the "FailureFunction".
+ // This allows us to handle both LOG(FATAL) and unintended crashes like
+ // SEGVs.
+ google::InstallFailureWriter(FailureWriterWithCoverage);
+ google::InstallFailureFunction(FlushCoverageAndAbort);
+ }
+
+ // Needs to be done after InitGoogleLogging
+ if (FLAGS_log_filename.empty()) {
+ CHECK_STRNE(google::ProgramInvocationShortName(), "UNKNOWN")
+ << ": must initialize gflags before glog";
+ FLAGS_log_filename = google::ProgramInvocationShortName();
+ }
+
+ // File logging: on.
+ // Stderr logging threshold: FLAGS_stderrthreshold.
+ // Sink logging: off.
+ initial_stderr_severity = FLAGS_stderrthreshold;
+
+ // Ignore SIGPIPE early in the startup process so that threads writing to TLS
+ // sockets do not crash when writing to a closed socket. See KUDU-1910.
+ IgnoreSigPipe();
+
+ // For minidump support. Must be called before logging threads started.
+ CHECK_OK(BlockSigUSR1());
+
+ if (FLAGS_log_async) {
+ EnableAsyncLogging();
+ }
+
+ logging_initialized = true;
+}
+
+void InitGoogleLoggingSafeBasic(const char* arg) {
+ SpinLockHolder l(&logging_mutex);
+ if (logging_initialized) return;
+
+ google::InitGoogleLogging(arg);
+
+ // This also disables file-based logging.
+ google::LogToStderr();
+
+ // File logging: off.
+ // Stderr logging threshold: INFO.
+ // Sink logging: off.
+ initial_stderr_severity = google::INFO;
+ logging_initialized = true;
+}
+
+void RegisterLoggingCallback(const LoggingCallback& cb) {
+ SpinLockHolder l(&logging_mutex);
+ CHECK(logging_initialized);
+
+ if (registered_sink) {
+ LOG(WARNING) << "Cannot register logging callback: one already registered";
+ return;
+ }
+
+ // AddLogSink() claims to take ownership of the sink, but it doesn't
+ // really; it actually expects it to remain valid until
+ // google::ShutdownGoogleLogging() is called.
+ registered_sink = new SimpleSink(cb);
+ google::AddLogSink(registered_sink);
+
+ // Even when stderr logging is ostensibly off, it's still emitting
+ // ERROR-level stuff. This is the default.
+ google::SetStderrLogging(google::ERROR);
+
+ // File logging: yes, if InitGoogleLoggingSafe() was called earlier.
+ // Stderr logging threshold: ERROR.
+ // Sink logging: on.
+}
+
+void UnregisterLoggingCallback() {
+ SpinLockHolder l(&logging_mutex);
+ CHECK(logging_initialized);
+
+ if (!registered_sink) {
+ LOG(WARNING) << "Cannot unregister logging callback: none registered";
+ return;
+ }
+
+ UnregisterLoggingCallbackUnlocked();
+ // File logging: yes, if InitGoogleLoggingSafe() was called earlier.
+ // Stderr logging threshold: initial_stderr_severity.
+ // Sink logging: off.
+}
+
+void GetFullLogFilename(google::LogSeverity severity, string* filename) {
+ ostringstream ss;
+ ss << FLAGS_log_dir << "/" << FLAGS_log_filename << "."
+ << google::GetLogSeverityName(severity);
+ *filename = ss.str();
+}
+
+std::string FormatTimestampForLog(MicrosecondsInt64 micros_since_epoch) {
+ time_t secs_since_epoch = micros_since_epoch / 1000000;
+ int usecs = micros_since_epoch % 1000000;
+ struct tm tm_time;
+ localtime_r(&secs_since_epoch, &tm_time);
+
+ return StringPrintf("%02d%02d %02d:%02d:%02d.%06d",
+ 1 + tm_time.tm_mon,
+ tm_time.tm_mday,
+ tm_time.tm_hour,
+ tm_time.tm_min,
+ tm_time.tm_sec,
+ usecs);
+}
+
+void ShutdownLoggingSafe() {
+ SpinLockHolder l(&logging_mutex);
+ if (!logging_initialized) return;
+
+ if (registered_sink) {
+ UnregisterLoggingCallbackUnlocked();
+ }
+
+ google::ShutdownGoogleLogging();
+
+ logging_initialized = false;
+}
+
+Status DeleteExcessLogFiles(Env* env) {
+ int32_t max_log_files = FLAGS_max_log_files;
+ // Ignore bad input or disable log rotation.
+ if (max_log_files <= 0) return Status::OK();
+
+ for (int severity = 0; severity < google::NUM_SEVERITIES; ++severity) {
+ // Build glob pattern for input
+ // e.g. /var/log/kudu/kudu-master.*.INFO.*
+ string pattern = strings::Substitute("$0/$1.*.$2.*", FLAGS_log_dir, FLAGS_log_filename,
+ google::GetLogSeverityName(severity));
+
+ // Keep the 'max_log_files' most recent log files, as compared by
+ // modification time. Glog files contain a second-granularity timestamp in
+ // the name, so this could potentially use the filename sort order as
+ // guaranteed by glob, however this code has been adapted from Impala which
+ // uses mtime to determine which files to delete, and there haven't been any
+ // issues in production settings.
+ RETURN_NOT_OK(env_util::DeleteExcessFilesByPattern(env, pattern, max_log_files));
+ }
+ return Status::OK();
+}
+
+// Support for the special THROTTLE_MSG token in a log message stream.
+ostream& operator<<(ostream &os, const PRIVATE_ThrottleMsg& /*unused*/) {
+ using google::LogMessage;
+#ifdef DISABLE_RTTI
+ LogMessage::LogStream *log = static_cast<LogMessage::LogStream*>(&os);
+#else
+ LogMessage::LogStream *log = dynamic_cast<LogMessage::LogStream*>(&os);
+#endif
+ CHECK(log && log == log->self())
+ << "You must not use COUNTER with non-glog ostream";
+ int ctr = log->ctr();
+ if (ctr > 0) {
+ os << " [suppressed " << ctr << " similar messages]";
+ }
+ return os;
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/logging.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/logging.h b/be/src/kudu/util/logging.h
new file mode 100644
index 0000000..428dadc
--- /dev/null
+++ b/be/src/kudu/util/logging.h
@@ -0,0 +1,367 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_LOGGING_H
+#define KUDU_UTIL_LOGGING_H
+
+#include <iosfwd>
+#include <string>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/atomicops.h"
+#include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/walltime.h"
+#include "kudu/util/logging_callback.h"
+#include "kudu/util/status.h"
+
+////////////////////////////////////////////////////////////////////////////////
+// Redaction support
+////////////////////////////////////////////////////////////////////////////////
+
+// Disable redaction of user data while evaluating the expression 'expr'.
+// This may be used inline as an expression, such as:
+//
+// LOG(INFO) << KUDU_DISABLE_REDACTION(schema.DebugRow(my_row));
+//
+// or with a block:
+//
+// KUDU_DISABLE_REDACTION({
+// LOG(INFO) << schema.DebugRow(my_row);
+// });
+//
+// Redaction should be disabled in the following cases:
+//
+// 1) Outputting strings to a "secure" endpoint (for example an authenticated and authorized
+// web UI)
+//
+// 2) Using methods like schema.DebugRow(...) when the parameter is not in fact a user-provided
+// row, but instead some piece of metadata such as a partition boundary.
+#define KUDU_DISABLE_REDACTION(expr) ([&]() { \
+ kudu::ScopedDisableRedaction s; \
+ return (expr); \
+ })()
+
+// Evaluates to 'true' if the caller should redact any user data in the current scope.
+// Most callers should instead use KUDU_REDACT(...) defined below, but this can be useful
+// to short-circuit expensive logic.
+#define KUDU_SHOULD_REDACT() ((kudu::g_should_redact == kudu::RedactContext::ALL || \
+ kudu::g_should_redact == kudu::RedactContext::LOG) && kudu::tls_redact_user_data)
+
+// Either evaluate and return 'expr', or return the string "<redacted>", depending on whether
+// redaction is enabled in the current scope.
+#define KUDU_REDACT(expr) \
+ (KUDU_SHOULD_REDACT() ? kRedactionMessage : (expr))
+
+// Like the above, but with the additional condition that redaction will only
+// be performed if 'cond' must be true.
+#define KUDU_MAYBE_REDACT_IF(cond, expr) \
+ ((KUDU_SHOULD_REDACT() && (cond)) ? kudu::kRedactionMessage : (expr))
+
+////////////////////////////////////////
+// Redaction implementation details follow.
+////////////////////////////////////////
+
+namespace kudu {
+
+// Flag which allows redaction to be enabled or disabled for a thread context.
+// Defaults to enabling redaction, since it's the safer default with respect to
+// leaking user data, and it's easier to identify when data is over-redacted
+// than vice-versa.
+extern __thread bool tls_redact_user_data;
+
+// Redacted log messages are replaced with this constant.
+extern const char* const kRedactionMessage;
+
+enum class RedactContext {
+ ALL,
+ LOG,
+ NONE
+};
+
+// Flag to indicate which redaction context is enabled.
+extern kudu::RedactContext g_should_redact;
+
+class ScopedDisableRedaction {
+ public:
+ ScopedDisableRedaction()
+ : old_val_(tls_redact_user_data) {
+ tls_redact_user_data = false;
+ }
+
+ ~ScopedDisableRedaction() {
+ tls_redact_user_data = old_val_;
+ }
+
+ private:
+ bool old_val_;
+};
+
+} // namespace kudu
+
+////////////////////////////////////////////////////////////////////////////////
+// Throttled logging support
+////////////////////////////////////////////////////////////////////////////////
+
+// Logs a message throttled to appear at most once every 'n_secs' seconds to
+// the given severity.
+//
+// The log message may include the special token 'THROTTLE_MSG' which expands
+// to either an empty string or '[suppressed <n> similar messages]'.
+//
+// Example usage:
+// KLOG_EVERY_N_SECS(WARNING, 1) << "server is low on memory" << THROTTLE_MSG;
+//
+//
+// Advanced per-instance throttling
+// -----------------------------------
+// For cases where the throttling should be scoped to a given class instance,
+// you may define a logging::LogThrottler object and pass it to the
+// KLOG_EVERY_N_SECS_THROTTLER(...) macro. In addition, you must pass a "tag".
+// Only log messages with equal tags (by pointer equality) will be throttled.
+// For example:
+//
+// struct MyThing {
+// string name;
+// LogThrottler throttler;
+// };
+//
+// if (...) {
+// LOG_EVERY_N_SECS_THROTTLER(INFO, 1, my_thing->throttler, "coffee") <<
+// my_thing->name << " needs coffee!";
+// } else {
+// LOG_EVERY_N_SECS_THROTTLER(INFO, 1, my_thing->throttler, "wine") <<
+// my_thing->name << " needs wine!";
+// }
+//
+// In this example, the "coffee"-related message will be collapsed into other
+// such messages within the prior one second; however, if the state alternates
+// between the "coffee" message and the "wine" message, then each such alternation
+// will yield a message.
+
+#define KLOG_EVERY_N_SECS_THROTTLER(severity, n_secs, throttler, tag) \
+ int VARNAME_LINENUM(num_suppressed) = 0; \
+ if ((throttler).ShouldLog(n_secs, tag, &VARNAME_LINENUM(num_suppressed))) \
+ google::LogMessage( \
+ __FILE__, __LINE__, google::GLOG_ ## severity, VARNAME_LINENUM(num_suppressed), \
+ &google::LogMessage::SendToLog).stream()
+
+#define KLOG_EVERY_N_SECS(severity, n_secs) \
+ static logging::LogThrottler LOG_THROTTLER; \
+ KLOG_EVERY_N_SECS_THROTTLER(severity, n_secs, LOG_THROTTLER, "no-tag")
+
+
+namespace kudu {
+enum PRIVATE_ThrottleMsg {THROTTLE_MSG};
+} // namespace kudu
+
+////////////////////////////////////////////////////////////////////////////////
+// Versions of glog macros for "LOG_EVERY" and "LOG_FIRST" that annotate the
+// benign races on their internal static variables.
+////////////////////////////////////////////////////////////////////////////////
+
+// The "base" macros.
+#define KUDU_SOME_KIND_OF_LOG_EVERY_N(severity, n, what_to_do) \
+ static int LOG_OCCURRENCES = 0, LOG_OCCURRENCES_MOD_N = 0; \
+ ANNOTATE_BENIGN_RACE(&LOG_OCCURRENCES, "Logging every N is approximate"); \
+ ANNOTATE_BENIGN_RACE(&LOG_OCCURRENCES_MOD_N, "Logging every N is approximate"); \
+ ++LOG_OCCURRENCES; \
+ if (++LOG_OCCURRENCES_MOD_N > n) LOG_OCCURRENCES_MOD_N -= n; \
+ if (LOG_OCCURRENCES_MOD_N == 1) \
+ google::LogMessage( \
+ __FILE__, __LINE__, google::GLOG_ ## severity, LOG_OCCURRENCES, \
+ &what_to_do).stream()
+
+#define KUDU_SOME_KIND_OF_LOG_IF_EVERY_N(severity, condition, n, what_to_do) \
+ static int LOG_OCCURRENCES = 0, LOG_OCCURRENCES_MOD_N = 0; \
+ ANNOTATE_BENIGN_RACE(&LOG_OCCURRENCES, "Logging every N is approximate"); \
+ ANNOTATE_BENIGN_RACE(&LOG_OCCURRENCES_MOD_N, "Logging every N is approximate"); \
+ ++LOG_OCCURRENCES; \
+ if (condition && \
+ ((LOG_OCCURRENCES_MOD_N=(LOG_OCCURRENCES_MOD_N + 1) % n) == (1 % n))) \
+ google::LogMessage( \
+ __FILE__, __LINE__, google::GLOG_ ## severity, LOG_OCCURRENCES, \
+ &what_to_do).stream()
+
+#define KUDU_SOME_KIND_OF_PLOG_EVERY_N(severity, n, what_to_do) \
+ static int LOG_OCCURRENCES = 0, LOG_OCCURRENCES_MOD_N = 0; \
+ ANNOTATE_BENIGN_RACE(&LOG_OCCURRENCES, "Logging every N is approximate"); \
+ ANNOTATE_BENIGN_RACE(&LOG_OCCURRENCES_MOD_N, "Logging every N is approximate"); \
+ ++LOG_OCCURRENCES; \
+ if (++LOG_OCCURRENCES_MOD_N > n) LOG_OCCURRENCES_MOD_N -= n; \
+ if (LOG_OCCURRENCES_MOD_N == 1) \
+ google::ErrnoLogMessage( \
+ __FILE__, __LINE__, google::GLOG_ ## severity, LOG_OCCURRENCES, \
+ &what_to_do).stream()
+
+#define KUDU_SOME_KIND_OF_LOG_FIRST_N(severity, n, what_to_do) \
+ static uint64_t LOG_OCCURRENCES = 0; \
+ ANNOTATE_BENIGN_RACE(&LOG_OCCURRENCES, "Logging the first N is approximate"); \
+ if (LOG_OCCURRENCES++ < n) \
+ google::LogMessage( \
+ __FILE__, __LINE__, google::GLOG_ ## severity, LOG_OCCURRENCES, \
+ &what_to_do).stream()
+
+// The direct user-facing macros.
+#define KLOG_EVERY_N(severity, n) \
+ GOOGLE_GLOG_COMPILE_ASSERT(google::GLOG_ ## severity < \
+ google::NUM_SEVERITIES, \
+ INVALID_REQUESTED_LOG_SEVERITY); \
+ KUDU_SOME_KIND_OF_LOG_EVERY_N(severity, (n), google::LogMessage::SendToLog)
+
+#define KSYSLOG_EVERY_N(severity, n) \
+ KUDU_SOME_KIND_OF_LOG_EVERY_N(severity, (n), google::LogMessage::SendToSyslogAndLog)
+
+#define KPLOG_EVERY_N(severity, n) \
+ KUDU_SOME_KIND_OF_PLOG_EVERY_N(severity, (n), google::LogMessage::SendToLog)
+
+#define KLOG_FIRST_N(severity, n) \
+ KUDU_SOME_KIND_OF_LOG_FIRST_N(severity, (n), google::LogMessage::SendToLog)
+
+#define KLOG_IF_EVERY_N(severity, condition, n) \
+ KUDU_SOME_KIND_OF_LOG_IF_EVERY_N(severity, (condition), (n), google::LogMessage::SendToLog)
+
+// We also disable the un-annotated glog macros for anyone who includes this header.
+#undef LOG_EVERY_N
+#define LOG_EVERY_N(severity, n) \
+ GOOGLE_GLOG_COMPILE_ASSERT(false, "LOG_EVERY_N is deprecated. Please use KLOG_EVERY_N.")
+
+#undef SYSLOG_EVERY_N
+#define SYSLOG_EVERY_N(severity, n) \
+ GOOGLE_GLOG_COMPILE_ASSERT(false, "SYSLOG_EVERY_N is deprecated. Please use KSYSLOG_EVERY_N.")
+
+#undef PLOG_EVERY_N
+#define PLOG_EVERY_N(severity, n) \
+ GOOGLE_GLOG_COMPILE_ASSERT(false, "PLOG_EVERY_N is deprecated. Please use KPLOG_EVERY_N.")
+
+#undef LOG_FIRST_N
+#define LOG_FIRST_N(severity, n) \
+ GOOGLE_GLOG_COMPILE_ASSERT(false, "LOG_FIRST_N is deprecated. Please use KLOG_FIRST_N.")
+
+#undef LOG_IF_EVERY_N
+#define LOG_IF_EVERY_N(severity, condition, n) \
+ GOOGLE_GLOG_COMPILE_ASSERT(false, "LOG_IF_EVERY_N is deprecated. Please use KLOG_IF_EVERY_N.")
+
+namespace kudu {
+
+class Env;
+
+// glog doesn't allow multiple invocations of InitGoogleLogging. This method conditionally
+// calls InitGoogleLogging only if it hasn't been called before.
+//
+// It also takes care of installing the google failure signal handler and
+// setting the signal handler for SIGPIPE to SIG_IGN.
+void InitGoogleLoggingSafe(const char* arg);
+
+// Like InitGoogleLoggingSafe() but stripped down: no signal handlers are
+// installed, regular logging is disabled, and log events of any severity
+// will be written to stderr.
+//
+// These properties make it attractive for us in libraries.
+void InitGoogleLoggingSafeBasic(const char* arg);
+
+// Demotes stderr logging to ERROR or higher and registers 'cb' as the
+// recipient for all log events.
+//
+// Subsequent calls to RegisterLoggingCallback no-op (until the callback
+// is unregistered with UnregisterLoggingCallback()).
+void RegisterLoggingCallback(const LoggingCallback& cb);
+
+// Unregisters a callback previously registered with
+// RegisterLoggingCallback() and promotes stderr logging back to all
+// severities.
+//
+// If no callback is registered, this is a no-op.
+void UnregisterLoggingCallback();
+
+// Returns the full pathname of the symlink to the most recent log
+// file corresponding to this severity
+void GetFullLogFilename(google::LogSeverity severity, std::string* filename);
+
+// Format a timestamp in the same format as used by GLog.
+std::string FormatTimestampForLog(MicrosecondsInt64 micros_since_epoch);
+
+// Shuts down the google logging library. Call before exit to ensure that log files are
+// flushed.
+void ShutdownLoggingSafe();
+
+// Deletes excess rotated log files.
+//
+// Keeps at most 'FLAG_max_log_files' of the most recent log files at every
+// severity level, using the file's modified time to determine recency.
+Status DeleteExcessLogFiles(Env* env);
+
+namespace logging {
+
+// A LogThrottler instance tracks the throttling state for a particular
+// log message.
+//
+// This is used internally by KLOG_EVERY_N_SECS, but can also be used
+// explicitly in conjunction with KLOG_EVERY_N_SECS_THROTTLER. See the
+// macro descriptions above for details.
+class LogThrottler {
+ public:
+ LogThrottler() : num_suppressed_(0), last_ts_(0), last_tag_(nullptr) {
+ ANNOTATE_BENIGN_RACE_SIZED(this, sizeof(*this), "OK to be sloppy with log throttling");
+ }
+
+ bool ShouldLog(int n_secs, const char* tag, int* num_suppressed) {
+ MicrosecondsInt64 ts = GetMonoTimeMicros();
+
+ // When we switch tags, we should not show the "suppressed" messages, because
+ // in fact it's a different message that we skipped. So, reset it to zero,
+ // and always log the new message.
+ if (tag != last_tag_) {
+ *num_suppressed = num_suppressed_ = 0;
+ last_tag_ = tag;
+ last_ts_ = ts;
+ return true;
+ }
+
+ if (ts - last_ts_ < n_secs * 1000000) {
+ *num_suppressed = base::subtle::NoBarrier_AtomicIncrement(&num_suppressed_, 1);
+ return false;
+ }
+ last_ts_ = ts;
+ *num_suppressed = base::subtle::NoBarrier_AtomicExchange(&num_suppressed_, 0);
+ return true;
+ }
+ private:
+ Atomic32 num_suppressed_;
+ MicrosecondsInt64 last_ts_;
+ const char* last_tag_;
+};
+} // namespace logging
+
+std::ostream& operator<<(std::ostream &os, const PRIVATE_ThrottleMsg&);
+
+// Convenience macros to prefix log messages with some prefix, these are the unlocked
+// versions and should not obtain a lock (if one is required to obtain the prefix).
+// There must be a LogPrefixUnlocked()/LogPrefixLocked() method available in the current
+// scope in order to use these macros.
+#define LOG_WITH_PREFIX_UNLOCKED(severity) LOG(severity) << LogPrefixUnlocked()
+#define VLOG_WITH_PREFIX_UNLOCKED(verboselevel) LOG_IF(INFO, VLOG_IS_ON(verboselevel)) \
+ << LogPrefixUnlocked()
+
+// Same as the above, but obtain the lock.
+#define LOG_WITH_PREFIX(severity) LOG(severity) << LogPrefix()
+#define VLOG_WITH_PREFIX(verboselevel) LOG_IF(INFO, VLOG_IS_ON(verboselevel)) \
+ << LogPrefix()
+
+} // namespace kudu
+
+#endif // KUDU_UTIL_LOGGING_H
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/logging_callback.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/logging_callback.h b/be/src/kudu/util/logging_callback.h
new file mode 100644
index 0000000..83fb973
--- /dev/null
+++ b/be/src/kudu/util/logging_callback.h
@@ -0,0 +1,46 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_LOGGING_CALLBACK_H
+#define KUDU_UTIL_LOGGING_CALLBACK_H
+
+#include <ctime>
+#include <string>
+
+#include "kudu/gutil/callback_forward.h"
+
+namespace kudu {
+
+enum LogSeverity {
+ SEVERITY_INFO,
+ SEVERITY_WARNING,
+ SEVERITY_ERROR,
+ SEVERITY_FATAL
+};
+
+// Callback for simple logging.
+//
+// 'message' is NOT terminated with an endline.
+typedef Callback<void(LogSeverity severity,
+ const char* filename,
+ int line_number,
+ const struct ::tm* time,
+ const char* message,
+ size_t message_len)> LoggingCallback;
+
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/logging_test_util.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/logging_test_util.h b/be/src/kudu/util/logging_test_util.h
new file mode 100644
index 0000000..8102375
--- /dev/null
+++ b/be/src/kudu/util/logging_test_util.h
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef KUDU_LOGGING_TEST_UTIL_H
+#define KUDU_LOGGING_TEST_UTIL_H
+
+#include <glog/logging.h>
+#include <string>
+#include <vector>
+
+namespace kudu {
+
+// GLog sink that keeps an internal buffer of messages that have been logged.
+class StringVectorSink : public google::LogSink {
+ public:
+ void send(google::LogSeverity severity, const char* full_filename,
+ const char* base_filename, int line,
+ const struct ::tm* tm_time,
+ const char* message, size_t message_len) override {
+ logged_msgs_.push_back(ToString(severity, base_filename, line,
+ tm_time, message, message_len));
+ }
+
+ std::vector<std::string>& logged_msgs() {
+ return logged_msgs_;
+ }
+
+ private:
+ std::vector<std::string> logged_msgs_;
+};
+
+// RAII wrapper around registering a LogSink with GLog.
+struct ScopedRegisterSink {
+ explicit ScopedRegisterSink(google::LogSink* s) : s_(s) {
+ google::AddLogSink(s_);
+ }
+ ~ScopedRegisterSink() {
+ google::RemoveLogSink(s_);
+ }
+
+ google::LogSink* s_;
+};
+
+} // namespace kudu
+
+#endif
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/maintenance_manager-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/maintenance_manager-test.cc b/be/src/kudu/util/maintenance_manager-test.cc
new file mode 100644
index 0000000..6777e06
--- /dev/null
+++ b/be/src/kudu/util/maintenance_manager-test.cc
@@ -0,0 +1,369 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <atomic>
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <ostream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <boost/bind.hpp> // IWYU pragma: keep
+#include <gflags/gflags_declare.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/maintenance_manager.h"
+#include "kudu/util/maintenance_manager.pb.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+#include "kudu/util/thread.h"
+
+using std::shared_ptr;
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+METRIC_DEFINE_entity(test);
+METRIC_DEFINE_gauge_uint32(test, maintenance_ops_running,
+ "Number of Maintenance Operations Running",
+ kudu::MetricUnit::kMaintenanceOperations,
+ "The number of background maintenance operations currently running.");
+METRIC_DEFINE_histogram(test, maintenance_op_duration,
+ "Maintenance Operation Duration",
+ kudu::MetricUnit::kSeconds, "", 60000000LU, 2);
+
+DECLARE_int64(log_target_replay_size_mb);
+
+namespace kudu {
+
+static const int kHistorySize = 4;
+static const char kFakeUuid[] = "12345";
+
+class MaintenanceManagerTest : public KuduTest {
+ public:
+ void SetUp() override {
+ MaintenanceManager::Options options;
+ options.num_threads = 2;
+ options.polling_interval_ms = 1;
+ options.history_size = kHistorySize;
+ manager_.reset(new MaintenanceManager(options, kFakeUuid));
+ manager_->set_memory_pressure_func_for_tests(
+ [&](double* consumption) {
+ return indicate_memory_pressure_.load();
+ });
+ ASSERT_OK(manager_->Start());
+ }
+
+ void TearDown() override {
+ manager_->Shutdown();
+ }
+
+ protected:
+ shared_ptr<MaintenanceManager> manager_;
+ std::atomic<bool> indicate_memory_pressure_ { false };
+};
+
+// Just create the MaintenanceManager and then shut it down, to make sure
+// there are no race conditions there.
+TEST_F(MaintenanceManagerTest, TestCreateAndShutdown) {
+}
+
+class TestMaintenanceOp : public MaintenanceOp {
+ public:
+ TestMaintenanceOp(const std::string& name,
+ IOUsage io_usage)
+ : MaintenanceOp(name, io_usage),
+ ram_anchored_(500),
+ logs_retained_bytes_(0),
+ perf_improvement_(0),
+ metric_entity_(METRIC_ENTITY_test.Instantiate(&metric_registry_, "test")),
+ maintenance_op_duration_(METRIC_maintenance_op_duration.Instantiate(metric_entity_)),
+ maintenance_ops_running_(METRIC_maintenance_ops_running.Instantiate(metric_entity_, 0)),
+ remaining_runs_(1),
+ prepared_runs_(0),
+ sleep_time_(MonoDelta::FromSeconds(0)) {
+ }
+
+ virtual ~TestMaintenanceOp() {}
+
+ virtual bool Prepare() OVERRIDE {
+ std::lock_guard<Mutex> guard(lock_);
+ if (remaining_runs_ == 0) {
+ return false;
+ }
+ remaining_runs_--;
+ prepared_runs_++;
+ DLOG(INFO) << "Prepared op " << name();
+ return true;
+ }
+
+ virtual void Perform() OVERRIDE {
+ {
+ std::lock_guard<Mutex> guard(lock_);
+ DLOG(INFO) << "Performing op " << name();
+
+ // Ensure that we don't call Perform() more times than we returned
+ // success from Prepare().
+ CHECK_GE(prepared_runs_, 1);
+ prepared_runs_--;
+ }
+
+ SleepFor(sleep_time_);
+ }
+
+ virtual void UpdateStats(MaintenanceOpStats* stats) OVERRIDE {
+ std::lock_guard<Mutex> guard(lock_);
+ stats->set_runnable(remaining_runs_ > 0);
+ stats->set_ram_anchored(ram_anchored_);
+ stats->set_logs_retained_bytes(logs_retained_bytes_);
+ stats->set_perf_improvement(perf_improvement_);
+ }
+
+ void set_remaining_runs(int runs) {
+ std::lock_guard<Mutex> guard(lock_);
+ remaining_runs_ = runs;
+ }
+
+ void set_sleep_time(MonoDelta time) {
+ std::lock_guard<Mutex> guard(lock_);
+ sleep_time_ = time;
+ }
+
+ void set_ram_anchored(uint64_t ram_anchored) {
+ std::lock_guard<Mutex> guard(lock_);
+ ram_anchored_ = ram_anchored;
+ }
+
+ void set_logs_retained_bytes(uint64_t logs_retained_bytes) {
+ std::lock_guard<Mutex> guard(lock_);
+ logs_retained_bytes_ = logs_retained_bytes;
+ }
+
+ void set_perf_improvement(uint64_t perf_improvement) {
+ std::lock_guard<Mutex> guard(lock_);
+ perf_improvement_ = perf_improvement;
+ }
+
+ virtual scoped_refptr<Histogram> DurationHistogram() const OVERRIDE {
+ return maintenance_op_duration_;
+ }
+
+ virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const OVERRIDE {
+ return maintenance_ops_running_;
+ }
+
+ private:
+ Mutex lock_;
+
+ uint64_t ram_anchored_;
+ uint64_t logs_retained_bytes_;
+ uint64_t perf_improvement_;
+ MetricRegistry metric_registry_;
+ scoped_refptr<MetricEntity> metric_entity_;
+ scoped_refptr<Histogram> maintenance_op_duration_;
+ scoped_refptr<AtomicGauge<uint32_t> > maintenance_ops_running_;
+
+ // The number of remaining times this operation will run before disabling
+ // itself.
+ int remaining_runs_;
+ // The number of Prepared() operations which have not yet been Perform()ed.
+ int prepared_runs_;
+
+ // The amount of time each op invocation will sleep.
+ MonoDelta sleep_time_;
+};
+
+// Create an op and wait for it to start running. Unregister it while it is
+// running and verify that UnregisterOp waits for it to finish before
+// proceeding.
+TEST_F(MaintenanceManagerTest, TestRegisterUnregister) {
+ TestMaintenanceOp op1("1", MaintenanceOp::HIGH_IO_USAGE);
+ op1.set_perf_improvement(10);
+ // Register initially with no remaining runs. We'll later enable it once it's
+ // already registered.
+ op1.set_remaining_runs(0);
+ manager_->RegisterOp(&op1);
+ scoped_refptr<kudu::Thread> thread;
+ CHECK_OK(Thread::Create(
+ "TestThread", "TestRegisterUnregister",
+ boost::bind(&TestMaintenanceOp::set_remaining_runs, &op1, 1), &thread));
+ ASSERT_EVENTUALLY([&]() {
+ ASSERT_EQ(op1.DurationHistogram()->TotalCount(), 1);
+ });
+ manager_->UnregisterOp(&op1);
+ ThreadJoiner(thread.get()).Join();
+}
+
+// Regression test for KUDU-1495: when an operation is being unregistered,
+// new instances of that operation should not be scheduled.
+TEST_F(MaintenanceManagerTest, TestNewOpsDontGetScheduledDuringUnregister) {
+ TestMaintenanceOp op1("1", MaintenanceOp::HIGH_IO_USAGE);
+ op1.set_perf_improvement(10);
+
+ // Set the op to run up to 10 times, and each time should sleep for a second.
+ op1.set_remaining_runs(10);
+ op1.set_sleep_time(MonoDelta::FromSeconds(1));
+ manager_->RegisterOp(&op1);
+
+ // Wait until two instances of the ops start running, since we have two MM threads.
+ ASSERT_EVENTUALLY([&]() {
+ ASSERT_EQ(op1.RunningGauge()->value(), 2);
+ });
+
+ // Trigger Unregister while they are running. This should wait for the currently-
+ // running operations to complete, but no new operations should be scheduled.
+ manager_->UnregisterOp(&op1);
+
+ // Hence, we should have run only the original two that we saw above.
+ ASSERT_LE(op1.DurationHistogram()->TotalCount(), 2);
+}
+
+// Test that we'll run an operation that doesn't improve performance when memory
+// pressure gets high.
+TEST_F(MaintenanceManagerTest, TestMemoryPressure) {
+ TestMaintenanceOp op("op", MaintenanceOp::HIGH_IO_USAGE);
+ op.set_ram_anchored(100);
+ manager_->RegisterOp(&op);
+
+ // At first, we don't want to run this, since there is no perf_improvement.
+ SleepFor(MonoDelta::FromMilliseconds(20));
+ ASSERT_EQ(0, op.DurationHistogram()->TotalCount());
+
+ // Fake that the server is under memory pressure.
+ indicate_memory_pressure_ = true;
+
+ ASSERT_EVENTUALLY([&]() {
+ ASSERT_EQ(op.DurationHistogram()->TotalCount(), 1);
+ });
+ manager_->UnregisterOp(&op);
+}
+
+// Test that ops are prioritized correctly when we add log retention.
+TEST_F(MaintenanceManagerTest, TestLogRetentionPrioritization) {
+ const int64_t kMB = 1024 * 1024;
+
+ manager_->Shutdown();
+
+ TestMaintenanceOp op1("op1", MaintenanceOp::LOW_IO_USAGE);
+ op1.set_ram_anchored(0);
+ op1.set_logs_retained_bytes(100 * kMB);
+
+ TestMaintenanceOp op2("op2", MaintenanceOp::HIGH_IO_USAGE);
+ op2.set_ram_anchored(100);
+ op2.set_logs_retained_bytes(100 * kMB);
+
+ TestMaintenanceOp op3("op3", MaintenanceOp::HIGH_IO_USAGE);
+ op3.set_ram_anchored(200);
+ op3.set_logs_retained_bytes(100 * kMB);
+
+ manager_->RegisterOp(&op1);
+ manager_->RegisterOp(&op2);
+ manager_->RegisterOp(&op3);
+
+ // We want to do the low IO op first since it clears up some log retention.
+ auto op_and_why = manager_->FindBestOp();
+ ASSERT_EQ(&op1, op_and_why.first);
+ EXPECT_EQ(op_and_why.second, "free 104857600 bytes of WAL");
+
+ manager_->UnregisterOp(&op1);
+
+ // Low IO is taken care of, now we find the op that clears the most log retention and ram.
+ // However, with the default settings, we won't bother running any of these operations
+ // which only retain 100MB of logs.
+ op_and_why = manager_->FindBestOp();
+ ASSERT_EQ(nullptr, op_and_why.first);
+ EXPECT_EQ(op_and_why.second, "no ops with positive improvement");
+
+ // If we change the target WAL size, we will select these ops.
+ FLAGS_log_target_replay_size_mb = 50;
+ op_and_why = manager_->FindBestOp();
+ ASSERT_EQ(&op3, op_and_why.first);
+ EXPECT_EQ(op_and_why.second, "104857600 bytes log retention");
+
+ manager_->UnregisterOp(&op3);
+
+ op_and_why = manager_->FindBestOp();
+ ASSERT_EQ(&op2, op_and_why.first);
+ EXPECT_EQ(op_and_why.second, "104857600 bytes log retention");
+
+ manager_->UnregisterOp(&op2);
+}
+
+// Test retrieving a list of an op's running instances
+TEST_F(MaintenanceManagerTest, TestRunningInstances) {
+ TestMaintenanceOp op("op", MaintenanceOp::HIGH_IO_USAGE);
+ op.set_perf_improvement(10);
+ op.set_remaining_runs(2);
+ op.set_sleep_time(MonoDelta::FromSeconds(1));
+ manager_->RegisterOp(&op);
+
+ // Check that running instances are added to the maintenance manager's collection,
+ // and fields are getting filled.
+ ASSERT_EVENTUALLY([&]() {
+ MaintenanceManagerStatusPB status_pb;
+ manager_->GetMaintenanceManagerStatusDump(&status_pb);
+ ASSERT_EQ(status_pb.running_operations_size(), 2);
+ const MaintenanceManagerStatusPB_OpInstancePB& instance1 = status_pb.running_operations(0);
+ const MaintenanceManagerStatusPB_OpInstancePB& instance2 = status_pb.running_operations(1);
+ ASSERT_EQ(instance1.name(), op.name());
+ ASSERT_NE(instance1.thread_id(), instance2.thread_id());
+ });
+
+ // Wait for instances to complete.
+ manager_->UnregisterOp(&op);
+
+ // Check that running instances are removed from collection after completion.
+ MaintenanceManagerStatusPB status_pb;
+ manager_->GetMaintenanceManagerStatusDump(&status_pb);
+ ASSERT_EQ(status_pb.running_operations_size(), 0);
+}
+// Test adding operations and make sure that the history of recently completed operations
+// is correct in that it wraps around and doesn't grow.
+TEST_F(MaintenanceManagerTest, TestCompletedOpsHistory) {
+ for (int i = 0; i < 5; i++) {
+ string name = Substitute("op$0", i);
+ TestMaintenanceOp op(name, MaintenanceOp::HIGH_IO_USAGE);
+ op.set_perf_improvement(1);
+ op.set_ram_anchored(100);
+ manager_->RegisterOp(&op);
+
+ ASSERT_EVENTUALLY([&]() {
+ ASSERT_EQ(op.DurationHistogram()->TotalCount(), 1);
+ });
+ manager_->UnregisterOp(&op);
+
+ MaintenanceManagerStatusPB status_pb;
+ manager_->GetMaintenanceManagerStatusDump(&status_pb);
+ // The size should be at most the history_size.
+ ASSERT_GE(kHistorySize, status_pb.completed_operations_size());
+ // The most recently completed op should always be first, even if we wrap
+ // around.
+ ASSERT_EQ(name, status_pb.completed_operations(0).name());
+ }
+}
+
+} // namespace kudu