You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by al...@apache.org on 2022/12/20 21:41:06 UTC

[kudu] branch master updated: KUDU-3406 memory budgeting for CompactRowSetsOp

This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new 1556a353e KUDU-3406 memory budgeting for CompactRowSetsOp
1556a353e is described below

commit 1556a353e60c5d555996347cbd46d5e5a6661266
Author: Alexey Serbin <al...@apache.org>
AuthorDate: Fri Nov 25 18:20:38 2022 -0800

    KUDU-3406 memory budgeting for CompactRowSetsOp
    
    This patch implements memory budgeting for performing rowset merge
    compactions (i.e. CompactRowSetsOp maintenance operations).
    
    The idea is to check whether it's enough memory left before reaching the
    hard memory limit if starting a CompactRowSetsOp.  An estimate for the
    amount of memory necessary to perform the operation is based on the
    total on-disk size of all deltas in rowsets selected for the merge
    compaction and the ratio of memory-to-disk size when loading those
    deltas in memory to perform the merge rowset compaction.  If there is
    enough memory, then a rowset is considered as an input for merge
    compaction, otherwise it's not.  Meanwhile, REDO deltas are becoming
    UNDO deltas after major delta compactions run on the rowset, and UNDO
    deltas eventually become ancient, so UndoDeltaBlockGCOp drop those.
    With that, the amount of memory required to load a rowset's delta data
    into memory shrinks over long run, and eventually it's back into the
    input for one of the future runs of the CompactRowSetsOp maintenance
    operation.
    
    Prior to this patch, the root cause of running out of memory when
    performing CompactRowSetsOp was trying to allocate too much memory
    at least due to the following factors:
      * many UNDO deltas might accumulate in rowsets selected for the
        compaction operation because of the relatively high setting for the
        --tablet_history_max_age_sec flag (7 days) and a particular workload
        that issues many updates for rows in the same rowset
      * even if it's a merge-like operation by its nature, the current
        implementation of CompactRowSetsOp allocates all the memory
        necessary to load the UNDO deltas at once, and it keeps all the
        preliminary results in the memory as well before persisting
        the result data to disk
      * the current implementation of CompactRowSetsOp loads all the UNDO
        deltas from the rowsets selected for compaction regardless whether
        they are ancient or not; it discards of the data sourced from the
        ancient deltas in the very end before persisting the result data
    
    Ideally, the current implementation of CompactRowSetsOp should be
    refactored to merge the deltas in participating rowsets sequentially,
    chunk by chunk, persisting the results and allocating memory just for
    small bunch of processed deltas, not loading all the deltas at once.
    A future patch should take care of that, while this patch provides an
    interim approach using memory budgeting on top of the current
    CompactRowSetsOp implementation as-is.
    
    The newly introduced behavior is gated by the following two flags:
      * rowset_compaction_memory_estimate_enabled: whether to enable memory
        budgeting for CompactRowSetsOp (default is 'false').
      * rowset_compaction_ancient_delta_threshold_enabled: whether to
        check against the ratio of ancient UNDO deltas across rowsets
        selected for compaction (default is 'true').
    
    In addition, the following flags allow for tweaking the new
    behavior gated by the corresponding flags above:
      * rowset_compaction_delta_memory_factor: the multiplication factor for
        the total size of rowset's deltas to estimate how much memory
        CompactRowSetsOp would consume if operating on those deltas when
        no runtime stats for the compact_rs_mem_usage_to_deltas_size_ratio
        metric is yet available (default is 5.0)
      * rowset_compaction_ancient_delta_max_ratio: the threshold for the
        ratio of the data size in ancient UNDO deltas to the total data size
        of UNDO deltas in the rowsets selected for merge compaction
      * rowset_compaction_estimate_min_deltas_size_mb: the threshold on the
        total size of a rowset's deltas to apply the memory budgeting
    
    To complement the --rowset_compaction_delta_memory_factor flag with more
    tablet-specific stats, two new per-tablet metrics have been introduced:
      * compact_rs_mem_usage is a histogram to gather statistics on how much
        memory rowset merge compaction consumed
      * compact_rs_mem_usage_to_deltas_size_ratio is a histogram to track
        the memory-to-disk size for a tablet's rowsets participating in
        merge compaction -- this metric provides the average that's is used
        as a more precise factor to estimate the amount of memory a rowset's
        deltas would use when undergoing merge compaction given the amount
        of memory of all the rowset's deltas on disk
    
    This patch doesn't add a test, but I verified how the new functionality
    works with real data from the case when merge rowset compaction would
    take about 28GByte if not constrained by the memory limit.  I'm planning
    to add a test in a follow-up changelist based on the following patch
    once the latter appears in the git repository:
      https://gerrit.cloudera.org/#/c/19278
    
    Change-Id: I89c171284944831e95c45a993d85fbefe89048cf
    Reviewed-on: http://gerrit.cloudera.org:8080/19281
    Reviewed-by: Attila Bukor <ab...@apache.org>
    Tested-by: Kudu Jenkins
---
 src/kudu/tablet/compaction.cc             |  44 +++++++-
 src/kudu/tablet/compaction.h              |   4 +
 src/kudu/tablet/compaction_policy-test.cc |   7 +-
 src/kudu/tablet/compaction_policy.cc      | 109 +++++++++++++++++-
 src/kudu/tablet/compaction_policy.h       |   5 +-
 src/kudu/tablet/delta_iterator_merger.cc  |   8 ++
 src/kudu/tablet/delta_iterator_merger.h   |   2 +
 src/kudu/tablet/delta_store.h             |   4 +
 src/kudu/tablet/delta_tracker.cc          |   4 +-
 src/kudu/tablet/delta_tracker.h           |   2 +-
 src/kudu/tablet/deltafile.cc              |   7 +-
 src/kudu/tablet/deltafile.h               |  10 ++
 src/kudu/tablet/deltamemstore.h           |   6 +
 src/kudu/tablet/rowset_info.cc            |  26 ++++-
 src/kudu/tablet/rowset_info.h             |  28 ++++-
 src/kudu/tablet/tablet.cc                 | 182 +++++++++++++++++++++++++++---
 src/kudu/tablet/tablet_metrics.cc         |  17 +++
 src/kudu/tablet/tablet_metrics.h          |   4 +
 src/kudu/tablet/tablet_mm_ops-test.cc     |   9 +-
 src/kudu/tablet/tablet_mm_ops.cc          |  20 ++--
 src/kudu/tablet/tablet_mm_ops.h           |   1 +
 21 files changed, 451 insertions(+), 48 deletions(-)

diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index e38bb7479..61c39c59d 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -206,6 +206,12 @@ class MemRowSetCompactionInput : public CompactionInput {
     return iter_->schema();
   }
 
+  size_t memory_footprint() const override {
+    // TODO(aserbin): implement this if it's necessary to track peak memory
+    //                usage for objects of this type during compaction
+    return 0;
+  }
+
  private:
   DISALLOW_COPY_AND_ASSIGN(MemRowSetCompactionInput);
   unique_ptr<RowBlock> row_block_;
@@ -287,6 +293,11 @@ class DiskRowSetCompactionInput : public CompactionInput {
     return base_iter_->schema();
   }
 
+  size_t memory_footprint() const override {
+    return redo_delta_iter_->memory_footprint() +
+        undo_delta_iter_->memory_footprint();
+  }
+
  private:
   DISALLOW_COPY_AND_ASSIGN(DiskRowSetCompactionInput);
   unique_ptr<RowwiseIterator> base_iter_;
@@ -578,8 +589,9 @@ class MergeCompactionInput : public CompactionInput {
  public:
   MergeCompactionInput(const vector<shared_ptr<CompactionInput>>& inputs,
                        const Schema* schema)
-    : schema_(schema),
-      num_dup_rows_(0) {
+      : schema_(schema),
+        num_dup_rows_(0),
+        max_memory_usage_(0) {
     for (const auto& input : inputs) {
       unique_ptr<MergeState> state(new MergeState);
       state->input = input;
@@ -710,13 +722,36 @@ class MergeCompactionInput : public CompactionInput {
   Arena* PreparedBlockArena() override { return prepared_block_arena_; }
 
   Status FinishBlock() override {
-    return ProcessEmptyInputs();
+    auto s = ProcessEmptyInputs();
+
+    // Update the stats on peak memory usage.
+    size_t cur_usage = 0;
+    for (const auto* st : states_) {
+      cur_usage += st->input->memory_footprint();
+      for (const auto* st : st->dominated) {
+        cur_usage += st->input->memory_footprint();
+      }
+    }
+    if (cur_usage > max_memory_usage_) {
+      max_memory_usage_ = cur_usage;
+    }
+    VLOG(2) << Substitute("max memory usage: $0", max_memory_usage_);
+
+    return s;
   }
 
   const Schema& schema() const override {
     return *schema_;
   }
 
+  // Return the peak amount of memory used by this compaction input.
+  // Since 'max_memory_usage_' isn't protected against concurrent access,
+  // this method should be invoked from the same thread that performs merge
+  // compaction.
+  size_t memory_footprint() const override {
+    return max_memory_usage_;
+  }
+
  private:
   DISALLOW_COPY_AND_ASSIGN(MergeCompactionInput);
 
@@ -895,6 +930,9 @@ class MergeCompactionInput : public CompactionInput {
   vector<std::unique_ptr<RowBlock>> duplicated_rows_;
   int num_dup_rows_;
 
+  // An estimate on maximum memory usage by this compaction input.
+  size_t max_memory_usage_;
+
   enum {
     kDuplicatedRowsPerBlock = 10
   };
diff --git a/src/kudu/tablet/compaction.h b/src/kudu/tablet/compaction.h
index f6e7dae4e..4ea0f791a 100644
--- a/src/kudu/tablet/compaction.h
+++ b/src/kudu/tablet/compaction.h
@@ -134,6 +134,10 @@ class CompactionInput {
   virtual bool HasMoreBlocks() = 0;
   virtual const Schema& schema() const = 0;
 
+  // Return an estimate on the maximum amount of memory used by the object
+  // during its lifecycle while initializing, reading and processing data, etc.
+  virtual size_t memory_footprint() const = 0;
+
   virtual ~CompactionInput() {}
 };
 
diff --git a/src/kudu/tablet/compaction_policy-test.cc b/src/kudu/tablet/compaction_policy-test.cc
index fe0958392..2161e2483 100644
--- a/src/kudu/tablet/compaction_policy-test.cc
+++ b/src/kudu/tablet/compaction_policy-test.cc
@@ -21,8 +21,10 @@
 #include <initializer_list>
 #include <limits>
 #include <memory>
+#include <optional>
 #include <ostream>
 #include <string>
+#include <unordered_set>
 #include <utility>
 #include <vector>
 
@@ -47,6 +49,7 @@
 #include "kudu/util/test_macros.h"
 #include "kudu/util/test_util.h"
 
+using std::nullopt;
 using std::string;
 using std::vector;
 
@@ -443,8 +446,10 @@ double ComputeAverageRowsetHeight(
   RowSetTree tree;
   CHECK_OK(tree.Reset(rowsets));
 
-  double rowset_total_height, rowset_total_width;
+  double rowset_total_height;
+  double rowset_total_width;
   RowSetInfo::ComputeCdfAndCollectOrdered(tree,
+                                          nullopt,
                                           &rowset_total_height,
                                           &rowset_total_width,
                                           nullptr,
diff --git a/src/kudu/tablet/compaction_policy.cc b/src/kudu/tablet/compaction_policy.cc
index 0b88001cc..3e9a3c935 100644
--- a/src/kudu/tablet/compaction_policy.cc
+++ b/src/kudu/tablet/compaction_policy.cc
@@ -18,9 +18,12 @@
 #include "kudu/tablet/compaction_policy.h"
 
 #include <algorithm>
+#include <functional>
 #include <limits>
+#include <optional>
 #include <ostream>
 #include <string>
+#include <type_traits>
 #include <unordered_set>
 #include <utility>
 #include <vector>
@@ -28,14 +31,23 @@
 #include <gflags/gflags.h>
 #include <glog/logging.h>
 
+#include "kudu/gutil/casts.h"
 #include "kudu/gutil/map-util.h"
+#include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/tablet/diskrowset.h"
+#include "kudu/tablet/rowset.h"
 #include "kudu/tablet/rowset_info.h"
 #include "kudu/tablet/svg_dump.h"
+#include "kudu/tablet/tablet_metrics.h"
 #include "kudu/util/flag_tags.h"
+#include "kudu/util/hdr_histogram.h"
 #include "kudu/util/knapsack_solver.h"
+#include "kudu/util/metrics.h"
+#include "kudu/util/process_memory.h"
 #include "kudu/util/status.h"
 
+using std::make_optional;
 using std::vector;
 using strings::Substitute;
 
@@ -58,6 +70,53 @@ DEFINE_double(compaction_minimum_improvement, 0.01f,
               "compaction will be considered ineligible.");
 TAG_FLAG(compaction_minimum_improvement, advanced);
 
+DEFINE_bool(rowset_compaction_memory_estimate_enabled, false,
+            "Whether to check for available memory necessary to run "
+            "CompactRowSetsOp maintenance operations. If the difference "
+            "between the hard memory limit and current usage is less than the "
+            "estimated amount necessary to perform the operation, postpone "
+            "running the operation until there is enough memory available. "
+            "Use the --rowset_compaction_delta_memory_factor flag to tune the "
+            "initial factor to estimate the amount of required memory based "
+            "on the on-disk size of deltas when relevant statistics have not "
+            "yet accumulated or the --rowset_compaction_enforce_preset_factor "
+            "flag is set to 'true'.");
+TAG_FLAG(rowset_compaction_memory_estimate_enabled, experimental);
+TAG_FLAG(rowset_compaction_memory_estimate_enabled, runtime);
+
+DEFINE_uint32(rowset_compaction_estimate_min_deltas_size_mb, 64,
+              "Minimum size (in MBytes) of on-disk delta sizes to apply memory "
+              "budgeting constraints for rowset merge compaction if "
+              "--rowset_compaction_memory_estimate_enabled set to 'true'. "
+              "This threshold is also used to decide whether to collect "
+              "the stats for the compact_rs_mem_usage_to_deltas_size_ratio "
+              "tablet metric.");
+TAG_FLAG(rowset_compaction_estimate_min_deltas_size_mb, experimental);
+TAG_FLAG(rowset_compaction_estimate_min_deltas_size_mb, runtime);
+
+DEFINE_double(rowset_compaction_delta_memory_factor, 5.0,
+              "The initial memory-to-disk size factor to estimate the amount "
+              "of memory necessary to load a rowset's deltas into memory to "
+              "perform CompactRowSetsOp. The estimate is obtained by "
+              "multiplying this factor by the total size of deltas across all "
+              "rowsets selected for rowset merge compaction. This flag is used "
+              "when there isn't enough statistics accumulated for the tablet "
+              "during the runtime of a tablet server (e.g., upon the very "
+              "first run of CompactRowSetsOp for a tablet after starting the "
+              "tablet server) or --rowset_compaction_enforce_preset_factor "
+              "is set. The factor depends on tablet's column types, encoding, "
+              "compression, workload pattern, etc.");
+TAG_FLAG(rowset_compaction_delta_memory_factor, experimental);
+TAG_FLAG(rowset_compaction_delta_memory_factor, runtime);
+
+DEFINE_bool(rowset_compaction_enforce_preset_factor, false,
+            "Whether to use the preset factor defined by the "
+            "--rowset_compaction_delta_memory_factor flag even when enough "
+            "runtime stats have accumulated by the "
+            "compact_rs_mem_usage_to_deltas_size_ratio metric");
+TAG_FLAG(rowset_compaction_enforce_preset_factor, experimental);
+TAG_FLAG(rowset_compaction_enforce_preset_factor, runtime);
+
 namespace kudu {
 namespace tablet {
 
@@ -91,9 +150,11 @@ static const double kSupportAdjust = 1.003;
 // BudgetedCompactionPolicy
 ////////////////////////////////////////////////////////////
 
-BudgetedCompactionPolicy::BudgetedCompactionPolicy(int budget)
-  : size_budget_mb_(budget) {
-  CHECK_GT(budget, 0);
+BudgetedCompactionPolicy::BudgetedCompactionPolicy(int size_budget_mb,
+                                                   const TabletMetrics* metrics)
+    : size_budget_mb_(size_budget_mb),
+      metrics_(metrics) {
+  CHECK_GT(size_budget_mb, 0);
 }
 
 uint64_t BudgetedCompactionPolicy::target_rowset_size() const {
@@ -105,7 +166,49 @@ void BudgetedCompactionPolicy::SetupKnapsackInput(
     const RowSetTree& tree,
     vector<RowSetInfo>* asc_min_key,
     vector<RowSetInfo>* asc_max_key) const {
+  const auto is_on_memory_budget = [&] (const RowSet* rs) {
+    // The memory budgeting applies only when configured so.
+    if (!FLAGS_rowset_compaction_memory_estimate_enabled) {
+      return true;
+    }
+    const DiskRowSet* drs = down_cast<const DiskRowSet*>(rs);
+    DiskRowSetSpace drss;
+    drs->GetDiskRowSetSpaceUsage(&drss);
+    const uint64_t deltas_on_disk_size = drss.redo_deltas_size + drss.undo_deltas_size;
+
+    if (FLAGS_rowset_compaction_estimate_min_deltas_size_mb < deltas_on_disk_size) {
+      const auto* h = metrics_->compact_rs_mem_usage_to_deltas_size_ratio->histogram();
+      const bool use_metrics = !FLAGS_rowset_compaction_enforce_preset_factor &&
+          h->TotalCount() > 0 && h->MeanValue() > 0;
+      const double mem_size_factor = use_metrics
+          ? h->MeanValue() : FLAGS_rowset_compaction_delta_memory_factor;
+      // An estimate for the amount of memory necessary to load all rowset's
+      // deltas into memory. As of now, compaction operations such as
+      // CompactRowSetsOp (i.e. rowset merge compaction) are implemented in
+      // such a way that they load all the deltas into the memory at once.
+      // With that, let's check if there is enough memory to do so.
+      const int64_t estimated_mem_size = static_cast<int64_t>(
+          mem_size_factor * static_cast<double>(deltas_on_disk_size));
+      const int64_t available_mem_size =
+          process_memory::HardLimit() - process_memory::CurrentConsumption();
+      if (available_mem_size < estimated_mem_size) {
+        VLOG(2) << Substitute(
+            "rowset '$0' is not on memory budget for compaction: "
+            "$1 bytes needed, $2 bytes available",
+            rs->ToString(), estimated_mem_size, available_mem_size);
+        return false;
+      }
+      VLOG(3) << Substitute(
+          "compaction memory budgeting for rowset '$0': "
+          "$1 bytes needed, $2 bytes available",
+          rs->ToString(), estimated_mem_size, available_mem_size);
+    }
+
+    return true;
+  };
+
   RowSetInfo::ComputeCdfAndCollectOrdered(tree,
+                                          make_optional(is_on_memory_budget),
                                           /*rowset_total_height=*/nullptr,
                                           /*rowset_total_width=*/nullptr,
                                           asc_min_key,
diff --git a/src/kudu/tablet/compaction_policy.h b/src/kudu/tablet/compaction_policy.h
index f05f81038..e04e7442c 100644
--- a/src/kudu/tablet/compaction_policy.h
+++ b/src/kudu/tablet/compaction_policy.h
@@ -32,6 +32,7 @@ namespace tablet {
 class RowSet;
 class RowSetInfo;
 class RowSetTree;
+struct TabletMetrics;
 
 // A set of rowsets selected for compaction.
 typedef std::unordered_set<const RowSet*> CompactionSelection;
@@ -77,7 +78,8 @@ class CompactionPolicy {
 // See docs/design-docs/compaction-policy.md for details.
 class BudgetedCompactionPolicy : public CompactionPolicy {
  public:
-  explicit BudgetedCompactionPolicy(int size_budget_mb);
+  explicit BudgetedCompactionPolicy(int size_budget_mb,
+                                    const TabletMetrics* metrics = nullptr);
 
   Status PickRowSets(const RowSetTree &tree,
                      CompactionSelection* picked,
@@ -125,6 +127,7 @@ class BudgetedCompactionPolicy : public CompactionPolicy {
                 SolutionAndValue* best_solution) const;
 
   const size_t size_budget_mb_;
+  const TabletMetrics* metrics_;
 };
 
 } // namespace tablet
diff --git a/src/kudu/tablet/delta_iterator_merger.cc b/src/kudu/tablet/delta_iterator_merger.cc
index 1afb2f722..b9968bea6 100644
--- a/src/kudu/tablet/delta_iterator_merger.cc
+++ b/src/kudu/tablet/delta_iterator_merger.cc
@@ -18,6 +18,7 @@
 #include "kudu/tablet/delta_iterator_merger.h"
 
 #include <algorithm>
+#include <type_traits>
 
 #include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/substitute.h"
@@ -148,6 +149,13 @@ string DeltaIteratorMerger::ToString() const {
   return ret;
 }
 
+size_t DeltaIteratorMerger::memory_footprint() {
+  size_t result = 0;
+  for (const auto& it : iters_) {
+    result += it->memory_footprint();
+  }
+  return result;
+}
 
 Status DeltaIteratorMerger::Create(
     const vector<shared_ptr<DeltaStore> > &stores,
diff --git a/src/kudu/tablet/delta_iterator_merger.h b/src/kudu/tablet/delta_iterator_merger.h
index 13547c8d5..7653a306f 100644
--- a/src/kudu/tablet/delta_iterator_merger.h
+++ b/src/kudu/tablet/delta_iterator_merger.h
@@ -95,6 +95,8 @@ class DeltaIteratorMerger : public DeltaIterator {
     LOG(DFATAL) << "Not implemented";
   }
 
+  size_t memory_footprint() override;
+
  private:
   explicit DeltaIteratorMerger(std::vector<std::unique_ptr<DeltaIterator> > iters);
 
diff --git a/src/kudu/tablet/delta_store.h b/src/kudu/tablet/delta_store.h
index 18f72b46b..9e8ffb8bf 100644
--- a/src/kudu/tablet/delta_store.h
+++ b/src/kudu/tablet/delta_store.h
@@ -384,6 +384,10 @@ class DeltaIterator : public PreparedDeltas {
   // See SelectedDeltas::Delta for more details.
   virtual void set_deltas_selected(int64_t deltas_selected) = 0;
 
+  // Return an estimate on the maximum amount of memory a DeltaIterator object
+  // uses during its lifecycle while initializing, preparing next batch, etc.
+  virtual size_t memory_footprint() = 0;
+
   virtual ~DeltaIterator() {}
 };
 
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index 6f88878f8..7f836db95 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -504,8 +504,8 @@ bool DeltaTracker::EstimateAllRedosAreAncient(Timestamp ancient_history_mark) {
       newest_redo->delta_stats().max_timestamp() < ancient_history_mark;
 }
 
-Status DeltaTracker::EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp ancient_history_mark,
-                                                                 int64_t* bytes) {
+Status DeltaTracker::EstimateBytesInPotentiallyAncientUndoDeltas(
+    Timestamp ancient_history_mark, int64_t* bytes) const {
   DCHECK_NE(Timestamp::kInvalidTimestamp, ancient_history_mark);
   DCHECK(bytes);
   SharedDeltaStoreVector undos_newest_first;
diff --git a/src/kudu/tablet/delta_tracker.h b/src/kudu/tablet/delta_tracker.h
index cf5831537..7feadcc64 100644
--- a/src/kudu/tablet/delta_tracker.h
+++ b/src/kudu/tablet/delta_tracker.h
@@ -177,7 +177,7 @@ class DeltaTracker {
 
   // See RowSet::EstimateBytesInPotentiallyAncientUndoDeltas().
   Status EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp ancient_history_mark,
-                                                     int64_t* bytes);
+                                                     int64_t* bytes) const;
 
   // Returns whether all redo (DMS and newest redo delta file) are ancient
   // (i.e. that the redo with the highest timestamp is older than the AHM).
diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc
index 6e3ef38a5..fbc91839d 100644
--- a/src/kudu/tablet/deltafile.cc
+++ b/src/kudu/tablet/deltafile.cc
@@ -424,7 +424,9 @@ DeltaFileIterator<Type>::DeltaFileIterator(shared_ptr<DeltaFileReader> dfr,
       prepared_(false),
       exhausted_(false),
       initted_(false),
-      cache_blocks_(CFileReader::CACHE_BLOCK) {}
+      cache_blocks_(CFileReader::CACHE_BLOCK),
+      delta_blocks_mem_size_(0) {
+}
 
 template<DeltaType Type>
 Status DeltaFileIterator<Type>::Init(ScanSpec* spec) {
@@ -454,6 +456,7 @@ Status DeltaFileIterator<Type>::SeekToOrdinal(rowid_t idx) {
                                     preparer_.opts().snap_to_include)) {
     exhausted_ = true;
     delta_blocks_.clear();
+    delta_blocks_mem_size_ = 0;
     return Status::OK();
   }
 
@@ -482,6 +485,7 @@ Status DeltaFileIterator<Type>::SeekToOrdinal(rowid_t idx) {
   preparer_.Seek(idx);
   prepared_ = false;
   delta_blocks_.clear();
+  delta_blocks_mem_size_ = 0;
   exhausted_ = false;
   return Status::OK();
 }
@@ -515,6 +519,7 @@ Status DeltaFileIterator<Type>::ReadCurrentBlockOntoQueue() {
     pdb.last_updated_idx;
   #endif
 
+  delta_blocks_mem_size_ += pdb.block->data().size();
   delta_blocks_.emplace_back(std::move(pdb));
   return Status::OK();
 }
diff --git a/src/kudu/tablet/deltafile.h b/src/kudu/tablet/deltafile.h
index ef97fe591..fb19ce1d5 100644
--- a/src/kudu/tablet/deltafile.h
+++ b/src/kudu/tablet/deltafile.h
@@ -281,6 +281,10 @@ class DeltaFileIterator : public DeltaIterator {
     preparer_.set_deltas_selected(deltas_selected);
   }
 
+  size_t memory_footprint() override {
+    return delta_blocks_mem_size_;
+  }
+
  private:
   friend class DeltaFileReader;
 
@@ -363,6 +367,12 @@ class DeltaFileIterator : public DeltaIterator {
 
   cfile::CFileReader::CacheControl cache_blocks_;
 
+  // The amount of memory allocated for the data stored in delta_blocks_ (in
+  // bytes). That corresponds to the amount of memory necessary to store the
+  // uncompressed (but not yet decoded) deltas in memory. The memory might
+  // be allocated in a block cache, an arena, or heap.
+  size_t delta_blocks_mem_size_;
+
   DISALLOW_COPY_AND_ASSIGN(DeltaFileIterator);
 };
 
diff --git a/src/kudu/tablet/deltamemstore.h b/src/kudu/tablet/deltamemstore.h
index 2fd4da36d..259333937 100644
--- a/src/kudu/tablet/deltamemstore.h
+++ b/src/kudu/tablet/deltamemstore.h
@@ -251,6 +251,12 @@ class DMSIterator : public DeltaIterator {
     preparer_.set_deltas_selected(deltas_selected);
   }
 
+  size_t memory_footprint() override {
+    // TODO(aserbin): implement this if it's necessary to track peak memory
+    //                usage for objects of this type during their lifecycle
+    return 0;
+  }
+
  private:
   DISALLOW_COPY_AND_ASSIGN(DMSIterator);
   friend class DeltaMemStore;
diff --git a/src/kudu/tablet/rowset_info.cc b/src/kudu/tablet/rowset_info.cc
index 661de69e7..3abbfc208 100644
--- a/src/kudu/tablet/rowset_info.cc
+++ b/src/kudu/tablet/rowset_info.cc
@@ -21,8 +21,10 @@
 #include <cstdint>
 #include <cstring>
 #include <memory>
+#include <optional>
 #include <ostream>
 #include <string>
+#include <type_traits>
 #include <unordered_map>
 #include <utility>
 
@@ -36,6 +38,7 @@
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/gutil/template_util.h"
 #include "kudu/tablet/rowset.h"
 #include "kudu/tablet/rowset_tree.h"
 #include "kudu/util/flag_tags.h"
@@ -45,9 +48,11 @@
 #include "kudu/util/status.h"
 
 using std::shared_ptr;
+using std::optional;
 using std::string;
 using std::unordered_map;
 using std::vector;
+using strings::Substitute;
 
 DECLARE_double(compaction_minimum_improvement);
 DECLARE_int64(budgeted_compaction_target_rowset_size);
@@ -87,7 +92,7 @@ bool ValidateSmallRowSetTradeoffVsMinScore() {
   const auto tradeoff = FLAGS_compaction_small_rowset_tradeoff;
   const auto min_score = FLAGS_compaction_minimum_improvement;
   if (tradeoff >= min_score) {
-    LOG(ERROR) << strings::Substitute(
+    LOG(ERROR) << Substitute(
         "-compaction_small_rowset_tradeoff=$0 must be less than "
         "-compaction_minimum_improvement=$1 in order to prevent pointless "
         "compactions; if you know what you are doing, pass "
@@ -251,11 +256,13 @@ void RowSetInfo::Collect(const RowSetTree& tree, vector<RowSetInfo>* rsvec) {
   }
 }
 
-void RowSetInfo::ComputeCdfAndCollectOrdered(const RowSetTree& tree,
-                                             double* rowset_total_height,
-                                             double* rowset_total_width,
-                                             vector<RowSetInfo>* info_by_min_key,
-                                             vector<RowSetInfo>* info_by_max_key) {
+void RowSetInfo::ComputeCdfAndCollectOrdered(
+    const RowSetTree& tree,
+    optional<MemoryBudgetingFunc> is_on_memory_budget,
+    double* rowset_total_height,
+    double* rowset_total_width,
+    vector<RowSetInfo>* info_by_min_key,
+    vector<RowSetInfo>* info_by_max_key) {
   DCHECK((info_by_min_key && info_by_max_key) ||
          (!info_by_min_key && !info_by_max_key))
       << "'info_by_min_key' and 'info_by_max_key' must both be non-null or both be null";
@@ -289,6 +296,13 @@ void RowSetInfo::ComputeCdfAndCollectOrdered(const RowSetTree& tree,
   RowSetVector available_rowsets;
   for (const auto& rs : tree.all_rowsets()) {
     if (rs->IsAvailableForCompaction()) {
+      if (is_on_memory_budget && !(*is_on_memory_budget)(rs.get())) {
+        // Skip rowsets filtered out by the memory budgeting.
+        KLOG_EVERY_N_SECS(INFO, 600) << Substitute(
+            "$0 removed from compaction input due to memory constraints",
+            rs->ToString());
+        continue;
+      }
       available_rowsets.push_back(rs);
     }
   }
diff --git a/src/kudu/tablet/rowset_info.h b/src/kudu/tablet/rowset_info.h
index 585c9cb31..2ec8b4d31 100644
--- a/src/kudu/tablet/rowset_info.h
+++ b/src/kudu/tablet/rowset_info.h
@@ -18,6 +18,8 @@
 #define KUDU_TABLET_ROWSET_INFO_H_
 
 #include <cstdint>
+#include <functional>
+#include <optional>
 #include <string>
 #include <vector>
 
@@ -41,24 +43,28 @@ class RowSetTree;
 // Class is immutable.
 class RowSetInfo {
  public:
+  typedef std::function<bool(const RowSet*)> MemoryBudgetingFunc;
 
   // Appends the rowsets in no order without the cdf values set.
   static void Collect(const RowSetTree& tree, std::vector<RowSetInfo>* rsvec);
 
   // From the rowset tree 'tree', computes the keyspace cdf and collects rowset
   // information in min-key- and max-key-sorted order into 'info_by_min_key'
-  // and 'info_by_max_key', respectively.
+  // and 'info_by_max_key', respectively. The memory budgeting function
+  // 'is_on_memory_budget' is used for OS resource assessment, if present.
   // The total weighted height and the total width of the rowset tree is set into
   // 'rowset_total_height' and 'rowset_total_width', if they are not nullptr.
   // If one of 'info_by_min_key' and 'info_by_max_key' is nullptr, the other
   // must be.
   // Requires holding the compact_select_lock_ for the tablet that the
   // rowsets in 'tree' references.
-  static void ComputeCdfAndCollectOrdered(const RowSetTree& tree,
-                                          double* rowset_total_height,
-                                          double* rowset_total_width,
-                                          std::vector<RowSetInfo>* info_by_min_key,
-                                          std::vector<RowSetInfo>* info_by_max_key);
+  static void ComputeCdfAndCollectOrdered(
+      const RowSetTree& tree,
+      std::optional<MemoryBudgetingFunc> is_on_memory_budget,
+      double* rowset_total_height,
+      double* rowset_total_width,
+      std::vector<RowSetInfo>* info_by_min_key,
+      std::vector<RowSetInfo>* info_by_max_key);
 
   // Split [start_key, stop_key) into primary key ranges by chunk size.
   //
@@ -72,6 +78,16 @@ class RowSetInfo {
                             uint64 target_chunk_size,
                             std::vector<KeyRange>* ranges);
 
+  // Current implementation of CompactRowSetsOp loads all the rowset's delta
+  // data into the memory. Doing so, it unpacks and decodes the data that
+  // is stored on disk. The information on how much memory each delta requires
+  // when unpacked and decoded isn't available beforehand, but it's possible
+  // to provide an estimate based on the size of the delta as stored on disk and
+  // the stats on the memory/disk ratio for rowsets that have gone through
+  // the merge compaction for a particular tablet.
+  static void FitsIntoMemory(const RowSet& rs,
+                             double mem_to_disk_ratio);
+
   uint64_t size_bytes(const ColumnId& col_id) const;
   uint64_t base_and_redos_size_bytes() const {
     return extra_->base_and_redos_size_bytes;
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index c5bc838a5..d2afc7a94 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -85,6 +85,7 @@
 #include "kudu/util/faststring.h"
 #include "kudu/util/fault_injection.h"
 #include "kudu/util/flag_tags.h"
+#include "kudu/util/flag_validators.h"
 #include "kudu/util/locks.h"
 #include "kudu/util/logging.h"
 #include "kudu/util/memory/arena.h"
@@ -195,6 +196,33 @@ DEFINE_int32(rows_writed_per_sec_for_hot_tablets, 1000,
 TAG_FLAG(rows_writed_per_sec_for_hot_tablets, experimental);
 TAG_FLAG(rows_writed_per_sec_for_hot_tablets, runtime);
 
+DEFINE_double(rowset_compaction_ancient_delta_max_ratio, 0.2,
+              "The ratio of data in ancient UNDO deltas to the total amount "
+              "of data in all deltas across rowsets picked for rowset merge "
+              "compaction used as a threshold to determine whether to run "
+              "the operation when --rowset_compaction_ancient_delta_max_ratio "
+              "is set to 'true'. If the ratio is greater than the threshold "
+              "defined by this flag, CompactRowSetsOp operations are postponed "
+              "until UndoDeltaBlockGCOp purges enough of ancient UNDO deltas.");
+TAG_FLAG(rowset_compaction_ancient_delta_max_ratio, advanced);
+TAG_FLAG(rowset_compaction_ancient_delta_max_ratio, runtime);
+
+DEFINE_bool(rowset_compaction_ancient_delta_threshold_enabled, true,
+            "Whether to check the ratio of data in ancient UNDO deltas against "
+            "the threshold set by --rowset_compaction_ancient_delta_max_ratio "
+            "before running rowset merge compaction. If the ratio of ancient "
+            "data in UNDO deltas is greater than the threshold, postpone "
+            "running CompactRowSetsOp until UndoDeltaBlockGCOp purges ancient "
+            "data and the ratio drops below the threshold (NOTE: regardless of "
+            "the setting, the effective "
+            "value of this flag becomes 'false' if "
+            "--enable_undo_delta_block_gc is set to 'false')");
+TAG_FLAG(rowset_compaction_ancient_delta_threshold_enabled, advanced);
+TAG_FLAG(rowset_compaction_ancient_delta_threshold_enabled, runtime);
+
+DECLARE_bool(enable_undo_delta_block_gc);
+DECLARE_uint32(rowset_compaction_estimate_min_deltas_size_mb);
+
 METRIC_DEFINE_entity(tablet);
 METRIC_DEFINE_gauge_size(tablet, memrowset_size, "MemRowSet Memory Usage",
                          kudu::MetricUnit::kBytes,
@@ -241,6 +269,38 @@ using std::unordered_set;
 using std::vector;
 using strings::Substitute;
 
+
+namespace {
+
+bool ValidateAncientDeltaMaxRatio(const char* flag, double val) {
+  constexpr double kMinVal = 0.0;
+  constexpr double kMaxVal = 1.0;
+  if (val < kMinVal || val > kMaxVal) {
+    LOG(ERROR) << Substitute(
+        "$0: invalid value for --$1 flag, should be between $2 and $3",
+        val, flag, kMinVal, kMaxVal);
+    return false;
+  }
+  return true;
+}
+DEFINE_validator(rowset_compaction_ancient_delta_max_ratio,
+                 &ValidateAncientDeltaMaxRatio);
+
+bool ValidateRowsetCompactionGuard() {
+  if (FLAGS_rowset_compaction_ancient_delta_threshold_enabled &&
+      !FLAGS_enable_undo_delta_block_gc) {
+    LOG(WARNING) << Substitute(
+        "rowset compaction ancient ratio threshold is enabled "
+        "but UNDO delta block GC is disabled: check current settings of "
+        "--rowset_compaction_ancient_delta_threshold_enabled and "
+        "--enable_undo_delta_block_gc flags");
+  }
+  return true;
+}
+GROUP_FLAG_VALIDATOR(rowset_compaction, &ValidateRowsetCompactionGuard);
+
+} // anonymous namespace
+
 namespace kudu {
 
 class RowBlock;
@@ -248,10 +308,6 @@ struct IteratorStats;
 
 namespace tablet {
 
-static CompactionPolicy *CreateCompactionPolicy() {
-  return new BudgetedCompactionPolicy(FLAGS_tablet_compaction_budget_mb);
-}
-
 ////////////////////////////////////////////////////////////
 // TabletComponents
 ////////////////////////////////////////////////////////////
@@ -289,7 +345,6 @@ Tablet::Tablet(scoped_refptr<TabletMetadata> metadata,
       last_read_score_(0.0),
       last_write_score_(0.0) {
   CHECK(schema()->has_column_ids());
-  compaction_policy_.reset(CreateCompactionPolicy());
 
   if (metric_registry) {
     MetricEntity::AttributeMap attrs;
@@ -318,6 +373,9 @@ Tablet::Tablet(scoped_refptr<TabletMetadata> metadata,
         ->AutoDetach(&metric_detacher_);
   }
 
+  compaction_policy_.reset(new BudgetedCompactionPolicy(
+      FLAGS_tablet_compaction_budget_mb, metrics_.get()));
+
   if (FLAGS_tablet_throttler_rpc_per_sec > 0 || FLAGS_tablet_throttler_bytes_per_sec > 0) {
     throttler_.reset(new Throttler(MonoTime::Now(),
                                    FLAGS_tablet_throttler_rpc_per_sec,
@@ -1895,6 +1953,17 @@ Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompaction &input,
                                     "Phase 1 snapshot: $1",
                                     op_name, flush_snap.ToString());
 
+  // Save the stats on the total on-disk size of all deltas in selected rowsets.
+  size_t deltas_on_disk_size = 0;
+  if (mrs_being_flushed == TabletMetadata::kNoMrsFlushed) {
+    for (const auto& rs : input.rowsets()) {
+      DiskRowSetSpace drss;
+      DiskRowSet* drs = down_cast<DiskRowSet*>(rs.get());
+      drs->GetDiskRowSetSpaceUsage(&drss);
+      deltas_on_disk_size += drss.redo_deltas_size + drss.undo_deltas_size;
+    }
+  }
+
   if (common_hooks_) {
     RETURN_NOT_OK_PREPEND(common_hooks_->PostTakeMvccSnapshot(),
                           "PostTakeMvccSnapshot hook failed");
@@ -2043,6 +2112,9 @@ Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompaction &input,
                           "PostSwapInDuplicatingRowSet hook failed");
   }
 
+  // Store the stats on the max memory used for compaction phase 1.
+  const size_t peak_mem_usage_ph1 = merge->memory_footprint();
+
   // Phase 2. Here we re-scan the compaction input, copying those missed updates into the
   // new rowset's DeltaTracker.
   VLOG_WITH_PREFIX(1) << Substitute("$0: Phase 2: carrying over any updates "
@@ -2098,12 +2170,38 @@ Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompaction &input,
   AtomicSwapRowSets({ inprogress_rowset }, new_disk_rowsets);
   UpdateAverageRowsetHeight();
 
+  const size_t peak_mem_usage = std::max(peak_mem_usage_ph1,
+                                         merge->memory_footprint());
+  // For rowset merge compactions, update the stats on the max peak memory used
+  // and ratio of the amount of memory used to the size of all deltas on disk.
+  if (deltas_on_disk_size > 0) {
+    // Update the peak memory usage metric.
+    metrics_->compact_rs_mem_usage->Increment(peak_mem_usage);
+
+    // Update the ratio of the peak memory usage to the size of deltas on disk.
+    // To keep the stats relevant for larger rowsets, filter out rowsets with
+    // relatively small amount of data in deltas. Update the memory-to-disk size
+    // ratio metric only when the on-disk size of deltas crosses the configured
+    // threshold.
+    const int64_t min_deltas_size_bytes =
+        FLAGS_rowset_compaction_estimate_min_deltas_size_mb * 1024 * 1024;
+    if (deltas_on_disk_size > min_deltas_size_bytes) {
+      // Round up the ratio. Since the ratio is used to estimate the amount of
+      // memory needed to perform merge rowset compaction based on the amount of
+      // data stored in rowsets' deltas, it's safer to provide an upper rather
+      // than a lower bound estimate.
+      metrics_->compact_rs_mem_usage_to_deltas_size_ratio->Increment(
+          (peak_mem_usage + deltas_on_disk_size - 1) / deltas_on_disk_size);
+    }
+  }
+
   const auto rows_written = drsw.rows_written_count();
   const auto drs_written = drsw.drs_written_count();
   const auto bytes_written = drsw.written_size();
   TRACE_COUNTER_INCREMENT("rows_written", rows_written);
   TRACE_COUNTER_INCREMENT("drs_written", drs_written);
   TRACE_COUNTER_INCREMENT("bytes_written", bytes_written);
+  TRACE_COUNTER_INCREMENT("peak_mem_usage", peak_mem_usage);
   VLOG_WITH_PREFIX(1) << Substitute("$0 successful on $1 rows ($2 rowsets, $3 bytes)",
                                     op_name,
                                     rows_written,
@@ -2143,12 +2241,14 @@ void Tablet::UpdateAverageRowsetHeight() {
   scoped_refptr<TabletComponents> comps;
   GetComponents(&comps);
   std::lock_guard<std::mutex> l(compact_select_lock_);
-  double rowset_total_height, rowset_total_width;
+  double rowset_total_height;
+  double rowset_total_width;
   RowSetInfo::ComputeCdfAndCollectOrdered(*comps->rowsets,
+                                          /*is_on_memory_budget=*/nullopt,
                                           &rowset_total_height,
                                           &rowset_total_width,
-                                          nullptr,
-                                          nullptr);
+                                          /*info_by_min_key=*/nullptr,
+                                          /*info_by_max_key=*/nullptr);
   metrics_->average_diskrowset_height->set_value(rowset_total_height, rowset_total_width);
 }
 
@@ -2187,7 +2287,7 @@ void Tablet::UpdateCompactionStats(MaintenanceOpStats* stats) {
   }
 
   double quality = 0;
-  unordered_set<const RowSet*> picked_set_ignored;
+  unordered_set<const RowSet*> picked;
 
   shared_ptr<RowSetTree> rowsets_copy;
   {
@@ -2197,13 +2297,64 @@ void Tablet::UpdateCompactionStats(MaintenanceOpStats* stats) {
 
   {
     std::lock_guard<std::mutex> compact_lock(compact_select_lock_);
-    WARN_NOT_OK(compaction_policy_->PickRowSets(*rowsets_copy, &picked_set_ignored, &quality, NULL),
+    WARN_NOT_OK(compaction_policy_->PickRowSets(*rowsets_copy, &picked, &quality, nullptr),
                 Substitute("Couldn't determine compaction quality for $0", tablet_id()));
   }
 
-  VLOG_WITH_PREFIX(1) << "Best compaction for " << tablet_id() << ": " << quality;
+  // An estimate for the total amount of data stored in the UNDO deltas across
+  // all the rowsets selected for this rowset compaction.
+  uint64_t undos_total_size = 0;
+
+  // An estimate for the amount of data stored in ancient UNDO deltas across
+  // all the rowsets picked for this rowset compaction.
+  int64_t ancient_undos_total_size = 0;
+
+  for (const auto* rs : picked) {
+    const auto* drs = down_cast<const DiskRowSet*>(rs);
+    const auto& dt = drs->delta_tracker();
+
+    int64_t size = 0;
+    {
+      Timestamp ancient_history_mark;
+      if (Tablet::GetTabletAncientHistoryMark(&ancient_history_mark)) {
+        WARN_NOT_OK(dt.EstimateBytesInPotentiallyAncientUndoDeltas(
+            ancient_history_mark, &size),
+            "could not estimate size of ancient UNDO deltas");
+      }
+    }
+    ancient_undos_total_size += size;
+    undos_total_size += dt.UndoDeltaOnDiskSize();
+  }
+
+  // Whether there is too much of data accumulated in ancient UNDO deltas.
+  bool much_of_ancient_data = false;
+  if (FLAGS_rowset_compaction_ancient_delta_threshold_enabled) {
+    // Check if too much of the UNDO data in the selected rowsets is ancient.
+    // If so, wait while the UNDO delta GC maintenance task does its job, if
+    // the latter is enabled. Don't waste too much of IO, memory, and CPU cycles
+    // working with the data that will be discarded later on.
+    //
+    // TODO(aserbin): instead of this workaroud, update CompactRowSetsOp
+    //                maintenance operation to avoid reading in, working with,
+    //                and discarding of ancient deltas; right now it's done
+    //                only in the very end before persisting the result
+    const auto ancient_undos_threshold = static_cast<int64_t>(
+        FLAGS_rowset_compaction_ancient_delta_max_ratio *
+        static_cast<double>(undos_total_size));
+    if (ancient_undos_threshold < ancient_undos_total_size) {
+      much_of_ancient_data = true;
+      LOG_WITH_PREFIX(INFO) << Substitute(
+          "compaction isn't runnable because of too much data in "
+          "ancient UNDO deltas: $0 out of $1 total bytes",
+          ancient_undos_total_size, undos_total_size);
+    }
+    VLOG_WITH_PREFIX(2) << Substitute(
+        "UNDO deltas estimated size: $0 ancient; $1 total",
+        ancient_undos_total_size, undos_total_size);
+  }
+  VLOG_WITH_PREFIX(1) << Substitute("compaction quality: $0", quality);
 
-  stats->set_runnable(quality >= 0);
+  stats->set_runnable(!much_of_ancient_data && quality >= 0);
   stats->set_perf_improvement(quality);
 }
 
@@ -2955,9 +3106,12 @@ void Tablet::PrintRSLayout(ostream* o) {
     out << "</p>";
   }
 
-  double rowset_total_height, rowset_total_width;
-  vector<RowSetInfo> min, max;
+  double rowset_total_height;
+  double rowset_total_width;
+  vector<RowSetInfo> min;
+  vector<RowSetInfo> max;
   RowSetInfo::ComputeCdfAndCollectOrdered(*rowsets_copy,
+                                          /*is_on_memory_budget=*/nullopt,
                                           &rowset_total_height,
                                           &rowset_total_width,
                                           &min,
diff --git a/src/kudu/tablet/tablet_metrics.cc b/src/kudu/tablet/tablet_metrics.cc
index b9c1e9c89..ce94985af 100644
--- a/src/kudu/tablet/tablet_metrics.cc
+++ b/src/kudu/tablet/tablet_metrics.cc
@@ -354,6 +354,21 @@ METRIC_DEFINE_histogram(tablet, undo_delta_block_gc_perform_duration,
   kudu::MetricLevel::kInfo,
   60000LU, 1);
 
+METRIC_DEFINE_histogram(tablet, compact_rs_mem_usage,
+  "Peak Memory Usage for CompactRowSetsOp",
+  kudu::MetricUnit::kBytes,
+  "Peak memory usage of rowset merge compaction operations (CompactRowSetsOp)",
+  kudu::MetricLevel::kInfo,
+  60000LU, 1);
+
+METRIC_DEFINE_histogram(tablet, compact_rs_mem_usage_to_deltas_size_ratio,
+  "Peak Memory Usage to On-Disk Delta Size Ratio for CompactRowSetsOp",
+  kudu::MetricUnit::kUnits,
+  "Ratio of the peak memory usage to the estimated on-disk size of all deltas "
+  "for rowsets involved in rowset merge compaction (CompactRowSetsOp)",
+  kudu::MetricLevel::kInfo,
+  60000LU, 1);
+
 METRIC_DEFINE_histogram(tablet, deleted_rowset_gc_duration,
   "Deleted Rowset GC Duration",
   kudu::MetricUnit::kMilliseconds,
@@ -434,6 +449,8 @@ TabletMetrics::TabletMetrics(const scoped_refptr<MetricEntity>& entity)
     MINIT(undo_delta_block_gc_init_duration),
     MINIT(undo_delta_block_gc_delete_duration),
     MINIT(undo_delta_block_gc_perform_duration),
+    MINIT(compact_rs_mem_usage),
+    MINIT(compact_rs_mem_usage_to_deltas_size_ratio),
     MINIT(leader_memory_pressure_rejections),
     MEANINIT(average_diskrowset_height),
     HIDEINIT(merged_entities_count_of_tablet, 1) {
diff --git a/src/kudu/tablet/tablet_metrics.h b/src/kudu/tablet/tablet_metrics.h
index 1ef983605..b277d4284 100644
--- a/src/kudu/tablet/tablet_metrics.h
+++ b/src/kudu/tablet/tablet_metrics.h
@@ -106,6 +106,10 @@ struct TabletMetrics {
   scoped_refptr<Histogram> undo_delta_block_gc_delete_duration;
   scoped_refptr<Histogram> undo_delta_block_gc_perform_duration;
 
+  // Metrics specific to rowset merge compaction.
+  scoped_refptr<Histogram> compact_rs_mem_usage;
+  scoped_refptr<Histogram> compact_rs_mem_usage_to_deltas_size_ratio;
+
   scoped_refptr<Counter> leader_memory_pressure_rejections;
 
   // Compaction metrics.
diff --git a/src/kudu/tablet/tablet_mm_ops-test.cc b/src/kudu/tablet/tablet_mm_ops-test.cc
index 906ba96c9..b7bb778f7 100644
--- a/src/kudu/tablet/tablet_mm_ops-test.cc
+++ b/src/kudu/tablet/tablet_mm_ops-test.cc
@@ -19,6 +19,8 @@
 
 #include <memory>
 #include <optional>
+#include <string>
+#include <type_traits>
 #include <unordered_set>
 #include <vector>
 
@@ -31,7 +33,6 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/tablet/tablet-harness.h"
 #include "kudu/tablet/tablet-test-base.h"
-#include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_metrics.h"
 #include "kudu/util/hdr_histogram.h"
 #include "kudu/util/maintenance_manager.h"
@@ -143,8 +144,10 @@ TEST_F(KuduTabletMmOpsTest, TestCompactRowSetsOpCacheStats) {
   CompactRowSetsOp op(tablet().get());
   ASSERT_FALSE(op.DisableCompaction());
   NO_FATALS(TestFirstCall(&op));
-  NO_FATALS(TestAffectedMetrics(&op, { tablet()->metrics()->flush_mrs_duration,
-                                       tablet()->metrics()->compact_rs_duration }));
+  auto* m = tablet()->metrics();
+  NO_FATALS(TestAffectedMetrics(&op, { m->flush_mrs_duration,
+                                       m->compact_rs_duration,
+                                       m->undo_delta_block_gc_perform_duration }));
 }
 
 TEST_F(KuduTabletMmOpsTest, TestDisableCompactRowSetsOp) {
diff --git a/src/kudu/tablet/tablet_mm_ops.cc b/src/kudu/tablet/tablet_mm_ops.cc
index 445b4a5d9..7f33ef3c8 100644
--- a/src/kudu/tablet/tablet_mm_ops.cc
+++ b/src/kudu/tablet/tablet_mm_ops.cc
@@ -122,10 +122,11 @@ bool TabletOpBase::DisableCompaction() const {
 ////////////////////////////////////////////////////////////
 
 CompactRowSetsOp::CompactRowSetsOp(Tablet* tablet)
-  : TabletOpBase(Substitute("CompactRowSetsOp($0)", tablet->tablet_id()),
-                 MaintenanceOp::HIGH_IO_USAGE, tablet),
-    last_num_mrs_flushed_(0),
-    last_num_rs_compacted_(0) {
+    : TabletOpBase(Substitute("CompactRowSetsOp($0)", tablet->tablet_id()),
+                   MaintenanceOp::HIGH_IO_USAGE, tablet),
+      last_num_mrs_flushed_(0),
+      last_num_rs_compacted_(0),
+      last_num_undo_deltas_gced_(0) {
 }
 
 void CompactRowSetsOp::UpdateStats(MaintenanceOpStats* stats) {
@@ -142,21 +143,26 @@ void CompactRowSetsOp::UpdateStats(MaintenanceOpStats* stats) {
   double workload_score = FLAGS_enable_workload_score_for_perf_improvement_ops ?
                           tablet_->CollectAndUpdateWorkloadStats(MaintenanceOp::COMPACT_OP) : 0;
 
-  // Any operation that changes the on-disk row layout invalidates the
-  // cached stats.
+  // Any operation that changes the on-disk row layout invalidates the cached
+  // stats. Also, UNDO delta GC do the same since the runnable state of the
+  // CompactRowSetsOp depends on the fraction of ancient deltas in the rowsets.
   TabletMetrics* metrics = tablet_->metrics();
   if (metrics) {
     uint64_t new_num_mrs_flushed = metrics->flush_mrs_duration->TotalCount();
     uint64_t new_num_rs_compacted = metrics->compact_rs_duration->TotalCount();
+    uint64_t new_num_undo_deltas_gced =
+        metrics->undo_delta_block_gc_perform_duration->TotalCount();
     if (prev_stats_.valid() &&
         new_num_mrs_flushed == last_num_mrs_flushed_ &&
-        new_num_rs_compacted == last_num_rs_compacted_) {
+        new_num_rs_compacted == last_num_rs_compacted_ &&
+        new_num_undo_deltas_gced == last_num_undo_deltas_gced_) {
       prev_stats_.set_workload_score(workload_score);
       *stats = prev_stats_;
       return;
     }
     last_num_mrs_flushed_ = new_num_mrs_flushed;
     last_num_rs_compacted_ = new_num_rs_compacted;
+    last_num_undo_deltas_gced_ = new_num_undo_deltas_gced;
   }
 
   tablet_->UpdateCompactionStats(&prev_stats_);
diff --git a/src/kudu/tablet/tablet_mm_ops.h b/src/kudu/tablet/tablet_mm_ops.h
index 23e33998d..3c9829d2f 100644
--- a/src/kudu/tablet/tablet_mm_ops.h
+++ b/src/kudu/tablet/tablet_mm_ops.h
@@ -76,6 +76,7 @@ class CompactRowSetsOp : public TabletOpBase {
   MaintenanceOpStats prev_stats_;
   uint64_t last_num_mrs_flushed_;
   uint64_t last_num_rs_compacted_;
+  uint64_t last_num_undo_deltas_gced_;
 };
 
 // MaintenanceOp to run minor compaction on delta stores.