You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/02/24 04:07:30 UTC
[3/4] kudu git commit: KUDU-1601. Delete ancient UNDO delta blocks in
the background
http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/tablet/tablet_history_gc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_history_gc-test.cc b/src/kudu/tablet/tablet_history_gc-test.cc
index 17de070..b071661 100644
--- a/src/kudu/tablet/tablet_history_gc-test.cc
+++ b/src/kudu/tablet/tablet_history_gc-test.cc
@@ -25,6 +25,7 @@
#include "kudu/tablet/tablet_metrics.h"
#include "kudu/tablet/tablet-test-base.h"
+DECLARE_bool(enable_maintenance_manager);
DECLARE_int32(tablet_history_max_age_sec);
DECLARE_bool(use_mock_wall_clock);
@@ -65,7 +66,8 @@ class TabletHistoryGcTest : public TabletTestBase<IntKeyTestSetup<INT64>> {
NO_FLUSH
};
- void InsertOriginalRows(int num_rowsets, uint64_t rows_per_rowset);
+ void InsertOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset);
+ void UpdateOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset, int32_t val);
void AddTimeToHybridClock(MonoDelta delta) {
uint64_t now = HybridClock::GetPhysicalValueMicros(clock()->Now());
uint64_t new_time = now + delta.ToMicroseconds();
@@ -87,8 +89,7 @@ class TabletHistoryGcTest : public TabletTestBase<IntKeyTestSetup<INT64>> {
int rows_per_rowset_ = 300;
};
-void TabletHistoryGcTest::InsertOriginalRows(int num_rowsets, uint64_t rows_per_rowset) {
- ClampRowCount(num_rowsets * rows_per_rowset);
+void TabletHistoryGcTest::InsertOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset) {
for (int rowset_id = 0; rowset_id < num_rowsets; rowset_id++) {
InsertTestRows(rowset_id * rows_per_rowset, rows_per_rowset, 0);
ASSERT_OK(tablet()->Flush());
@@ -96,6 +97,15 @@ void TabletHistoryGcTest::InsertOriginalRows(int num_rowsets, uint64_t rows_per_
ASSERT_EQ(num_rowsets, tablet()->num_rowsets());
}
+void TabletHistoryGcTest::UpdateOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset,
+ int32_t val) {
+ for (int rowset_id = 0; rowset_id < num_rowsets; rowset_id++) {
+ UpsertTestRows(rowset_id * rows_per_rowset, rows_per_rowset, val);
+ ASSERT_OK(tablet()->FlushAllDMSForTests());
+ }
+ ASSERT_EQ(num_rowsets, tablet()->num_rowsets());
+}
+
// Test that we do not generate undos for redo operations that are older than
// the AHM during major delta compaction.
TEST_F(TabletHistoryGcTest, TestNoGenerateUndoOnMajorDeltaCompaction) {
@@ -510,5 +520,77 @@ TEST_F(TabletHistoryGcTest, TestGcWithConcurrentCompaction) {
}
}
+// A version of the tablet history gc test with the maintenance manager disabled.
+class TabletHistoryGcNoMaintMgrTest : public TabletHistoryGcTest {
+ public:
+ void SetUp() override {
+ FLAGS_enable_maintenance_manager = false;
+ TabletHistoryGcTest::SetUp();
+ }
+};
+
+// Test that basic deletion of undo delta blocks prior to the AHM works.
+TEST_F(TabletHistoryGcNoMaintMgrTest, TestUndoDeltaBlockGc) {
+ FLAGS_tablet_history_max_age_sec = 1000;
+
+ NO_FATALS(InsertOriginalRows(num_rowsets_, rows_per_rowset_));
+ ASSERT_EQ(num_rowsets_, tablet()->CountUndoDeltasForTests());
+
+ // Generate a bunch of redo deltas and then compact them into undo deltas.
+ constexpr int kNumMutationsPerRow = 5;
+ for (int i = 0; i < kNumMutationsPerRow; i++) {
+ SCOPED_TRACE(i);
+ ASSERT_EQ((i + 1) * num_rowsets_, tablet()->CountUndoDeltasForTests());
+ NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(1)));
+ NO_FATALS(UpdateOriginalRows(num_rowsets_, rows_per_rowset_, i));
+ ASSERT_OK(tablet()->MajorCompactAllDeltaStoresForTests());
+ ASSERT_EQ((i + 2) * num_rowsets_, tablet()->CountUndoDeltasForTests());
+ }
+
+ ASSERT_EQ(0, tablet()->CountRedoDeltasForTests());
+ const int expected_undo_blocks = (kNumMutationsPerRow + 1) * num_rowsets_;
+ ASSERT_EQ(expected_undo_blocks, tablet()->CountUndoDeltasForTests());
+
+ // There will be uninitialized undos so we will estimate that there may be
+ // undos to GC.
+ int64_t bytes;
+ ASSERT_OK(tablet()->EstimateBytesInPotentiallyAncientUndoDeltas(&bytes));
+ ASSERT_GT(bytes, 0);
+
+ // Now initialize the undos. Our estimates should be back to 0.
+ int64_t bytes_in_ancient_undos = 0;
+ const MonoDelta kNoTimeLimit = MonoDelta();
+ ASSERT_OK(tablet()->InitAncientUndoDeltas(kNoTimeLimit, &bytes_in_ancient_undos));
+ ASSERT_EQ(0, bytes_in_ancient_undos);
+ ASSERT_OK(tablet()->EstimateBytesInPotentiallyAncientUndoDeltas(&bytes));
+ ASSERT_EQ(0, bytes);
+
+ // Move the clock so all deltas should be ancient.
+ NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec + 1)));
+
+ // Initialize and delete undos.
+ ASSERT_OK(tablet()->InitAncientUndoDeltas(kNoTimeLimit, &bytes_in_ancient_undos));
+ ASSERT_GT(bytes_in_ancient_undos, 0);
+
+ // Check that the estimate and the metrics match.
+ ASSERT_OK(tablet()->EstimateBytesInPotentiallyAncientUndoDeltas(&bytes));
+ ASSERT_EQ(bytes_in_ancient_undos, bytes);
+ ASSERT_EQ(bytes, tablet()->metrics()->undo_delta_block_estimated_retained_bytes->value());
+
+ int64_t blocks_deleted;
+ int64_t bytes_deleted;
+ ASSERT_OK(tablet()->DeleteAncientUndoDeltas(&blocks_deleted, &bytes_deleted));
+ ASSERT_EQ(expected_undo_blocks, blocks_deleted);
+ ASSERT_GT(bytes_deleted, 0);
+ ASSERT_EQ(0, tablet()->CountUndoDeltasForTests());
+ VLOG(1) << "Bytes deleted: " << bytes_deleted;
+
+ // Basic sanity check for our per-tablet metrics. Duration is difficult to
+ // verify with a Histogram so simply ensure each Histogram was incremented.
+ ASSERT_EQ(bytes_deleted, tablet()->metrics()->undo_delta_block_gc_bytes_deleted->value());
+ ASSERT_EQ(2, tablet()->metrics()->undo_delta_block_gc_init_duration->TotalCount());
+ ASSERT_EQ(1, tablet()->metrics()->undo_delta_block_gc_delete_duration->TotalCount());
+}
+
} // namespace tablet
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/tablet/tablet_metrics.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_metrics.cc b/src/kudu/tablet/tablet_metrics.cc
index 7b776a4..c24f96c 100644
--- a/src/kudu/tablet/tablet_metrics.cc
+++ b/src/kudu/tablet/tablet_metrics.cc
@@ -111,6 +111,13 @@ METRIC_DEFINE_counter(tablet, bytes_flushed, "Bytes Flushed",
kudu::MetricUnit::kBytes,
"Amount of data that has been flushed to disk by this tablet.");
+METRIC_DEFINE_counter(tablet, undo_delta_block_gc_bytes_deleted,
+ "Undo Delta Block GC Bytes Deleted",
+ kudu::MetricUnit::kBytes,
+ "Number of bytes deleted by garbage-collecting old UNDO delta blocks "
+ "on this tablet since this server was restarted. "
+ "Does not include bytes garbage collected during compactions.");
+
METRIC_DEFINE_histogram(tablet, bloom_lookups_per_op, "Bloom Lookups per Operation",
kudu::MetricUnit::kProbes,
"Tracks the number of bloom filter lookups performed by each "
@@ -182,6 +189,17 @@ METRIC_DEFINE_gauge_uint32(tablet, delta_major_compact_rs_running,
kudu::MetricUnit::kMaintenanceOperations,
"Number of delta major compactions currently running.");
+METRIC_DEFINE_gauge_uint32(tablet, undo_delta_block_gc_running,
+ "Undo Delta Block GC Running",
+ kudu::MetricUnit::kMaintenanceOperations,
+ "Number of UNDO delta block GC operations currently running.");
+
+METRIC_DEFINE_gauge_int64(tablet, undo_delta_block_estimated_retained_bytes,
+ "Estimated Deletable Bytes Retained in Undo Delta Blocks",
+ kudu::MetricUnit::kBytes,
+ "Estimated bytes of deletable data in undo delta blocks for this tablet. "
+ "May be an overestimate.");
+
METRIC_DEFINE_histogram(tablet, flush_dms_duration,
"DeltaMemStore Flush Duration",
kudu::MetricUnit::kMilliseconds,
@@ -207,6 +225,21 @@ METRIC_DEFINE_histogram(tablet, delta_major_compact_rs_duration,
kudu::MetricUnit::kSeconds,
"Seconds spent major delta compacting.", 60000000LU, 2);
+METRIC_DEFINE_histogram(tablet, undo_delta_block_gc_init_duration,
+ "Undo Delta Block GC Init Duration",
+ kudu::MetricUnit::kMilliseconds,
+ "Time spent initializing ancient UNDO delta blocks.", 60000LU, 1);
+
+METRIC_DEFINE_histogram(tablet, undo_delta_block_gc_delete_duration,
+ "Undo Delta Block GC Delete Duration",
+ kudu::MetricUnit::kMilliseconds,
+ "Time spent deleting ancient UNDO delta blocks.", 60000LU, 1);
+
+METRIC_DEFINE_histogram(tablet, undo_delta_block_gc_perform_duration,
+ "Undo Delta Block GC Perform Duration",
+ kudu::MetricUnit::kMilliseconds,
+ "Time spent running the maintenance operation to GC ancient UNDO delta blocks.", 60000LU, 1);
+
METRIC_DEFINE_counter(tablet, leader_memory_pressure_rejections,
"Leader Memory Pressure Rejections",
kudu::MetricUnit::kRequests,
@@ -239,6 +272,7 @@ TabletMetrics::TabletMetrics(const scoped_refptr<MetricEntity>& entity)
MINIT(delta_file_lookups),
MINIT(mrs_lookups),
MINIT(bytes_flushed),
+ MINIT(undo_delta_block_gc_bytes_deleted),
MINIT(bloom_lookups_per_op),
MINIT(key_file_lookups_per_op),
MINIT(delta_file_lookups_per_op),
@@ -251,11 +285,16 @@ TabletMetrics::TabletMetrics(const scoped_refptr<MetricEntity>& entity)
GINIT(compact_rs_running),
GINIT(delta_minor_compact_rs_running),
GINIT(delta_major_compact_rs_running),
+ GINIT(undo_delta_block_gc_running),
+ GINIT(undo_delta_block_estimated_retained_bytes),
MINIT(flush_dms_duration),
MINIT(flush_mrs_duration),
MINIT(compact_rs_duration),
MINIT(delta_minor_compact_rs_duration),
MINIT(delta_major_compact_rs_duration),
+ MINIT(undo_delta_block_gc_init_duration),
+ MINIT(undo_delta_block_gc_delete_duration),
+ MINIT(undo_delta_block_gc_perform_duration),
MINIT(leader_memory_pressure_rejections) {
}
#undef MINIT
http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/tablet/tablet_metrics.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_metrics.h b/src/kudu/tablet/tablet_metrics.h
index 84a7185..5fcea64 100644
--- a/src/kudu/tablet/tablet_metrics.h
+++ b/src/kudu/tablet/tablet_metrics.h
@@ -64,7 +64,10 @@ struct TabletMetrics {
scoped_refptr<Counter> key_file_lookups;
scoped_refptr<Counter> delta_file_lookups;
scoped_refptr<Counter> mrs_lookups;
+
+ // Operation stats.
scoped_refptr<Counter> bytes_flushed;
+ scoped_refptr<Counter> undo_delta_block_gc_bytes_deleted;
scoped_refptr<Histogram> bloom_lookups_per_op;
scoped_refptr<Histogram> key_file_lookups_per_op;
@@ -80,12 +83,17 @@ struct TabletMetrics {
scoped_refptr<AtomicGauge<uint32_t> > compact_rs_running;
scoped_refptr<AtomicGauge<uint32_t> > delta_minor_compact_rs_running;
scoped_refptr<AtomicGauge<uint32_t> > delta_major_compact_rs_running;
+ scoped_refptr<AtomicGauge<uint32_t> > undo_delta_block_gc_running;
+ scoped_refptr<AtomicGauge<int64_t> > undo_delta_block_estimated_retained_bytes;
scoped_refptr<Histogram> flush_dms_duration;
scoped_refptr<Histogram> flush_mrs_duration;
scoped_refptr<Histogram> compact_rs_duration;
scoped_refptr<Histogram> delta_minor_compact_rs_duration;
scoped_refptr<Histogram> delta_major_compact_rs_duration;
+ scoped_refptr<Histogram> undo_delta_block_gc_init_duration;
+ scoped_refptr<Histogram> undo_delta_block_gc_delete_duration;
+ scoped_refptr<Histogram> undo_delta_block_gc_perform_duration;
scoped_refptr<Counter> leader_memory_pressure_rejections;
};
http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/tablet/tablet_mm_ops.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_mm_ops.cc b/src/kudu/tablet/tablet_mm_ops.cc
index 30f5138..12268a0 100644
--- a/src/kudu/tablet/tablet_mm_ops.cc
+++ b/src/kudu/tablet/tablet_mm_ops.cc
@@ -19,9 +19,20 @@
#include <mutex>
-#include "kudu/util/locks.h"
+#include <gflags/gflags.h>
+
#include "kudu/tablet/tablet.h"
#include "kudu/tablet/tablet_metrics.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/locks.h"
+
+DEFINE_int32(undo_delta_block_gc_init_budget_millis, 1000,
+ "The maximum number of milliseconds we will spend initializing "
+ "UNDO delta blocks per invocation of UndoDeltaBlockGCOp. Existing delta "
+ "blocks must be initialized once per process startup to determine "
+ "when they can be deleted.");
+TAG_FLAG(undo_delta_block_gc_init_budget_millis, evolving);
+TAG_FLAG(undo_delta_block_gc_init_budget_millis, advanced);
using std::string;
using strings::Substitute;
@@ -247,5 +258,55 @@ scoped_refptr<AtomicGauge<uint32_t> > MajorDeltaCompactionOp::RunningGauge() con
return tablet_->metrics()->delta_major_compact_rs_running;
}
+////////////////////////////////////////////////////////////
+// UndoDeltaBlockGCOp
+////////////////////////////////////////////////////////////
+
+UndoDeltaBlockGCOp::UndoDeltaBlockGCOp(Tablet* tablet)
+ : TabletOpBase(Substitute("UndoDeltaBlockGCOp($0)", tablet->tablet_id()),
+ MaintenanceOp::HIGH_IO_USAGE, tablet) {
+}
+
+void UndoDeltaBlockGCOp::UpdateStats(MaintenanceOpStats* stats) {
+ int64_t max_estimated_retained_bytes = 0;
+ WARN_NOT_OK(tablet_->EstimateBytesInPotentiallyAncientUndoDeltas(&max_estimated_retained_bytes),
+ "Unable to count bytes in potentially ancient undo deltas");
+ stats->set_data_retained_bytes(max_estimated_retained_bytes);
+ stats->set_runnable(max_estimated_retained_bytes > 0);
+}
+
+bool UndoDeltaBlockGCOp::Prepare() {
+ // Nothing for us to do.
+ return true;
+}
+
+void UndoDeltaBlockGCOp::Perform() {
+ MonoDelta time_budget = MonoDelta::FromMilliseconds(FLAGS_undo_delta_block_gc_init_budget_millis);
+ int64_t bytes_in_ancient_undos = 0;
+ Status s = tablet_->InitAncientUndoDeltas(time_budget, &bytes_in_ancient_undos);
+ if (PREDICT_FALSE(!s.ok())) {
+ LOG_WITH_PREFIX(WARNING) << s.ToString();
+ return;
+ }
+
+ // Return early if it turns out that we have nothing to GC.
+ if (bytes_in_ancient_undos == 0) return;
+
+ CHECK_OK_PREPEND(tablet_->DeleteAncientUndoDeltas(),
+ Substitute("$0GC of undo delta blocks failed", LogPrefix()));
+}
+
+scoped_refptr<Histogram> UndoDeltaBlockGCOp::DurationHistogram() const {
+ return tablet_->metrics()->undo_delta_block_gc_perform_duration;
+}
+
+scoped_refptr<AtomicGauge<uint32_t>> UndoDeltaBlockGCOp::RunningGauge() const {
+ return tablet_->metrics()->undo_delta_block_gc_running;
+}
+
+std::string UndoDeltaBlockGCOp::LogPrefix() const {
+ return tablet_->LogPrefix();
+}
+
} // namespace tablet
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/tablet/tablet_mm_ops.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_mm_ops.h b/src/kudu/tablet/tablet_mm_ops.h
index 98ff281..2f8e18d 100644
--- a/src/kudu/tablet/tablet_mm_ops.h
+++ b/src/kudu/tablet/tablet_mm_ops.h
@@ -125,6 +125,36 @@ class MajorDeltaCompactionOp : public TabletOpBase {
uint64_t last_num_rs_major_delta_compacted_;
};
+// MaintenanceOp to garbage-collect undo delta blocks that are older than the
+// ancient history mark.
+class UndoDeltaBlockGCOp : public TabletOpBase {
+ public:
+ explicit UndoDeltaBlockGCOp(Tablet* tablet);
+
+ // Estimates the number of bytes that may potentially be in ancient delta
+ // undo blocks. Over time, as Perform() is invoked, this estimate gets more
+ // accurate.
+ void UpdateStats(MaintenanceOpStats* stats) override;
+
+ bool Prepare() override;
+
+ // Deletes ancient history data from disk. This also initializes undo delta
+ // blocks greedily (in a budgeted manner controlled by the
+ // --undo_delta_block_gc_init_budget_millis gflag) that makes the estimate
+ // performed in UpdateStats() more accurate.
+ void Perform() override;
+
+ scoped_refptr<Histogram> DurationHistogram() const override;
+
+ scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const override;
+
+ private:
+ std::string LogPrefix() const;
+
+ DISALLOW_COPY_AND_ASSIGN(UndoDeltaBlockGCOp);
+};
+
+
} // namespace tablet
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/tserver/tablet_server.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server.cc b/src/kudu/tserver/tablet_server.cc
index 0ae70b9..a26c94b 100644
--- a/src/kudu/tserver/tablet_server.cc
+++ b/src/kudu/tserver/tablet_server.cc
@@ -124,7 +124,7 @@ Status TabletServer::Start() {
RETURN_NOT_OK(ServerBase::Start());
RETURN_NOT_OK(heartbeater_->Start());
- RETURN_NOT_OK(maintenance_manager_->Init());
+ RETURN_NOT_OK(maintenance_manager_->Init(fs_manager_->uuid()));
google::FlushLogFiles(google::INFO); // Flush the startup messages.
http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/util/maintenance_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/maintenance_manager-test.cc b/src/kudu/util/maintenance_manager-test.cc
index 282578b..e5dcc96 100644
--- a/src/kudu/util/maintenance_manager-test.cc
+++ b/src/kudu/util/maintenance_manager-test.cc
@@ -47,11 +47,12 @@ DECLARE_int64(log_target_replay_size_mb);
namespace kudu {
-const int kHistorySize = 4;
+static const int kHistorySize = 4;
+static const char kFakeUuid[] = "12345";
class MaintenanceManagerTest : public KuduTest {
public:
- MaintenanceManagerTest() {
+ void SetUp() override {
test_tracker_ = MemTracker::CreateTracker(1000, "test");
MaintenanceManager::Options options;
options.num_threads = 2;
@@ -59,9 +60,10 @@ class MaintenanceManagerTest : public KuduTest {
options.history_size = kHistorySize;
options.parent_mem_tracker = test_tracker_;
manager_.reset(new MaintenanceManager(options));
- manager_->Init();
+ ASSERT_OK(manager_->Init(kFakeUuid));
}
- ~MaintenanceManagerTest() {
+
+ void TearDown() override {
manager_->Shutdown();
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/util/maintenance_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/maintenance_manager.cc b/src/kudu/util/maintenance_manager.cc
index 6a50f6f..eda71e8 100644
--- a/src/kudu/util/maintenance_manager.cc
+++ b/src/kudu/util/maintenance_manager.cc
@@ -31,6 +31,7 @@
#include "kudu/util/logging.h"
#include "kudu/util/mem_tracker.h"
#include "kudu/util/metrics.h"
+#include "kudu/util/random_util.h"
#include "kudu/util/stopwatch.h"
#include "kudu/util/thread.h"
#include "kudu/util/trace.h"
@@ -65,6 +66,18 @@ DEFINE_int64(log_target_replay_size_mb, 1024,
"these operations to disk.");
TAG_FLAG(log_target_replay_size_mb, experimental);
+DEFINE_int64(data_gc_min_size_mb, 0,
+ "The (exclusive) minimum number of megabytes of ancient data on "
+ "disk, per tablet, needed to prioritize deletion of that data.");
+TAG_FLAG(data_gc_min_size_mb, experimental);
+
+DEFINE_double(data_gc_prioritization_prob, 0.5,
+ "The probability that we will prioritize data GC over performance "
+ "improvement operations. If set to 1.0, we will always prefer to "
+ "delete old data before running performance improvement operations "
+ "such as delta compaction.");
+TAG_FLAG(data_gc_prioritization_prob, experimental);
+
namespace kudu {
MaintenanceOpStats::MaintenanceOpStats() {
@@ -76,7 +89,9 @@ void MaintenanceOpStats::Clear() {
runnable_ = false;
ram_anchored_ = 0;
logs_retained_bytes_ = 0;
+ data_retained_bytes_ = 0;
perf_improvement_ = 0;
+ last_modified_ = MonoTime();
}
MaintenanceOp::MaintenanceOp(std::string name, IOUsage io_usage)
@@ -97,10 +112,10 @@ void MaintenanceOp::Unregister() {
}
const MaintenanceManager::Options MaintenanceManager::DEFAULT_OPTIONS = {
- 0,
- 0,
- 0,
- shared_ptr<MemTracker>(),
+ .num_threads = 0,
+ .polling_interval_ms = 0,
+ .history_size = 0,
+ .parent_mem_tracker = shared_ptr<MemTracker>(),
};
MaintenanceManager::MaintenanceManager(const Options& options)
@@ -114,7 +129,8 @@ MaintenanceManager::MaintenanceManager(const Options& options)
options.polling_interval_ms),
completed_ops_count_(0),
parent_mem_tracker_(!options.parent_mem_tracker ?
- MemTracker::GetRootTracker() : options.parent_mem_tracker) {
+ MemTracker::GetRootTracker() : options.parent_mem_tracker),
+ rand_(GetRandomSeed32()) {
CHECK_OK(ThreadPoolBuilder("MaintenanceMgr").set_min_threads(num_threads_)
.set_max_threads(num_threads_).Build(&thread_pool_));
uint32_t history_size = options.history_size == 0 ?
@@ -127,7 +143,8 @@ MaintenanceManager::~MaintenanceManager() {
Shutdown();
}
-Status MaintenanceManager::Init() {
+Status MaintenanceManager::Init(string server_uuid) {
+ server_uuid_ = std::move(server_uuid);
RETURN_NOT_OK(Thread::Create("maintenance", "maintenance_scheduler",
boost::bind(&MaintenanceManager::RunSchedulerThread, this),
&monitor_thread_));
@@ -151,8 +168,9 @@ void MaintenanceManager::Shutdown() {
}
void MaintenanceManager::RegisterOp(MaintenanceOp* op) {
+ CHECK(op);
std::lock_guard<Mutex> guard(lock_);
- CHECK(!op->manager_.get()) << "Tried to register " << op->name()
+ CHECK(!op->manager_) << "Tried to register " << op->name()
<< ", but it was already registered.";
pair<OpMapTy::iterator, bool> val
(ops_.insert(OpMapTy::value_type(op, MaintenanceOpStats())));
@@ -161,7 +179,7 @@ void MaintenanceManager::RegisterOp(MaintenanceOp* op) {
<< ", but it already exists in ops_.";
op->manager_ = shared_from_this();
op->cond_.reset(new ConditionVariable(&lock_));
- VLOG_AND_TRACE("maintenance", 1) << "Registered " << op->name();
+ VLOG_AND_TRACE("maintenance", 1) << LogPrefix() << "Registered " << op->name();
}
void MaintenanceManager::UnregisterOp(MaintenanceOp* op) {
@@ -174,8 +192,8 @@ void MaintenanceManager::UnregisterOp(MaintenanceOp* op) {
<< ", but it was never registered";
// While the op is running, wait for it to be finished.
if (iter->first->running_ > 0) {
- VLOG_AND_TRACE("maintenance", 1) << "Waiting for op " << op->name() << " to finish so "
- << "we can unregister it.";
+ VLOG_AND_TRACE("maintenance", 1) << LogPrefix() << "Waiting for op " << op->name()
+ << " to finish so we can unregister it.";
}
op->CancelAndDisable();
while (iter->first->running_ > 0) {
@@ -187,7 +205,7 @@ void MaintenanceManager::UnregisterOp(MaintenanceOp* op) {
}
ops_.erase(iter);
}
- LOG(INFO) << "Unregistered op " << op->name();
+ LOG_WITH_PREFIX(INFO) << "Unregistered op " << op->name();
op->cond_.reset();
// Remove the op's shared_ptr reference to us. This might 'delete this'.
op->manager_.reset();
@@ -201,7 +219,7 @@ void MaintenanceManager::RunSchedulerThread() {
// Loop until we are shutting down or it is time to run another op.
cond_.TimedWait(polling_interval);
if (shutdown_) {
- VLOG_AND_TRACE("maintenance", 1) << "Shutting down maintenance manager.";
+ VLOG_AND_TRACE("maintenance", 1) << LogPrefix() << "Shutting down maintenance manager.";
return;
}
@@ -213,7 +231,8 @@ void MaintenanceManager::RunSchedulerThread() {
// Find the best op.
MaintenanceOp* op = FindBestOp();
if (!op) {
- VLOG_AND_TRACE("maintenance", 2) << "No maintenance operations look worth doing.";
+ VLOG_AND_TRACE("maintenance", 2) << LogPrefix()
+ << "No maintenance operations look worth doing.";
continue;
}
@@ -224,8 +243,8 @@ void MaintenanceManager::RunSchedulerThread() {
bool ready = op->Prepare();
guard.lock();
if (!ready) {
- LOG(INFO) << "Prepare failed for " << op->name()
- << ". Re-running scheduler.";
+ LOG_WITH_PREFIX(INFO) << "Prepare failed for " << op->name()
+ << ". Re-running scheduler.";
op->running_--;
op->cond_->Signal();
continue;
@@ -263,7 +282,8 @@ MaintenanceOp* MaintenanceManager::FindBestOp() {
size_t free_threads = num_threads_ - running_ops_;
if (free_threads == 0) {
- VLOG_AND_TRACE("maintenance", 1) << "there are no free threads, so we can't run anything.";
+ VLOG_AND_TRACE("maintenance", 1) << LogPrefix()
+ << "There are no free threads, so we can't run anything.";
return nullptr;
}
@@ -277,11 +297,15 @@ MaintenanceOp* MaintenanceManager::FindBestOp() {
int64_t most_logs_retained_bytes_ram_anchored = 0;
MaintenanceOp* most_logs_retained_bytes_op = nullptr;
+ int64_t most_data_retained_bytes = 0;
+ MaintenanceOp* most_data_retained_bytes_op = nullptr;
+
double best_perf_improvement = 0;
MaintenanceOp* best_perf_improvement_op = nullptr;
for (OpMapTy::value_type &val : ops_) {
MaintenanceOp* op(val.first);
MaintenanceOpStats& stats(val.second);
+ VLOG_WITH_PREFIX(3) << "Considering MM op " << op->name();
// Update op stats.
stats.Clear();
op->UpdateStats(&stats);
@@ -289,9 +313,11 @@ MaintenanceOp* MaintenanceManager::FindBestOp() {
continue;
}
if (stats.logs_retained_bytes() > low_io_most_logs_retained_bytes &&
- op->io_usage_ == MaintenanceOp::LOW_IO_USAGE) {
+ op->io_usage() == MaintenanceOp::LOW_IO_USAGE) {
low_io_most_logs_retained_bytes_op = op;
low_io_most_logs_retained_bytes = stats.logs_retained_bytes();
+ VLOG_AND_TRACE("maintenance", 2) << LogPrefix() << "Op " << op->name() << " can free "
+ << stats.logs_retained_bytes() << " bytes of logs";
}
if (stats.ram_anchored() > most_mem_anchored) {
@@ -308,6 +334,14 @@ MaintenanceOp* MaintenanceManager::FindBestOp() {
most_logs_retained_bytes = stats.logs_retained_bytes();
most_logs_retained_bytes_ram_anchored = stats.ram_anchored();
}
+
+ if (stats.data_retained_bytes() > most_data_retained_bytes) {
+ most_data_retained_bytes_op = op;
+ most_data_retained_bytes = stats.data_retained_bytes();
+ VLOG_AND_TRACE("maintenance", 2) << LogPrefix() << "Op " << op->name() << " can free "
+ << stats.data_retained_bytes() << " bytes of data";
+ }
+
if ((!best_perf_improvement_op) ||
(stats.perf_improvement() > best_perf_improvement)) {
best_perf_improvement_op = op;
@@ -318,7 +352,7 @@ MaintenanceOp* MaintenanceManager::FindBestOp() {
// Look at ops that we can run quickly that free up log retention.
if (low_io_most_logs_retained_bytes_op) {
if (low_io_most_logs_retained_bytes > 0) {
- VLOG_AND_TRACE("maintenance", 1)
+ VLOG_AND_TRACE("maintenance", 1) << LogPrefix()
<< "Performing " << low_io_most_logs_retained_bytes_op->name() << ", "
<< "because it can free up more logs "
<< "at " << low_io_most_logs_retained_bytes
@@ -335,10 +369,10 @@ MaintenanceOp* MaintenanceManager::FindBestOp() {
string msg = StringPrintf("we have exceeded our soft memory limit "
"(current capacity is %.2f%%). However, there are no ops currently "
"runnable which would free memory.", capacity_pct);
- LOG(INFO) << msg;
+ LOG_WITH_PREFIX(INFO) << msg;
return nullptr;
}
- VLOG_AND_TRACE("maintenance", 1) << "we have exceeded our soft memory limit "
+ VLOG_AND_TRACE("maintenance", 1) << LogPrefix() << "We have exceeded our soft memory limit "
<< "(current capacity is " << capacity_pct << "%). Running the op "
<< "which anchors the most memory: " << most_mem_anchored_op->name();
return most_mem_anchored_op;
@@ -346,26 +380,39 @@ MaintenanceOp* MaintenanceManager::FindBestOp() {
if (most_logs_retained_bytes_op &&
most_logs_retained_bytes / 1024 / 1024 >= FLAGS_log_target_replay_size_mb) {
- VLOG_AND_TRACE("maintenance", 1)
+ VLOG_AND_TRACE("maintenance", 1) << LogPrefix()
<< "Performing " << most_logs_retained_bytes_op->name() << ", "
<< "because it can free up more logs (" << most_logs_retained_bytes
<< " bytes)";
return most_logs_retained_bytes_op;
}
- if (best_perf_improvement_op) {
- if (best_perf_improvement > 0) {
- VLOG_AND_TRACE("maintenance", 1) << "Performing " << best_perf_improvement_op->name() << ", "
- << "because it had the best perf_improvement score, "
- << "at " << best_perf_improvement;
- return best_perf_improvement_op;
+ // Look at ops that we can run quickly that free up data on disk.
+ if (most_data_retained_bytes_op &&
+ most_data_retained_bytes > FLAGS_data_gc_min_size_mb * 1024 * 1024) {
+ if (!best_perf_improvement_op || best_perf_improvement <= 0 ||
+ rand_.NextDoubleFraction() <= FLAGS_data_gc_prioritization_prob) {
+ VLOG_AND_TRACE("maintenance", 1) << LogPrefix()
+ << "Performing " << most_data_retained_bytes_op->name() << ", "
+ << "because it can free up more data "
+ << "at " << most_data_retained_bytes << " bytes";
+ return most_data_retained_bytes_op;
}
+ VLOG(1) << "Skipping data GC due to prioritizing perf improvement";
+ }
+
+ if (best_perf_improvement_op && best_perf_improvement > 0) {
+ VLOG_AND_TRACE("maintenance", 1) << LogPrefix() << "Performing "
+ << best_perf_improvement_op->name() << ", "
+ << "because it had the best perf_improvement score, "
+ << "at " << best_perf_improvement;
+ return best_perf_improvement_op;
}
return nullptr;
}
void MaintenanceManager::LaunchOp(MaintenanceOp* op) {
- MonoTime start_time(MonoTime::Now());
+ MonoTime start_time = MonoTime::Now();
op->RunningGauge()->Increment();
scoped_refptr<Trace> trace(new Trace);
@@ -375,13 +422,12 @@ void MaintenanceManager::LaunchOp(MaintenanceOp* op) {
"name", op->name());
op->Perform();
}
- LOG(INFO) << op->name() << " metrics: " << trace->MetricsAsJSON();
+ LOG_WITH_PREFIX(INFO) << op->name() << " metrics: " << trace->MetricsAsJSON();
op->RunningGauge()->Decrement();
- MonoTime end_time(MonoTime::Now());
- MonoDelta delta(end_time.GetDeltaSince(start_time));
- std::lock_guard<Mutex> guard(lock_);
+ MonoDelta delta = MonoTime::Now() - start_time;
+ std::lock_guard<Mutex> l(lock_);
CompletedOp& completed_op = completed_ops_[completed_ops_count_ % completed_ops_.size()];
completed_op.name = op->name();
completed_op.duration = delta;
@@ -438,4 +484,8 @@ void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatu
}
}
+std::string MaintenanceManager::LogPrefix() const {
+ return Substitute("P $0: ", server_uuid_);
+}
+
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/util/maintenance_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/maintenance_manager.h b/src/kudu/util/maintenance_manager.h
index 2ae4ccb..5c432a7 100644
--- a/src/kudu/util/maintenance_manager.h
+++ b/src/kudu/util/maintenance_manager.h
@@ -28,10 +28,11 @@
#include "kudu/gutil/macros.h"
#include "kudu/util/atomic.h"
#include "kudu/util/condition_variable.h"
+#include "kudu/util/countdown_latch.h"
#include "kudu/util/maintenance_manager.pb.h"
#include "kudu/util/monotime.h"
#include "kudu/util/mutex.h"
-#include "kudu/util/countdown_latch.h"
+#include "kudu/util/random.h"
#include "kudu/util/thread.h"
#include "kudu/util/threadpool.h"
@@ -80,6 +81,16 @@ class MaintenanceOpStats {
logs_retained_bytes_ = logs_retained_bytes;
}
+ int64_t data_retained_bytes() const {
+ DCHECK(valid_);
+ return data_retained_bytes_;
+ }
+
+ void set_data_retained_bytes(int64_t data_retained_bytes) {
+ UpdateLastModified();
+ data_retained_bytes_ = data_retained_bytes;
+ }
+
double perf_improvement() const {
DCHECK(valid_);
return perf_improvement_;
@@ -105,6 +116,8 @@ class MaintenanceOpStats {
last_modified_ = MonoTime::Now();
}
+ // Important: Update Clear() when adding fields to this class.
+
// True if these stats are valid.
bool valid_;
@@ -116,10 +129,14 @@ class MaintenanceOpStats {
// should be fairly accurate. May be 0.
uint64_t ram_anchored_;
- // The approximate amount of disk space that not doing this operation keeps us from GCing from
- // the logs. May be 0.
+ // Approximate amount of disk space in WAL files that would be freed if this
+ // operation ran. May be 0.
int64_t logs_retained_bytes_;
+ // Approximate amount of disk space in data blocks that would be freed if
+ // this operation ran. May be 0.
+ int64_t data_retained_bytes_;
+
// The estimated performance improvement-- how good it is to do this on some
// absolute scale (yet TBD).
double perf_improvement_;
@@ -248,7 +265,7 @@ class MaintenanceManager : public std::enable_shared_from_this<MaintenanceManage
explicit MaintenanceManager(const Options& options);
~MaintenanceManager();
- Status Init();
+ Status Init(std::string server_uuid);
void Shutdown();
// Register an op with the manager.
@@ -275,6 +292,8 @@ class MaintenanceManager : public std::enable_shared_from_this<MaintenanceManage
void LaunchOp(MaintenanceOp* op);
+ std::string LogPrefix() const;
+
const int32_t num_threads_;
OpMapTy ops_; // registered operations
Mutex lock_;
@@ -289,6 +308,8 @@ class MaintenanceManager : public std::enable_shared_from_this<MaintenanceManage
std::vector<CompletedOp> completed_ops_;
int64_t completed_ops_count_;
std::shared_ptr<MemTracker> parent_mem_tracker_;
+ std::string server_uuid_;
+ Random rand_;
DISALLOW_COPY_AND_ASSIGN(MaintenanceManager);
};