You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@impala.apache.org by ta...@apache.org on 2018/07/13 06:03:28 UTC
[16/51] [abbrv] impala git commit: IMPALA-7006: Add KRPC folders from
kudu@334ecafd
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/maintenance_manager.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/maintenance_manager.cc b/be/src/kudu/util/maintenance_manager.cc
new file mode 100644
index 0000000..9a42464
--- /dev/null
+++ b/be/src/kudu/util/maintenance_manager.cc
@@ -0,0 +1,550 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/maintenance_manager.h"
+
+#include <cinttypes>
+#include <cstddef>
+#include <cstdint>
+#include <memory>
+#include <mutex>
+#include <sstream>
+#include <string>
+#include <type_traits>
+#include <utility>
+
+#include <boost/bind.hpp>
+#include <gflags/gflags.h>
+
+#include "kudu/gutil/dynamic_annotations.h"
+#include "kudu/gutil/map-util.h"
+#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/debug/trace_event.h"
+#include "kudu/util/debug/trace_logging.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/logging.h"
+#include "kudu/util/maintenance_manager.pb.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/process_memory.h"
+#include "kudu/util/random_util.h"
+#include "kudu/util/scoped_cleanup.h"
+#include "kudu/util/stopwatch.h"
+#include "kudu/util/thread.h"
+#include "kudu/util/threadpool.h"
+#include "kudu/util/trace.h"
+
+using std::pair;
+using std::string;
+using strings::Substitute;
+
+DEFINE_int32(maintenance_manager_num_threads, 1,
+ "Size of the maintenance manager thread pool. "
+ "For spinning disks, the number of threads should "
+ "not be above the number of devices.");
+TAG_FLAG(maintenance_manager_num_threads, stable);
+
+DEFINE_int32(maintenance_manager_polling_interval_ms, 250,
+ "Polling interval for the maintenance manager scheduler, "
+ "in milliseconds.");
+TAG_FLAG(maintenance_manager_polling_interval_ms, hidden);
+
+DEFINE_int32(maintenance_manager_history_size, 8,
+ "Number of completed operations the manager is keeping track of.");
+TAG_FLAG(maintenance_manager_history_size, hidden);
+
+DEFINE_bool(enable_maintenance_manager, true,
+ "Enable the maintenance manager, runs compaction and tablet cleaning tasks.");
+TAG_FLAG(enable_maintenance_manager, unsafe);
+
+DEFINE_int64(log_target_replay_size_mb, 1024,
+ "The target maximum size of logs to be replayed at startup. If a tablet "
+ "has in-memory operations that are causing more than this size of logs "
+ "to be retained, then the maintenance manager will prioritize flushing "
+ "these operations to disk.");
+TAG_FLAG(log_target_replay_size_mb, experimental);
+
+DEFINE_int64(data_gc_min_size_mb, 0,
+ "The (exclusive) minimum number of megabytes of ancient data on "
+ "disk, per tablet, needed to prioritize deletion of that data.");
+TAG_FLAG(data_gc_min_size_mb, experimental);
+
+DEFINE_double(data_gc_prioritization_prob, 0.5,
+ "The probability that we will prioritize data GC over performance "
+ "improvement operations. If set to 1.0, we will always prefer to "
+ "delete old data before running performance improvement operations "
+ "such as delta compaction.");
+TAG_FLAG(data_gc_prioritization_prob, experimental);
+
+namespace kudu {
+
+MaintenanceOpStats::MaintenanceOpStats() {
+ Clear();
+}
+
+void MaintenanceOpStats::Clear() {
+ valid_ = false;
+ runnable_ = false;
+ ram_anchored_ = 0;
+ logs_retained_bytes_ = 0;
+ data_retained_bytes_ = 0;
+ perf_improvement_ = 0;
+ last_modified_ = MonoTime();
+}
+
+MaintenanceOp::MaintenanceOp(std::string name, IOUsage io_usage)
+ : name_(std::move(name)),
+ running_(0),
+ cancel_(false),
+ io_usage_(io_usage) {
+}
+
+MaintenanceOp::~MaintenanceOp() {
+ CHECK(!manager_.get()) << "You must unregister the " << name_
+ << " Op before destroying it.";
+}
+
+void MaintenanceOp::Unregister() {
+ CHECK(manager_.get()) << "Op " << name_ << " was never registered.";
+ manager_->UnregisterOp(this);
+}
+
+MaintenanceManagerStatusPB_OpInstancePB OpInstance::DumpToPB() const {
+ MaintenanceManagerStatusPB_OpInstancePB pb;
+ pb.set_thread_id(thread_id);
+ pb.set_name(name);
+ if (duration.Initialized()) {
+ pb.set_duration_millis(duration.ToMilliseconds());
+ }
+ MonoDelta delta(MonoTime::Now() - start_mono_time);
+ pb.set_millis_since_start(delta.ToMilliseconds());
+ return pb;
+}
+
+const MaintenanceManager::Options MaintenanceManager::kDefaultOptions = {
+ .num_threads = 0,
+ .polling_interval_ms = 0,
+ .history_size = 0,
+};
+
+MaintenanceManager::MaintenanceManager(const Options& options,
+ std::string server_uuid)
+ : server_uuid_(std::move(server_uuid)),
+ num_threads_(options.num_threads <= 0 ?
+ FLAGS_maintenance_manager_num_threads : options.num_threads),
+ cond_(&lock_),
+ shutdown_(false),
+ polling_interval_ms_(options.polling_interval_ms <= 0 ?
+ FLAGS_maintenance_manager_polling_interval_ms :
+ options.polling_interval_ms),
+ running_ops_(0),
+ completed_ops_count_(0),
+ rand_(GetRandomSeed32()),
+ memory_pressure_func_(&process_memory::UnderMemoryPressure) {
+ CHECK_OK(ThreadPoolBuilder("MaintenanceMgr").set_min_threads(num_threads_)
+ .set_max_threads(num_threads_).Build(&thread_pool_));
+ uint32_t history_size = options.history_size == 0 ?
+ FLAGS_maintenance_manager_history_size :
+ options.history_size;
+ completed_ops_.resize(history_size);
+}
+
+MaintenanceManager::~MaintenanceManager() {
+ Shutdown();
+}
+
+Status MaintenanceManager::Start() {
+ CHECK(!monitor_thread_);
+ RETURN_NOT_OK(Thread::Create("maintenance", "maintenance_scheduler",
+ boost::bind(&MaintenanceManager::RunSchedulerThread, this),
+ &monitor_thread_));
+ return Status::OK();
+}
+
+void MaintenanceManager::Shutdown() {
+ {
+ std::lock_guard<Mutex> guard(lock_);
+ if (shutdown_) {
+ return;
+ }
+ shutdown_ = true;
+ cond_.Broadcast();
+ }
+ if (monitor_thread_.get()) {
+ CHECK_OK(ThreadJoiner(monitor_thread_.get()).Join());
+ monitor_thread_.reset();
+ // Wait for all the running and queued tasks before shutting down. Otherwise,
+ // Shutdown() can remove a queued task silently. We count on eventually running the
+ // queued tasks to decrement their "running" count, which is incremented at the time
+ // they are enqueued.
+ thread_pool_->Wait();
+ thread_pool_->Shutdown();
+ }
+}
+
+void MaintenanceManager::RegisterOp(MaintenanceOp* op) {
+ CHECK(op);
+ std::lock_guard<Mutex> guard(lock_);
+ CHECK(!op->manager_) << "Tried to register " << op->name()
+ << ", but it was already registered.";
+ pair<OpMapTy::iterator, bool> val
+ (ops_.insert(OpMapTy::value_type(op, MaintenanceOpStats())));
+ CHECK(val.second)
+ << "Tried to register " << op->name()
+ << ", but it already exists in ops_.";
+ op->manager_ = shared_from_this();
+ op->cond_.reset(new ConditionVariable(&lock_));
+ VLOG_AND_TRACE("maintenance", 1) << LogPrefix() << "Registered " << op->name();
+}
+
+void MaintenanceManager::UnregisterOp(MaintenanceOp* op) {
+ {
+ std::lock_guard<Mutex> guard(lock_);
+ CHECK(op->manager_.get() == this) << "Tried to unregister " << op->name()
+ << ", but it is not currently registered with this maintenance manager.";
+ auto iter = ops_.find(op);
+ CHECK(iter != ops_.end()) << "Tried to unregister " << op->name()
+ << ", but it was never registered";
+ // While the op is running, wait for it to be finished.
+ if (iter->first->running_ > 0) {
+ VLOG_AND_TRACE("maintenance", 1) << LogPrefix() << "Waiting for op " << op->name()
+ << " to finish so we can unregister it.";
+ }
+ op->CancelAndDisable();
+ while (iter->first->running_ > 0) {
+ op->cond_->Wait();
+ iter = ops_.find(op);
+ CHECK(iter != ops_.end()) << "Tried to unregister " << op->name()
+ << ", but another thread unregistered it while we were "
+ << "waiting for it to complete";
+ }
+ ops_.erase(iter);
+ }
+ LOG_WITH_PREFIX(INFO) << "Unregistered op " << op->name();
+ op->cond_.reset();
+ // Remove the op's shared_ptr reference to us. This might 'delete this'.
+ op->manager_.reset();
+}
+
+bool MaintenanceManager::disabled_for_tests() const {
+ return !ANNOTATE_UNPROTECTED_READ(FLAGS_enable_maintenance_manager);
+}
+
+void MaintenanceManager::RunSchedulerThread() {
+ if (!FLAGS_enable_maintenance_manager) {
+ LOG(INFO) << "Maintenance manager is disabled. Stopping thread.";
+ return;
+ }
+
+ MonoDelta polling_interval = MonoDelta::FromMilliseconds(polling_interval_ms_);
+
+ std::unique_lock<Mutex> guard(lock_);
+
+ // Set to true if the scheduler runs and finds that there is no work to do.
+ bool prev_iter_found_no_work = false;
+
+ while (true) {
+ // We'll keep sleeping if:
+ // 1) there are no free threads available to perform a maintenance op.
+ // or 2) we just tried to schedule an op but found nothing to run.
+ // However, if it's time to shut down, we want to do so immediately.
+ while ((running_ops_ >= num_threads_ || prev_iter_found_no_work || disabled_for_tests()) &&
+ !shutdown_) {
+ cond_.WaitFor(polling_interval);
+ prev_iter_found_no_work = false;
+ }
+ if (shutdown_) {
+ VLOG_AND_TRACE("maintenance", 1) << LogPrefix() << "Shutting down maintenance manager.";
+ return;
+ }
+
+ // Find the best op.
+ pair<MaintenanceOp*, string> op_and_note = FindBestOp();
+ auto* op = op_and_note.first;
+ const auto& note = op_and_note.second;
+
+ // If we found no work to do, then we should sleep before trying again to schedule.
+ // Otherwise, we can go right into trying to find the next op.
+ prev_iter_found_no_work = (op == nullptr);
+ if (!op) {
+ VLOG_AND_TRACE("maintenance", 2) << LogPrefix()
+ << "No maintenance operations look worth doing.";
+ continue;
+ }
+
+ // Prepare the maintenance operation.
+ op->running_++;
+ running_ops_++;
+ guard.unlock();
+ bool ready = op->Prepare();
+ guard.lock();
+ if (!ready) {
+ LOG_WITH_PREFIX(INFO) << "Prepare failed for " << op->name()
+ << ". Re-running scheduler.";
+ op->running_--;
+ running_ops_--;
+ op->cond_->Signal();
+ continue;
+ }
+
+ LOG_AND_TRACE("maintenance", INFO) << LogPrefix() << "Scheduling "
+ << op->name() << ": " << note;
+ // Run the maintenance operation.
+ Status s = thread_pool_->SubmitFunc(boost::bind(
+ &MaintenanceManager::LaunchOp, this, op));
+ CHECK(s.ok());
+ }
+}
+
+// Finding the best operation goes through four filters:
+// - If there's an Op that we can run quickly that frees log retention, we run it.
+// - If we've hit the overall process memory limit (note: this includes memory that the Ops cannot
+// free), we run the Op with the highest RAM usage.
+// - If there are Ops that are retaining logs past our target replay size, we run the one that has
+// the highest retention (and if many qualify, then we run the one that also frees up the
+// most RAM).
+// - Finally, if there's nothing else that we really need to do, we run the Op that will improve
+// performance the most.
+//
+// The reason it's done this way is that we want to prioritize limiting the amount of resources we
+// hold on to. Low IO Ops go first since we can quickly run them, then we can look at memory usage.
+// Reversing those can starve the low IO Ops when the system is under intense memory pressure.
+//
+// In the third priority we're at a point where nothing's urgent and there's nothing we can run
+// quickly.
+// TODO We currently optimize for freeing log retention but we could consider having some sort of
+// sliding priority between log retention and RAM usage. For example, is an Op that frees
+// 128MB of log retention and 12MB of RAM always better than an op that frees 12MB of log retention
+// and 128MB of RAM? Maybe a more holistic approach would be better.
+pair<MaintenanceOp*, string> MaintenanceManager::FindBestOp() {
+ TRACE_EVENT0("maintenance", "MaintenanceManager::FindBestOp");
+
+ size_t free_threads = num_threads_ - running_ops_;
+ if (free_threads == 0) {
+ return {nullptr, "no free threads"};
+ }
+
+ int64_t low_io_most_logs_retained_bytes = 0;
+ MaintenanceOp* low_io_most_logs_retained_bytes_op = nullptr;
+
+ uint64_t most_mem_anchored = 0;
+ MaintenanceOp* most_mem_anchored_op = nullptr;
+
+ int64_t most_logs_retained_bytes = 0;
+ int64_t most_logs_retained_bytes_ram_anchored = 0;
+ MaintenanceOp* most_logs_retained_bytes_op = nullptr;
+
+ int64_t most_data_retained_bytes = 0;
+ MaintenanceOp* most_data_retained_bytes_op = nullptr;
+
+ double best_perf_improvement = 0;
+ MaintenanceOp* best_perf_improvement_op = nullptr;
+ for (OpMapTy::value_type &val : ops_) {
+ MaintenanceOp* op(val.first);
+ MaintenanceOpStats& stats(val.second);
+ VLOG_WITH_PREFIX(3) << "Considering MM op " << op->name();
+ // Update op stats.
+ stats.Clear();
+ op->UpdateStats(&stats);
+ if (op->cancelled() || !stats.valid() || !stats.runnable()) {
+ continue;
+ }
+ if (stats.logs_retained_bytes() > low_io_most_logs_retained_bytes &&
+ op->io_usage() == MaintenanceOp::LOW_IO_USAGE) {
+ low_io_most_logs_retained_bytes_op = op;
+ low_io_most_logs_retained_bytes = stats.logs_retained_bytes();
+ VLOG_AND_TRACE("maintenance", 2) << LogPrefix() << "Op " << op->name() << " can free "
+ << stats.logs_retained_bytes() << " bytes of logs";
+ }
+
+ if (stats.ram_anchored() > most_mem_anchored) {
+ most_mem_anchored_op = op;
+ most_mem_anchored = stats.ram_anchored();
+ }
+ // We prioritize ops that can free more logs, but when it's the same we pick the one that
+ // also frees up the most memory.
+ if (stats.logs_retained_bytes() > 0 &&
+ (stats.logs_retained_bytes() > most_logs_retained_bytes ||
+ (stats.logs_retained_bytes() == most_logs_retained_bytes &&
+ stats.ram_anchored() > most_logs_retained_bytes_ram_anchored))) {
+ most_logs_retained_bytes_op = op;
+ most_logs_retained_bytes = stats.logs_retained_bytes();
+ most_logs_retained_bytes_ram_anchored = stats.ram_anchored();
+ }
+
+ if (stats.data_retained_bytes() > most_data_retained_bytes) {
+ most_data_retained_bytes_op = op;
+ most_data_retained_bytes = stats.data_retained_bytes();
+ VLOG_AND_TRACE("maintenance", 2) << LogPrefix() << "Op " << op->name() << " can free "
+ << stats.data_retained_bytes() << " bytes of data";
+ }
+
+ if ((!best_perf_improvement_op) ||
+ (stats.perf_improvement() > best_perf_improvement)) {
+ best_perf_improvement_op = op;
+ best_perf_improvement = stats.perf_improvement();
+ }
+ }
+
+ // Look at ops that we can run quickly that free up log retention.
+ if (low_io_most_logs_retained_bytes_op) {
+ if (low_io_most_logs_retained_bytes > 0) {
+ string notes = Substitute("free $0 bytes of WAL", low_io_most_logs_retained_bytes);
+ return {low_io_most_logs_retained_bytes_op, std::move(notes)};
+ }
+ }
+
+ // Look at free memory. If it is dangerously low, we must select something
+ // that frees memory-- the op with the most anchored memory.
+ double capacity_pct;
+ if (memory_pressure_func_(&capacity_pct)) {
+ if (!most_mem_anchored_op) {
+ std::string msg = StringPrintf("System under memory pressure "
+ "(%.2f%% of limit used). However, there are no ops currently "
+ "runnable which would free memory.", capacity_pct);
+ LOG_WITH_PREFIX(INFO) << msg;
+ return {nullptr, msg};
+ }
+ string note = StringPrintf("under memory pressure (%.2f%% used, "
+ "can flush %" PRIu64 " bytes)",
+ capacity_pct, most_mem_anchored);
+ return {most_mem_anchored_op, std::move(note)};
+ }
+
+ if (most_logs_retained_bytes_op &&
+ most_logs_retained_bytes / 1024 / 1024 >= FLAGS_log_target_replay_size_mb) {
+ string note = Substitute("$0 bytes log retention", most_logs_retained_bytes);
+ return {most_logs_retained_bytes_op, std::move(note)};
+ }
+
+ // Look at ops that we can run quickly that free up data on disk.
+ if (most_data_retained_bytes_op &&
+ most_data_retained_bytes > FLAGS_data_gc_min_size_mb * 1024 * 1024) {
+ if (!best_perf_improvement_op || best_perf_improvement <= 0 ||
+ rand_.NextDoubleFraction() <= FLAGS_data_gc_prioritization_prob) {
+ string note = Substitute("$0 bytes on disk", most_data_retained_bytes);
+ return {most_data_retained_bytes_op, std::move(note)};
+ }
+ VLOG(1) << "Skipping data GC due to prioritizing perf improvement";
+ }
+
+ if (best_perf_improvement_op && best_perf_improvement > 0) {
+ string note = StringPrintf("perf score=%.6f", best_perf_improvement);
+ return {best_perf_improvement_op, std::move(note)};
+ }
+ return {nullptr, "no ops with positive improvement"};
+}
+
+void MaintenanceManager::LaunchOp(MaintenanceOp* op) {
+ int64_t thread_id = Thread::CurrentThreadId();
+ OpInstance op_instance;
+ op_instance.thread_id = thread_id;
+ op_instance.name = op->name();
+ op_instance.start_mono_time = MonoTime::Now();
+ op->RunningGauge()->Increment();
+ {
+ std::lock_guard<Mutex> lock(running_instances_lock_);
+ InsertOrDie(&running_instances_, thread_id, &op_instance);
+ }
+
+ SCOPED_CLEANUP({
+ op->RunningGauge()->Decrement();
+
+ std::lock_guard<Mutex> l(lock_);
+ {
+ std::lock_guard<Mutex> lock(running_instances_lock_);
+ running_instances_.erase(thread_id);
+ }
+ op_instance.duration = MonoTime::Now() - op_instance.start_mono_time;
+ completed_ops_[completed_ops_count_ % completed_ops_.size()] = op_instance;
+ completed_ops_count_++;
+
+ op->DurationHistogram()->Increment(op_instance.duration.ToMilliseconds());
+
+ running_ops_--;
+ op->running_--;
+ op->cond_->Signal();
+ cond_.Signal(); // wake up scheduler
+ });
+
+ scoped_refptr<Trace> trace(new Trace);
+ Stopwatch sw;
+ sw.start();
+ {
+ ADOPT_TRACE(trace.get());
+ TRACE_EVENT1("maintenance", "MaintenanceManager::LaunchOp",
+ "name", op->name());
+ op->Perform();
+ sw.stop();
+ }
+ LOG_WITH_PREFIX(INFO) << op->name() << " complete. "
+ << "Timing: " << sw.elapsed().ToString()
+ << " Metrics: " << trace->MetricsAsJSON();
+}
+
+void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatusPB* out_pb) {
+ DCHECK(out_pb != nullptr);
+ std::lock_guard<Mutex> guard(lock_);
+ pair<MaintenanceOp*, string> best_op_and_why = FindBestOp();
+ auto* best_op = best_op_and_why.first;
+
+ for (MaintenanceManager::OpMapTy::value_type& val : ops_) {
+ MaintenanceManagerStatusPB_MaintenanceOpPB* op_pb = out_pb->add_registered_operations();
+ MaintenanceOp* op(val.first);
+ MaintenanceOpStats& stat(val.second);
+ op_pb->set_name(op->name());
+ op_pb->set_running(op->running());
+ if (stat.valid()) {
+ op_pb->set_runnable(stat.runnable());
+ op_pb->set_ram_anchored_bytes(stat.ram_anchored());
+ op_pb->set_logs_retained_bytes(stat.logs_retained_bytes());
+ op_pb->set_perf_improvement(stat.perf_improvement());
+ } else {
+ op_pb->set_runnable(false);
+ op_pb->set_ram_anchored_bytes(0);
+ op_pb->set_logs_retained_bytes(0);
+ op_pb->set_perf_improvement(0.0);
+ }
+
+ if (best_op == op) {
+ out_pb->mutable_best_op()->CopyFrom(*op_pb);
+ }
+ }
+
+ {
+ std::lock_guard<Mutex> lock(running_instances_lock_);
+ for (const auto& running_instance : running_instances_) {
+ *out_pb->add_running_operations() = running_instance.second->DumpToPB();
+ }
+ }
+
+ for (int n = 1; n <= completed_ops_.size(); n++) {
+ int i = completed_ops_count_ - n;
+ if (i < 0) break;
+ const auto& completed_op = completed_ops_[i % completed_ops_.size()];
+
+ if (!completed_op.name.empty()) {
+ *out_pb->add_completed_operations() = completed_op.DumpToPB();
+ }
+ }
+}
+
+std::string MaintenanceManager::LogPrefix() const {
+ return Substitute("P $0: ", server_uuid_);
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/maintenance_manager.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/maintenance_manager.h b/be/src/kudu/util/maintenance_manager.h
new file mode 100644
index 0000000..7d20c8a
--- /dev/null
+++ b/be/src/kudu/util/maintenance_manager.h
@@ -0,0 +1,361 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <functional>
+#include <map>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest_prod.h>
+
+#include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/ref_counted.h"
+#include "kudu/util/atomic.h"
+#include "kudu/util/condition_variable.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/random.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+template<class T>
+class AtomicGauge;
+class Histogram;
+class MaintenanceManager;
+class MaintenanceManagerStatusPB;
+class MaintenanceManagerStatusPB_OpInstancePB;
+class Thread;
+class ThreadPool;
+
+class MaintenanceOpStats {
+ public:
+ MaintenanceOpStats();
+
+ // Zero all stats. They are invalid until the first setter is called.
+ void Clear();
+
+ bool runnable() const {
+ DCHECK(valid_);
+ return runnable_;
+ }
+
+ void set_runnable(bool runnable) {
+ UpdateLastModified();
+ runnable_ = runnable;
+ }
+
+ uint64_t ram_anchored() const {
+ DCHECK(valid_);
+ return ram_anchored_;
+ }
+
+ void set_ram_anchored(uint64_t ram_anchored) {
+ UpdateLastModified();
+ ram_anchored_ = ram_anchored;
+ }
+
+ int64_t logs_retained_bytes() const {
+ DCHECK(valid_);
+ return logs_retained_bytes_;
+ }
+
+ void set_logs_retained_bytes(int64_t logs_retained_bytes) {
+ UpdateLastModified();
+ logs_retained_bytes_ = logs_retained_bytes;
+ }
+
+ int64_t data_retained_bytes() const {
+ DCHECK(valid_);
+ return data_retained_bytes_;
+ }
+
+ void set_data_retained_bytes(int64_t data_retained_bytes) {
+ UpdateLastModified();
+ data_retained_bytes_ = data_retained_bytes;
+ }
+
+ double perf_improvement() const {
+ DCHECK(valid_);
+ return perf_improvement_;
+ }
+
+ void set_perf_improvement(double perf_improvement) {
+ UpdateLastModified();
+ perf_improvement_ = perf_improvement;
+ }
+
+ const MonoTime& last_modified() const {
+ DCHECK(valid_);
+ return last_modified_;
+ }
+
+ bool valid() const {
+ return valid_;
+ }
+
+ private:
+ void UpdateLastModified() {
+ valid_ = true;
+ last_modified_ = MonoTime::Now();
+ }
+
+ // Important: Update Clear() when adding fields to this class.
+
+ // True if these stats are valid.
+ bool valid_;
+
+ // True if this op can be run now.
+ bool runnable_;
+
+ // The approximate amount of memory that not doing this operation keeps
+ // around. This number is used to decide when to start freeing memory, so it
+ // should be fairly accurate. May be 0.
+ uint64_t ram_anchored_;
+
+ // Approximate amount of disk space in WAL files that would be freed if this
+ // operation ran. May be 0.
+ int64_t logs_retained_bytes_;
+
+ // Approximate amount of disk space in data blocks that would be freed if
+ // this operation ran. May be 0.
+ int64_t data_retained_bytes_;
+
+ // The estimated performance improvement-- how good it is to do this on some
+ // absolute scale (yet TBD).
+ double perf_improvement_;
+
+ // The last time that the stats were modified.
+ MonoTime last_modified_;
+};
+
+// Represents an instance of a maintenance operation.
+struct OpInstance {
+ // Id of thread the instance ran on.
+ int64_t thread_id;
+ // Name of operation.
+ std::string name;
+ // Time the operation took to run. Value is unitialized if instance is still running.
+ MonoDelta duration;
+ MonoTime start_mono_time;
+
+ MaintenanceManagerStatusPB_OpInstancePB DumpToPB() const;
+};
+
+// MaintenanceOp objects represent background operations that the
+// MaintenanceManager can schedule. Once a MaintenanceOp is registered, the
+// manager will periodically poll it for statistics. The registrant is
+// responsible for managing the memory associated with the MaintenanceOp object.
+// Op objects should be unregistered before being de-allocated.
+class MaintenanceOp {
+ public:
+ friend class MaintenanceManager;
+
+ // General indicator of how much IO the Op will use.
+ enum IOUsage {
+ LOW_IO_USAGE, // Low impact operations like removing a file, updating metadata.
+ HIGH_IO_USAGE // Everything else.
+ };
+
+ explicit MaintenanceOp(std::string name, IOUsage io_usage);
+ virtual ~MaintenanceOp();
+
+ // Unregister this op, if it is currently registered.
+ void Unregister();
+
+ // Update the op statistics. This will be called every scheduling period
+ // (about a few times a second), so it should not be too expensive. It's
+ // possible for the returned statistics to be invalid; the caller should
+ // call MaintenanceOpStats::valid() before using them. This will be run
+ // under the MaintenanceManager lock.
+ virtual void UpdateStats(MaintenanceOpStats* stats) = 0;
+
+ // Prepare to perform the operation. This will be run without holding the
+ // maintenance manager lock. It should be short, since it is run from the
+ // context of the maintenance op scheduler thread rather than a worker thread.
+ // If this returns false, we will abort the operation.
+ virtual bool Prepare() = 0;
+
+ // Perform the operation. This will be run without holding the maintenance
+ // manager lock, and may take a long time.
+ virtual void Perform() = 0;
+
+ // Returns the histogram for this op that tracks duration. Cannot be NULL.
+ virtual scoped_refptr<Histogram> DurationHistogram() const = 0;
+
+ // Returns the gauge for this op that tracks when this op is running. Cannot be NULL.
+ virtual scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const = 0;
+
+ uint32_t running() { return running_; }
+
+ std::string name() const { return name_; }
+
+ IOUsage io_usage() const { return io_usage_; }
+
+ // Return true if the operation has been cancelled due to Unregister() pending.
+ bool cancelled() const {
+ return cancel_.Load();
+ }
+
+ // Cancel this operation, which prevents new instances of it from being scheduled
+ // regardless of whether the statistics indicate it is runnable. Instances may also
+ // optionally poll 'cancelled()' on a periodic basis to know if they should abort a
+ // lengthy operation in the middle of Perform().
+ void CancelAndDisable() {
+ cancel_.Store(true);
+ }
+
+ private:
+ DISALLOW_COPY_AND_ASSIGN(MaintenanceOp);
+
+ // The name of the operation. Op names must be unique.
+ const std::string name_;
+
+ // The number of times that this op is currently running.
+ uint32_t running_;
+
+ // Set when we are trying to unregister the maintenance operation.
+ // Ongoing operations could read this boolean and cancel themselves.
+ // New operations will not be scheduled when this boolean is set.
+ AtomicBool cancel_;
+
+ // Condition variable which the UnregisterOp function can wait on.
+ //
+ // Note: 'cond_' is used with the MaintenanceManager's mutex. As such,
+ // it only exists when the op is registered.
+ gscoped_ptr<ConditionVariable> cond_;
+
+ // The MaintenanceManager with which this op is registered, or null
+ // if it is not registered.
+ std::shared_ptr<MaintenanceManager> manager_;
+
+ IOUsage io_usage_;
+};
+
+struct MaintenanceOpComparator {
+ bool operator() (const MaintenanceOp* lhs,
+ const MaintenanceOp* rhs) const {
+ return lhs->name().compare(rhs->name()) < 0;
+ }
+};
+
+// The MaintenanceManager manages the scheduling of background operations such
+// as flushes or compactions. It runs these operations in the background, in a
+// thread pool. It uses information provided in MaintenanceOpStats objects to
+// decide which operations, if any, to run.
+class MaintenanceManager : public std::enable_shared_from_this<MaintenanceManager> {
+ public:
+ struct Options {
+ int32_t num_threads;
+ int32_t polling_interval_ms;
+ uint32_t history_size;
+ };
+
+ MaintenanceManager(const Options& options, std::string server_uuid);
+ ~MaintenanceManager();
+
+ // Start running the maintenance manager.
+ // Must be called at most once.
+ Status Start();
+ void Shutdown();
+
+ // Register an op with the manager.
+ void RegisterOp(MaintenanceOp* op);
+
+ // Unregister an op with the manager.
+ // If the Op is currently running, it will not be interrupted. However, this
+ // function will block until the Op is finished.
+ void UnregisterOp(MaintenanceOp* op);
+
+ void GetMaintenanceManagerStatusDump(MaintenanceManagerStatusPB* out_pb);
+
+ void set_memory_pressure_func_for_tests(std::function<bool(double*)> f) {
+ std::lock_guard<Mutex> guard(lock_);
+ memory_pressure_func_ = std::move(f);
+ }
+
+ static const Options kDefaultOptions;
+
+ private:
+ FRIEND_TEST(MaintenanceManagerTest, TestLogRetentionPrioritization);
+ typedef std::map<MaintenanceOp*, MaintenanceOpStats,
+ MaintenanceOpComparator> OpMapTy;
+
+ // Return true if tests have currently disabled the maintenance
+ // manager by way of changing the gflags at runtime.
+ bool disabled_for_tests() const;
+
+ void RunSchedulerThread();
+
+ // Find the best op, or null if there is nothing we want to run.
+ //
+ // Returns the op, as well as a string explanation of why that op was chosen,
+ // suitable for logging.
+ std::pair<MaintenanceOp*, std::string> FindBestOp();
+
+ void LaunchOp(MaintenanceOp* op);
+
+ std::string LogPrefix() const;
+
+ const std::string server_uuid_;
+ const int32_t num_threads_;
+ OpMapTy ops_; // registered operations
+ Mutex lock_;
+ scoped_refptr<kudu::Thread> monitor_thread_;
+ gscoped_ptr<ThreadPool> thread_pool_;
+ ConditionVariable cond_;
+ bool shutdown_;
+ int32_t polling_interval_ms_;
+ uint64_t running_ops_;
+ // Vector used as a circular buffer for recently completed ops. Elements need to be added at
+ // the completed_ops_count_ % the vector's size and then the count needs to be incremented.
+ std::vector<OpInstance> completed_ops_;
+ int64_t completed_ops_count_;
+ Random rand_;
+
+ // Function which should return true if the server is under global memory pressure.
+ // This is indirected for testing purposes.
+ std::function<bool(double*)> memory_pressure_func_;
+
+ // Running instances lock.
+ //
+ // This is separate of lock_ so that worker threads don't need to take the
+ // global MM lock when beginning operations. When taking both
+ // running_instances_lock_ and lock_, lock_ must be acquired first.
+ Mutex running_instances_lock_;
+
+ // Maps thread ids to instances of an op that they're running. Instances should be added
+ // right before MaintenanceOp::Perform() is called, and should be removed right after
+ // MaintenanceOp::Perform() completes. Any thread that adds an instance to this map
+ // owns that instance, and the instance should exist until the same thread removes it.
+ //
+ // Protected by running_instances_lock_;
+ std::unordered_map<int64_t, OpInstance*> running_instances_;
+
+ DISALLOW_COPY_AND_ASSIGN(MaintenanceManager);
+};
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/maintenance_manager.proto
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/maintenance_manager.proto b/be/src/kudu/util/maintenance_manager.proto
new file mode 100644
index 0000000..b6b1203
--- /dev/null
+++ b/be/src/kudu/util/maintenance_manager.proto
@@ -0,0 +1,54 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+syntax = "proto2";
+package kudu;
+
+option java_package = "org.apache.kudu";
+
+// Used to present the maintenance manager's internal state.
+message MaintenanceManagerStatusPB {
+ message MaintenanceOpPB {
+ required string name = 1;
+ // Number of times this operation is currently running.
+ required uint32 running = 2;
+ required bool runnable = 3;
+ required uint64 ram_anchored_bytes = 4;
+ required int64 logs_retained_bytes = 5;
+ required double perf_improvement = 6;
+ }
+
+ message OpInstancePB {
+ required int64 thread_id = 1;
+ required string name = 2;
+ // How long the op took to run. Only present if the instance completed.
+ optional int32 duration_millis = 3;
+ // Number of milliseconds since this operation started.
+ required int32 millis_since_start = 4;
+ }
+
+ // The next operation that would run.
+ optional MaintenanceOpPB best_op = 1;
+
+ // List of all the operations.
+ repeated MaintenanceOpPB registered_operations = 2;
+
+ // This list isn't in order of anything. Can contain the same operation multiple times.
+ repeated OpInstancePB running_operations = 3;
+
+ // This list isn't in order of anything. Can contain the same operation multiple times.
+ repeated OpInstancePB completed_operations = 4;
+}
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/make_shared.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/make_shared.h b/be/src/kudu/util/make_shared.h
new file mode 100644
index 0000000..649cae7
--- /dev/null
+++ b/be/src/kudu/util/make_shared.h
@@ -0,0 +1,64 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <memory>
+#include <utility>
+
+// It isn't possible to use 'std::make_shared' on a class with private or protected
+// constructors. Using friends as a workaround worked in some earlier libc++/libstdcxx
+// versions, but in the latest versions there are some static_asserts that seem to defeat
+// this trickery. So, instead, we rely on the "curiously recurring template pattern" (CRTP)
+// to inject a static 'make_shared' function inside the class.
+//
+// See https://stackoverflow.com/questions/8147027/how-do-i-call-stdmake-shared-on-a-class-with-only-protected-or-private-const
+// for some details.
+//
+// Usage:
+//
+// class MyClass : public enable_make_shared<MyClass> {
+// public:
+// ...
+//
+// protected:
+// // The constructor must be protected rather than private.
+// MyClass(Foo arg1, Bar arg2) {
+// }
+//
+// }
+//
+// shared_ptr<MyClass> foo = MyClass::make_shared(arg1, arg2);
+template<class T>
+class enable_make_shared { // NOLINT
+ public:
+
+ // Define a static make_shared member which constructs the public subclass
+ // and casts it back to the desired class.
+ template<typename... Arg>
+ static std::shared_ptr<T> make_shared(Arg&&... args) {
+ // Define a struct subclass with a public constructor which will be accessible
+ // from make_shared.
+ struct make_shared_enabler : public T { // NOLINT
+ explicit make_shared_enabler(Arg&&... args) : T(std::forward<Arg>(args)...) {
+ }
+ };
+
+ return ::std::make_shared<make_shared_enabler>(
+ ::std::forward<Arg>(args)...);
+ }
+};
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/malloc.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/malloc.cc b/be/src/kudu/util/malloc.cc
new file mode 100644
index 0000000..3fec2db
--- /dev/null
+++ b/be/src/kudu/util/malloc.cc
@@ -0,0 +1,35 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#include "kudu/util/malloc.h"
+
+#if defined(__linux__)
+#include <malloc.h>
+#else
+#include <malloc/malloc.h>
+#endif // defined(__linux__)
+
+namespace kudu {
+
+int64_t kudu_malloc_usable_size(const void* obj) {
+#if defined(__linux__)
+ return malloc_usable_size(const_cast<void*>(obj));
+#else
+ return malloc_size(obj);
+#endif // defined(__linux__)
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/malloc.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/malloc.h b/be/src/kudu/util/malloc.h
new file mode 100644
index 0000000..e8a27c5
--- /dev/null
+++ b/be/src/kudu/util/malloc.h
@@ -0,0 +1,32 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_MALLOC_H
+#define KUDU_UTIL_MALLOC_H
+
+#include <stdint.h>
+
+namespace kudu {
+
+// Simple wrapper for malloc_usable_size().
+//
+// Really just centralizes the const_cast, as this function is often called
+// on const pointers (i.e. "this" in a const method).
+int64_t kudu_malloc_usable_size(const void* obj);
+
+} // namespace kudu
+
+#endif // KUDU_UTIL_MALLOC_H
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/map-util-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/map-util-test.cc b/be/src/kudu/util/map-util-test.cc
new file mode 100644
index 0000000..3aa9448
--- /dev/null
+++ b/be/src/kudu/util/map-util-test.cc
@@ -0,0 +1,116 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This unit test belongs in gutil, but it depends on test_main which is
+// part of util.
+#include "kudu/gutil/map-util.h"
+
+#include <map>
+#include <memory>
+#include <string>
+#include <utility>
+
+#include <gtest/gtest.h>
+
+using std::map;
+using std::string;
+using std::shared_ptr;
+using std::unique_ptr;
+
+namespace kudu {
+
+TEST(FloorTest, TestMapUtil) {
+ map<int, int> my_map;
+
+ ASSERT_EQ(nullptr, FindFloorOrNull(my_map, 5));
+
+ my_map[5] = 5;
+ ASSERT_EQ(5, *FindFloorOrNull(my_map, 6));
+ ASSERT_EQ(5, *FindFloorOrNull(my_map, 5));
+ ASSERT_EQ(nullptr, FindFloorOrNull(my_map, 4));
+
+ my_map[1] = 1;
+ ASSERT_EQ(5, *FindFloorOrNull(my_map, 6));
+ ASSERT_EQ(5, *FindFloorOrNull(my_map, 5));
+ ASSERT_EQ(1, *FindFloorOrNull(my_map, 4));
+ ASSERT_EQ(1, *FindFloorOrNull(my_map, 1));
+ ASSERT_EQ(nullptr, FindFloorOrNull(my_map, 0));
+}
+
+TEST(ComputeIfAbsentTest, TestComputeIfAbsent) {
+ map<string, string> my_map;
+ auto result = ComputeIfAbsent(&my_map, "key", []{ return "hello_world"; });
+ ASSERT_EQ(*result, "hello_world");
+ auto result2 = ComputeIfAbsent(&my_map, "key", [] { return "hello_world2"; });
+ ASSERT_EQ(*result2, "hello_world");
+}
+
+TEST(ComputeIfAbsentTest, TestComputeIfAbsentAndReturnAbsense) {
+ map<string, string> my_map;
+ auto result = ComputeIfAbsentReturnAbsense(&my_map, "key", []{ return "hello_world"; });
+ ASSERT_TRUE(result.second);
+ ASSERT_EQ(*result.first, "hello_world");
+ auto result2 = ComputeIfAbsentReturnAbsense(&my_map, "key", [] { return "hello_world2"; });
+ ASSERT_FALSE(result2.second);
+ ASSERT_EQ(*result2.first, "hello_world");
+}
+
+TEST(FindPointeeOrNullTest, TestFindPointeeOrNull) {
+ map<string, unique_ptr<string>> my_map;
+ auto iter = my_map.emplace("key", unique_ptr<string>(new string("hello_world")));
+ ASSERT_TRUE(iter.second);
+ string* value = FindPointeeOrNull(my_map, "key");
+ ASSERT_TRUE(value != nullptr);
+ ASSERT_EQ(*value, "hello_world");
+ my_map.erase(iter.first);
+ value = FindPointeeOrNull(my_map, "key");
+ ASSERT_TRUE(value == nullptr);
+}
+
+TEST(EraseKeyReturnValuePtrTest, TestRawAndSmartSmartPointers) {
+ map<string, unique_ptr<string>> my_map;
+ unique_ptr<string> value = EraseKeyReturnValuePtr(&my_map, "key");
+ ASSERT_TRUE(value.get() == nullptr);
+ my_map.emplace("key", unique_ptr<string>(new string("hello_world")));
+ value = EraseKeyReturnValuePtr(&my_map, "key");
+ ASSERT_EQ(*value, "hello_world");
+ value.reset();
+ value = EraseKeyReturnValuePtr(&my_map, "key");
+ ASSERT_TRUE(value.get() == nullptr);
+ map<string, shared_ptr<string>> my_map2;
+ shared_ptr<string> value2 = EraseKeyReturnValuePtr(&my_map2, "key");
+ ASSERT_TRUE(value2.get() == nullptr);
+ my_map2.emplace("key", std::make_shared<string>("hello_world"));
+ value2 = EraseKeyReturnValuePtr(&my_map2, "key");
+ ASSERT_EQ(*value2, "hello_world");
+ map<string, string*> my_map_raw;
+ my_map_raw.emplace("key", new string("hello_world"));
+ value.reset(EraseKeyReturnValuePtr(&my_map_raw, "key"));
+ ASSERT_EQ(*value, "hello_world");
+}
+
+TEST(EmplaceTest, TestEmplace) {
+ // Map with move-only value type.
+ map<string, unique_ptr<string>> my_map;
+ unique_ptr<string> val(new string("foo"));
+ ASSERT_TRUE(EmplaceIfNotPresent(&my_map, "k", std::move(val)));
+ ASSERT_TRUE(ContainsKey(my_map, "k"));
+ ASSERT_FALSE(EmplaceIfNotPresent(&my_map, "k", nullptr))
+ << "Should return false for already-present";
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/mem_tracker-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/mem_tracker-test.cc b/be/src/kudu/util/mem_tracker-test.cc
new file mode 100644
index 0000000..dbadd09
--- /dev/null
+++ b/be/src/kudu/util/mem_tracker-test.cc
@@ -0,0 +1,285 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/mem_tracker.h"
+
+#include <atomic>
+#include <functional>
+#include <memory>
+#include <string>
+#include <system_error>
+#include <thread>
+#include <unordered_map>
+#include <utility>
+#include <vector>
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/monotime.h"
+#include "kudu/util/test_util.h"
+
+namespace kudu {
+
+using std::equal_to;
+using std::hash;
+using std::pair;
+using std::shared_ptr;
+using std::string;
+using std::unordered_map;
+using std::vector;
+using strings::Substitute;
+
+TEST(MemTrackerTest, SingleTrackerNoLimit) {
+ shared_ptr<MemTracker> t = MemTracker::CreateTracker(-1, "t");
+ EXPECT_FALSE(t->has_limit());
+ t->Consume(10);
+ EXPECT_EQ(t->consumption(), 10);
+ t->Consume(10);
+ EXPECT_EQ(t->consumption(), 20);
+ t->Release(15);
+ EXPECT_EQ(t->consumption(), 5);
+ EXPECT_FALSE(t->LimitExceeded());
+ t->Release(5);
+ EXPECT_EQ(t->consumption(), 0);
+}
+
+TEST(MemTrackerTest, SingleTrackerWithLimit) {
+ shared_ptr<MemTracker> t = MemTracker::CreateTracker(11, "t");
+ EXPECT_TRUE(t->has_limit());
+ t->Consume(10);
+ EXPECT_EQ(t->consumption(), 10);
+ EXPECT_FALSE(t->LimitExceeded());
+ t->Consume(10);
+ EXPECT_EQ(t->consumption(), 20);
+ EXPECT_TRUE(t->LimitExceeded());
+ t->Release(15);
+ EXPECT_EQ(t->consumption(), 5);
+ EXPECT_FALSE(t->LimitExceeded());
+ t->Release(5);
+}
+
+TEST(MemTrackerTest, TrackerHierarchy) {
+ shared_ptr<MemTracker> p = MemTracker::CreateTracker(100, "p");
+ shared_ptr<MemTracker> c1 = MemTracker::CreateTracker(80, "c1", p);
+ shared_ptr<MemTracker> c2 = MemTracker::CreateTracker(50, "c2", p);
+
+ // everything below limits
+ c1->Consume(60);
+ EXPECT_EQ(c1->consumption(), 60);
+ EXPECT_FALSE(c1->LimitExceeded());
+ EXPECT_FALSE(c1->AnyLimitExceeded());
+ EXPECT_EQ(c2->consumption(), 0);
+ EXPECT_FALSE(c2->LimitExceeded());
+ EXPECT_FALSE(c2->AnyLimitExceeded());
+ EXPECT_EQ(p->consumption(), 60);
+ EXPECT_FALSE(p->LimitExceeded());
+ EXPECT_FALSE(p->AnyLimitExceeded());
+
+ // p goes over limit
+ c2->Consume(50);
+ EXPECT_EQ(c1->consumption(), 60);
+ EXPECT_FALSE(c1->LimitExceeded());
+ EXPECT_TRUE(c1->AnyLimitExceeded());
+ EXPECT_EQ(c2->consumption(), 50);
+ EXPECT_FALSE(c2->LimitExceeded());
+ EXPECT_TRUE(c2->AnyLimitExceeded());
+ EXPECT_EQ(p->consumption(), 110);
+ EXPECT_TRUE(p->LimitExceeded());
+
+ // c2 goes over limit, p drops below limit
+ c1->Release(20);
+ c2->Consume(10);
+ EXPECT_EQ(c1->consumption(), 40);
+ EXPECT_FALSE(c1->LimitExceeded());
+ EXPECT_FALSE(c1->AnyLimitExceeded());
+ EXPECT_EQ(c2->consumption(), 60);
+ EXPECT_TRUE(c2->LimitExceeded());
+ EXPECT_TRUE(c2->AnyLimitExceeded());
+ EXPECT_EQ(p->consumption(), 100);
+ EXPECT_FALSE(p->LimitExceeded());
+ c1->Release(40);
+ c2->Release(60);
+}
+
+class GcFunctionHelper {
+ public:
+ static const int kNumReleaseBytes = 1;
+
+ explicit GcFunctionHelper(MemTracker* tracker) : tracker_(tracker) { }
+
+ void GcFunc() { tracker_->Release(kNumReleaseBytes); }
+
+ private:
+ MemTracker* tracker_;
+};
+
+TEST(MemTrackerTest, STLContainerAllocator) {
+ shared_ptr<MemTracker> t = MemTracker::CreateTracker(-1, "t");
+ MemTrackerAllocator<int> vec_alloc(t);
+ MemTrackerAllocator<pair<const int, int>> map_alloc(t);
+
+ // Simple test: use the allocator in a vector.
+ {
+ vector<int, MemTrackerAllocator<int> > v(vec_alloc);
+ ASSERT_EQ(0, t->consumption());
+ v.reserve(5);
+ ASSERT_EQ(5 * sizeof(int), t->consumption());
+ v.reserve(10);
+ ASSERT_EQ(10 * sizeof(int), t->consumption());
+ }
+ ASSERT_EQ(0, t->consumption());
+
+ // Complex test: use it in an unordered_map, where it must be rebound in
+ // order to allocate the map's buckets.
+ {
+ unordered_map<int, int, hash<int>, equal_to<int>, MemTrackerAllocator<pair<const int, int>>> um(
+ 10,
+ hash<int>(),
+ equal_to<int>(),
+ map_alloc);
+
+ // Don't care about the value (it depends on map internals).
+ ASSERT_GT(t->consumption(), 0);
+ }
+ ASSERT_EQ(0, t->consumption());
+}
+
+TEST(MemTrackerTest, FindFunctionsTakeOwnership) {
+ // In each test, ToString() would crash if the MemTracker is destroyed when
+ // 'm' goes out of scope.
+
+ shared_ptr<MemTracker> ref;
+ {
+ shared_ptr<MemTracker> m = MemTracker::CreateTracker(-1, "test");
+ ASSERT_TRUE(MemTracker::FindTracker(m->id(), &ref));
+ }
+ LOG(INFO) << ref->ToString();
+ ref.reset();
+
+ {
+ shared_ptr<MemTracker> m = MemTracker::CreateTracker(-1, "test");
+ ref = MemTracker::FindOrCreateGlobalTracker(-1, m->id());
+ }
+ LOG(INFO) << ref->ToString();
+ ref.reset();
+
+ vector<shared_ptr<MemTracker> > refs;
+ {
+ shared_ptr<MemTracker> m = MemTracker::CreateTracker(-1, "test");
+ MemTracker::ListTrackers(&refs);
+ }
+ for (const shared_ptr<MemTracker>& r : refs) {
+ LOG(INFO) << r->ToString();
+ }
+ refs.clear();
+}
+
+TEST(MemTrackerTest, ScopedTrackedConsumption) {
+ shared_ptr<MemTracker> m = MemTracker::CreateTracker(-1, "test");
+ ASSERT_EQ(0, m->consumption());
+ {
+ ScopedTrackedConsumption consumption(m, 1);
+ ASSERT_EQ(1, m->consumption());
+
+ consumption.Reset(3);
+ ASSERT_EQ(3, m->consumption());
+ }
+ ASSERT_EQ(0, m->consumption());
+}
+
+TEST(MemTrackerTest, CollisionDetection) {
+ shared_ptr<MemTracker> p = MemTracker::CreateTracker(-1, "parent");
+ shared_ptr<MemTracker> c = MemTracker::CreateTracker(-1, "child", p);
+ vector<shared_ptr<MemTracker>> all;
+
+ // Three trackers: root, parent, and child.
+ MemTracker::ListTrackers(&all);
+ ASSERT_EQ(3, all.size());
+
+ // Now only two because the child has been destroyed.
+ c.reset();
+ MemTracker::ListTrackers(&all);
+ ASSERT_EQ(2, all.size());
+ shared_ptr<MemTracker> not_found;
+ ASSERT_FALSE(MemTracker::FindTracker("child", ¬_found, p));
+
+ // Let's duplicate the parent. It's not recommended, but it's allowed.
+ shared_ptr<MemTracker> p2 = MemTracker::CreateTracker(-1, "parent");
+ ASSERT_EQ(p->ToString(), p2->ToString());
+
+ // Only when we do a Find() operation do we crash.
+#ifndef NDEBUG
+ const string kDeathMsg = "Multiple memtrackers with same id";
+ EXPECT_DEATH({
+ shared_ptr<MemTracker> found;
+ MemTracker::FindTracker("parent", &found);
+ }, kDeathMsg);
+ EXPECT_DEATH({
+ MemTracker::FindOrCreateGlobalTracker(-1, "parent");
+ }, kDeathMsg);
+#endif
+}
+
+TEST(MemTrackerTest, TestMultiThreadedRegisterAndDestroy) {
+ std::atomic<bool> done(false);
+ vector<std::thread> threads;
+ for (int i = 0; i < 10; i++) {
+ threads.emplace_back([&done]{
+ while (!done.load()) {
+ shared_ptr<MemTracker> t = MemTracker::FindOrCreateGlobalTracker(
+ 1000, "foo");
+ }
+ });
+ }
+
+ SleepFor(MonoDelta::FromSeconds(AllowSlowTests() ? 5 : 1));
+ done.store(true);
+ for (auto& t : threads) {
+ t.join();
+ }
+}
+
+TEST(MemTrackerTest, TestMultiThreadedCreateFind) {
+ shared_ptr<MemTracker> p = MemTracker::CreateTracker(-1, "p");
+ shared_ptr<MemTracker> c1 = MemTracker::CreateTracker(-1, "c1", p);
+ std::atomic<bool> done(false);
+ vector<std::thread> threads;
+ threads.emplace_back([&]{
+ while (!done.load()) {
+ shared_ptr<MemTracker> c1_copy;
+ CHECK(MemTracker::FindTracker(c1->id(), &c1_copy, p));
+ }
+ });
+ for (int i = 0; i < 5; i++) {
+ threads.emplace_back([&, i]{
+ while (!done.load()) {
+ shared_ptr<MemTracker> c2 =
+ MemTracker::CreateTracker(-1, Substitute("ci-$0", i), p);
+ }
+ });
+ }
+
+ SleepFor(MonoDelta::FromMilliseconds(500));
+ done.store(true);
+ for (auto& t : threads) {
+ t.join();
+ }
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/mem_tracker.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/mem_tracker.cc b/be/src/kudu/util/mem_tracker.cc
new file mode 100644
index 0000000..f7294d1
--- /dev/null
+++ b/be/src/kudu/util/mem_tracker.cc
@@ -0,0 +1,296 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/mem_tracker.h"
+
+#include <algorithm>
+#include <cstddef>
+#include <deque>
+#include <limits>
+#include <list>
+#include <memory>
+#include <ostream>
+
+#include "kudu/gutil/once.h"
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/mutex.h"
+#include "kudu/util/process_memory.h"
+
+namespace kudu {
+
+// NOTE: this class has been adapted from Impala, so the code style varies
+// somewhat from kudu.
+
+using std::deque;
+using std::list;
+using std::shared_ptr;
+using std::string;
+using std::vector;
+using std::weak_ptr;
+
+using strings::Substitute;
+
+// The ancestor for all trackers. Every tracker is visible from the root down.
+static shared_ptr<MemTracker> root_tracker;
+static GoogleOnceType root_tracker_once = GOOGLE_ONCE_INIT;
+
+void MemTracker::CreateRootTracker() {
+ root_tracker.reset(new MemTracker(-1, "root", shared_ptr<MemTracker>()));
+ root_tracker->Init();
+}
+
+shared_ptr<MemTracker> MemTracker::CreateTracker(int64_t byte_limit,
+ const string& id,
+ shared_ptr<MemTracker> parent) {
+ shared_ptr<MemTracker> real_parent;
+ if (parent) {
+ real_parent = std::move(parent);
+ } else {
+ real_parent = GetRootTracker();
+ }
+ shared_ptr<MemTracker> tracker(new MemTracker(byte_limit, id, real_parent));
+ real_parent->AddChildTracker(tracker);
+ tracker->Init();
+
+ return tracker;
+}
+
+MemTracker::MemTracker(int64_t byte_limit, const string& id, shared_ptr<MemTracker> parent)
+ : limit_(byte_limit),
+ id_(id),
+ descr_(Substitute("memory consumption for $0", id)),
+ parent_(std::move(parent)),
+ consumption_(0) {
+ VLOG(1) << "Creating tracker " << ToString();
+}
+
+MemTracker::~MemTracker() {
+ VLOG(1) << "Destroying tracker " << ToString();
+ if (parent_) {
+ DCHECK(consumption() == 0) << "Memory tracker " << ToString()
+ << " has unreleased consumption " << consumption();
+ parent_->Release(consumption());
+
+ MutexLock l(parent_->child_trackers_lock_);
+ if (child_tracker_it_ != parent_->child_trackers_.end()) {
+ parent_->child_trackers_.erase(child_tracker_it_);
+ child_tracker_it_ = parent_->child_trackers_.end();
+ }
+ }
+}
+
+string MemTracker::ToString() const {
+ string s;
+ const MemTracker* tracker = this;
+ while (tracker) {
+ if (s != "") {
+ s += "->";
+ }
+ s += tracker->id();
+ tracker = tracker->parent_.get();
+ }
+ return s;
+}
+
+bool MemTracker::FindTracker(const string& id,
+ shared_ptr<MemTracker>* tracker,
+ const shared_ptr<MemTracker>& parent) {
+ return FindTrackerInternal(id, tracker, parent ? parent : GetRootTracker());
+}
+
+bool MemTracker::FindTrackerInternal(const string& id,
+ shared_ptr<MemTracker>* tracker,
+ const shared_ptr<MemTracker>& parent) {
+ DCHECK(parent != NULL);
+
+ list<weak_ptr<MemTracker>> children;
+ {
+ MutexLock l(parent->child_trackers_lock_);
+ children = parent->child_trackers_;
+ }
+
+ // Search for the matching child without holding the parent's lock.
+ //
+ // If the lock were held while searching, it'd be possible for 'child' to be
+ // the last live ref to a tracker, which would lead to a recursive
+ // acquisition of the parent lock during the 'child' destructor call.
+ vector<shared_ptr<MemTracker>> found;
+ for (const auto& child_weak : children) {
+ shared_ptr<MemTracker> child = child_weak.lock();
+ if (child && child->id() == id) {
+ found.emplace_back(std::move(child));
+ }
+ }
+ if (PREDICT_TRUE(found.size() == 1)) {
+ *tracker = found[0];
+ return true;
+ } else if (found.size() > 1) {
+ LOG(DFATAL) <<
+ Substitute("Multiple memtrackers with same id ($0) found on parent $1",
+ id, parent->ToString());
+ *tracker = found[0];
+ return true;
+ }
+ return false;
+}
+
+shared_ptr<MemTracker> MemTracker::FindOrCreateGlobalTracker(
+ int64_t byte_limit,
+ const string& id) {
+ // The calls below comprise a critical section, but we can't use the root
+ // tracker's child_trackers_lock_ to synchronize it as the lock must be
+ // released during FindTrackerInternal(). Since this function creates
+ // globally-visible MemTrackers which are the exception rather than the rule,
+ // it's reasonable to synchronize their creation on a singleton lock.
+ static Mutex find_or_create_lock;
+ MutexLock l(find_or_create_lock);
+
+ shared_ptr<MemTracker> found;
+ if (FindTrackerInternal(id, &found, GetRootTracker())) {
+ return found;
+ }
+ return CreateTracker(byte_limit, id, GetRootTracker());
+}
+
+void MemTracker::ListTrackers(vector<shared_ptr<MemTracker>>* trackers) {
+ trackers->clear();
+ deque<shared_ptr<MemTracker> > to_process;
+ to_process.push_front(GetRootTracker());
+ while (!to_process.empty()) {
+ shared_ptr<MemTracker> t = to_process.back();
+ to_process.pop_back();
+
+ trackers->push_back(t);
+ {
+ MutexLock l(t->child_trackers_lock_);
+ for (const auto& child_weak : t->child_trackers_) {
+ shared_ptr<MemTracker> child = child_weak.lock();
+ if (child) {
+ to_process.emplace_back(std::move(child));
+ }
+ }
+ }
+ }
+}
+
+void MemTracker::Consume(int64_t bytes) {
+ if (bytes < 0) {
+ Release(-bytes);
+ return;
+ }
+
+ if (bytes == 0) {
+ return;
+ }
+ for (auto& tracker : all_trackers_) {
+ tracker->consumption_.IncrementBy(bytes);
+ }
+}
+
+bool MemTracker::TryConsume(int64_t bytes) {
+ if (bytes <= 0) {
+ Release(-bytes);
+ return true;
+ }
+
+ int i = 0;
+ // Walk the tracker tree top-down, consuming memory from each in turn.
+ for (i = all_trackers_.size() - 1; i >= 0; --i) {
+ MemTracker *tracker = all_trackers_[i];
+ if (tracker->limit_ < 0) {
+ tracker->consumption_.IncrementBy(bytes);
+ } else {
+ if (!tracker->consumption_.TryIncrementBy(bytes, tracker->limit_)) {
+ break;
+ }
+ }
+ }
+ // Everyone succeeded, return.
+ if (i == -1) {
+ return true;
+ }
+
+ // Someone failed, roll back the ones that succeeded.
+ // TODO(todd): this doesn't roll it back completely since the max values for
+ // the updated trackers aren't decremented. The max values are only used
+ // for error reporting so this is probably okay. Rolling those back is
+ // pretty hard; we'd need something like 2PC.
+ for (int j = all_trackers_.size() - 1; j > i; --j) {
+ all_trackers_[j]->consumption_.IncrementBy(-bytes);
+ }
+ return false;
+}
+
+void MemTracker::Release(int64_t bytes) {
+ if (bytes < 0) {
+ Consume(-bytes);
+ return;
+ }
+
+ if (bytes == 0) {
+ return;
+ }
+
+ for (auto& tracker : all_trackers_) {
+ tracker->consumption_.IncrementBy(-bytes);
+ }
+ process_memory::MaybeGCAfterRelease(bytes);
+}
+
+bool MemTracker::AnyLimitExceeded() {
+ for (const auto& tracker : limit_trackers_) {
+ if (tracker->LimitExceeded()) {
+ return true;
+ }
+ }
+ return false;
+}
+
+int64_t MemTracker::SpareCapacity() const {
+ int64_t result = std::numeric_limits<int64_t>::max();
+ for (const auto& tracker : limit_trackers_) {
+ int64_t mem_left = tracker->limit() - tracker->consumption();
+ result = std::min(result, mem_left);
+ }
+ return result;
+}
+
+
+void MemTracker::Init() {
+ // populate all_trackers_ and limit_trackers_
+ MemTracker* tracker = this;
+ while (tracker) {
+ all_trackers_.push_back(tracker);
+ if (tracker->has_limit()) limit_trackers_.push_back(tracker);
+ tracker = tracker->parent_.get();
+ }
+ DCHECK_GT(all_trackers_.size(), 0);
+ DCHECK_EQ(all_trackers_[0], this);
+}
+
+void MemTracker::AddChildTracker(const shared_ptr<MemTracker>& tracker) {
+ MutexLock l(child_trackers_lock_);
+ tracker->child_tracker_it_ = child_trackers_.insert(child_trackers_.end(), tracker);
+}
+
+shared_ptr<MemTracker> MemTracker::GetRootTracker() {
+ GoogleOnceInit(&root_tracker_once, &MemTracker::CreateRootTracker);
+ return root_tracker;
+}
+
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/mem_tracker.h
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/mem_tracker.h b/be/src/kudu/util/mem_tracker.h
new file mode 100644
index 0000000..14db374
--- /dev/null
+++ b/be/src/kudu/util/mem_tracker.h
@@ -0,0 +1,272 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+#ifndef KUDU_UTIL_MEM_TRACKER_H
+#define KUDU_UTIL_MEM_TRACKER_H
+
+#include <cstdint>
+#include <list>
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/util/high_water_mark.h"
+#include "kudu/util/mutex.h"
+
+namespace kudu {
+
+// A MemTracker tracks memory consumption; it contains an optional limit and is
+// arranged into a tree structure such that the consumption tracked by a
+// MemTracker is also tracked by its ancestors.
+//
+// The MemTracker hierarchy is rooted in a single static MemTracker.
+// The root MemTracker always exists, and it is the common
+// ancestor to all MemTrackers. All operations that discover MemTrackers begin
+// at the root and work their way down the tree, while operations that deal
+// with adjusting memory consumption begin at a particular MemTracker and work
+// their way up the tree to the root. All MemTrackers (except the root) must
+// have a parent. As a rule, all children belonging to a parent should have
+// unique ids, but this is only enforced during a Find() operation to allow for
+// transient duplicates (e.g. the web UI grabbing very short-lived references
+// to all MemTrackers while rendering a web page). This also means id
+// uniqueness only exists where it's actually needed.
+//
+// When a MemTracker begins its life, it has a strong reference to its parent
+// and the parent has a weak reference to it. Both remain for the lifetime of
+// the MemTracker.
+//
+// Memory consumption is tracked via calls to Consume()/Release(), either to
+// the tracker itself or to one of its descendants.
+//
+// This class is thread-safe.
+class MemTracker : public std::enable_shared_from_this<MemTracker> {
+ public:
+ ~MemTracker();
+
+ // Creates and adds the tracker to the tree so that it can be retrieved with
+ // FindTracker/FindOrCreateTracker.
+ //
+ // byte_limit < 0 means no limit; 'id' is a used as a label to uniquely identify
+ // the MemTracker for the below Find...() calls as well as the web UI.
+ //
+ // Use the two-argument form if there is no parent.
+ static std::shared_ptr<MemTracker> CreateTracker(
+ int64_t byte_limit,
+ const std::string& id,
+ std::shared_ptr<MemTracker> parent = std::shared_ptr<MemTracker>());
+
+ // If a tracker with the specified 'id' and 'parent' exists in the tree, sets
+ // 'tracker' to reference that instance. Returns false if no such tracker
+ // exists.
+ //
+ // Use the two-argument form if there is no parent.
+ //
+ // Note: this function will enforce that 'id' is unique amongst the children
+ // of 'parent'.
+ static bool FindTracker(
+ const std::string& id,
+ std::shared_ptr<MemTracker>* tracker,
+ const std::shared_ptr<MemTracker>& parent = std::shared_ptr<MemTracker>());
+
+ // If a global tracker with the specified 'id' exists in the tree, returns a
+ // shared_ptr to that instance. Otherwise, creates a new MemTracker with the
+ // specified byte_limit and id, parented to the root MemTracker.
+ //
+ // Note: this function will enforce that 'id' is unique amongst the children
+ // of the root MemTracker.
+ static std::shared_ptr<MemTracker> FindOrCreateGlobalTracker(
+ int64_t byte_limit, const std::string& id);
+
+ // Returns a list of all the valid trackers.
+ static void ListTrackers(std::vector<std::shared_ptr<MemTracker> >* trackers);
+
+ // Gets a shared_ptr to the "root" tracker, creating it if necessary.
+ static std::shared_ptr<MemTracker> GetRootTracker();
+
+ // Increases consumption of this tracker and its ancestors by 'bytes'.
+ void Consume(int64_t bytes);
+
+ // Increases consumption of this tracker and its ancestors by 'bytes' only if
+ // they can all consume 'bytes'. If this brings any of them over, none of them
+ // are updated.
+ // Returns true if the try succeeded.
+ bool TryConsume(int64_t bytes);
+
+ // Decreases consumption of this tracker and its ancestors by 'bytes'.
+ //
+ // This will also cause the process to periodically trigger tcmalloc "ReleaseMemory"
+ // to ensure that memory is released to the OS.
+ void Release(int64_t bytes);
+
+ // Returns true if a valid limit of this tracker or one of its ancestors is
+ // exceeded.
+ bool AnyLimitExceeded();
+
+ // If this tracker has a limit, checks the limit and attempts to free up some memory if
+ // the limit is exceeded by calling any added GC functions. Returns true if the limit is
+ // exceeded after calling the GC functions. Returns false if there is no limit.
+ bool LimitExceeded() {
+ return limit_ >= 0 && limit_ < consumption();
+ }
+
+ // Returns the maximum consumption that can be made without exceeding the limit on
+ // this tracker or any of its parents. Returns int64_t::max() if there are no
+ // limits and a negative value if any limit is already exceeded.
+ int64_t SpareCapacity() const;
+
+
+ int64_t limit() const { return limit_; }
+ bool has_limit() const { return limit_ >= 0; }
+ const std::string& id() const { return id_; }
+
+ // Returns the memory consumed in bytes.
+ int64_t consumption() const {
+ return consumption_.current_value();
+ }
+
+ int64_t peak_consumption() const { return consumption_.max_value(); }
+
+ // Retrieve the parent tracker, or NULL If one is not set.
+ std::shared_ptr<MemTracker> parent() const { return parent_; }
+
+ // Returns a textual representation of the tracker that is likely (but not
+ // guaranteed) to be globally unique.
+ std::string ToString() const;
+
+ private:
+ // byte_limit < 0 means no limit
+ // 'id' is the label for LogUsage() and web UI.
+ MemTracker(int64_t byte_limit, const std::string& id, std::shared_ptr<MemTracker> parent);
+
+ // Further initializes the tracker.
+ void Init();
+
+ // Adds tracker to child_trackers_.
+ void AddChildTracker(const std::shared_ptr<MemTracker>& tracker);
+
+ // Variant of FindTracker() that must be called with a non-NULL parent.
+ static bool FindTrackerInternal(
+ const std::string& id,
+ std::shared_ptr<MemTracker>* tracker,
+ const std::shared_ptr<MemTracker>& parent);
+
+ // Creates the root tracker.
+ static void CreateRootTracker();
+
+ int64_t limit_;
+ const std::string id_;
+ const std::string descr_;
+ std::shared_ptr<MemTracker> parent_;
+
+ HighWaterMark consumption_;
+
+ // this tracker plus all of its ancestors
+ std::vector<MemTracker*> all_trackers_;
+ // all_trackers_ with valid limits
+ std::vector<MemTracker*> limit_trackers_;
+
+ // All the child trackers of this tracker. Used for error reporting and
+ // listing only (i.e. updating the consumption of a parent tracker does not
+ // update that of its children).
+ mutable Mutex child_trackers_lock_;
+ std::list<std::weak_ptr<MemTracker>> child_trackers_;
+
+ // Iterator into parent_->child_trackers_ for this object. Stored to have O(1)
+ // remove.
+ std::list<std::weak_ptr<MemTracker>>::iterator child_tracker_it_;
+};
+
+// An std::allocator that manipulates a MemTracker during allocation
+// and deallocation.
+template<typename T, typename Alloc = std::allocator<T> >
+class MemTrackerAllocator : public Alloc {
+ public:
+ typedef typename Alloc::pointer pointer;
+ typedef typename Alloc::const_pointer const_pointer;
+ typedef typename Alloc::size_type size_type;
+
+ explicit MemTrackerAllocator(std::shared_ptr<MemTracker> mem_tracker)
+ : mem_tracker_(std::move(mem_tracker)) {}
+
+ // This constructor is used for rebinding.
+ template <typename U>
+ MemTrackerAllocator(const MemTrackerAllocator<U>& allocator)
+ : Alloc(allocator),
+ mem_tracker_(allocator.mem_tracker()) {
+ }
+
+ ~MemTrackerAllocator() {
+ }
+
+ pointer allocate(size_type n, const_pointer hint = 0) {
+ // Ideally we'd use TryConsume() here to enforce the tracker's limit.
+ // However, that means throwing bad_alloc if the limit is exceeded, and
+ // it's not clear that the rest of Kudu can handle that.
+ mem_tracker_->Consume(n * sizeof(T));
+ return Alloc::allocate(n, hint);
+ }
+
+ void deallocate(pointer p, size_type n) {
+ Alloc::deallocate(p, n);
+ mem_tracker_->Release(n * sizeof(T));
+ }
+
+ // This allows an allocator<T> to be used for a different type.
+ template <class U>
+ struct rebind {
+ typedef MemTrackerAllocator<U, typename Alloc::template rebind<U>::other> other;
+ };
+
+ const std::shared_ptr<MemTracker>& mem_tracker() const { return mem_tracker_; }
+
+ private:
+ std::shared_ptr<MemTracker> mem_tracker_;
+};
+
+// Convenience class that adds memory consumption to a tracker when declared,
+// releasing it when the end of scope is reached.
+class ScopedTrackedConsumption {
+ public:
+ ScopedTrackedConsumption(std::shared_ptr<MemTracker> tracker,
+ int64_t to_consume)
+ : tracker_(std::move(tracker)), consumption_(to_consume) {
+ DCHECK(tracker_);
+ tracker_->Consume(consumption_);
+ }
+
+ void Reset(int64_t new_consumption) {
+ // Consume(-x) is the same as Release(x).
+ tracker_->Consume(new_consumption - consumption_);
+ consumption_ = new_consumption;
+ }
+
+ ~ScopedTrackedConsumption() {
+ tracker_->Release(consumption_);
+ }
+
+ int64_t consumption() const { return consumption_; }
+
+ private:
+ std::shared_ptr<MemTracker> tracker_;
+ int64_t consumption_;
+};
+
+} // namespace kudu
+
+#endif // KUDU_UTIL_MEM_TRACKER_H
http://git-wip-us.apache.org/repos/asf/impala/blob/fcf190c4/be/src/kudu/util/memcmpable_varint-test.cc
----------------------------------------------------------------------
diff --git a/be/src/kudu/util/memcmpable_varint-test.cc b/be/src/kudu/util/memcmpable_varint-test.cc
new file mode 100644
index 0000000..fcbe25d
--- /dev/null
+++ b/be/src/kudu/util/memcmpable_varint-test.cc
@@ -0,0 +1,220 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <ostream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/faststring.h"
+#include "kudu/util/hexdump.h"
+#include "kudu/util/memcmpable_varint.h"
+#include "kudu/util/random.h"
+#include "kudu/util/slice.h"
+#include "kudu/util/stopwatch.h" // IWYU pragma: keep
+#include "kudu/util/test_util.h"
+
+// Add operator<< to print pairs, used in a test below.
+// This has to be done in the 'std' namespace due to the way that
+// template resolution works.
+namespace std {
+template<typename T1, typename T2>
+ostream &operator <<(ostream &os, const pair<T1, T2> &pair) {
+ return os << "(" << pair.first << ", " << pair.second << ")";
+}
+}
+
+using std::make_pair;
+using std::pair;
+using std::vector;
+
+namespace kudu {
+
+class TestMemcmpableVarint : public KuduTest {
+ protected:
+ TestMemcmpableVarint() : random_(SeedRandom()) {}
+
+ // Random number generator that generates different length integers
+ // with equal probability -- i.e it is equally as likely to generate
+ // a number with 8 bits as it is to generate one with 64 bits.
+ // This is useful for testing varint implementations, where a uniform
+ // random is skewed towards generating longer integers.
+ uint64_t Rand64WithRandomBitLength() {
+ return random_.Next64() >> random_.Uniform(64);
+ }
+
+ Random random_;
+};
+
+static void DoRoundTripTest(uint64_t to_encode) {
+ static faststring buf;
+ buf.clear();
+ PutMemcmpableVarint64(&buf, to_encode);
+
+ uint64_t decoded;
+ Slice slice(buf);
+ bool success = GetMemcmpableVarint64(&slice, &decoded);
+ ASSERT_TRUE(success);
+ ASSERT_EQ(to_encode, decoded);
+ ASSERT_TRUE(slice.empty());
+}
+
+
+TEST_F(TestMemcmpableVarint, TestRoundTrip) {
+ // Test the first 100K integers
+ // (exercises the special cases for <= 67823 in the code)
+ for (int i = 0; i < 100000; i++) {
+ DoRoundTripTest(i);
+ }
+
+ // Test a bunch of random integers (which are likely to be many bytes)
+ for (int i = 0; i < 100000; i++) {
+ DoRoundTripTest(random_.Next64());
+ }
+}
+
+
+// Test that a composite key can be made up of multiple memcmpable
+// varints strung together, and that the resulting key compares the
+// same as the original pair of integers (i.e left-to-right).
+TEST_F(TestMemcmpableVarint, TestCompositeKeys) {
+ faststring buf1;
+ faststring buf2;
+
+ const int n_trials = 1000;
+
+ for (int i = 0; i < n_trials; i++) {
+ buf1.clear();
+ buf2.clear();
+
+ pair<uint64_t, uint64_t> p1 =
+ make_pair(Rand64WithRandomBitLength(), Rand64WithRandomBitLength());
+ PutMemcmpableVarint64(&buf1, p1.first);
+ PutMemcmpableVarint64(&buf1, p1.second);
+
+ pair<uint64_t, uint64_t> p2 =
+ make_pair(Rand64WithRandomBitLength(), Rand64WithRandomBitLength());
+ PutMemcmpableVarint64(&buf2, p2.first);
+ PutMemcmpableVarint64(&buf2, p2.second);
+
+ SCOPED_TRACE(testing::Message() << p1 << "\n" << HexDump(Slice(buf1))
+ << " vs\n" << p2 << "\n" << HexDump(Slice(buf2)));
+ if (p1 < p2) {
+ ASSERT_LT(Slice(buf1).compare(Slice(buf2)), 0);
+ } else if (p1 > p2) {
+ ASSERT_GT(Slice(buf1).compare(Slice(buf2)), 0);
+ } else {
+ ASSERT_EQ(Slice(buf1).compare(Slice(buf2)), 0);
+ }
+ }
+}
+
+// Similar to the above test, but instead of being randomized, specifically
+// tests "interesting" values -- i.e values around the boundaries of where
+// the encoding changes its number of bytes.
+TEST_F(TestMemcmpableVarint, TestInterestingCompositeKeys) {
+ const vector<uint64_t> interesting_values = {
+ 0, 1, 240, // 1 byte
+ 241, 2000, 2287, // 2 bytes
+ 2288, 40000, 67823, // 3 bytes
+ 67824, 1ULL << 23, (1ULL << 24) - 1, // 4 bytes
+ 1ULL << 24, 1ULL << 30, (1ULL << 32) - 1, // 5 bytes
+ };
+
+ faststring buf1;
+ faststring buf2;
+
+ for (uint64_t v1 : interesting_values) {
+ for (uint64_t v2 : interesting_values) {
+ buf1.clear();
+ pair<uint64_t, uint64_t> p1 = make_pair(v1, v2);
+ PutMemcmpableVarint64(&buf1, p1.first);
+ PutMemcmpableVarint64(&buf1, p1.second);
+
+ for (uint64_t v3 : interesting_values) {
+ for (uint64_t v4 : interesting_values) {
+ buf2.clear();
+ pair<uint64_t, uint64_t> p2 = make_pair(v3, v4);
+ PutMemcmpableVarint64(&buf2, p2.first);
+ PutMemcmpableVarint64(&buf2, p2.second);
+
+ SCOPED_TRACE(testing::Message() << p1 << "\n" << HexDump(Slice(buf1))
+ << " vs\n" << p2 << "\n" << HexDump(Slice(buf2)));
+ if (p1 < p2) {
+ ASSERT_LT(Slice(buf1).compare(Slice(buf2)), 0);
+ } else if (p1 > p2) {
+ ASSERT_GT(Slice(buf1).compare(Slice(buf2)), 0);
+ } else {
+ ASSERT_EQ(Slice(buf1).compare(Slice(buf2)), 0);
+ }
+ }
+ }
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////
+// Benchmarks
+////////////////////////////////////////////////////////////
+
+#ifdef NDEBUG
+TEST_F(TestMemcmpableVarint, BenchmarkEncode) {
+ faststring buf;
+
+ int sum_sizes = 0; // need to do something with results to force evaluation
+
+ LOG_TIMING(INFO, "Encoding integers") {
+ for (int trial = 0; trial < 100; trial++) {
+ for (uint64_t i = 0; i < 1000000; i++) {
+ buf.clear();
+ PutMemcmpableVarint64(&buf, i);
+ sum_sizes += buf.size();
+ }
+ }
+ }
+ ASSERT_GT(sum_sizes, 1); // use 'sum_sizes' to avoid optimizing it out.
+}
+
+TEST_F(TestMemcmpableVarint, BenchmarkDecode) {
+ faststring buf;
+
+ // Encode 1M integers into the buffer
+ for (uint64_t i = 0; i < 1000000; i++) {
+ PutMemcmpableVarint64(&buf, i);
+ }
+
+ // Decode the whole buffer 100 times.
+ LOG_TIMING(INFO, "Decoding integers") {
+ uint64_t sum_vals = 0;
+ for (int trial = 0; trial < 100; trial++) {
+ Slice s(buf);
+ while (!s.empty()) {
+ uint64_t decoded;
+ CHECK(GetMemcmpableVarint64(&s, &decoded));
+ sum_vals += decoded;
+ }
+ }
+ ASSERT_GT(sum_vals, 1); // use 'sum_vals' to avoid optimizing it out.
+ }
+}
+
+#endif
+
+} // namespace kudu