You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/02/24 04:07:30 UTC

[3/4] kudu git commit: KUDU-1601. Delete ancient UNDO delta blocks in the background

http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/tablet/tablet_history_gc-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_history_gc-test.cc b/src/kudu/tablet/tablet_history_gc-test.cc
index 17de070..b071661 100644
--- a/src/kudu/tablet/tablet_history_gc-test.cc
+++ b/src/kudu/tablet/tablet_history_gc-test.cc
@@ -25,6 +25,7 @@
 #include "kudu/tablet/tablet_metrics.h"
 #include "kudu/tablet/tablet-test-base.h"
 
+DECLARE_bool(enable_maintenance_manager);
 DECLARE_int32(tablet_history_max_age_sec);
 DECLARE_bool(use_mock_wall_clock);
 
@@ -65,7 +66,8 @@ class TabletHistoryGcTest : public TabletTestBase<IntKeyTestSetup<INT64>> {
     NO_FLUSH
   };
 
-  void InsertOriginalRows(int num_rowsets, uint64_t rows_per_rowset);
+  void InsertOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset);
+  void UpdateOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset, int32_t val);
   void AddTimeToHybridClock(MonoDelta delta) {
     uint64_t now = HybridClock::GetPhysicalValueMicros(clock()->Now());
     uint64_t new_time = now + delta.ToMicroseconds();
@@ -87,8 +89,7 @@ class TabletHistoryGcTest : public TabletTestBase<IntKeyTestSetup<INT64>> {
   int rows_per_rowset_ = 300;
 };
 
-void TabletHistoryGcTest::InsertOriginalRows(int num_rowsets, uint64_t rows_per_rowset) {
-  ClampRowCount(num_rowsets * rows_per_rowset);
+void TabletHistoryGcTest::InsertOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset) {
   for (int rowset_id = 0; rowset_id < num_rowsets; rowset_id++) {
     InsertTestRows(rowset_id * rows_per_rowset, rows_per_rowset, 0);
     ASSERT_OK(tablet()->Flush());
@@ -96,6 +97,15 @@ void TabletHistoryGcTest::InsertOriginalRows(int num_rowsets, uint64_t rows_per_
   ASSERT_EQ(num_rowsets, tablet()->num_rowsets());
 }
 
+void TabletHistoryGcTest::UpdateOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset,
+                                             int32_t val) {
+  for (int rowset_id = 0; rowset_id < num_rowsets; rowset_id++) {
+    UpsertTestRows(rowset_id * rows_per_rowset, rows_per_rowset, val);
+    ASSERT_OK(tablet()->FlushAllDMSForTests());
+  }
+  ASSERT_EQ(num_rowsets, tablet()->num_rowsets());
+}
+
 // Test that we do not generate undos for redo operations that are older than
 // the AHM during major delta compaction.
 TEST_F(TabletHistoryGcTest, TestNoGenerateUndoOnMajorDeltaCompaction) {
@@ -510,5 +520,77 @@ TEST_F(TabletHistoryGcTest, TestGcWithConcurrentCompaction) {
   }
 }
 
+// A version of the tablet history gc test with the maintenance manager disabled.
+class TabletHistoryGcNoMaintMgrTest : public TabletHistoryGcTest {
+ public:
+  void SetUp() override {
+    FLAGS_enable_maintenance_manager = false;
+    TabletHistoryGcTest::SetUp();
+  }
+};
+
+// Test that basic deletion of undo delta blocks prior to the AHM works.
+TEST_F(TabletHistoryGcNoMaintMgrTest, TestUndoDeltaBlockGc) {
+  FLAGS_tablet_history_max_age_sec = 1000;
+
+  NO_FATALS(InsertOriginalRows(num_rowsets_, rows_per_rowset_));
+  ASSERT_EQ(num_rowsets_, tablet()->CountUndoDeltasForTests());
+
+  // Generate a bunch of redo deltas and then compact them into undo deltas.
+  constexpr int kNumMutationsPerRow = 5;
+  for (int i = 0; i < kNumMutationsPerRow; i++) {
+    SCOPED_TRACE(i);
+    ASSERT_EQ((i + 1) * num_rowsets_, tablet()->CountUndoDeltasForTests());
+    NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(1)));
+    NO_FATALS(UpdateOriginalRows(num_rowsets_, rows_per_rowset_, i));
+    ASSERT_OK(tablet()->MajorCompactAllDeltaStoresForTests());
+    ASSERT_EQ((i + 2) * num_rowsets_, tablet()->CountUndoDeltasForTests());
+  }
+
+  ASSERT_EQ(0, tablet()->CountRedoDeltasForTests());
+  const int expected_undo_blocks = (kNumMutationsPerRow + 1) * num_rowsets_;
+  ASSERT_EQ(expected_undo_blocks, tablet()->CountUndoDeltasForTests());
+
+  // There will be uninitialized undos so we will estimate that there may be
+  // undos to GC.
+  int64_t bytes;
+  ASSERT_OK(tablet()->EstimateBytesInPotentiallyAncientUndoDeltas(&bytes));
+  ASSERT_GT(bytes, 0);
+
+  // Now initialize the undos. Our estimates should be back to 0.
+  int64_t bytes_in_ancient_undos = 0;
+  const MonoDelta kNoTimeLimit = MonoDelta();
+  ASSERT_OK(tablet()->InitAncientUndoDeltas(kNoTimeLimit, &bytes_in_ancient_undos));
+  ASSERT_EQ(0, bytes_in_ancient_undos);
+  ASSERT_OK(tablet()->EstimateBytesInPotentiallyAncientUndoDeltas(&bytes));
+  ASSERT_EQ(0, bytes);
+
+  // Move the clock so all deltas should be ancient.
+  NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec + 1)));
+
+  // Initialize and delete undos.
+  ASSERT_OK(tablet()->InitAncientUndoDeltas(kNoTimeLimit, &bytes_in_ancient_undos));
+  ASSERT_GT(bytes_in_ancient_undos, 0);
+
+  // Check that the estimate and the metrics match.
+  ASSERT_OK(tablet()->EstimateBytesInPotentiallyAncientUndoDeltas(&bytes));
+  ASSERT_EQ(bytes_in_ancient_undos, bytes);
+  ASSERT_EQ(bytes, tablet()->metrics()->undo_delta_block_estimated_retained_bytes->value());
+
+  int64_t blocks_deleted;
+  int64_t bytes_deleted;
+  ASSERT_OK(tablet()->DeleteAncientUndoDeltas(&blocks_deleted, &bytes_deleted));
+  ASSERT_EQ(expected_undo_blocks, blocks_deleted);
+  ASSERT_GT(bytes_deleted, 0);
+  ASSERT_EQ(0, tablet()->CountUndoDeltasForTests());
+  VLOG(1) << "Bytes deleted: " << bytes_deleted;
+
+  // Basic sanity check for our per-tablet metrics. Duration is difficult to
+  // verify with a Histogram so simply ensure each Histogram was incremented.
+  ASSERT_EQ(bytes_deleted, tablet()->metrics()->undo_delta_block_gc_bytes_deleted->value());
+  ASSERT_EQ(2, tablet()->metrics()->undo_delta_block_gc_init_duration->TotalCount());
+  ASSERT_EQ(1, tablet()->metrics()->undo_delta_block_gc_delete_duration->TotalCount());
+}
+
 } // namespace tablet
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/tablet/tablet_metrics.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_metrics.cc b/src/kudu/tablet/tablet_metrics.cc
index 7b776a4..c24f96c 100644
--- a/src/kudu/tablet/tablet_metrics.cc
+++ b/src/kudu/tablet/tablet_metrics.cc
@@ -111,6 +111,13 @@ METRIC_DEFINE_counter(tablet, bytes_flushed, "Bytes Flushed",
                       kudu::MetricUnit::kBytes,
                       "Amount of data that has been flushed to disk by this tablet.");
 
+METRIC_DEFINE_counter(tablet, undo_delta_block_gc_bytes_deleted,
+                      "Undo Delta Block GC Bytes Deleted",
+                      kudu::MetricUnit::kBytes,
+                      "Number of bytes deleted by garbage-collecting old UNDO delta blocks "
+                      "on this tablet since this server was restarted. "
+                      "Does not include bytes garbage collected during compactions.");
+
 METRIC_DEFINE_histogram(tablet, bloom_lookups_per_op, "Bloom Lookups per Operation",
                         kudu::MetricUnit::kProbes,
                         "Tracks the number of bloom filter lookups performed by each "
@@ -182,6 +189,17 @@ METRIC_DEFINE_gauge_uint32(tablet, delta_major_compact_rs_running,
   kudu::MetricUnit::kMaintenanceOperations,
   "Number of delta major compactions currently running.");
 
+METRIC_DEFINE_gauge_uint32(tablet, undo_delta_block_gc_running,
+  "Undo Delta Block GC Running",
+  kudu::MetricUnit::kMaintenanceOperations,
+  "Number of UNDO delta block GC operations currently running.");
+
+METRIC_DEFINE_gauge_int64(tablet, undo_delta_block_estimated_retained_bytes,
+  "Estimated Deletable Bytes Retained in Undo Delta Blocks",
+  kudu::MetricUnit::kBytes,
+  "Estimated bytes of deletable data in undo delta blocks for this tablet. "
+  "May be an overestimate.");
+
 METRIC_DEFINE_histogram(tablet, flush_dms_duration,
   "DeltaMemStore Flush Duration",
   kudu::MetricUnit::kMilliseconds,
@@ -207,6 +225,21 @@ METRIC_DEFINE_histogram(tablet, delta_major_compact_rs_duration,
   kudu::MetricUnit::kSeconds,
   "Seconds spent major delta compacting.", 60000000LU, 2);
 
+METRIC_DEFINE_histogram(tablet, undo_delta_block_gc_init_duration,
+  "Undo Delta Block GC Init Duration",
+  kudu::MetricUnit::kMilliseconds,
+  "Time spent initializing ancient UNDO delta blocks.", 60000LU, 1);
+
+METRIC_DEFINE_histogram(tablet, undo_delta_block_gc_delete_duration,
+  "Undo Delta Block GC Delete Duration",
+  kudu::MetricUnit::kMilliseconds,
+  "Time spent deleting ancient UNDO delta blocks.", 60000LU, 1);
+
+METRIC_DEFINE_histogram(tablet, undo_delta_block_gc_perform_duration,
+  "Undo Delta Block GC Perform Duration",
+  kudu::MetricUnit::kMilliseconds,
+  "Time spent running the maintenance operation to GC ancient UNDO delta blocks.", 60000LU, 1);
+
 METRIC_DEFINE_counter(tablet, leader_memory_pressure_rejections,
   "Leader Memory Pressure Rejections",
   kudu::MetricUnit::kRequests,
@@ -239,6 +272,7 @@ TabletMetrics::TabletMetrics(const scoped_refptr<MetricEntity>& entity)
     MINIT(delta_file_lookups),
     MINIT(mrs_lookups),
     MINIT(bytes_flushed),
+    MINIT(undo_delta_block_gc_bytes_deleted),
     MINIT(bloom_lookups_per_op),
     MINIT(key_file_lookups_per_op),
     MINIT(delta_file_lookups_per_op),
@@ -251,11 +285,16 @@ TabletMetrics::TabletMetrics(const scoped_refptr<MetricEntity>& entity)
     GINIT(compact_rs_running),
     GINIT(delta_minor_compact_rs_running),
     GINIT(delta_major_compact_rs_running),
+    GINIT(undo_delta_block_gc_running),
+    GINIT(undo_delta_block_estimated_retained_bytes),
     MINIT(flush_dms_duration),
     MINIT(flush_mrs_duration),
     MINIT(compact_rs_duration),
     MINIT(delta_minor_compact_rs_duration),
     MINIT(delta_major_compact_rs_duration),
+    MINIT(undo_delta_block_gc_init_duration),
+    MINIT(undo_delta_block_gc_delete_duration),
+    MINIT(undo_delta_block_gc_perform_duration),
     MINIT(leader_memory_pressure_rejections) {
 }
 #undef MINIT

http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/tablet/tablet_metrics.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_metrics.h b/src/kudu/tablet/tablet_metrics.h
index 84a7185..5fcea64 100644
--- a/src/kudu/tablet/tablet_metrics.h
+++ b/src/kudu/tablet/tablet_metrics.h
@@ -64,7 +64,10 @@ struct TabletMetrics {
   scoped_refptr<Counter> key_file_lookups;
   scoped_refptr<Counter> delta_file_lookups;
   scoped_refptr<Counter> mrs_lookups;
+
+  // Operation stats.
   scoped_refptr<Counter> bytes_flushed;
+  scoped_refptr<Counter> undo_delta_block_gc_bytes_deleted;
 
   scoped_refptr<Histogram> bloom_lookups_per_op;
   scoped_refptr<Histogram> key_file_lookups_per_op;
@@ -80,12 +83,17 @@ struct TabletMetrics {
   scoped_refptr<AtomicGauge<uint32_t> > compact_rs_running;
   scoped_refptr<AtomicGauge<uint32_t> > delta_minor_compact_rs_running;
   scoped_refptr<AtomicGauge<uint32_t> > delta_major_compact_rs_running;
+  scoped_refptr<AtomicGauge<uint32_t> > undo_delta_block_gc_running;
+  scoped_refptr<AtomicGauge<int64_t> > undo_delta_block_estimated_retained_bytes;
 
   scoped_refptr<Histogram> flush_dms_duration;
   scoped_refptr<Histogram> flush_mrs_duration;
   scoped_refptr<Histogram> compact_rs_duration;
   scoped_refptr<Histogram> delta_minor_compact_rs_duration;
   scoped_refptr<Histogram> delta_major_compact_rs_duration;
+  scoped_refptr<Histogram> undo_delta_block_gc_init_duration;
+  scoped_refptr<Histogram> undo_delta_block_gc_delete_duration;
+  scoped_refptr<Histogram> undo_delta_block_gc_perform_duration;
 
   scoped_refptr<Counter> leader_memory_pressure_rejections;
 };

http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/tablet/tablet_mm_ops.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_mm_ops.cc b/src/kudu/tablet/tablet_mm_ops.cc
index 30f5138..12268a0 100644
--- a/src/kudu/tablet/tablet_mm_ops.cc
+++ b/src/kudu/tablet/tablet_mm_ops.cc
@@ -19,9 +19,20 @@
 
 #include <mutex>
 
-#include "kudu/util/locks.h"
+#include <gflags/gflags.h>
+
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_metrics.h"
+#include "kudu/util/flag_tags.h"
+#include "kudu/util/locks.h"
+
+DEFINE_int32(undo_delta_block_gc_init_budget_millis, 1000,
+    "The maximum number of milliseconds we will spend initializing "
+    "UNDO delta blocks per invocation of UndoDeltaBlockGCOp. Existing delta "
+    "blocks must be initialized once per process startup to determine "
+    "when they can be deleted.");
+TAG_FLAG(undo_delta_block_gc_init_budget_millis, evolving);
+TAG_FLAG(undo_delta_block_gc_init_budget_millis, advanced);
 
 using std::string;
 using strings::Substitute;
@@ -247,5 +258,55 @@ scoped_refptr<AtomicGauge<uint32_t> > MajorDeltaCompactionOp::RunningGauge() con
   return tablet_->metrics()->delta_major_compact_rs_running;
 }
 
+////////////////////////////////////////////////////////////
+// UndoDeltaBlockGCOp
+////////////////////////////////////////////////////////////
+
+UndoDeltaBlockGCOp::UndoDeltaBlockGCOp(Tablet* tablet)
+  : TabletOpBase(Substitute("UndoDeltaBlockGCOp($0)", tablet->tablet_id()),
+                 MaintenanceOp::HIGH_IO_USAGE, tablet) {
+}
+
+void UndoDeltaBlockGCOp::UpdateStats(MaintenanceOpStats* stats) {
+  int64_t max_estimated_retained_bytes = 0;
+  WARN_NOT_OK(tablet_->EstimateBytesInPotentiallyAncientUndoDeltas(&max_estimated_retained_bytes),
+              "Unable to count bytes in potentially ancient undo deltas");
+  stats->set_data_retained_bytes(max_estimated_retained_bytes);
+  stats->set_runnable(max_estimated_retained_bytes > 0);
+}
+
+bool UndoDeltaBlockGCOp::Prepare() {
+  // Nothing for us to do.
+  return true;
+}
+
+void UndoDeltaBlockGCOp::Perform() {
+  MonoDelta time_budget = MonoDelta::FromMilliseconds(FLAGS_undo_delta_block_gc_init_budget_millis);
+  int64_t bytes_in_ancient_undos = 0;
+  Status s = tablet_->InitAncientUndoDeltas(time_budget, &bytes_in_ancient_undos);
+  if (PREDICT_FALSE(!s.ok())) {
+    LOG_WITH_PREFIX(WARNING) << s.ToString();
+    return;
+  }
+
+  // Return early if it turns out that we have nothing to GC.
+  if (bytes_in_ancient_undos == 0) return;
+
+  CHECK_OK_PREPEND(tablet_->DeleteAncientUndoDeltas(),
+                   Substitute("$0GC of undo delta blocks failed", LogPrefix()));
+}
+
+scoped_refptr<Histogram> UndoDeltaBlockGCOp::DurationHistogram() const {
+  return tablet_->metrics()->undo_delta_block_gc_perform_duration;
+}
+
+scoped_refptr<AtomicGauge<uint32_t>> UndoDeltaBlockGCOp::RunningGauge() const {
+  return tablet_->metrics()->undo_delta_block_gc_running;
+}
+
+std::string UndoDeltaBlockGCOp::LogPrefix() const {
+  return tablet_->LogPrefix();
+}
+
 } // namespace tablet
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/tablet/tablet_mm_ops.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_mm_ops.h b/src/kudu/tablet/tablet_mm_ops.h
index 98ff281..2f8e18d 100644
--- a/src/kudu/tablet/tablet_mm_ops.h
+++ b/src/kudu/tablet/tablet_mm_ops.h
@@ -125,6 +125,36 @@ class MajorDeltaCompactionOp : public TabletOpBase {
   uint64_t last_num_rs_major_delta_compacted_;
 };
 
+// MaintenanceOp to garbage-collect undo delta blocks that are older than the
+// ancient history mark.
+class UndoDeltaBlockGCOp : public TabletOpBase {
+ public:
+  explicit UndoDeltaBlockGCOp(Tablet* tablet);
+
+  // Estimates the number of bytes that may potentially be in ancient delta
+  // undo blocks. Over time, as Perform() is invoked, this estimate gets more
+  // accurate.
+  void UpdateStats(MaintenanceOpStats* stats) override;
+
+  bool Prepare() override;
+
+  // Deletes ancient history data from disk. This also initializes undo delta
+  // blocks greedily (in a budgeted manner controlled by the
+  // --undo_delta_block_gc_init_budget_millis gflag) that makes the estimate
+  // performed in UpdateStats() more accurate.
+  void Perform() override;
+
+  scoped_refptr<Histogram> DurationHistogram() const override;
+
+  scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const override;
+
+ private:
+  std::string LogPrefix() const;
+
+  DISALLOW_COPY_AND_ASSIGN(UndoDeltaBlockGCOp);
+};
+
+
 } // namespace tablet
 } // namespace kudu
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/tserver/tablet_server.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server.cc b/src/kudu/tserver/tablet_server.cc
index 0ae70b9..a26c94b 100644
--- a/src/kudu/tserver/tablet_server.cc
+++ b/src/kudu/tserver/tablet_server.cc
@@ -124,7 +124,7 @@ Status TabletServer::Start() {
   RETURN_NOT_OK(ServerBase::Start());
 
   RETURN_NOT_OK(heartbeater_->Start());
-  RETURN_NOT_OK(maintenance_manager_->Init());
+  RETURN_NOT_OK(maintenance_manager_->Init(fs_manager_->uuid()));
 
   google::FlushLogFiles(google::INFO); // Flush the startup messages.
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/util/maintenance_manager-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/maintenance_manager-test.cc b/src/kudu/util/maintenance_manager-test.cc
index 282578b..e5dcc96 100644
--- a/src/kudu/util/maintenance_manager-test.cc
+++ b/src/kudu/util/maintenance_manager-test.cc
@@ -47,11 +47,12 @@ DECLARE_int64(log_target_replay_size_mb);
 
 namespace kudu {
 
-const int kHistorySize = 4;
+static const int kHistorySize = 4;
+static const char kFakeUuid[] = "12345";
 
 class MaintenanceManagerTest : public KuduTest {
  public:
-  MaintenanceManagerTest() {
+  void SetUp() override {
     test_tracker_ = MemTracker::CreateTracker(1000, "test");
     MaintenanceManager::Options options;
     options.num_threads = 2;
@@ -59,9 +60,10 @@ class MaintenanceManagerTest : public KuduTest {
     options.history_size = kHistorySize;
     options.parent_mem_tracker = test_tracker_;
     manager_.reset(new MaintenanceManager(options));
-    manager_->Init();
+    ASSERT_OK(manager_->Init(kFakeUuid));
   }
-  ~MaintenanceManagerTest() {
+
+  void TearDown() override {
     manager_->Shutdown();
   }
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/util/maintenance_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/util/maintenance_manager.cc b/src/kudu/util/maintenance_manager.cc
index 6a50f6f..eda71e8 100644
--- a/src/kudu/util/maintenance_manager.cc
+++ b/src/kudu/util/maintenance_manager.cc
@@ -31,6 +31,7 @@
 #include "kudu/util/logging.h"
 #include "kudu/util/mem_tracker.h"
 #include "kudu/util/metrics.h"
+#include "kudu/util/random_util.h"
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/thread.h"
 #include "kudu/util/trace.h"
@@ -65,6 +66,18 @@ DEFINE_int64(log_target_replay_size_mb, 1024,
              "these operations to disk.");
 TAG_FLAG(log_target_replay_size_mb, experimental);
 
+DEFINE_int64(data_gc_min_size_mb, 0,
+             "The (exclusive) minimum number of megabytes of ancient data on "
+             "disk, per tablet, needed to prioritize deletion of that data.");
+TAG_FLAG(data_gc_min_size_mb, experimental);
+
+DEFINE_double(data_gc_prioritization_prob, 0.5,
+             "The probability that we will prioritize data GC over performance "
+             "improvement operations. If set to 1.0, we will always prefer to "
+             "delete old data before running performance improvement operations "
+             "such as delta compaction.");
+TAG_FLAG(data_gc_prioritization_prob, experimental);
+
 namespace kudu {
 
 MaintenanceOpStats::MaintenanceOpStats() {
@@ -76,7 +89,9 @@ void MaintenanceOpStats::Clear() {
   runnable_ = false;
   ram_anchored_ = 0;
   logs_retained_bytes_ = 0;
+  data_retained_bytes_ = 0;
   perf_improvement_ = 0;
+  last_modified_ = MonoTime();
 }
 
 MaintenanceOp::MaintenanceOp(std::string name, IOUsage io_usage)
@@ -97,10 +112,10 @@ void MaintenanceOp::Unregister() {
 }
 
 const MaintenanceManager::Options MaintenanceManager::DEFAULT_OPTIONS = {
-  0,
-  0,
-  0,
-  shared_ptr<MemTracker>(),
+  .num_threads = 0,
+  .polling_interval_ms = 0,
+  .history_size = 0,
+  .parent_mem_tracker = shared_ptr<MemTracker>(),
 };
 
 MaintenanceManager::MaintenanceManager(const Options& options)
@@ -114,7 +129,8 @@ MaintenanceManager::MaintenanceManager(const Options& options)
           options.polling_interval_ms),
     completed_ops_count_(0),
     parent_mem_tracker_(!options.parent_mem_tracker ?
-        MemTracker::GetRootTracker() : options.parent_mem_tracker) {
+        MemTracker::GetRootTracker() : options.parent_mem_tracker),
+    rand_(GetRandomSeed32()) {
   CHECK_OK(ThreadPoolBuilder("MaintenanceMgr").set_min_threads(num_threads_)
                .set_max_threads(num_threads_).Build(&thread_pool_));
   uint32_t history_size = options.history_size == 0 ?
@@ -127,7 +143,8 @@ MaintenanceManager::~MaintenanceManager() {
   Shutdown();
 }
 
-Status MaintenanceManager::Init() {
+Status MaintenanceManager::Init(string server_uuid) {
+  server_uuid_ = std::move(server_uuid);
   RETURN_NOT_OK(Thread::Create("maintenance", "maintenance_scheduler",
       boost::bind(&MaintenanceManager::RunSchedulerThread, this),
       &monitor_thread_));
@@ -151,8 +168,9 @@ void MaintenanceManager::Shutdown() {
 }
 
 void MaintenanceManager::RegisterOp(MaintenanceOp* op) {
+  CHECK(op);
   std::lock_guard<Mutex> guard(lock_);
-  CHECK(!op->manager_.get()) << "Tried to register " << op->name()
+  CHECK(!op->manager_) << "Tried to register " << op->name()
           << ", but it was already registered.";
   pair<OpMapTy::iterator, bool> val
     (ops_.insert(OpMapTy::value_type(op, MaintenanceOpStats())));
@@ -161,7 +179,7 @@ void MaintenanceManager::RegisterOp(MaintenanceOp* op) {
       << ", but it already exists in ops_.";
   op->manager_ = shared_from_this();
   op->cond_.reset(new ConditionVariable(&lock_));
-  VLOG_AND_TRACE("maintenance", 1) << "Registered " << op->name();
+  VLOG_AND_TRACE("maintenance", 1) << LogPrefix() << "Registered " << op->name();
 }
 
 void MaintenanceManager::UnregisterOp(MaintenanceOp* op) {
@@ -174,8 +192,8 @@ void MaintenanceManager::UnregisterOp(MaintenanceOp* op) {
         << ", but it was never registered";
     // While the op is running, wait for it to be finished.
     if (iter->first->running_ > 0) {
-      VLOG_AND_TRACE("maintenance", 1) << "Waiting for op " << op->name() << " to finish so "
-            << "we can unregister it.";
+      VLOG_AND_TRACE("maintenance", 1) << LogPrefix() << "Waiting for op " << op->name()
+                                       << " to finish so we can unregister it.";
     }
     op->CancelAndDisable();
     while (iter->first->running_ > 0) {
@@ -187,7 +205,7 @@ void MaintenanceManager::UnregisterOp(MaintenanceOp* op) {
     }
     ops_.erase(iter);
   }
-  LOG(INFO) << "Unregistered op " << op->name();
+  LOG_WITH_PREFIX(INFO) << "Unregistered op " << op->name();
   op->cond_.reset();
   // Remove the op's shared_ptr reference to us.  This might 'delete this'.
   op->manager_.reset();
@@ -201,7 +219,7 @@ void MaintenanceManager::RunSchedulerThread() {
     // Loop until we are shutting down or it is time to run another op.
     cond_.TimedWait(polling_interval);
     if (shutdown_) {
-      VLOG_AND_TRACE("maintenance", 1) << "Shutting down maintenance manager.";
+      VLOG_AND_TRACE("maintenance", 1) << LogPrefix() << "Shutting down maintenance manager.";
       return;
     }
 
@@ -213,7 +231,8 @@ void MaintenanceManager::RunSchedulerThread() {
     // Find the best op.
     MaintenanceOp* op = FindBestOp();
     if (!op) {
-      VLOG_AND_TRACE("maintenance", 2) << "No maintenance operations look worth doing.";
+      VLOG_AND_TRACE("maintenance", 2) << LogPrefix()
+                                       << "No maintenance operations look worth doing.";
       continue;
     }
 
@@ -224,8 +243,8 @@ void MaintenanceManager::RunSchedulerThread() {
     bool ready = op->Prepare();
     guard.lock();
     if (!ready) {
-      LOG(INFO) << "Prepare failed for " << op->name()
-                << ".  Re-running scheduler.";
+      LOG_WITH_PREFIX(INFO) << "Prepare failed for " << op->name()
+                            << ".  Re-running scheduler.";
       op->running_--;
       op->cond_->Signal();
       continue;
@@ -263,7 +282,8 @@ MaintenanceOp* MaintenanceManager::FindBestOp() {
 
   size_t free_threads = num_threads_ - running_ops_;
   if (free_threads == 0) {
-    VLOG_AND_TRACE("maintenance", 1) << "there are no free threads, so we can't run anything.";
+    VLOG_AND_TRACE("maintenance", 1) << LogPrefix()
+                                     << "There are no free threads, so we can't run anything.";
     return nullptr;
   }
 
@@ -277,11 +297,15 @@ MaintenanceOp* MaintenanceManager::FindBestOp() {
   int64_t most_logs_retained_bytes_ram_anchored = 0;
   MaintenanceOp* most_logs_retained_bytes_op = nullptr;
 
+  int64_t most_data_retained_bytes = 0;
+  MaintenanceOp* most_data_retained_bytes_op = nullptr;
+
   double best_perf_improvement = 0;
   MaintenanceOp* best_perf_improvement_op = nullptr;
   for (OpMapTy::value_type &val : ops_) {
     MaintenanceOp* op(val.first);
     MaintenanceOpStats& stats(val.second);
+    VLOG_WITH_PREFIX(3) << "Considering MM op " << op->name();
     // Update op stats.
     stats.Clear();
     op->UpdateStats(&stats);
@@ -289,9 +313,11 @@ MaintenanceOp* MaintenanceManager::FindBestOp() {
       continue;
     }
     if (stats.logs_retained_bytes() > low_io_most_logs_retained_bytes &&
-        op->io_usage_ == MaintenanceOp::LOW_IO_USAGE) {
+        op->io_usage() == MaintenanceOp::LOW_IO_USAGE) {
       low_io_most_logs_retained_bytes_op = op;
       low_io_most_logs_retained_bytes = stats.logs_retained_bytes();
+      VLOG_AND_TRACE("maintenance", 2) << LogPrefix() << "Op " << op->name() << " can free "
+                                       << stats.logs_retained_bytes() << " bytes of logs";
     }
 
     if (stats.ram_anchored() > most_mem_anchored) {
@@ -308,6 +334,14 @@ MaintenanceOp* MaintenanceManager::FindBestOp() {
       most_logs_retained_bytes = stats.logs_retained_bytes();
       most_logs_retained_bytes_ram_anchored = stats.ram_anchored();
     }
+
+    if (stats.data_retained_bytes() > most_data_retained_bytes) {
+      most_data_retained_bytes_op = op;
+      most_data_retained_bytes = stats.data_retained_bytes();
+      VLOG_AND_TRACE("maintenance", 2) << LogPrefix() << "Op " << op->name() << " can free "
+                                       << stats.data_retained_bytes() << " bytes of data";
+    }
+
     if ((!best_perf_improvement_op) ||
         (stats.perf_improvement() > best_perf_improvement)) {
       best_perf_improvement_op = op;
@@ -318,7 +352,7 @@ MaintenanceOp* MaintenanceManager::FindBestOp() {
   // Look at ops that we can run quickly that free up log retention.
   if (low_io_most_logs_retained_bytes_op) {
     if (low_io_most_logs_retained_bytes > 0) {
-      VLOG_AND_TRACE("maintenance", 1)
+      VLOG_AND_TRACE("maintenance", 1) << LogPrefix()
                     << "Performing " << low_io_most_logs_retained_bytes_op->name() << ", "
                     << "because it can free up more logs "
                     << "at " << low_io_most_logs_retained_bytes
@@ -335,10 +369,10 @@ MaintenanceOp* MaintenanceManager::FindBestOp() {
       string msg = StringPrintf("we have exceeded our soft memory limit "
           "(current capacity is %.2f%%).  However, there are no ops currently "
           "runnable which would free memory.", capacity_pct);
-      LOG(INFO) << msg;
+      LOG_WITH_PREFIX(INFO) << msg;
       return nullptr;
     }
-    VLOG_AND_TRACE("maintenance", 1) << "we have exceeded our soft memory limit "
+    VLOG_AND_TRACE("maintenance", 1) << LogPrefix() << "We have exceeded our soft memory limit "
             << "(current capacity is " << capacity_pct << "%).  Running the op "
             << "which anchors the most memory: " << most_mem_anchored_op->name();
     return most_mem_anchored_op;
@@ -346,26 +380,39 @@ MaintenanceOp* MaintenanceManager::FindBestOp() {
 
   if (most_logs_retained_bytes_op &&
       most_logs_retained_bytes / 1024 / 1024 >= FLAGS_log_target_replay_size_mb) {
-    VLOG_AND_TRACE("maintenance", 1)
+    VLOG_AND_TRACE("maintenance", 1) << LogPrefix()
             << "Performing " << most_logs_retained_bytes_op->name() << ", "
             << "because it can free up more logs (" << most_logs_retained_bytes
             << " bytes)";
     return most_logs_retained_bytes_op;
   }
 
-  if (best_perf_improvement_op) {
-    if (best_perf_improvement > 0) {
-      VLOG_AND_TRACE("maintenance", 1) << "Performing " << best_perf_improvement_op->name() << ", "
-                 << "because it had the best perf_improvement score, "
-                 << "at " << best_perf_improvement;
-      return best_perf_improvement_op;
+  // Look at ops that we can run quickly that free up data on disk.
+  if (most_data_retained_bytes_op &&
+      most_data_retained_bytes > FLAGS_data_gc_min_size_mb * 1024 * 1024) {
+    if (!best_perf_improvement_op || best_perf_improvement <= 0 ||
+        rand_.NextDoubleFraction() <= FLAGS_data_gc_prioritization_prob) {
+      VLOG_AND_TRACE("maintenance", 1) << LogPrefix()
+                    << "Performing " << most_data_retained_bytes_op->name() << ", "
+                    << "because it can free up more data "
+                    << "at " << most_data_retained_bytes << " bytes";
+      return most_data_retained_bytes_op;
     }
+    VLOG(1) << "Skipping data GC due to prioritizing perf improvement";
+  }
+
+  if (best_perf_improvement_op && best_perf_improvement > 0) {
+    VLOG_AND_TRACE("maintenance", 1) << LogPrefix() << "Performing "
+                << best_perf_improvement_op->name() << ", "
+                << "because it had the best perf_improvement score, "
+                << "at " << best_perf_improvement;
+    return best_perf_improvement_op;
   }
   return nullptr;
 }
 
 void MaintenanceManager::LaunchOp(MaintenanceOp* op) {
-  MonoTime start_time(MonoTime::Now());
+  MonoTime start_time = MonoTime::Now();
   op->RunningGauge()->Increment();
 
   scoped_refptr<Trace> trace(new Trace);
@@ -375,13 +422,12 @@ void MaintenanceManager::LaunchOp(MaintenanceOp* op) {
                  "name", op->name());
     op->Perform();
   }
-  LOG(INFO) << op->name() << " metrics: " << trace->MetricsAsJSON();
+  LOG_WITH_PREFIX(INFO) << op->name() << " metrics: " << trace->MetricsAsJSON();
 
   op->RunningGauge()->Decrement();
-  MonoTime end_time(MonoTime::Now());
-  MonoDelta delta(end_time.GetDeltaSince(start_time));
-  std::lock_guard<Mutex> guard(lock_);
+  MonoDelta delta = MonoTime::Now() - start_time;
 
+  std::lock_guard<Mutex> l(lock_);
   CompletedOp& completed_op = completed_ops_[completed_ops_count_ % completed_ops_.size()];
   completed_op.name = op->name();
   completed_op.duration = delta;
@@ -438,4 +484,8 @@ void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatu
   }
 }
 
+std::string MaintenanceManager::LogPrefix() const {
+  return Substitute("P $0: ", server_uuid_);
+}
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/util/maintenance_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/maintenance_manager.h b/src/kudu/util/maintenance_manager.h
index 2ae4ccb..5c432a7 100644
--- a/src/kudu/util/maintenance_manager.h
+++ b/src/kudu/util/maintenance_manager.h
@@ -28,10 +28,11 @@
 #include "kudu/gutil/macros.h"
 #include "kudu/util/atomic.h"
 #include "kudu/util/condition_variable.h"
+#include "kudu/util/countdown_latch.h"
 #include "kudu/util/maintenance_manager.pb.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/mutex.h"
-#include "kudu/util/countdown_latch.h"
+#include "kudu/util/random.h"
 #include "kudu/util/thread.h"
 #include "kudu/util/threadpool.h"
 
@@ -80,6 +81,16 @@ class MaintenanceOpStats {
     logs_retained_bytes_ = logs_retained_bytes;
   }
 
+  int64_t data_retained_bytes() const {
+    DCHECK(valid_);
+    return data_retained_bytes_;
+  }
+
+  void set_data_retained_bytes(int64_t data_retained_bytes) {
+    UpdateLastModified();
+    data_retained_bytes_ = data_retained_bytes;
+  }
+
   double perf_improvement() const {
     DCHECK(valid_);
     return perf_improvement_;
@@ -105,6 +116,8 @@ class MaintenanceOpStats {
     last_modified_ = MonoTime::Now();
   }
 
+  // Important: Update Clear() when adding fields to this class.
+
   // True if these stats are valid.
   bool valid_;
 
@@ -116,10 +129,14 @@ class MaintenanceOpStats {
   // should be fairly accurate.  May be 0.
   uint64_t ram_anchored_;
 
-  // The approximate amount of disk space that not doing this operation keeps us from GCing from
-  // the logs. May be 0.
+  // Approximate amount of disk space in WAL files that would be freed if this
+  // operation ran. May be 0.
   int64_t logs_retained_bytes_;
 
+  // Approximate amount of disk space in data blocks that would be freed if
+  // this operation ran. May be 0.
+  int64_t data_retained_bytes_;
+
   // The estimated performance improvement-- how good it is to do this on some
   // absolute scale (yet TBD).
   double perf_improvement_;
@@ -248,7 +265,7 @@ class MaintenanceManager : public std::enable_shared_from_this<MaintenanceManage
   explicit MaintenanceManager(const Options& options);
   ~MaintenanceManager();
 
-  Status Init();
+  Status Init(std::string server_uuid);
   void Shutdown();
 
   // Register an op with the manager.
@@ -275,6 +292,8 @@ class MaintenanceManager : public std::enable_shared_from_this<MaintenanceManage
 
   void LaunchOp(MaintenanceOp* op);
 
+  std::string LogPrefix() const;
+
   const int32_t num_threads_;
   OpMapTy ops_; // registered operations
   Mutex lock_;
@@ -289,6 +308,8 @@ class MaintenanceManager : public std::enable_shared_from_this<MaintenanceManage
   std::vector<CompletedOp> completed_ops_;
   int64_t completed_ops_count_;
   std::shared_ptr<MemTracker> parent_mem_tracker_;
+  std::string server_uuid_;
+  Random rand_;
 
   DISALLOW_COPY_AND_ASSIGN(MaintenanceManager);
 };