You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by zh...@apache.org on 2020/08/03 10:19:05 UTC

[kudu] branch master updated: [maintenance] use workload statistics to scale perf score of flushes/compactions

This is an automated email from the ASF dual-hosted git repository.

zhangyifan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a9ea0f  [maintenance] use workload statistics to scale perf score of flushes/compactions
9a9ea0f is described below

commit 9a9ea0f2f8acb39b06cc49f0a3a53cba26f4aae9
Author: zhangyifan27 <ch...@163.com>
AuthorDate: Mon Jun 15 17:05:22 2020 +0800

    [maintenance] use workload statistics to scale perf score of flushes/compactions
    
    When we consider the performance improvement brought by maintenance
    operations, we could use workload statistics to find how 'hot' the
    tablet has been in the last few minutes and perform maintenance ops
    for 'hot' tablets in priority. This patch use recent read/write rate
    of a tablet as a workload score, calculate a final perf score based on
    a op's raw perf_improvement, the tablet's workload score and the table's
    priority, so maintenance ops for a 'hot' tablet are more likely to launch.
    
    In our use case, there is insert/update/delete traffic all the time,
    but some tables may have more read traffic at some time, so we want to
    dynamically adjust priorities of compaction/flush ops for different tables.
    
    We tested this on a 6-node cluster and set tservers with configs:
    -maintenance_manager_num_threads=1,
    -workload_score_upper_bound=10,
    and we run workloads with setting enable_workload_score_for_perf_improvement_ops
    false and true to see whether it can improve performance.
    
    We first insert 5,000,000,000 rows into table-C(256 tablets), and then
    insert 200,000,000 rows into table-A(8 tablets) and table-B(8 tablets)
    at the same time. Next we run different YCSB workloads on table-A and table-B,
    all the tablets have some uncompacted rowsets at this time, but there is
    no on-going workloads on table-C.
    
    workload for table_A: Update heavy workload, scan/update ratio is 50/50
      operationcount=10,000,000
      requestdistribution=zipfian
      maxscanlength=10
    
    workload for table_B: Scan mostly workload, scan/insert ratio is 80/20.
      operationcount=10,000,000
      requestdistribution=zipfian
      maxscanlength=10000
    
    result:
    measurement                                 Before change     After change
    [table-A:UPDATE]AverageLatency(us)          9.46              3.84
    [table-A:UPDATE]95thPercentileLatency(us)   12                7
    [table-A:UPDATE]99thPercentileLatency(us)   19                13
    [table-A:SCAN]AverageLatency(us)            2317              1419
    [table-A:SCAN]95thPercentileLatency(us)     4847              2939
    [table-A:SCAN]99thPercentileLatency(us)     10815             5703
    [table-B:INSERT]AverageLatency(us)          16.11             16.54
    [table-B:INSERT]95thPercentileLatency(us)   35                35
    [table-B:INSERT]99thPercentileLatency(us)   58                56
    [table-B:SCAN]AverageLatency(us)            6417              5545
    [table-B:SCAN]95thPercentileLatency(us)     12463             10063
    [table-B:SCAN]99thPercentileLatency(us)     18095             13511
    
    We run these workloads 5 times, we can see 10%-30% reduction in scan
    latency of table-B, and 38%-60% reduction in scan latency of table-A.
    
    This patch also add 'Workload score' to tserver /maintenance-manager page[1]
    so that we could adjust runtime flags based on current state.
    
    [1] http://ww1.sinaimg.cn/large/9b7ebaddly1gh83qh2bkfj21eb0d3goy.jpg
    
    Change-Id: Ie3afcc359002d1392164ba2fda885f8930ef8696
    Reviewed-on: http://gerrit.cloudera.org:8080/15995
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 src/kudu/tablet/tablet.cc                   | 81 +++++++++++++++++++++++++++--
 src/kudu/tablet/tablet.h                    | 16 ++++--
 src/kudu/tablet/tablet_mm_ops.cc            | 46 +++++++++++-----
 src/kudu/tablet/tablet_replica_mm_ops.cc    | 15 +++++-
 src/kudu/tserver/tablet_server-test-base.cc |  8 ++-
 src/kudu/tserver/tablet_server-test-base.h  |  4 +-
 src/kudu/tserver/tablet_server-test.cc      | 71 +++++++++++++++++++++++++
 src/kudu/tserver/tserver_path_handlers.cc   |  1 +
 src/kudu/util/maintenance_manager-test.cc   | 53 ++++++++++++++-----
 src/kudu/util/maintenance_manager.cc        | 30 +++++++++--
 src/kudu/util/maintenance_manager.h         | 24 ++++++++-
 src/kudu/util/maintenance_manager.proto     |  1 +
 www/maintenance-manager.mustache            |  2 +
 13 files changed, 308 insertions(+), 44 deletions(-)

diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 176d8f4..90f744d 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -81,7 +81,6 @@
 #include "kudu/util/flag_tags.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/logging.h"
-#include "kudu/util/maintenance_manager.h"
 #include "kudu/util/memory/arena.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
@@ -151,6 +150,34 @@ DEFINE_int32(max_encoded_key_size_bytes, 16 * 1024,
              "result in an error.");
 TAG_FLAG(max_encoded_key_size_bytes, unsafe);
 
+DEFINE_int32(workload_stats_rate_collection_min_interval_ms, 60 * 1000,
+             "The minimal interval in milliseconds at which we collect read/write rates.");
+TAG_FLAG(workload_stats_rate_collection_min_interval_ms, experimental);
+TAG_FLAG(workload_stats_rate_collection_min_interval_ms, runtime);
+
+DEFINE_int32(workload_stats_metric_collection_interval_ms, 5 * 60 * 1000,
+             "The interval in milliseconds at which we collect workload metrics.");
+TAG_FLAG(workload_stats_metric_collection_interval_ms, experimental);
+TAG_FLAG(workload_stats_metric_collection_interval_ms, runtime);
+
+DEFINE_double(workload_score_upper_bound, 1.0, "Upper bound for workload score.");
+TAG_FLAG(workload_score_upper_bound, experimental);
+TAG_FLAG(workload_score_upper_bound, runtime);
+
+DEFINE_int32(scans_started_per_sec_for_hot_tablets, 1,
+    "Minimum read rate for tablets to be considered 'hot' (scans/sec). If a tablet's "
+    "read rate exceeds this value, flush/compaction ops for it will be assigned the highest "
+    "possible workload score, which is defined by --workload_score_upper_bound.");
+TAG_FLAG(scans_started_per_sec_for_hot_tablets, experimental);
+TAG_FLAG(scans_started_per_sec_for_hot_tablets, runtime);
+
+DEFINE_int32(rows_writed_per_sec_for_hot_tablets, 1000,
+    "Minimum write rate for tablets to be considered 'hot' (rows/sec). If a tablet's "
+    "write rate exceeds this value, compaction ops for it will be assigned the highest "
+    "possible workload score, which is defined by --workload_score_upper_bound.");
+TAG_FLAG(rows_writed_per_sec_for_hot_tablets, experimental);
+TAG_FLAG(rows_writed_per_sec_for_hot_tablets, runtime);
+
 METRIC_DEFINE_entity(tablet);
 METRIC_DEFINE_gauge_size(tablet, memrowset_size, "MemRowSet Memory Usage",
                          kudu::MetricUnit::kBytes,
@@ -229,7 +256,12 @@ Tablet::Tablet(scoped_refptr<TabletMetadata> metadata,
     rowsets_flush_sem_(1),
     state_(kInitialized),
     last_write_time_(MonoTime::Now()),
-    last_read_time_(MonoTime::Now()) {
+    last_read_time_(MonoTime::Now()),
+    last_update_workload_stats_time_(MonoTime::Now()),
+    last_scans_started_(0),
+    last_rows_mutated_(0),
+    last_read_score_(0.0),
+    last_write_score_(0.0) {
       CHECK(schema()->has_column_ids());
   compaction_policy_.reset(CreateCompactionPolicy());
 
@@ -1835,9 +1867,6 @@ void Tablet::UpdateCompactionStats(MaintenanceOpStats* stats) {
     return;
   }
 
-  // TODO: use workload statistics here to find out how "hot" the tablet has
-  // been in the last 5 minutes, and somehow scale the compaction quality
-  // based on that, so we favor hot tablets.
   double quality = 0;
   unordered_set<const RowSet*> picked_set_ignored;
 
@@ -2039,6 +2068,48 @@ uint64_t Tablet::LastWriteElapsedSeconds() const {
   return static_cast<uint64_t>((MonoTime::Now() - last_write_time_).ToSeconds());
 }
 
+double Tablet::CollectAndUpdateWorkloadStats(MaintenanceOp::PerfImprovementOpType type) {
+  DCHECK(last_update_workload_stats_time_.Initialized());
+  double workload_score = 0;
+  MonoDelta elapse = MonoTime::Now() - last_update_workload_stats_time_;
+  if (metrics_) {
+    int64_t scans_started = metrics_->scans_started->value();
+    int64_t rows_mutated = metrics_->rows_inserted->value() +
+                           metrics_->rows_upserted->value() +
+                           metrics_->rows_updated->value() +
+                           metrics_->rows_deleted->value();
+    if (elapse.ToMilliseconds() > FLAGS_workload_stats_rate_collection_min_interval_ms) {
+      double last_read_rate =
+          static_cast<double>(scans_started - last_scans_started_) / elapse.ToSeconds();
+      last_read_score_ =
+          std::min(1.0, last_read_rate / FLAGS_scans_started_per_sec_for_hot_tablets) *
+          FLAGS_workload_score_upper_bound;
+      double last_write_rate =
+          static_cast<double>(rows_mutated - last_rows_mutated_) / elapse.ToSeconds();
+      last_write_score_ =
+          std::min(1.0, last_write_rate / FLAGS_rows_writed_per_sec_for_hot_tablets) *
+          FLAGS_workload_score_upper_bound;
+    }
+    if (elapse.ToMilliseconds() > FLAGS_workload_stats_metric_collection_interval_ms) {
+      last_update_workload_stats_time_ = MonoTime::Now();
+      last_scans_started_ = metrics_->scans_started->value();
+      last_rows_mutated_ = rows_mutated;
+    }
+  }
+  if (type == MaintenanceOp::FLUSH_OP) {
+    // Flush ops are already scored based on how hot the tablet is
+    // for writes, so we'll only adjust the workload score based on
+    // how hot the tablet is for reads.
+    workload_score = last_read_score_;
+  } else if (type == MaintenanceOp::COMPACT_OP) {
+    // Since compactions may improve both read and write performance, increase
+    // the workload score based on the read and write rate to the tablet.
+    workload_score = std::min(FLAGS_workload_score_upper_bound,
+                              last_read_score_ + last_write_score_);
+  }
+  return workload_score;
+}
+
 size_t Tablet::DeltaMemStoresSize() const {
   scoped_refptr<TabletComponents> comps;
   GetComponents(&comps);
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index d0cdfb1..ab220bb 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -45,6 +45,7 @@
 #include "kudu/tablet/tablet_metadata.h"
 #include "kudu/util/bloom_filter.h"
 #include "kudu/util/locks.h"
+#include "kudu/util/maintenance_manager.h"
 #include "kudu/util/metrics.h"
 #include "kudu/util/monotime.h"
 #include "kudu/util/rw_semaphore.h"
@@ -56,9 +57,6 @@ class AlterTableTest;
 class ConstContiguousRow;
 class EncodedKey;
 class KeyRange;
-class MaintenanceManager;
-class MaintenanceOp;
-class MaintenanceOpStats;
 class MemTracker;
 class RowBlock;
 class ScanSpec;
@@ -468,6 +466,12 @@ class Tablet {
   // variable there.
   void UpdateLastReadTime() const;
 
+  // Collect and update recent workload statistics for the tablet.
+  // Return the current workload score of the tablet.
+  //
+  // This method is not thread safe and should only be called from a single thread at once.
+  double CollectAndUpdateWorkloadStats(MaintenanceOp::PerfImprovementOpType type);
+
  private:
   friend class kudu::AlterTableTest;
   friend class Iterator;
@@ -773,6 +777,12 @@ class Tablet {
   // ensures we do not attempt to collect metrics while calling the destructor.
   FunctionGaugeDetacher metric_detacher_;
 
+  MonoTime last_update_workload_stats_time_;
+  int64_t last_scans_started_;
+  int64_t last_rows_mutated_;
+  double last_read_score_;
+  double last_write_score_;
+
   DISALLOW_COPY_AND_ASSIGN(Tablet);
 };
 
diff --git a/src/kudu/tablet/tablet_mm_ops.cc b/src/kudu/tablet/tablet_mm_ops.cc
index c0f3caf..d5c83f5 100644
--- a/src/kudu/tablet/tablet_mm_ops.cc
+++ b/src/kudu/tablet/tablet_mm_ops.cc
@@ -83,6 +83,12 @@ DEFINE_bool(enable_deleted_rowset_gc, true,
     "considered ancient history (see --tablet_history_max_age_sec) are deleted.");
 TAG_FLAG(enable_deleted_rowset_gc, runtime);
 
+DEFINE_bool(enable_workload_score_for_perf_improvement_ops, false,
+            "Whether to enable prioritization of maintenance operations based on "
+            "whether there are on-going workloads, favoring ops of 'hot' tablets.");
+TAG_FLAG(enable_workload_score_for_perf_improvement_ops, experimental);
+TAG_FLAG(enable_workload_score_for_perf_improvement_ops, runtime);
+
 using std::string;
 using strings::Substitute;
 
@@ -128,6 +134,9 @@ void CompactRowSetsOp::UpdateStats(MaintenanceOpStats* stats) {
 
   std::lock_guard<simple_spinlock> l(lock_);
 
+  double workload_score = FLAGS_enable_workload_score_for_perf_improvement_ops ?
+                          tablet_->CollectAndUpdateWorkloadStats(MaintenanceOp::COMPACT_OP) : 0;
+
   // Any operation that changes the on-disk row layout invalidates the
   // cached stats.
   TabletMetrics* metrics = tablet_->metrics();
@@ -137,15 +146,16 @@ void CompactRowSetsOp::UpdateStats(MaintenanceOpStats* stats) {
     if (prev_stats_.valid() &&
         new_num_mrs_flushed == last_num_mrs_flushed_ &&
         new_num_rs_compacted == last_num_rs_compacted_) {
+      prev_stats_.set_workload_score(workload_score);
       *stats = prev_stats_;
       return;
-    } else {
-      last_num_mrs_flushed_ = new_num_mrs_flushed;
-      last_num_rs_compacted_ = new_num_rs_compacted;
     }
+    last_num_mrs_flushed_ = new_num_mrs_flushed;
+    last_num_rs_compacted_ = new_num_rs_compacted;
   }
 
   tablet_->UpdateCompactionStats(&prev_stats_);
+  prev_stats_.set_workload_score(workload_score);
   *stats = prev_stats_;
 }
 
@@ -199,6 +209,9 @@ void MinorDeltaCompactionOp::UpdateStats(MaintenanceOpStats* stats) {
 
   std::lock_guard<simple_spinlock> l(lock_);
 
+  double workload_score = FLAGS_enable_workload_score_for_perf_improvement_ops ?
+                          tablet_->CollectAndUpdateWorkloadStats(MaintenanceOp::COMPACT_OP) : 0;
+
   // Any operation that changes the number of REDO files invalidates the
   // cached stats.
   TabletMetrics* metrics = tablet_->metrics();
@@ -213,20 +226,21 @@ void MinorDeltaCompactionOp::UpdateStats(MaintenanceOpStats* stats) {
         new_num_dms_flushed == last_num_dms_flushed_ &&
         new_num_rs_compacted == last_num_rs_compacted_ &&
         new_num_rs_minor_delta_compacted == last_num_rs_minor_delta_compacted_) {
+      prev_stats_.set_workload_score(workload_score);
       *stats = prev_stats_;
       return;
-    } else {
-      last_num_mrs_flushed_ = new_num_mrs_flushed;
-      last_num_dms_flushed_ = new_num_dms_flushed;
-      last_num_rs_compacted_ = new_num_rs_compacted;
-      last_num_rs_minor_delta_compacted_ = new_num_rs_minor_delta_compacted;
     }
+    last_num_mrs_flushed_ = new_num_mrs_flushed;
+    last_num_dms_flushed_ = new_num_dms_flushed;
+    last_num_rs_compacted_ = new_num_rs_compacted;
+    last_num_rs_minor_delta_compacted_ = new_num_rs_minor_delta_compacted;
   }
 
   double perf_improv = tablet_->GetPerfImprovementForBestDeltaCompact(
       RowSet::MINOR_DELTA_COMPACTION, nullptr);
   prev_stats_.set_perf_improvement(perf_improv);
   prev_stats_.set_runnable(perf_improv > 0);
+  prev_stats_.set_workload_score(workload_score);
   *stats = prev_stats_;
 }
 
@@ -278,6 +292,9 @@ void MajorDeltaCompactionOp::UpdateStats(MaintenanceOpStats* stats) {
 
   std::lock_guard<simple_spinlock> l(lock_);
 
+  double workload_score = FLAGS_enable_workload_score_for_perf_improvement_ops ?
+                          tablet_->CollectAndUpdateWorkloadStats(MaintenanceOp::COMPACT_OP) : 0;
+
   // Any operation that changes the size of the on-disk data invalidates the
   // cached stats.
   TabletMetrics* metrics = tablet_->metrics();
@@ -295,21 +312,22 @@ void MajorDeltaCompactionOp::UpdateStats(MaintenanceOpStats* stats) {
         new_num_rs_compacted == last_num_rs_compacted_ &&
         new_num_rs_minor_delta_compacted == last_num_rs_minor_delta_compacted_ &&
         new_num_rs_major_delta_compacted == last_num_rs_major_delta_compacted_) {
+      prev_stats_.set_workload_score(workload_score);
       *stats = prev_stats_;
       return;
-    } else {
-      last_num_mrs_flushed_ = new_num_mrs_flushed;
-      last_num_dms_flushed_ = new_num_dms_flushed;
-      last_num_rs_compacted_ = new_num_rs_compacted;
-      last_num_rs_minor_delta_compacted_ = new_num_rs_minor_delta_compacted;
-      last_num_rs_major_delta_compacted_ = new_num_rs_major_delta_compacted;
     }
+    last_num_mrs_flushed_ = new_num_mrs_flushed;
+    last_num_dms_flushed_ = new_num_dms_flushed;
+    last_num_rs_compacted_ = new_num_rs_compacted;
+    last_num_rs_minor_delta_compacted_ = new_num_rs_minor_delta_compacted;
+    last_num_rs_major_delta_compacted_ = new_num_rs_major_delta_compacted;
   }
 
   double perf_improv = tablet_->GetPerfImprovementForBestDeltaCompact(
       RowSet::MAJOR_DELTA_COMPACTION, nullptr);
   prev_stats_.set_perf_improvement(perf_improv);
   prev_stats_.set_runnable(perf_improv > 0);
+  prev_stats_.set_workload_score(workload_score);
   *stats = prev_stats_;
 }
 
diff --git a/src/kudu/tablet/tablet_replica_mm_ops.cc b/src/kudu/tablet/tablet_replica_mm_ops.cc
index dafe746..61243a7 100644
--- a/src/kudu/tablet/tablet_replica_mm_ops.cc
+++ b/src/kudu/tablet/tablet_replica_mm_ops.cc
@@ -75,6 +75,7 @@ DEFINE_int32(flush_threshold_secs, 2 * 60,
 TAG_FLAG(flush_threshold_secs, experimental);
 TAG_FLAG(flush_threshold_secs, runtime);
 
+DECLARE_bool(enable_workload_score_for_perf_improvement_ops);
 
 METRIC_DEFINE_gauge_uint32(tablet, log_gc_running,
                            "Log GCs Running",
@@ -178,8 +179,12 @@ void FlushMRSOp::UpdateStats(MaintenanceOpStats* stats) {
   stats->set_logs_retained_bytes(
       tablet_replica_->tablet()->MemRowSetLogReplaySize(replay_size_map));
 
-  // TODO(todd): use workload statistics here to find out how "hot" the tablet has
-  // been in the last 5 minutes.
+  if (FLAGS_enable_workload_score_for_perf_improvement_ops) {
+    double workload_score =
+        tablet_replica_->tablet()->CollectAndUpdateWorkloadStats(MaintenanceOp::FLUSH_OP);
+    stats->set_workload_score(workload_score);
+  }
+
   FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(
       stats,
       time_since_flush_.elapsed().wall_millis());
@@ -248,6 +253,12 @@ void FlushDeltaMemStoresOp::UpdateStats(MaintenanceOpStats* stats) {
   stats->set_runnable(true);
   stats->set_logs_retained_bytes(retention_size);
 
+  if (FLAGS_enable_workload_score_for_perf_improvement_ops) {
+    double workload_score =
+        tablet_replica_->tablet()->CollectAndUpdateWorkloadStats(MaintenanceOp::FLUSH_OP);
+    stats->set_workload_score(workload_score);
+  }
+
   FlushOpPerfImprovementPolicy::SetPerfImprovementForFlush(
       stats,
       time_since_flush_.elapsed().wall_millis());
diff --git a/src/kudu/tserver/tablet_server-test-base.cc b/src/kudu/tserver/tablet_server-test-base.cc
index a172a1f..9cbbeeb 100644
--- a/src/kudu/tserver/tablet_server-test-base.cc
+++ b/src/kudu/tserver/tablet_server-test-base.cc
@@ -191,8 +191,12 @@ void TabletServerTestBase::ResetClientProxies() {
 
 // Inserts 'num_rows' test rows directly into the tablet (i.e not via RPC)
 void TabletServerTestBase::InsertTestRowsDirect(int32_t start_row,
-                                                int32_t num_rows) {
-  tablet::LocalTabletWriter writer(tablet_replica_->tablet(), &schema_);
+                                                int32_t num_rows,
+                                                const string& tablet_id) {
+  scoped_refptr<tablet::TabletReplica> tablet_replica;
+  ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(tablet_id,
+                                                                     &tablet_replica));
+  tablet::LocalTabletWriter writer(tablet_replica->tablet(), &schema_);
   KuduPartialRow row(&schema_);
   for (int32_t i = 0; i < num_rows; i++) {
     BuildTestRow(start_row + i, &row);
diff --git a/src/kudu/tserver/tablet_server-test-base.h b/src/kudu/tserver/tablet_server-test-base.h
index b10e3b5..8304202 100644
--- a/src/kudu/tserver/tablet_server-test-base.h
+++ b/src/kudu/tserver/tablet_server-test-base.h
@@ -71,7 +71,9 @@ class TabletServerTestBase : public KuduTest {
   void ResetClientProxies();
 
   // Inserts 'num_rows' test rows directly into the tablet (i.e not via RPC)
-  void InsertTestRowsDirect(int32_t start_row, int32_t num_rows);
+  void InsertTestRowsDirect(int32_t start_row,
+                            int32_t num_rows,
+                            const std::string& tablet_id = kTabletId);
 
   // Inserts 'num_rows' test rows remotely into the tablet (i.e via RPC)
   // Rows are grouped in batches of 'count'/'num_batches' size.
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index 3e0c5c7..62de1d8 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -84,6 +84,7 @@
 #include "kudu/tablet/metadata.pb.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_metadata.h"
+#include "kudu/tablet/tablet_metrics.h"
 #include "kudu/tablet/tablet_replica.h"
 #include "kudu/tserver/heartbeater.h"
 #include "kudu/tserver/mini_tablet_server.h"
@@ -166,16 +167,19 @@ DECLARE_bool(enable_flush_deltamemstores);
 DECLARE_bool(enable_flush_memrowset);
 DECLARE_bool(enable_maintenance_manager);
 DECLARE_bool(enable_rowset_compaction);
+DECLARE_bool(enable_workload_score_for_perf_improvement_ops);
 DECLARE_bool(fail_dns_resolution);
 DECLARE_bool(rowset_metadata_store_keys);
 DECLARE_bool(scanner_unregister_on_invalid_seq_id);
 DECLARE_double(cfile_inject_corruption);
 DECLARE_double(env_inject_eio);
 DECLARE_double(env_inject_full);
+DECLARE_double(workload_score_upper_bound);
 DECLARE_int32(flush_threshold_mb);
 DECLARE_int32(flush_threshold_secs);
 DECLARE_int32(fs_data_dirs_available_space_cache_seconds);
 DECLARE_int32(fs_target_data_dirs_per_tablet);
+DECLARE_int32(maintenance_manager_inject_latency_ms);
 DECLARE_int32(maintenance_manager_num_threads);
 DECLARE_int32(maintenance_manager_polling_interval_ms);
 DECLARE_int32(memory_pressure_percentage);
@@ -183,6 +187,8 @@ DECLARE_int32(metrics_retirement_age_ms);
 DECLARE_int32(scanner_batch_size_rows);
 DECLARE_int32(scanner_gc_check_interval_us);
 DECLARE_int32(scanner_ttl_ms);
+DECLARE_int32(workload_stats_rate_collection_min_interval_ms);
+DECLARE_int32(workload_stats_metric_collection_interval_ms);
 DECLARE_string(block_manager);
 DECLARE_string(env_inject_eio_globs);
 DECLARE_string(env_inject_full_globs);
@@ -4280,5 +4286,70 @@ TEST_F(TabletServerTest, TestScannerCheckMatchingUser) {
   }
 }
 
+TEST_F(TabletServerTest, TestStarvePerfImprovementOpsInColdTablet) {
+  SKIP_IF_SLOW_NOT_ALLOWED();
+  FLAGS_enable_maintenance_manager = true;
+  NO_FATALS(ShutdownAndRebuildTablet());
+
+  // Disable flushing and compactions to create overlapping rowsets.
+  FLAGS_enable_flush_memrowset = false;
+  FLAGS_enable_rowset_compaction = false;
+
+  // Create a cold tablet and insert sets of overlapping rows to it.
+  const char* kColdTablet = "cold_tablet";
+  ASSERT_OK(mini_server_->AddTestTablet(kTableId, kColdTablet, schema_));
+  ASSERT_OK(WaitForTabletRunning(kColdTablet));
+  scoped_refptr<TabletReplica> cold_replica;
+  ASSERT_TRUE(mini_server_->server()->tablet_manager()->LookupTablet(kColdTablet, &cold_replica));
+  NO_FATALS(InsertTestRowsDirect(0, 1, kColdTablet));
+  NO_FATALS(InsertTestRowsDirect(2, 1, kColdTablet));
+  ASSERT_OK(cold_replica->tablet()->Flush());
+  NO_FATALS(InsertTestRowsDirect(1, 1, kColdTablet));
+  ASSERT_OK(cold_replica->tablet()->Flush());
+
+  // Make MRS flush op starts out with the highest adjusted perf score.
+  FLAGS_enable_flush_memrowset = true;
+  FLAGS_enable_workload_score_for_perf_improvement_ops = true;
+  FLAGS_workload_stats_rate_collection_min_interval_ms = 10;
+  FLAGS_workload_score_upper_bound = 10;
+  // Make maintenance manager find best op more slowly than a new MRS could flush,
+  // so there are always something to do in the hot tablet.
+  FLAGS_flush_threshold_secs = 1;
+  FLAGS_maintenance_manager_inject_latency_ms = 1000;
+
+  // Make the default tablet 'hot'.
+  std::atomic<bool> keep_inserting_and_scanning(true);
+  thread insert_thread([&] {
+    int cur_row = 0;
+    while (keep_inserting_and_scanning) {
+      NO_FATALS(InsertTestRowsDirect(cur_row, 1000));
+      cur_row += 1000;
+      ScanResponsePB resp;
+      NO_FATALS(OpenScannerWithAllColumns(&resp));
+      ASSERT_TRUE(!resp.scanner_id().empty());
+    }
+  });
+  SCOPED_CLEANUP({
+    keep_inserting_and_scanning = false;
+    insert_thread.join();
+  });
+
+  // Wait a bit to allow workload stats collection to begin.
+  SleepFor(MonoDelta::FromSeconds(1));
+  FLAGS_enable_rowset_compaction = true;
+  // Wait a couple seconds to test if we can starve compaction in the cold tablet even
+  // if the compaction op has a higher raw perf improvement score than that of time-based
+  // flushes in the hot tablet.
+  SleepFor(MonoDelta::FromSeconds(5));
+  ASSERT_EQ(0, cold_replica->tablet()->metrics()->compact_rs_duration->TotalCount());
+
+  // Disable workload score, now we can launch compaction in the cold tablet.
+  FLAGS_enable_workload_score_for_perf_improvement_ops = false;
+  // Compaction in the cold tablet will eventually complete.
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_EQ(1, cold_replica->tablet()->metrics()->compact_rs_duration->TotalCount());
+  });
+}
+
 } // namespace tserver
 } // namespace kudu
diff --git a/src/kudu/tserver/tserver_path_handlers.cc b/src/kudu/tserver/tserver_path_handlers.cc
index 0551337..76e457b 100644
--- a/src/kudu/tserver/tserver_path_handlers.cc
+++ b/src/kudu/tserver/tserver_path_handlers.cc
@@ -640,6 +640,7 @@ void TabletServerPathHandlers::HandleMaintenanceManagerPage(const Webserver::Web
     registered_op["ram_anchored"] = HumanReadableNumBytes::ToString(op_pb.ram_anchored_bytes());
     registered_op["logs_retained"] = HumanReadableNumBytes::ToString(op_pb.logs_retained_bytes());
     registered_op["perf"] = op_pb.perf_improvement();
+    registered_op["workload_score"] = op_pb.workload_score();
   }
 }
 
diff --git a/src/kudu/util/maintenance_manager-test.cc b/src/kudu/util/maintenance_manager-test.cc
index 3a0954d..84bcc7a 100644
--- a/src/kudu/util/maintenance_manager-test.cc
+++ b/src/kudu/util/maintenance_manager-test.cc
@@ -66,10 +66,10 @@ DECLARE_bool(enable_maintenance_manager);
 DECLARE_int64(log_target_replay_size_mb);
 DECLARE_double(maintenance_op_multiplier);
 DECLARE_int32(max_priority_range);
-
 namespace kudu {
 
-static const int kHistorySize = 7;
+// Set this a bit bigger so that the manager could keep track of all possible completed ops.
+static const int kHistorySize = 10;
 static const char kFakeUuid[] = "12345";
 
 class MaintenanceManagerTest : public KuduTest {
@@ -124,7 +124,8 @@ class TestMaintenanceOp : public MaintenanceOp {
       remaining_runs_(1),
       prepared_runs_(0),
       sleep_time_(MonoDelta::FromSeconds(0)),
-      priority_(priority) {
+      priority_(priority),
+      workload_score_(0) {
   }
 
   ~TestMaintenanceOp() override = default;
@@ -160,6 +161,7 @@ class TestMaintenanceOp : public MaintenanceOp {
     stats->set_ram_anchored(ram_anchored_);
     stats->set_logs_retained_bytes(logs_retained_bytes_);
     stats->set_perf_improvement(perf_improvement_);
+    stats->set_workload_score(workload_score_);
   }
 
   void set_remaining_runs(int runs) {
@@ -187,6 +189,11 @@ class TestMaintenanceOp : public MaintenanceOp {
     perf_improvement_ = perf_improvement;
   }
 
+  void set_workload_score(uint64_t workload_score) {
+    std::lock_guard<Mutex> guard(lock_);
+    workload_score_ = workload_score;
+  }
+
   scoped_refptr<Histogram> DurationHistogram() const override {
     return maintenance_op_duration_;
   }
@@ -226,6 +233,8 @@ class TestMaintenanceOp : public MaintenanceOp {
 
   // Maintenance priority.
   int32_t priority_;
+
+  double workload_score_;
 };
 
 // Create an op and wait for it to start running.  Unregister it while it is
@@ -475,13 +484,14 @@ TEST_F(MaintenanceManagerTest, TestOpFactors) {
   TestMaintenanceOp op5("op5", MaintenanceOp::HIGH_IO_USAGE, FLAGS_max_priority_range + 1);
 
   ASSERT_DOUBLE_EQ(pow(FLAGS_maintenance_op_multiplier, -FLAGS_max_priority_range),
-                   manager_->PerfImprovement(1, op1.priority()));
+                   manager_->AdjustedPerfScore(1, 0, op1.priority()));
   ASSERT_DOUBLE_EQ(pow(FLAGS_maintenance_op_multiplier, -1),
-                   manager_->PerfImprovement(1, op2.priority()));
-  ASSERT_DOUBLE_EQ(1, manager_->PerfImprovement(1, op3.priority()));
-  ASSERT_DOUBLE_EQ(FLAGS_maintenance_op_multiplier, manager_->PerfImprovement(1, op4.priority()));
+                   manager_->AdjustedPerfScore(1, 0, op2.priority()));
+  ASSERT_DOUBLE_EQ(1, manager_->AdjustedPerfScore(1, 0, op3.priority()));
+  ASSERT_DOUBLE_EQ(FLAGS_maintenance_op_multiplier,
+                   manager_->AdjustedPerfScore(1, 0, op4.priority()));
   ASSERT_DOUBLE_EQ(pow(FLAGS_maintenance_op_multiplier, FLAGS_max_priority_range),
-                   manager_->PerfImprovement(1, op5.priority()));
+                   manager_->AdjustedPerfScore(1, 0, op5.priority()));
 }
 
 // Test priority OP launching.
@@ -513,8 +523,16 @@ TEST_F(MaintenanceManagerTest, TestPriorityOpLaunch) {
   // FLAGS_enable_maintenance_manager = false, which would cause the thread
   // to exit entirely instead of sleeping.
 
-  // Ops are listed here in perf improvement order, which is a function of the
-  // op's raw perf improvement as well as its priority.
+  // Ops are listed here in final perf score order, which is a function of the
+  // op's raw perf improvement, workload score and its priority.
+  // The 'op0' would never launch because it has a raw perf improvement 0, even if
+  // it has a high workload_score and a high priority.
+  TestMaintenanceOp op0("op0", MaintenanceOp::HIGH_IO_USAGE, FLAGS_max_priority_range + 1);
+  op0.set_perf_improvement(0);
+  op0.set_workload_score(10);
+  op0.set_remaining_runs(1);
+  op0.set_sleep_time(MonoDelta::FromMilliseconds(1));
+
   TestMaintenanceOp op1("op1", MaintenanceOp::HIGH_IO_USAGE, -FLAGS_max_priority_range - 1);
   op1.set_perf_improvement(10);
   op1.set_remaining_runs(1);
@@ -545,30 +563,40 @@ TEST_F(MaintenanceManagerTest, TestPriorityOpLaunch) {
   op6.set_remaining_runs(1);
   op6.set_sleep_time(MonoDelta::FromMilliseconds(1));
 
+  TestMaintenanceOp op7("op7", MaintenanceOp::HIGH_IO_USAGE, 0);
+  op7.set_perf_improvement(9);
+  op7.set_workload_score(10);
+  op7.set_remaining_runs(1);
+  op7.set_sleep_time(MonoDelta::FromMilliseconds(1));
+
   FLAGS_enable_maintenance_manager = false;
+  manager_->RegisterOp(&op0);
   manager_->RegisterOp(&op1);
   manager_->RegisterOp(&op2);
   manager_->RegisterOp(&op3);
   manager_->RegisterOp(&op4);
   manager_->RegisterOp(&op5);
   manager_->RegisterOp(&op6);
+  manager_->RegisterOp(&op7);
   FLAGS_enable_maintenance_manager = true;
 
   // From this point forward if an ASSERT fires, we'll hit a CHECK failure if
   // we don't unregister an op before it goes out of scope.
   SCOPED_CLEANUP({
+    manager_->UnregisterOp(&op0);
     manager_->UnregisterOp(&op1);
     manager_->UnregisterOp(&op2);
     manager_->UnregisterOp(&op3);
     manager_->UnregisterOp(&op4);
     manager_->UnregisterOp(&op5);
     manager_->UnregisterOp(&op6);
+    manager_->UnregisterOp(&op7);
   });
 
   ASSERT_EVENTUALLY([&]() {
     MaintenanceManagerStatusPB status_pb;
     manager_->GetMaintenanceManagerStatusDump(&status_pb);
-    ASSERT_EQ(status_pb.completed_operations_size(), 7);
+    ASSERT_EQ(8, status_pb.completed_operations_size());
   });
 
   // Wait for instances to complete by shutting down the maintenance manager.
@@ -578,7 +606,7 @@ TEST_F(MaintenanceManagerTest, TestPriorityOpLaunch) {
   // Check that running instances are removed from collection after completion.
   MaintenanceManagerStatusPB status_pb;
   manager_->GetMaintenanceManagerStatusDump(&status_pb);
-  ASSERT_EQ(status_pb.running_operations_size(), 0);
+  ASSERT_EQ(0, status_pb.running_operations_size());
 
   // Check that ops were executed in perf improvement order (from greatest to
   // least improvement). Note that completed ops are listed in _reverse_ execution order.
@@ -588,6 +616,7 @@ TEST_F(MaintenanceManagerTest, TestPriorityOpLaunch) {
                             "op4",
                             "op5",
                             "op6",
+                            "op7",
                             "early"});
   ASSERT_EQ(ordered_ops.size(), status_pb.completed_operations().size());
   for (const auto& instance : status_pb.completed_operations()) {
diff --git a/src/kudu/util/maintenance_manager.cc b/src/kudu/util/maintenance_manager.cc
index 6327048..033045f 100644
--- a/src/kudu/util/maintenance_manager.cc
+++ b/src/kudu/util/maintenance_manager.cc
@@ -33,6 +33,7 @@
 
 #include "kudu/gutil/dynamic_annotations.h"
 #include "kudu/gutil/map-util.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/split.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -110,6 +111,11 @@ TAG_FLAG(max_priority_range, advanced);
 TAG_FLAG(max_priority_range, experimental);
 TAG_FLAG(max_priority_range, runtime);
 
+DEFINE_int32(maintenance_manager_inject_latency_ms, 0,
+             "Injects latency into maintenance thread. For use in tests only.");
+TAG_FLAG(maintenance_manager_inject_latency_ms, runtime);
+TAG_FLAG(maintenance_manager_inject_latency_ms, unsafe);
+
 namespace kudu {
 
 MaintenanceOpStats::MaintenanceOpStats() {
@@ -123,6 +129,7 @@ void MaintenanceOpStats::Clear() {
   logs_retained_bytes_ = 0;
   data_retained_bytes_ = 0;
   perf_improvement_ = 0;
+  workload_score_ = 0;
   last_modified_ = MonoTime();
 }
 
@@ -292,6 +299,12 @@ void MaintenanceManager::RunSchedulerThread() {
       return;
     }
 
+    if (PREDICT_FALSE(FLAGS_maintenance_manager_inject_latency_ms > 0)) {
+      LOG(WARNING) << "Injecting " << FLAGS_maintenance_manager_inject_latency_ms
+                   << "ms of latency into maintenance thread";
+      SleepFor(MonoDelta::FromMilliseconds(FLAGS_maintenance_manager_inject_latency_ms));
+    }
+
     // If we found no work to do, then we should sleep before trying again to schedule.
     // Otherwise, we can go right into trying to find the next op.
     prev_iter_found_no_work = !FindAndLaunchOp(&guard);
@@ -408,7 +421,8 @@ pair<MaintenanceOp*, string> MaintenanceManager::FindBestOp() {
                         op->name(), data_retained_bytes);
     }
 
-    const auto perf_improvement = PerfImprovement(stats.perf_improvement(), op->priority());
+    const auto perf_improvement =
+        AdjustedPerfScore(stats.perf_improvement(), stats.workload_score(), op->priority());
     if ((!best_perf_improvement_op) ||
         (perf_improvement > best_perf_improvement)) {
       best_perf_improvement_op = op;
@@ -472,14 +486,20 @@ pair<MaintenanceOp*, string> MaintenanceManager::FindBestOp() {
   return {nullptr, "no ops with positive improvement"};
 }
 
-double MaintenanceManager::PerfImprovement(double perf_improvement, int32_t priority) const {
+double MaintenanceManager::AdjustedPerfScore(double perf_improvement,
+                                             double workload_score,
+                                             int32_t priority) {
+  if (perf_improvement == 0) {
+    return 0;
+  }
+  double perf_score = perf_improvement + workload_score;
   if (priority == 0) {
-    return perf_improvement;
+    return perf_score;
   }
 
   priority = std::max(priority, -FLAGS_max_priority_range);
   priority = std::min(priority, FLAGS_max_priority_range);
-  return perf_improvement * std::pow(FLAGS_maintenance_op_multiplier, priority);
+  return perf_score * std::pow(FLAGS_maintenance_op_multiplier, priority);
 }
 
 void MaintenanceManager::LaunchOp(MaintenanceOp* op) {
@@ -542,11 +562,13 @@ void MaintenanceManager::GetMaintenanceManagerStatusDump(MaintenanceManagerStatu
       op_pb->set_ram_anchored_bytes(stats.ram_anchored());
       op_pb->set_logs_retained_bytes(stats.logs_retained_bytes());
       op_pb->set_perf_improvement(stats.perf_improvement());
+      op_pb->set_workload_score(stats.workload_score());
     } else {
       op_pb->set_runnable(false);
       op_pb->set_ram_anchored_bytes(0);
       op_pb->set_logs_retained_bytes(0);
       op_pb->set_perf_improvement(0.0);
+      op_pb->set_workload_score(0.0);
     }
   }
 
diff --git a/src/kudu/util/maintenance_manager.h b/src/kudu/util/maintenance_manager.h
index 6dad1b6..9bccc9e 100644
--- a/src/kudu/util/maintenance_manager.h
+++ b/src/kudu/util/maintenance_manager.h
@@ -106,6 +106,19 @@ class MaintenanceOpStats {
     perf_improvement_ = perf_improvement;
   }
 
+  double workload_score() const {
+    DCHECK(valid_);
+    return workload_score_;
+  }
+
+  void set_workload_score(double workload_score) {
+    if (workload_score == workload_score_) {
+      return;
+    }
+    UpdateLastModified();
+    workload_score_ = workload_score;
+  }
+
   const MonoTime& last_modified() const {
     DCHECK(valid_);
     return last_modified_;
@@ -146,6 +159,8 @@ class MaintenanceOpStats {
   // absolute scale (yet TBD).
   double perf_improvement_;
 
+  double workload_score_;
+
   // The last time that the stats were modified.
   MonoTime last_modified_;
 };
@@ -179,6 +194,11 @@ class MaintenanceOp {
     HIGH_IO_USAGE // Everything else.
   };
 
+  enum PerfImprovementOpType {
+    FLUSH_OP,
+    COMPACT_OP
+  };
+
   explicit MaintenanceOp(std::string name, IOUsage io_usage);
   virtual ~MaintenanceOp();
 
@@ -325,7 +345,9 @@ class MaintenanceManager : public std::enable_shared_from_this<MaintenanceManage
   // suitable for logging.
   std::pair<MaintenanceOp*, std::string> FindBestOp();
 
-  double PerfImprovement(double perf_improvement, int32_t priority) const;
+  // Adjust the perf score based on the raw perf score, the tablet's workload_score
+  // and the table's priority.
+  static double AdjustedPerfScore(double perf_improvement, double workload_score, int32_t priority);
 
   void LaunchOp(MaintenanceOp* op);
 
diff --git a/src/kudu/util/maintenance_manager.proto b/src/kudu/util/maintenance_manager.proto
index 77c8cf8..54f2280 100644
--- a/src/kudu/util/maintenance_manager.proto
+++ b/src/kudu/util/maintenance_manager.proto
@@ -29,6 +29,7 @@ message MaintenanceManagerStatusPB {
     required int64 ram_anchored_bytes = 4;
     required int64 logs_retained_bytes = 5;
     required double perf_improvement = 6;
+    required double workload_score = 7;
   }
 
   message OpInstancePB {
diff --git a/www/maintenance-manager.mustache b/www/maintenance-manager.mustache
index 81d5247..60151e2 100644
--- a/www/maintenance-manager.mustache
+++ b/www/maintenance-manager.mustache
@@ -66,6 +66,7 @@ under the License.
       <th data-sorter="bytesSorter" data-sortable="true">RAM anchored</th>
       <th data-sorter="bytesSorter" data-sortable="true">Logs retained</th>
       <th data-sortable="true">Perf</th>
+      <th data-sortable="true">Workload score</th>
     </tr>
   </thead>
   <tbody>
@@ -76,6 +77,7 @@ under the License.
       <td>{{ram_anchored}}</td>
       <td>{{logs_retained}}</td>
       <td>{{perf}}</td>
+      <td>{{workload_score}}</td>
     </tr>
    {{/registered_operations}}
   </tbody>