You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/03/21 05:16:51 UTC

[kudu] branch master updated (4672c75 -> 58f7a30)

This is an automated email from the ASF dual-hosted git repository.

awong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from 4672c75  [mini_kdc] MiniKdc::Kinit() atomically replaces ccache
     new 7dff767  test: add some buffer in timing metrics
     new 7059548  KUDU-1625: background op to GC ancient, fully deleted rowsets
     new f4508ff  tablet: cache delta stats when flushing a DMS
     new 58f7a30  tablet: plumb delta stats into delta compaction outputs

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../kudu/subprocess/echo/TestEchoSubprocess.java   |   7 +-
 .../integration-tests/tablet_history_gc-itest.cc   | 109 ++++++++++++--
 src/kudu/tablet/delta_compaction-test.cc           |   8 +-
 src/kudu/tablet/delta_compaction.cc                |  28 ++--
 src/kudu/tablet/delta_store.cc                     |   6 +-
 src/kudu/tablet/delta_store.h                      |  19 ++-
 src/kudu/tablet/delta_tracker.cc                   |  69 ++++++---
 src/kudu/tablet/delta_tracker.h                    |  28 ++--
 src/kudu/tablet/deltafile-test.cc                  |   8 +-
 src/kudu/tablet/deltafile.cc                       |  20 ++-
 src/kudu/tablet/deltafile.h                        |  43 ++++--
 src/kudu/tablet/deltamemstore-test.cc              |   4 +-
 src/kudu/tablet/deltamemstore.cc                   |  12 +-
 src/kudu/tablet/deltamemstore.h                    |  23 ++-
 src/kudu/tablet/diskrowset.cc                      |  24 +++-
 src/kudu/tablet/diskrowset.h                       |   3 +
 src/kudu/tablet/memrowset.h                        |   7 +
 src/kudu/tablet/mock-rowsets.h                     |  78 ++++++-----
 src/kudu/tablet/mt-tablet-test.cc                  |  62 +++++---
 src/kudu/tablet/rowset.h                           |  14 ++
 src/kudu/tablet/tablet-test-base.h                 |   8 ++
 src/kudu/tablet/tablet-test-util.h                 |   6 +-
 src/kudu/tablet/tablet.cc                          |  91 ++++++++++++
 src/kudu/tablet/tablet.h                           |  23 ++-
 src/kudu/tablet/tablet_bootstrap.cc                |  16 ++-
 src/kudu/tablet/tablet_history_gc-test.cc          | 156 +++++++++++++++++----
 src/kudu/tablet/tablet_metrics.cc                  |  29 ++++
 src/kudu/tablet/tablet_metrics.h                   |   4 +
 src/kudu/tablet/tablet_mm_ops.cc                   |  48 +++++++
 src/kudu/tablet/tablet_mm_ops.h                    |  38 ++++-
 src/kudu/tablet/tablet_replica-test.cc             |  75 ++++++++--
 31 files changed, 861 insertions(+), 205 deletions(-)


[kudu] 03/04: tablet: cache delta stats when flushing a DMS

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit f4508ff2ceea073197eb07b85c1c4bd28a9c7b3d
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Tue Mar 17 02:19:02 2020 -0700

    tablet: cache delta stats when flushing a DMS
    
    This allows us to GC ancient rowsets that have had their DMS flushed,
    which is likely to be the case, given the default ancient history
    period is 7 days.
    
    Change-Id: I26e74467ed180cebbd8da360763ba9498661e5f3
    Reviewed-on: http://gerrit.cloudera.org:8080/15460
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
 src/kudu/tablet/delta_store.h             | 19 +++++++++++-------
 src/kudu/tablet/delta_tracker.cc          | 14 ++++++-------
 src/kudu/tablet/deltafile-test.cc         |  2 +-
 src/kudu/tablet/deltafile.cc              | 15 +++++++++++---
 src/kudu/tablet/deltafile.h               | 33 ++++++++++++++++++++-----------
 src/kudu/tablet/deltamemstore.h           |  2 +-
 src/kudu/tablet/mt-tablet-test.cc         |  4 ++--
 src/kudu/tablet/tablet_history_gc-test.cc |  4 ----
 src/kudu/tablet/tablet_replica-test.cc    | 22 ++++++++++++++++++---
 9 files changed, 76 insertions(+), 39 deletions(-)

diff --git a/src/kudu/tablet/delta_store.h b/src/kudu/tablet/delta_store.h
index 069600e..7859e9f 100644
--- a/src/kudu/tablet/delta_store.h
+++ b/src/kudu/tablet/delta_store.h
@@ -14,8 +14,7 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
-#ifndef KUDU_TABLET_DELTA_STORE_H
-#define KUDU_TABLET_DELTA_STORE_H
+#pragma once
 
 #include <cstddef>
 #include <cstdint>
@@ -179,7 +178,7 @@ class DeltaStore {
   virtual Status Init(const fs::IOContext* io_context) = 0;
 
   // Whether this delta store was initialized or not.
-  virtual bool Initted() = 0;
+  virtual bool Initted() const = 0;
 
   // Create a DeltaIterator for the given projection.
   //
@@ -204,11 +203,18 @@ class DeltaStore {
 
   virtual std::string ToString() const = 0;
 
-  // TODO remove this once we don't need to have delta_stats for both DMS and DFR. Currently
-  // DeltaTracker#GetColumnsIdxWithUpdates() needs to filter out DMS from the redo list but it
-  // can't without RTTI.
+  // TODO(jdcryans): remove this once we don't need to have delta_stats for
+  // both DMS and DFR. Currently DeltaTracker#GetColumnsIdxWithUpdates() needs
+  // to filter out DMS from the redo list but it can't without RTTI.
   virtual const DeltaStats& delta_stats() const = 0;
 
+  // Returns whether callers can use 'delta_stats()', either because they've
+  // been read from disk, or because the store has been initialized with the
+  // stats cached.
+  virtual bool has_delta_stats() const  {
+    return Initted();
+  }
+
   virtual ~DeltaStore() {}
 };
 
@@ -558,4 +564,3 @@ Status WriteDeltaIteratorToFile(DeltaIterator* iter,
 } // namespace tablet
 } // namespace kudu
 
-#endif
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index 7b53fc8..2e8c2ea 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -122,6 +122,7 @@ Status DeltaTracker::OpenDeltaReaders(const vector<BlockId>& blocks,
     s = DeltaFileReader::OpenNoInit(std::move(block),
                                     type,
                                     std::move(options),
+                                    /*delta_stats*/nullptr,
                                     &dfr);
     if (!s.ok()) {
       LOG_WITH_PREFIX(ERROR) << "Failed to open " << DeltaType_Name(type)
@@ -193,7 +194,7 @@ Status DeltaTracker::MakeDeltaIteratorMergerUnlocked(const IOContext* io_context
     ignore_result(down_cast<DeltaFileReader*>(delta_store.get()));
     shared_ptr<DeltaFileReader> dfr = std::static_pointer_cast<DeltaFileReader>(delta_store);
 
-    if (dfr->Initted()) {
+    if (dfr->has_delta_stats()) {
       delete_count += dfr->delta_stats().delete_count();
       reinsert_count += dfr->delta_stats().reinsert_count();
       update_count += dfr->delta_stats().UpdateCount();
@@ -461,9 +462,7 @@ bool DeltaTracker::EstimateAllRedosAreAncient(Timestamp ancient_history_mark) {
   if (!redo_delta_stores_.empty()) {
     newest_redo = redo_delta_stores_.back();
   }
-  // TODO(awong): keep the delta stats cached after flushing a DMS so a flush
-  // doesn't invalidate this rowset.
-  return newest_redo && newest_redo->Initted() &&
+  return newest_redo && newest_redo->has_delta_stats() &&
       newest_redo->delta_stats().max_timestamp() < ancient_history_mark;
 }
 
@@ -477,7 +476,7 @@ Status DeltaTracker::EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp ancie
   int64_t tmp_bytes = 0;
   for (const auto& undo : boost::adaptors::reverse(undos_newest_first)) {
     // Short-circuit once we hit an initialized delta block with 'max_timestamp' > AHM.
-    if (undo->Initted() &&
+    if (undo->has_delta_stats() &&
         undo->delta_stats().max_timestamp() >= ancient_history_mark) {
       break;
     }
@@ -552,7 +551,7 @@ Status DeltaTracker::DeleteAncientUndoDeltas(Timestamp ancient_history_mark,
 
   // Traverse oldest-first.
   for (auto& undo : boost::adaptors::reverse(undos_newest_first)) {
-    if (!undo->Initted()) break; // Never initialize the deltas in this code path (it's slow).
+    if (!undo->has_delta_stats()) break;
     if (undo->delta_stats().max_timestamp() >= ancient_history_mark) break;
     tmp_blocks_deleted++;
     tmp_bytes_deleted += undo->EstimateSize();
@@ -762,6 +761,7 @@ Status DeltaTracker::FlushDMS(DeltaMemStore* dms,
   RETURN_NOT_OK(DeltaFileReader::OpenNoInit(std::move(readable_block),
                                             REDO,
                                             std::move(options),
+                                            std::move(stats),
                                             dfr));
   VLOG_WITH_PREFIX(1) << "Opened new delta block " << block_id.ToString() << " for read";
 
@@ -890,7 +890,7 @@ void DeltaTracker::GetColumnIdsWithUpdates(std::vector<ColumnId>* col_ids) const
   set<ColumnId> column_ids_with_updates;
   for (const shared_ptr<DeltaStore>& ds : redo_delta_stores_) {
     // We won't force open files just to read their stats.
-    if (!ds->Initted()) {
+    if (!ds->has_delta_stats()) {
       continue;
     }
 
diff --git a/src/kudu/tablet/deltafile-test.cc b/src/kudu/tablet/deltafile-test.cc
index 207b46c..c88396f 100644
--- a/src/kudu/tablet/deltafile-test.cc
+++ b/src/kudu/tablet/deltafile-test.cc
@@ -374,7 +374,7 @@ TEST_F(TestDeltaFile, TestLazyInit) {
   // Lazily opening the delta file should not trigger any reads.
   shared_ptr<DeltaFileReader> reader;
   ASSERT_OK(DeltaFileReader::OpenNoInit(
-      std::move(count_block), REDO, ReaderOptions(), &reader));
+      std::move(count_block), REDO, ReaderOptions(), /*delta_stats*/nullptr, &reader));
   ASSERT_EQ(0, bytes_read);
 
   // But initializing it should (only the first time).
diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc
index 227429a..5e83342 100644
--- a/src/kudu/tablet/deltafile.cc
+++ b/src/kudu/tablet/deltafile.cc
@@ -40,6 +40,7 @@
 #include "kudu/fs/block_id.h"
 #include "kudu/fs/block_manager.h"
 #include "kudu/fs/fs_manager.h"
+#include "kudu/gutil/port.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/delta_relevancy.h"
@@ -217,6 +218,7 @@ Status DeltaFileReader::Open(unique_ptr<ReadableBlock> block,
   RETURN_NOT_OK(DeltaFileReader::OpenNoInit(std::move(block),
                                             delta_type,
                                             std::move(options),
+                                            /*delta_stats*/nullptr,
                                             &df_reader));
   RETURN_NOT_OK(df_reader->Init(io_context));
 
@@ -227,6 +229,7 @@ Status DeltaFileReader::Open(unique_ptr<ReadableBlock> block,
 Status DeltaFileReader::OpenNoInit(unique_ptr<ReadableBlock> block,
                                    DeltaType delta_type,
                                    ReaderOptions options,
+                                   unique_ptr<DeltaStats> delta_stats,
                                    shared_ptr<DeltaFileReader>* reader_out) {
   unique_ptr<CFileReader> cf_reader;
   const IOContext* io_context = options.io_context;
@@ -234,7 +237,7 @@ Status DeltaFileReader::OpenNoInit(unique_ptr<ReadableBlock> block,
                                         std::move(options),
                                         &cf_reader));
   unique_ptr<DeltaFileReader> df_reader(
-      new DeltaFileReader(std::move(cf_reader), delta_type));
+      new DeltaFileReader(std::move(cf_reader), std::move(delta_stats), delta_type));
   if (!FLAGS_cfile_lazy_open) {
     RETURN_NOT_OK(df_reader->Init(io_context));
   }
@@ -245,8 +248,10 @@ Status DeltaFileReader::OpenNoInit(unique_ptr<ReadableBlock> block,
 }
 
 DeltaFileReader::DeltaFileReader(unique_ptr<CFileReader> cf_reader,
+                                 unique_ptr<DeltaStats> delta_stats,
                                  DeltaType delta_type)
     : reader_(cf_reader.release()),
+      delta_stats_(std::move(delta_stats)),
       delta_type_(delta_type) {}
 
 Status DeltaFileReader::Init(const IOContext* io_context) {
@@ -264,7 +269,9 @@ Status DeltaFileReader::InitOnce(const IOContext* io_context) {
   }
 
   // Initialize delta file stats
-  RETURN_NOT_OK(ReadDeltaStats());
+  if (!has_delta_stats()) {
+    RETURN_NOT_OK(ReadDeltaStats());
+  }
   return Status::OK();
 }
 
@@ -280,6 +287,7 @@ Status DeltaFileReader::ReadDeltaStats() {
   }
   unique_ptr<DeltaStats> stats(new DeltaStats());
   RETURN_NOT_OK(stats->InitFromPB(deltastats_pb));
+  std::lock_guard<simple_spinlock> l(stats_lock_);
   delta_stats_ = std::move(stats);
   return Status::OK();
 }
@@ -317,7 +325,8 @@ Status DeltaFileReader::CloneForDebugging(FsManager* fs_manager,
   RETURN_NOT_OK(fs_manager->OpenBlock(reader_->block_id(), &block));
   ReaderOptions options;
   options.parent_mem_tracker = parent_mem_tracker;
-  return DeltaFileReader::OpenNoInit(std::move(block), delta_type_, options, out);
+  return DeltaFileReader::OpenNoInit(std::move(block), delta_type_, options,
+                                     /*delta_stats*/nullptr, out);
 }
 
 Status DeltaFileReader::NewDeltaIterator(const RowIteratorOptions& opts,
diff --git a/src/kudu/tablet/deltafile.h b/src/kudu/tablet/deltafile.h
index 68222be..1850d1a 100644
--- a/src/kudu/tablet/deltafile.h
+++ b/src/kudu/tablet/deltafile.h
@@ -20,6 +20,7 @@
 #include <cstdint>
 #include <deque>
 #include <memory>
+#include <mutex>
 #include <string>
 #include <vector>
 
@@ -32,12 +33,12 @@
 #include "kudu/cfile/cfile_writer.h"
 #include "kudu/common/rowid.h"
 #include "kudu/gutil/macros.h"
-#include "kudu/gutil/port.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/delta_key.h"
 #include "kudu/tablet/delta_stats.h"
 #include "kudu/tablet/delta_store.h"
 #include "kudu/util/faststring.h"
+#include "kudu/util/locks.h"
 #include "kudu/util/once.h"
 #include "kudu/util/slice.h"
 #include "kudu/util/status.h"
@@ -147,33 +148,40 @@ class DeltaFileReader : public DeltaStore,
   static Status OpenNoInit(std::unique_ptr<fs::ReadableBlock> block,
                            DeltaType delta_type,
                            cfile::ReaderOptions options,
+                           std::unique_ptr<DeltaStats> delta_stats,
                            std::shared_ptr<DeltaFileReader>* reader_out);
 
-  virtual Status Init(const fs::IOContext* io_context) OVERRIDE;
+  Status Init(const fs::IOContext* io_context) override;
 
-  virtual bool Initted() OVERRIDE {
+  bool Initted() const override {
     return init_once_.init_succeeded();
   }
 
   // See DeltaStore::NewDeltaIterator(...)
   Status NewDeltaIterator(const RowIteratorOptions& opts,
-                          std::unique_ptr<DeltaIterator>* iterator) const OVERRIDE;
+                          std::unique_ptr<DeltaIterator>* iterator) const override;
 
   // See DeltaStore::CheckRowDeleted
-  virtual Status CheckRowDeleted(rowid_t row_idx,
-                                 const fs::IOContext* io_context,
-                                 bool *deleted) const OVERRIDE;
+  Status CheckRowDeleted(rowid_t row_idx,
+                         const fs::IOContext* io_context,
+                         bool *deleted) const override;
 
-  virtual uint64_t EstimateSize() const OVERRIDE;
+  uint64_t EstimateSize() const override;
 
   const BlockId& block_id() const { return reader_->block_id(); }
 
-  virtual const DeltaStats& delta_stats() const OVERRIDE {
-    DCHECK(init_once_.init_succeeded());
+  const DeltaStats& delta_stats() const override {
+    std::lock_guard<simple_spinlock> l(stats_lock_);
+    DCHECK(delta_stats_);
     return *delta_stats_;
   }
 
-  virtual std::string ToString() const OVERRIDE {
+  bool has_delta_stats() const override {
+    std::lock_guard<simple_spinlock> l(stats_lock_);
+    return delta_stats_ != nullptr;
+  }
+
+  std::string ToString() const override {
     if (!init_once_.init_succeeded()) return reader_->ToString();
     return strings::Substitute("$0 ($1)", reader_->ToString(), delta_stats_->ToString());
   }
@@ -201,6 +209,7 @@ class DeltaFileReader : public DeltaStore,
   }
 
   DeltaFileReader(std::unique_ptr<cfile::CFileReader> cf_reader,
+                  std::unique_ptr<DeltaStats> delta_stats,
                   DeltaType delta_type);
 
   // Callback used in 'init_once_' to initialize this delta file.
@@ -209,6 +218,8 @@ class DeltaFileReader : public DeltaStore,
   Status ReadDeltaStats();
 
   std::shared_ptr<cfile::CFileReader> reader_;
+
+  mutable simple_spinlock stats_lock_;
   std::unique_ptr<DeltaStats> delta_stats_;
 
   // The type of this delta, i.e. UNDO or REDO.
diff --git a/src/kudu/tablet/deltamemstore.h b/src/kudu/tablet/deltamemstore.h
index 4a42551..7603ebc 100644
--- a/src/kudu/tablet/deltamemstore.h
+++ b/src/kudu/tablet/deltamemstore.h
@@ -81,7 +81,7 @@ class DeltaMemStore : public DeltaStore,
 
   virtual Status Init(const fs::IOContext* io_context) OVERRIDE;
 
-  virtual bool Initted() OVERRIDE {
+  virtual bool Initted() const OVERRIDE {
     return true;
   }
 
diff --git a/src/kudu/tablet/mt-tablet-test.cc b/src/kudu/tablet/mt-tablet-test.cc
index f40b579..8cf354b 100644
--- a/src/kudu/tablet/mt-tablet-test.cc
+++ b/src/kudu/tablet/mt-tablet-test.cc
@@ -323,11 +323,11 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
     }
   }
 
-  void MinorCompactDeltasThread(int tid) {
+  void MinorCompactDeltasThread(int /*tid*/) {
     CompactDeltas(RowSet::MINOR_DELTA_COMPACTION);
   }
 
-  void MajorCompactDeltasThread(int tid) {
+  void MajorCompactDeltasThread(int /*tid*/) {
     CompactDeltas(RowSet::MAJOR_DELTA_COMPACTION);
   }
 
diff --git a/src/kudu/tablet/tablet_history_gc-test.cc b/src/kudu/tablet/tablet_history_gc-test.cc
index b5fa40b..a7e4ea0 100644
--- a/src/kudu/tablet/tablet_history_gc-test.cc
+++ b/src/kudu/tablet/tablet_history_gc-test.cc
@@ -668,10 +668,6 @@ TEST_F(TabletHistoryGcNoMaintMgrTest, TestGCDeletedRowsetsWithRedoFiles) {
   NO_FATALS(DeleteOriginalRows(kNumRowsets, rows_per_rowset_, /*flush_dms*/true));
   ASSERT_EQ(kNumRowsets, tablet()->CountRedoDeltasForTests());
 
-  // TODO(awong): keep the delta stats cached after flushing a DMS so we don't
-  // have to scan to read stats.
-  NO_FATALS(VerifyTestRows(0, 0));
-
   // We shouldn't have any ancient rowsets since we haven't passed the AHM.
   int64_t bytes = 0;
   ASSERT_OK(tablet()->GetBytesInAncientDeletedRowsets(&bytes));
diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc
index c4b88d3..baad47c 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -810,7 +810,8 @@ TEST_F(TabletReplicaTest, TestRestartAfterGCDeletedRowsets) {
   auto live_row_count = METRIC_live_row_count.InstantiateFunctionGauge(
       tablet->GetMetricEntity(), [] () { return 0; });
 
-  // Insert some rows and flush so we get a DRS.
+  // Insert some rows and flush so we get a DRS, and then delete them so we
+  // have an ancient, fully deleted DRS.
   ASSERT_OK(ExecuteInsertsAndRollLogs(kNumRows));
   ASSERT_OK(tablet->Flush());
   ASSERT_OK(ExecuteDeletesAndRollLogs(kNumRows));
@@ -828,14 +829,29 @@ TEST_F(TabletReplicaTest, TestRestartAfterGCDeletedRowsets) {
   ASSERT_OK(tablet->DeleteAncientDeletedRowsets());
   ASSERT_EQ(1, tablet->num_rowsets());
   ASSERT_EQ(kNumRows, live_row_count->value());
+  ASSERT_OK(ExecuteDeletesAndRollLogs(kNumRows));
+  ASSERT_EQ(0, live_row_count->value());
 
-  // Restart and ensure we can get online okay.
+  // Restart and ensure we can rebuild our DMS okay.
   NO_FATALS(RestartReplica());
   tablet = tablet_replica_->tablet();
   ASSERT_EQ(1, tablet->num_rowsets());
   live_row_count = METRIC_live_row_count.InstantiateFunctionGauge(
       tablet->GetMetricEntity(), [] () { return 0; });
-  ASSERT_EQ(kNumRows, live_row_count->value());
+  ASSERT_EQ(0, live_row_count->value());
+
+  // Now do that again but with deltafiles.
+  ASSERT_OK(tablet->FlushBiggestDMS());
+  NO_FATALS(RestartReplica());
+  tablet = tablet_replica_->tablet();
+  ASSERT_EQ(1, tablet->num_rowsets());
+
+  // Wait for our deleted rowset to become ancient. Since we just started up,
+  // we shouldn't have read any delta stats, so running the GC won't pick up
+  // our deleted DRS.
+  SleepFor(MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec));
+  ASSERT_OK(tablet->DeleteAncientDeletedRowsets());
+  ASSERT_EQ(1, tablet->num_rowsets());
 }
 
 } // namespace tablet


[kudu] 04/04: tablet: plumb delta stats into delta compaction outputs

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 58f7a3015c967e7590e91d729cea5327d253d74d
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Thu Mar 19 02:30:08 2020 -0700

    tablet: plumb delta stats into delta compaction outputs
    
    This plumbs delta stats from a delta compaction into the delta tracker's
    call to open the output delta readers. This means that in-memory delta
    stats aren't "lost" when doing a delta compaction, and that we can GC
    ancient deleted rowsets after a delta compaction without reading the
    stats from disk.
    
    I considered also plumbing stats into merge compaction outputs, but
    opted not to. The plumbing for compactions seems more significant, and
    chances are, large bulk deletes will come after data has had time to be
    compacted (e.g. deleting old ranges of keyspace).
    
    Change-Id: Iea2f28fb2905ddcc007c88ab80ae2185587400f0
    Reviewed-on: http://gerrit.cloudera.org:8080/15506
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 .../integration-tests/tablet_history_gc-itest.cc   |  1 -
 src/kudu/tablet/delta_compaction-test.cc           |  8 +--
 src/kudu/tablet/delta_compaction.cc                | 28 +++++----
 src/kudu/tablet/delta_store.cc                     |  6 +-
 src/kudu/tablet/delta_tracker.cc                   | 42 +++++++++----
 src/kudu/tablet/delta_tracker.h                    | 22 ++++---
 src/kudu/tablet/deltafile-test.cc                  |  6 +-
 src/kudu/tablet/deltafile.cc                       |  5 +-
 src/kudu/tablet/deltafile.h                        | 10 ++-
 src/kudu/tablet/deltamemstore-test.cc              |  4 +-
 src/kudu/tablet/deltamemstore.cc                   |  7 +--
 src/kudu/tablet/deltamemstore.h                    |  4 +-
 src/kudu/tablet/diskrowset.cc                      |  8 +--
 src/kudu/tablet/tablet-test-util.h                 |  6 +-
 src/kudu/tablet/tablet.cc                          |  2 +
 src/kudu/tablet/tablet_history_gc-test.cc          | 73 ++++++++++++++++------
 16 files changed, 147 insertions(+), 85 deletions(-)

diff --git a/src/kudu/integration-tests/tablet_history_gc-itest.cc b/src/kudu/integration-tests/tablet_history_gc-itest.cc
index 02928cc..205d793 100644
--- a/src/kudu/integration-tests/tablet_history_gc-itest.cc
+++ b/src/kudu/integration-tests/tablet_history_gc-itest.cc
@@ -284,7 +284,6 @@ TEST_F(TabletHistoryGcITest, TestUndoDeltaBlockGc) {
 TEST_F(TabletHistoryGcITest, TestDeletedRowsetGc) {
   // Disable merge compactions, since they may also cull deleted rowsets.
   FLAGS_enable_rowset_compaction = false;
-  FLAGS_enable_flush_deltamemstores = false; // Don't flush DMS so we don't have to init deltafiles.
   FLAGS_flush_threshold_secs = 0; // Flush as aggressively as possible.
   FLAGS_maintenance_manager_num_threads = 4; // Encourage concurrency.
   FLAGS_maintenance_manager_polling_interval_ms = 1; // Spin on MM for a quick test.
diff --git a/src/kudu/tablet/delta_compaction-test.cc b/src/kudu/tablet/delta_compaction-test.cc
index a602b11..77f9f20 100644
--- a/src/kudu/tablet/delta_compaction-test.cc
+++ b/src/kudu/tablet/delta_compaction-test.cc
@@ -143,7 +143,7 @@ TEST_F(TestDeltaCompaction, TestMergeMultipleSchemas) {
     // and update number (see update_value assignment).
     size_t kNumUpdates = 10;
     size_t kNumMultipleUpdates = kNumUpdates / 2;
-    DeltaStats stats;
+    unique_ptr<DeltaStats> stats(new DeltaStats);
     for (size_t i = 0; i < kNumUpdates; ++i) {
       buf.clear();
       RowChangeListEncoder update(&buf);
@@ -151,7 +151,7 @@ TEST_F(TestDeltaCompaction, TestMergeMultipleSchemas) {
         ColumnId col_id = schema.column_id(col_idx);
         DCHECK_GE(col_id, 0);
 
-        stats.IncrUpdateCount(col_id, 1);
+        stats->IncrUpdateCount(col_id, 1);
         const ColumnSchema& col_schema = schema.column(col_idx);
         int update_value = deltafile_idx * 100 + i;
         switch (col_schema.type_info()->physical_type()) {
@@ -180,12 +180,12 @@ TEST_F(TestDeltaCompaction, TestMergeMultipleSchemas) {
       DeltaKey key((i < kNumMultipleUpdates) ? i : row_id, Timestamp(curr_timestamp));
       RowChangeList row_changes = update.as_changelist();
       ASSERT_OK(dfw->AppendDelta<REDO>(key, row_changes));
-      ASSERT_OK(stats.UpdateStats(key.timestamp(), row_changes));
+      ASSERT_OK(stats->UpdateStats(key.timestamp(), row_changes));
       curr_timestamp++;
       row_id++;
     }
 
-    dfw->WriteDeltaStats(stats);
+    dfw->WriteDeltaStats(std::move(stats));
     ASSERT_OK(dfw->Finish());
     shared_ptr<DeltaFileReader> dfr;
     ASSERT_OK(GetDeltaFileReader(block_id, &dfr));
diff --git a/src/kudu/tablet/delta_compaction.cc b/src/kudu/tablet/delta_compaction.cc
index 21eefdd..1e9dda0 100644
--- a/src/kudu/tablet/delta_compaction.cc
+++ b/src/kudu/tablet/delta_compaction.cc
@@ -132,8 +132,8 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const IOContext* io_context) {
   RowBlock block(&partial_schema_, kRowsPerBlock, &arena);
 
   DVLOG(1) << "Applying deltas and rewriting columns (" << partial_schema_.ToString() << ")";
-  DeltaStats redo_stats;
-  DeltaStats undo_stats;
+  unique_ptr<DeltaStats> redo_stats(new DeltaStats);
+  unique_ptr<DeltaStats> undo_stats(new DeltaStats);
   size_t nrows = 0;
   // We know that we're reading everything from disk so we're including all transactions.
   MvccSnapshot snap = MvccSnapshot::CreateSnapshotIncludingAllTransactions();
@@ -199,7 +199,7 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const IOContext* io_context) {
       for (const Mutation *mut = new_undos_head; mut != nullptr; mut = mut->next()) {
         DeltaKey undo_key(nrows + dst_row.row_index(), mut->timestamp());
         RETURN_NOT_OK(new_undo_delta_writer_->AppendDelta<UNDO>(undo_key, mut->changelist()));
-        undo_stats.UpdateStats(mut->timestamp(), mut->changelist());
+        undo_stats->UpdateStats(mut->timestamp(), mut->changelist());
         undo_delta_mutations_written_++;
       }
     }
@@ -227,7 +227,7 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const IOContext* io_context) {
                << key_and_update.Stringify(DeltaType::REDO, base_schema_);
       RETURN_NOT_OK_PREPEND(new_redo_delta_writer_->AppendDelta<REDO>(key_and_update.key, update),
                             "Failed to append a delta");
-      WARN_NOT_OK(redo_stats.UpdateStats(key_and_update.key.timestamp(), update),
+      WARN_NOT_OK(redo_stats->UpdateStats(key_and_update.key.timestamp(), update),
                   "Failed to update stats");
     }
     redo_delta_mutations_written_ += out.size();
@@ -239,12 +239,12 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const IOContext* io_context) {
   RETURN_NOT_OK(base_data_writer_->FinishAndReleaseBlocks(transaction.get()));
 
   if (redo_delta_mutations_written_ > 0) {
-    new_redo_delta_writer_->WriteDeltaStats(redo_stats);
+    new_redo_delta_writer_->WriteDeltaStats(std::move(redo_stats));
     RETURN_NOT_OK(new_redo_delta_writer_->FinishAndReleaseBlock(transaction.get()));
   }
 
   if (undo_delta_mutations_written_ > 0) {
-    new_undo_delta_writer_->WriteDeltaStats(undo_stats);
+    new_undo_delta_writer_->WriteDeltaStats(std::move(undo_stats));
     RETURN_NOT_OK(new_undo_delta_writer_->FinishAndReleaseBlock(transaction.get()));
   }
   transaction->CommitCreatedBlocks();
@@ -417,19 +417,23 @@ Status MajorDeltaCompaction::UpdateDeltaTracker(DeltaTracker* tracker,
   // diskrowset operations.
 
   // Create blocks for the new redo deltas.
-  vector<BlockId> new_redo_blocks;
+  vector<DeltaBlockIdAndStats> new_redo_blocks;
   if (redo_delta_mutations_written_ > 0) {
-    new_redo_blocks.push_back(new_redo_delta_block_);
+    new_redo_blocks.emplace_back(std::make_pair(new_redo_delta_block_,
+        new_redo_delta_writer_->release_delta_stats()));
   }
   SharedDeltaStoreVector new_redo_stores;
-  RETURN_NOT_OK(tracker->OpenDeltaReaders(new_redo_blocks, io_context, &new_redo_stores, REDO));
+  RETURN_NOT_OK(tracker->OpenDeltaReaders(std::move(new_redo_blocks), io_context,
+                                          &new_redo_stores, REDO));
 
   // Create blocks for the new undo deltas.
   SharedDeltaStoreVector new_undo_stores;
   if (undo_delta_mutations_written_ > 0) {
-    vector<BlockId> new_undo_blocks;
-    new_undo_blocks.push_back(new_undo_delta_block_);
-    RETURN_NOT_OK(tracker->OpenDeltaReaders(new_undo_blocks, io_context, &new_undo_stores, UNDO));
+    vector<DeltaBlockIdAndStats> new_undo_blocks;
+    new_undo_blocks.emplace_back(std::make_pair(new_undo_delta_block_,
+        new_undo_delta_writer_->release_delta_stats()));
+    RETURN_NOT_OK(tracker->OpenDeltaReaders(std::move(new_undo_blocks), io_context,
+                                            &new_undo_stores, UNDO));
   }
 
   // 2. Only now that we cannot fail do we update the in-memory state.
diff --git a/src/kudu/tablet/delta_store.cc b/src/kudu/tablet/delta_store.cc
index 4ddfc9c..5938a34 100644
--- a/src/kudu/tablet/delta_store.cc
+++ b/src/kudu/tablet/delta_store.cc
@@ -623,7 +623,7 @@ Status WriteDeltaIteratorToFile(DeltaIterator* iter,
   RETURN_NOT_OK(iter->SeekToOrdinal(0));
 
   const size_t kRowsPerBlock = 100;
-  DeltaStats stats;
+  std::unique_ptr<DeltaStats> stats(new DeltaStats);
   Arena arena(32 * 1024);
   for (size_t i = 0; iter->HasNext(); ) {
     size_t n;
@@ -646,12 +646,12 @@ Status WriteDeltaIteratorToFile(DeltaIterator* iter,
     for (const DeltaKeyAndUpdate& cell : cells) {
       RowChangeList rcl(cell.cell);
       RETURN_NOT_OK(out->AppendDelta<Type>(cell.key, rcl));
-      RETURN_NOT_OK(stats.UpdateStats(cell.key.timestamp(), rcl));
+      RETURN_NOT_OK(stats->UpdateStats(cell.key.timestamp(), rcl));
     }
 
     i += n;
   }
-  out->WriteDeltaStats(stats);
+  out->WriteDeltaStats(std::move(stats));
   return Status::OK();
 }
 
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index 2e8c2ea..8b975a5 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -100,12 +100,14 @@ DeltaTracker::DeltaTracker(shared_ptr<RowSetMetadata> rowset_metadata,
       dms_exists_(false),
       deleted_row_count_(0) {}
 
-Status DeltaTracker::OpenDeltaReaders(const vector<BlockId>& blocks,
+Status DeltaTracker::OpenDeltaReaders(vector<DeltaBlockIdAndStats> blocks,
                                       const IOContext* io_context,
                                       vector<shared_ptr<DeltaStore> >* stores,
                                       DeltaType type) {
   FsManager* fs = rowset_metadata_->fs_manager();
-  for (const BlockId& block_id : blocks) {
+  for (auto& block_and_stats : blocks) {
+    const auto& block_id = block_and_stats.first;
+    unique_ptr<DeltaStats> stats = std::move(block_and_stats.second);
     unique_ptr<ReadableBlock> block;
     Status s = fs->OpenBlock(block_id, &block);
     if (!s.ok()) {
@@ -122,7 +124,7 @@ Status DeltaTracker::OpenDeltaReaders(const vector<BlockId>& blocks,
     s = DeltaFileReader::OpenNoInit(std::move(block),
                                     type,
                                     std::move(options),
-                                    /*delta_stats*/nullptr,
+                                    std::move(stats),
                                     &dfr);
     if (!s.ok()) {
       LOG_WITH_PREFIX(ERROR) << "Failed to open " << DeltaType_Name(type)
@@ -145,11 +147,19 @@ Status DeltaTracker::DoOpen(const IOContext* io_context) {
   CHECK(undo_delta_stores_.empty()) << "should call before opening any readers";
   CHECK(!open_);
 
-  RETURN_NOT_OK(OpenDeltaReaders(rowset_metadata_->redo_delta_blocks(),
+  vector<DeltaBlockIdAndStats> redos;
+  for (auto block_id : rowset_metadata_->redo_delta_blocks()) {
+    redos.emplace_back(std::make_pair(block_id, nullptr));
+  }
+  RETURN_NOT_OK(OpenDeltaReaders(std::move(redos),
                                  io_context,
                                  &redo_delta_stores_,
                                  REDO));
-  RETURN_NOT_OK(OpenDeltaReaders(rowset_metadata_->undo_delta_blocks(),
+  vector<DeltaBlockIdAndStats> undos;
+  for (auto block_id : rowset_metadata_->undo_delta_blocks()) {
+    undos.emplace_back(std::make_pair(block_id, nullptr));
+  }
+  RETURN_NOT_OK(OpenDeltaReaders(std::move(undos),
                                  io_context,
                                  &undo_delta_stores_,
                                  UNDO));
@@ -353,7 +363,7 @@ Status DeltaTracker::Compact(const IOContext* io_context) {
 
 Status DeltaTracker::CommitDeltaStoreMetadataUpdate(const RowSetMetadataUpdate& update,
                                                     const SharedDeltaStoreVector& to_remove,
-                                                    const vector<BlockId>& new_delta_blocks,
+                                                    vector<DeltaBlockIdAndStats> new_delta_blocks,
                                                     const IOContext* io_context,
                                                     DeltaType type,
                                                     MetadataFlushType flush_type) {
@@ -365,7 +375,7 @@ Status DeltaTracker::CommitDeltaStoreMetadataUpdate(const RowSetMetadataUpdate&
   DCHECK(!to_remove.empty());
 
   SharedDeltaStoreVector new_stores;
-  RETURN_NOT_OK_PREPEND(OpenDeltaReaders(new_delta_blocks, io_context,
+  RETURN_NOT_OK_PREPEND(OpenDeltaReaders(std::move(new_delta_blocks), io_context,
                                          &new_stores, type),
                         "Unable to open delta blocks");
 
@@ -428,12 +438,13 @@ Status DeltaTracker::CompactStores(const IOContext* io_context, int start_idx, i
   // Merge and compact the stores.
   vector<shared_ptr<DeltaStore> > compacted_stores;
   vector<BlockId> compacted_blocks;
-  RETURN_NOT_OK(DoCompactStores(io_context, start_idx, end_idx, std::move(block),
+  unique_ptr<DeltaStats> stats;
+  RETURN_NOT_OK(DoCompactStores(io_context, start_idx, end_idx, std::move(block), &stats,
                                 &compacted_stores, &compacted_blocks));
 
-  vector<BlockId> new_blocks = { new_block_id };
+  vector<BlockId> new_block = { new_block_id };
   RowSetMetadataUpdate update;
-  update.ReplaceRedoDeltaBlocks(compacted_blocks, new_blocks);
+  update.ReplaceRedoDeltaBlocks(compacted_blocks, new_block);
 
   const auto num_blocks_compacted = compacted_blocks.size();
   TRACE_COUNTER_INCREMENT("delta_blocks_compacted", num_blocks_compacted);
@@ -442,7 +453,10 @@ Status DeltaTracker::CompactStores(const IOContext* io_context, int start_idx, i
                                     num_blocks_compacted,
                                     BlockId::JoinStrings(compacted_blocks),
                                     new_block_id.ToString());
-  RETURN_NOT_OK_PREPEND(CommitDeltaStoreMetadataUpdate(update, compacted_stores, new_blocks,
+  vector<DeltaBlockIdAndStats> new_block_and_stats;
+  new_block_and_stats.emplace_back(std::make_pair(new_block_id, std::move(stats)));
+  RETURN_NOT_OK_PREPEND(CommitDeltaStoreMetadataUpdate(update, compacted_stores,
+                                                       std::move(new_block_and_stats),
                                                        io_context, REDO, FLUSH_METADATA),
                         "DeltaTracker: CompactStores: Unable to commit delta update");
   return Status::OK();
@@ -580,6 +594,7 @@ Status DeltaTracker::DeleteAncientUndoDeltas(Timestamp ancient_history_mark,
 Status DeltaTracker::DoCompactStores(const IOContext* io_context,
                                      size_t start_idx, size_t end_idx,
                                      unique_ptr<WritableBlock> block,
+                                     unique_ptr<DeltaStats>* output_stats,
                                      SharedDeltaStoreVector* compacted_stores,
                                      vector<BlockId> *compacted_blocks) {
   unique_ptr<DeltaIterator> inputs_merge;
@@ -598,6 +613,7 @@ Status DeltaTracker::DoCompactStores(const IOContext* io_context,
                                                ITERATE_OVER_ALL_ROWS,
                                                &dfw));
   RETURN_NOT_OK(dfw.Finish());
+  *output_stats = dfw.release_delta_stats();
   return Status::OK();
 }
 
@@ -738,9 +754,9 @@ Status DeltaTracker::FlushDMS(DeltaMemStore* dms,
                         Substitute("Unable to start writing to delta block $0",
                                    block_id.ToString()));
 
-  unique_ptr<DeltaStats> stats;
-  RETURN_NOT_OK(dms->FlushToFile(&dfw, &stats));
+  RETURN_NOT_OK(dms->FlushToFile(&dfw));
   RETURN_NOT_OK(dfw.Finish());
+  unique_ptr<DeltaStats> stats = dfw.release_delta_stats();
   const auto bytes_written = dfw.written_size();
   TRACE_COUNTER_INCREMENT("bytes_written", bytes_written);
   TRACE_COUNTER_INCREMENT("delete_count", stats->delete_count());
diff --git a/src/kudu/tablet/delta_tracker.h b/src/kudu/tablet/delta_tracker.h
index b317788..dfec6a3 100644
--- a/src/kudu/tablet/delta_tracker.h
+++ b/src/kudu/tablet/delta_tracker.h
@@ -20,14 +20,17 @@
 #include <cstdint>
 #include <memory>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include <gtest/gtest_prod.h>
 
 #include "kudu/common/rowid.h"
+#include "kudu/fs/block_id.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/tablet/cfile_set.h"
 #include "kudu/tablet/delta_key.h"
+#include "kudu/tablet/delta_stats.h"
 #include "kudu/tablet/delta_store.h"
 #include "kudu/tablet/tablet_mem_trackers.h"
 #include "kudu/util/atomic.h"
@@ -37,7 +40,6 @@
 
 namespace kudu {
 
-class BlockId;
 class ColumnwiseIterator;
 class MonoTime;
 class RowChangeList;
@@ -68,6 +70,8 @@ class RowSetMetadataUpdate;
 struct ProbeStats;
 struct RowIteratorOptions;
 
+typedef std::pair<BlockId, std::unique_ptr<DeltaStats>> DeltaBlockIdAndStats;
+
 // The DeltaTracker is the part of a DiskRowSet which is responsible for
 // tracking modifications against the base data. It consists of a set of
 // DeltaStores which each contain a set of mutations against the base data.
@@ -160,7 +164,7 @@ class DeltaTracker {
   // The 'compact_flush_lock_' should be acquired before calling this method.
   Status CommitDeltaStoreMetadataUpdate(const RowSetMetadataUpdate& update,
                                         const SharedDeltaStoreVector& to_remove,
-                                        const std::vector<BlockId>& new_delta_blocks,
+                                        std::vector<DeltaBlockIdAndStats> new_delta_blocks,
                                         const fs::IOContext* io_context,
                                         DeltaType type,
                                         MetadataFlushType flush_type);
@@ -194,7 +198,7 @@ class DeltaTracker {
 
   // Opens the input 'blocks' of type 'type' and returns the opened delta file
   // readers in 'stores'.
-  Status OpenDeltaReaders(const std::vector<BlockId>& blocks,
+  Status OpenDeltaReaders(std::vector<DeltaBlockIdAndStats> blocks,
                           const fs::IOContext* io_context,
                           std::vector<std::shared_ptr<DeltaStore>>* stores,
                           DeltaType type);
@@ -296,16 +300,18 @@ class DeltaTracker {
   void CollectStores(std::vector<std::shared_ptr<DeltaStore>>* stores,
                      WhichStores which) const;
 
-  // Performs the actual compaction. Results of compaction are written to "block",
-  // while delta stores that underwent compaction are appended to "compacted_stores", while
-  // their corresponding block ids are appended to "compacted_blocks".
+  // Performs the actual compaction. Results of compaction are written with
+  // 'block' and stats for are populated in 'output_stats'. Delta stores that
+  // underwent compaction are appended to 'compacted_stores', and their
+  // corresponding block ids are appended to 'compacted_blocks'.
   //
   // NOTE: the caller of this method should acquire or already hold an
-  // exclusive lock on 'compact_flush_lock_' before calling this
-  // method in order to protect 'redo_delta_stores_'.
+  // exclusive lock on 'compact_flush_lock_' before calling this method in
+  // order to protect 'redo_delta_stores_'.
   Status DoCompactStores(const fs::IOContext* io_context,
                          size_t start_idx, size_t end_idx,
                          std::unique_ptr<fs::WritableBlock> block,
+                         std::unique_ptr<DeltaStats>* output_stats,
                          std::vector<std::shared_ptr<DeltaStore>>* compacted_stores,
                          std::vector<BlockId>* compacted_blocks);
 
diff --git a/src/kudu/tablet/deltafile-test.cc b/src/kudu/tablet/deltafile-test.cc
index c88396f..1acdb1a 100644
--- a/src/kudu/tablet/deltafile-test.cc
+++ b/src/kudu/tablet/deltafile-test.cc
@@ -110,7 +110,7 @@ class TestDeltaFile : public KuduTest {
     // Update even numbered rows.
     faststring buf;
 
-    DeltaStats stats;
+    unique_ptr<DeltaStats> stats(new DeltaStats);
     for (int i = FLAGS_first_row_to_update; i <= FLAGS_last_row_to_update; i += 2) {
       for (int timestamp = min_timestamp; timestamp <= max_timestamp; timestamp++) {
         buf.clear();
@@ -120,10 +120,10 @@ class TestDeltaFile : public KuduTest {
         DeltaKey key(i, Timestamp(timestamp));
         RowChangeList rcl(buf);
         ASSERT_OK_FAST(dfw.AppendDelta<REDO>(key, rcl));
-        ASSERT_OK_FAST(stats.UpdateStats(key.timestamp(), rcl));
+        ASSERT_OK_FAST(stats->UpdateStats(key.timestamp(), rcl));
       }
     }
-    dfw.WriteDeltaStats(stats);
+    dfw.WriteDeltaStats(std::move(stats));
     ASSERT_OK(dfw.Finish());
   }
 
diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc
index 5e83342..864c20c 100644
--- a/src/kudu/tablet/deltafile.cc
+++ b/src/kudu/tablet/deltafile.cc
@@ -195,13 +195,14 @@ Status DeltaFileWriter::AppendDelta<UNDO>(
   return DoAppendDelta(key, delta);
 }
 
-void DeltaFileWriter::WriteDeltaStats(const DeltaStats& stats) {
+void DeltaFileWriter::WriteDeltaStats(std::unique_ptr<DeltaStats> stats) {
   DeltaStatsPB delta_stats_pb;
-  stats.ToPB(&delta_stats_pb);
+  stats->ToPB(&delta_stats_pb);
 
   faststring buf;
   pb_util::SerializeToString(delta_stats_pb, &buf);
   writer_->AddMetadataPair(DeltaFileReader::kDeltaStatsEntryName, buf.ToString());
+  delta_stats_ = std::move(stats);
 }
 
 
diff --git a/src/kudu/tablet/deltafile.h b/src/kudu/tablet/deltafile.h
index 1850d1a..777069e 100644
--- a/src/kudu/tablet/deltafile.h
+++ b/src/kudu/tablet/deltafile.h
@@ -22,6 +22,7 @@
 #include <memory>
 #include <mutex>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include <boost/optional/optional.hpp>
@@ -101,7 +102,11 @@ class DeltaFileWriter {
   template<DeltaType Type>
   Status AppendDelta(const DeltaKey &key, const RowChangeList &delta);
 
-  void WriteDeltaStats(const DeltaStats& stats);
+  void WriteDeltaStats(std::unique_ptr<DeltaStats> stats);
+
+  std::unique_ptr<DeltaStats> release_delta_stats() {
+    return std::move(delta_stats_);
+  }
 
   size_t written_size() const {
     return writer_->written_size();
@@ -110,6 +115,7 @@ class DeltaFileWriter {
  private:
   Status DoAppendDelta(const DeltaKey &key, const RowChangeList &delta);
 
+  std::unique_ptr<DeltaStats> delta_stats_;
   std::unique_ptr<cfile::CFileWriter> writer_;
 
   // Buffer used as a temporary for storing the serialized form
@@ -219,6 +225,8 @@ class DeltaFileReader : public DeltaStore,
 
   std::shared_ptr<cfile::CFileReader> reader_;
 
+  // TODO(awong): it'd be nice to not heap-allocate this and other usages of
+  // delta stats.
   mutable simple_spinlock stats_lock_;
   std::unique_ptr<DeltaStats> delta_stats_;
 
diff --git a/src/kudu/tablet/deltamemstore-test.cc b/src/kudu/tablet/deltamemstore-test.cc
index 51099e5..60d04c9 100644
--- a/src/kudu/tablet/deltamemstore-test.cc
+++ b/src/kudu/tablet/deltamemstore-test.cc
@@ -211,8 +211,8 @@ TEST_F(TestDeltaMemStore, TestUpdateCount) {
   ASSERT_OK(fs.CreateNewBlock({}, &block));
   DeltaFileWriter dfw(std::move(block));
   ASSERT_OK(dfw.Start());
-  unique_ptr<DeltaStats> stats;
-  dms_->FlushToFile(&dfw, &stats);
+  dms_->FlushToFile(&dfw);
+  unique_ptr<DeltaStats> stats = dfw.release_delta_stats();
 
   ASSERT_EQ(n_rows / 2, stats->update_count_for_col_id(schema_.column_id(kIntColumn)));
   ASSERT_EQ(n_rows / 4, stats->update_count_for_col_id(schema_.column_id(kStringColumn)));
diff --git a/src/kudu/tablet/deltamemstore.cc b/src/kudu/tablet/deltamemstore.cc
index d6aaddb..0309ca6 100644
--- a/src/kudu/tablet/deltamemstore.cc
+++ b/src/kudu/tablet/deltamemstore.cc
@@ -129,8 +129,7 @@ Status DeltaMemStore::Update(Timestamp timestamp,
   return Status::OK();
 }
 
-Status DeltaMemStore::FlushToFile(DeltaFileWriter *dfw,
-                                  unique_ptr<DeltaStats>* stats_ret) {
+Status DeltaMemStore::FlushToFile(DeltaFileWriter *dfw) {
   unique_ptr<DeltaStats> stats(new DeltaStats());
 
   unique_ptr<DMSTreeIter> iter(tree_.NewIterator());
@@ -146,9 +145,7 @@ Status DeltaMemStore::FlushToFile(DeltaFileWriter *dfw,
     stats->UpdateStats(key.timestamp(), rcl);
     iter->Next();
   }
-  dfw->WriteDeltaStats(*stats);
-
-  stats_ret->swap(stats);
+  dfw->WriteDeltaStats(std::move(stats));
   return Status::OK();
 }
 
diff --git a/src/kudu/tablet/deltamemstore.h b/src/kudu/tablet/deltamemstore.h
index 7603ebc..49dfaa1 100644
--- a/src/kudu/tablet/deltamemstore.h
+++ b/src/kudu/tablet/deltamemstore.h
@@ -105,9 +105,7 @@ class DeltaMemStore : public DeltaStore,
   void DebugPrint() const;
 
   // Flush the DMS to the given file writer.
-  // Returns statistics in *stats.
-  Status FlushToFile(DeltaFileWriter *dfw,
-                     std::unique_ptr<DeltaStats>* stats);
+  Status FlushToFile(DeltaFileWriter* dfw);
 
   // Create an iterator for applying deltas from this DMS.
   //
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index 8a939fb..caef925 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -449,8 +449,8 @@ Status RollingDiskRowSetWriter::FinishCurrentWriter() {
     RETURN_NOT_OK(writer_status);
     CHECK_GT(cur_writer_->written_count(), 0);
 
-    cur_undo_writer_->WriteDeltaStats(*cur_undo_delta_stats_);
-    cur_redo_writer_->WriteDeltaStats(*cur_redo_delta_stats_);
+    cur_undo_writer_->WriteDeltaStats(std::move(cur_undo_delta_stats_));
+    cur_redo_writer_->WriteDeltaStats(std::move(cur_redo_delta_stats_));
 
     // Commit the UNDO block. Status::Aborted() indicates that there
     // were no UNDOs written.
@@ -458,8 +458,6 @@ Status RollingDiskRowSetWriter::FinishCurrentWriter() {
     if (!s.IsAborted()) {
       RETURN_NOT_OK(s);
       cur_drs_metadata_->CommitUndoDeltaDataBlock(cur_undo_ds_block_id_);
-    } else {
-      DCHECK_EQ(cur_undo_delta_stats_->min_timestamp(), Timestamp::kMax);
     }
 
     // Same for the REDO block.
@@ -467,8 +465,6 @@ Status RollingDiskRowSetWriter::FinishCurrentWriter() {
     if (!s.IsAborted()) {
       RETURN_NOT_OK(s);
       cur_drs_metadata_->CommitRedoDeltaDataBlock(0, 0, cur_redo_ds_block_id_);
-    } else {
-      DCHECK_EQ(cur_redo_delta_stats_->min_timestamp(), Timestamp::kMax);
     }
 
     written_size_ += cur_writer_->written_size();
diff --git a/src/kudu/tablet/tablet-test-util.h b/src/kudu/tablet/tablet-test-util.h
index 937221b..8734a98 100644
--- a/src/kudu/tablet/tablet-test-util.h
+++ b/src/kudu/tablet/tablet-test-util.h
@@ -870,16 +870,16 @@ Status CreateRandomDeltaFile(const Schema& schema,
   BlockId block_id = wb->id();
   std::unique_ptr<DeltaFileWriter> writer(new DeltaFileWriter(std::move(wb)));
   RETURN_NOT_OK(writer->Start());
-  DeltaStats stats;
+  std::unique_ptr<DeltaStats> stats(new DeltaStats);
   for (const auto& e1 : mirror->all_deltas()) {
     for (const auto& e2 : e1.second) {
       DeltaKey k(e1.first, e2.first);
       RowChangeList changes(e2.second);
       RETURN_NOT_OK(writer->AppendDelta<T::kTag>(k, changes));
-      RETURN_NOT_OK(stats.UpdateStats(k.timestamp(), changes));
+      RETURN_NOT_OK(stats->UpdateStats(k.timestamp(), changes));
     }
   }
-  writer->WriteDeltaStats(stats);
+  writer->WriteDeltaStats(std::move(stats));
   RETURN_NOT_OK(writer->Finish());
 
   // Open a reader for this newly written delta file.
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 36ac2ff..d638284 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -1588,6 +1588,8 @@ Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompaction &input,
   {
     TRACE_EVENT0("tablet", "Opening compaction results");
     for (const shared_ptr<RowSetMetadata>& meta : new_drs_metas) {
+      // TODO(awong): it'd be nice to plumb delta stats from the rowset writer
+      // into the new deltafile readers opened here.
       shared_ptr<DiskRowSet> new_rowset;
       Status s = DiskRowSet::Open(meta,
                                   log_anchor_registry_.get(),
diff --git a/src/kudu/tablet/tablet_history_gc-test.cc b/src/kudu/tablet/tablet_history_gc-test.cc
index a7e4ea0..857ea89 100644
--- a/src/kudu/tablet/tablet_history_gc-test.cc
+++ b/src/kudu/tablet/tablet_history_gc-test.cc
@@ -98,6 +98,20 @@ class TabletHistoryGcTest : public TabletTestBase<IntKeyTestSetup<INT64>> {
     NO_FLUSH
   };
 
+  // Attempt to run the deleted rowset GC, triggering an assertion failure if
+  // it failed or if our metrics don't make sense.
+  void TryRunningDeletedRowsetGC() {
+    const auto& metrics = tablet()->metrics();
+    int orig_bytes = metrics->deleted_rowset_gc_bytes_deleted->value();
+    int orig_count = metrics->deleted_rowset_gc_duration->TotalCount();
+    int64_t bytes = 0;
+    ASSERT_OK(tablet()->GetBytesInAncientDeletedRowsets(&bytes));
+    ASSERT_LT(0, bytes);
+    ASSERT_OK(tablet()->DeleteAncientDeletedRowsets());
+    ASSERT_EQ(bytes + orig_bytes, metrics->deleted_rowset_gc_bytes_deleted->value());
+    ASSERT_GT(metrics->deleted_rowset_gc_duration->TotalCount(), orig_count);
+  }
+
   // Helper functions that mutate rows in batches of keys:
   //   [0, rows_per_rowset)
   //   [rows_per_rowset, 2*rows_per_rowset)
@@ -107,7 +121,7 @@ class TabletHistoryGcTest : public TabletTestBase<IntKeyTestSetup<INT64>> {
   // ...flushing MRS or DMS (depending on the workload) in between batches.
   void InsertOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset);
   void UpdateOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset, int32_t val);
-  void DeleteOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset, bool flush_dms);
+  void DeleteOriginalRows(int64_t num_batches, int64_t rows_per_batch, bool flush_dms);
 
   void AddTimeToHybridClock(MonoDelta delta) {
     uint64_t now = HybridClock::GetPhysicalValueMicros(clock()->Now());
@@ -142,15 +156,14 @@ void TabletHistoryGcTest::InsertOriginalRows(int64_t num_rowsets, int64_t rows_p
   ASSERT_EQ(num_rowsets, tablet()->num_rowsets());
 }
 
-void TabletHistoryGcTest::DeleteOriginalRows(int64_t num_rowsets,
-    int64_t rows_per_rowset, bool flush_dms) {
-  for (int rowset_id = 0; rowset_id < num_rowsets; rowset_id++) {
-    NO_FATALS(DeleteTestRows(rowset_id * rows_per_rowset, rows_per_rowset));
+void TabletHistoryGcTest::DeleteOriginalRows(int64_t num_batches,
+    int64_t rows_per_batch, bool flush_dms) {
+  for (int rowset_id = 0; rowset_id < num_batches; rowset_id++) {
+    NO_FATALS(DeleteTestRows(rowset_id * rows_per_batch, rows_per_batch));
     if (flush_dms) {
       ASSERT_OK(tablet()->FlushAllDMSForTests());
     }
   }
-  ASSERT_EQ(num_rowsets, tablet()->num_rowsets());
 }
 
 void TabletHistoryGcTest::UpdateOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset,
@@ -686,15 +699,9 @@ TEST_F(TabletHistoryGcNoMaintMgrTest, TestGCDeletedRowsetsWithRedoFiles) {
   // Move the clock so all rowsets are ancient. Our GC should succeed and we
   // should be left with no rowsets.
   NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec + 1)));
-  ASSERT_OK(tablet()->GetBytesInAncientDeletedRowsets(&bytes));
-  ASSERT_LT(0, bytes);
-  ASSERT_OK(tablet()->DeleteAncientDeletedRowsets());
+  NO_FATALS(TryRunningDeletedRowsetGC());
   ASSERT_EQ(0, tablet()->CountUndoDeltasForTests());
   ASSERT_EQ(0, tablet()->CountRedoDeltasForTests());
-
-  // Check that these show up.
-  ASSERT_EQ(bytes, metrics->deleted_rowset_gc_bytes_deleted->value());
-  ASSERT_EQ(1, metrics->deleted_rowset_gc_duration->TotalCount());
 }
 
 TEST_F(TabletHistoryGcNoMaintMgrTest, TestGCDeletedRowsetsWithDMS) {
@@ -702,12 +709,40 @@ TEST_F(TabletHistoryGcNoMaintMgrTest, TestGCDeletedRowsetsWithDMS) {
   NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_));
   NO_FATALS(DeleteOriginalRows(kNumRowsets, rows_per_rowset_, /*flush_dms*/false));
   NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec + 1)));
-  int64_t bytes = 0;
-  ASSERT_OK(tablet()->GetBytesInAncientDeletedRowsets(&bytes));
-  ASSERT_LT(0, bytes);
-  ASSERT_OK(tablet()->DeleteAncientDeletedRowsets());
-  ASSERT_EQ(bytes, tablet()->metrics()->deleted_rowset_gc_bytes_deleted->value());
-  ASSERT_EQ(1, tablet()->metrics()->deleted_rowset_gc_duration->TotalCount());
+  NO_FATALS(TryRunningDeletedRowsetGC());
+}
+
+TEST_F(TabletHistoryGcNoMaintMgrTest, TestGCDeletedRowsetsAfterMinorCompaction) {
+  FLAGS_tablet_history_max_age_sec = 1000;
+  NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_));
+  // Flush twice as frequently when deleting so we end up with multiple delta
+  // files per DRS.
+  NO_FATALS(DeleteOriginalRows(kNumRowsets * 2, rows_per_rowset_ / 2, /*flush_dms*/true));
+  NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec + 1)));
+  int num_redos_before_compaction = tablet()->CountRedoDeltasForTests();
+  for (int i = 0; i < kNumRowsets; i++) {
+    ASSERT_OK(tablet()->CompactWorstDeltas(RowSet::MINOR_DELTA_COMPACTION));
+  }
+  ASSERT_GT(num_redos_before_compaction, tablet()->CountRedoDeltasForTests());
+  NO_FATALS(TryRunningDeletedRowsetGC());
+}
+
+TEST_F(TabletHistoryGcNoMaintMgrTest, TestGCDeletedRowsetsAfterMajorCompaction) {
+  FLAGS_tablet_history_max_age_sec = 1000;
+  NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_));
+  // Major delta compaction is a no-op if we only have deletes, so trick the
+  // rowsets into major compacting by throwing in some updates.
+  NO_FATALS(UpdateOriginalRows(kNumRowsets, rows_per_rowset_, 5));
+  // Flush twice as frequently when deleting so we end up with multiple delta
+  // files per DRS.
+  NO_FATALS(DeleteOriginalRows(kNumRowsets * 2, rows_per_rowset_ / 2, /*flush_dms*/true));
+  NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec + 1)));
+  int num_redos_before_compaction = tablet()->CountRedoDeltasForTests();
+  for (int i = 0; i < kNumRowsets; i++) {
+    ASSERT_OK(tablet()->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION));
+  }
+  ASSERT_GT(num_redos_before_compaction, tablet()->CountRedoDeltasForTests());
+  NO_FATALS(TryRunningDeletedRowsetGC());
 }
 
 } // namespace tablet


[kudu] 02/04: KUDU-1625: background op to GC ancient, fully deleted rowsets

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 705954872dc86238556456abed0a879bb1462e51
Author: Andrew Wong <aw...@apache.org>
AuthorDate: Thu Jan 30 22:03:07 2020 -0800

    KUDU-1625: background op to GC ancient, fully deleted rowsets
    
    This adds a background op that deletes disk rowsets that have had all of
    their rows deleted. If the most recent update to a rowset is older than
    the ancient history mark, and the rowset contains no live rows, that
    rowset will be deleted.
    
    It'd be nice if we could have the policy work for rowsets that are
    mostly deleted, but such a solution would come with difficult questions
    around write amplification and compatibility with the existing
    compactions strategies. For instance, a more complete solution would
    need to consider whether to rewrite a rowset if it had 25%, 50%, or 75%
    deleted rows: some operators wouldn't mind the write amplification to
    save space. However, picking a good heuristic (or exposing some knobs to
    turn) makes this tricky.
    
    The benefit of the approach in this patch is that no such tradeoff needs
    to be made: the "write amplification" is minimal here because no new
    data blocks are written in performing the operation -- the tablet
    metadata is rewritten to exclude the blocks, and the underlying blocks
    are deleted, which isn't IO intensive either.
    
    There's still room for improvement in this implementation in that,
    currently, a DMS flush will write stats to disk and we'll only read the
    stats if we Init() the DeltaFileReader (e.g. on scan). I'll address this
    in a follow-up patch.
    
    Since the op GCs all viable rowsets in the tablet, a tablet should only
    schedule one deleted rowset GC op at a time. This isn't necessary for
    correctness, but avoids wasting some MM thread cycles.
    
    I ran this on a real cluster, deleting large chunks of keyspace with 4
    MM threads to confirm that space is actually freed, concurrent ops for
    the same tablet aren't scheduled, and the op runs relatively quickly (in
    the tens of ms, compared to hundreds to thousands of ms for other ops).
    
    Change-Id: I696e2a29ea52ad4e54801b495c322bc371787124
    Reviewed-on: http://gerrit.cloudera.org:8080/15145
    Tested-by: Kudu Jenkins
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
---
 .../integration-tests/tablet_history_gc-itest.cc   | 110 ++++++++++++++++--
 src/kudu/tablet/delta_tracker.cc                   |  21 ++++
 src/kudu/tablet/delta_tracker.h                    |   6 +
 src/kudu/tablet/deltamemstore.cc                   |   5 +-
 src/kudu/tablet/deltamemstore.h                    |  17 ++-
 src/kudu/tablet/diskrowset.cc                      |  16 +++
 src/kudu/tablet/diskrowset.h                       |   3 +
 src/kudu/tablet/memrowset.h                        |   7 ++
 src/kudu/tablet/mock-rowsets.h                     |  78 +++++++------
 src/kudu/tablet/mt-tablet-test.cc                  |  58 +++++++---
 src/kudu/tablet/rowset.h                           |  14 +++
 src/kudu/tablet/tablet-test-base.h                 |   8 ++
 src/kudu/tablet/tablet.cc                          |  89 +++++++++++++++
 src/kudu/tablet/tablet.h                           |  23 +++-
 src/kudu/tablet/tablet_bootstrap.cc                |  16 ++-
 src/kudu/tablet/tablet_history_gc-test.cc          | 125 ++++++++++++++++-----
 src/kudu/tablet/tablet_metrics.cc                  |  29 +++++
 src/kudu/tablet/tablet_metrics.h                   |   4 +
 src/kudu/tablet/tablet_mm_ops.cc                   |  48 ++++++++
 src/kudu/tablet/tablet_mm_ops.h                    |  38 ++++++-
 src/kudu/tablet/tablet_replica-test.cc             |  59 ++++++++--
 21 files changed, 664 insertions(+), 110 deletions(-)

diff --git a/src/kudu/integration-tests/tablet_history_gc-itest.cc b/src/kudu/integration-tests/tablet_history_gc-itest.cc
index 53a7aae..02928cc 100644
--- a/src/kudu/integration-tests/tablet_history_gc-itest.cc
+++ b/src/kudu/integration-tests/tablet_history_gc-itest.cc
@@ -71,6 +71,7 @@
 #include "kudu/util/test_util.h"
 
 using kudu::client::KuduClient;
+using kudu::client::KuduInsert;
 using kudu::client::KuduScanner;
 using kudu::client::KuduSession;
 using kudu::client::KuduTable;
@@ -87,6 +88,7 @@ using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 
+DECLARE_bool(enable_flush_deltamemstores);
 DECLARE_bool(enable_maintenance_manager);
 DECLARE_bool(enable_rowset_compaction);
 DECLARE_string(time_source);
@@ -112,6 +114,20 @@ class TabletHistoryGcITest : public MiniClusterITestBase {
     auto* ntp = down_cast<clock::MockNtp*>(clock->time_service());
     ntp->SetMockClockWallTimeForTests(new_time);
   }
+
+  // Inserts to the given default workload table the given number of rows.
+  static Status InsertRowsToTable(KuduTable* table, KuduSession* session,
+                           int start_key, int num_rows) {
+    for (int32_t row_idx = 0; row_idx < num_rows; row_idx++) {
+      unique_ptr<KuduInsert> insert(table->NewInsert());
+      KuduPartialRow* row = insert->mutable_row();
+      RETURN_NOT_OK(row->SetInt32(0, start_key + row_idx));
+      RETURN_NOT_OK(row->SetInt32(1, 0));
+      RETURN_NOT_OK(row->SetString(2, ""));
+      RETURN_NOT_OK(session->Apply(insert.release()));
+    }
+    return session->Flush();
+  }
 };
 
 // Check that attempts to scan prior to the ancient history mark fail.
@@ -183,18 +199,8 @@ TEST_F(TabletHistoryGcITest, TestUndoDeltaBlockGc) {
   ASSERT_EQ(1, tablet_replicas.size());
   std::shared_ptr<Tablet> tablet = tablet_replicas[0]->shared_tablet();
 
-  const int32_t kNumRows = AllowSlowTests() ? 100 : 10;
-
-  // Insert a few rows.
-  for (int32_t row_key = 0; row_key < kNumRows; row_key++) {
-    unique_ptr<client::KuduInsert> insert(table->NewInsert());
-    KuduPartialRow* row = insert->mutable_row();
-    ASSERT_OK_FAST(row->SetInt32(0, row_key));
-    ASSERT_OK_FAST(row->SetInt32(1, 0));
-    ASSERT_OK_FAST(row->SetString(2, ""));
-    ASSERT_OK_FAST(session->Apply(insert.release()));
-  }
-  ASSERT_OK_FAST(session->Flush());
+  const int32_t kNumRows = 100;
+  ASSERT_OK(InsertRowsToTable(table.get(), session.get(), /*start_key*/0, kNumRows));
 
   // Update rows in a loop; wait until some undo deltas are generated.
   int32_t row_value = 0;
@@ -275,6 +281,76 @@ TEST_F(TabletHistoryGcITest, TestUndoDeltaBlockGc) {
   });
 }
 
+TEST_F(TabletHistoryGcITest, TestDeletedRowsetGc) {
+  // Disable merge compactions, since they may also cull deleted rowsets.
+  FLAGS_enable_rowset_compaction = false;
+  FLAGS_enable_flush_deltamemstores = false; // Don't flush DMS so we don't have to init deltafiles.
+  FLAGS_flush_threshold_secs = 0; // Flush as aggressively as possible.
+  FLAGS_maintenance_manager_num_threads = 4; // Encourage concurrency.
+  FLAGS_maintenance_manager_polling_interval_ms = 1; // Spin on MM for a quick test.
+  FLAGS_tablet_history_max_age_sec = 1000;
+  FLAGS_time_source = "mock"; // Allow moving the clock.
+  NO_FATALS(StartCluster(1)); // Single-node cluster.
+
+  // Create a tablet.
+  TestWorkload workload(cluster_.get());
+  workload.set_num_replicas(1);
+  workload.Setup();
+  MiniTabletServer* mts = cluster_->mini_tablet_server(0);
+  vector<scoped_refptr<TabletReplica>> tablet_replicas;
+  mts->server()->tablet_manager()->GetTabletReplicas(&tablet_replicas);
+  ASSERT_EQ(1, tablet_replicas.size());
+  std::shared_ptr<Tablet> tablet = tablet_replicas[0]->shared_tablet();
+
+  // Insert some rows.
+  shared_ptr<KuduTable> table;
+  ASSERT_OK(client_->OpenTable(TestWorkload::kDefaultTableName, &table));
+  shared_ptr<KuduSession> session = client_->NewSession();
+  const int32_t kNumRows = 100;
+  ASSERT_OK(InsertRowsToTable(table.get(), session.get(), /*start_key*/0, kNumRows));
+
+  // Flush what's in memory to ensure we have at least one DRS.
+  ASSERT_OK(tablet->Flush());
+  ASSERT_GT(tablet->num_rowsets(), 0);
+
+  // Now delete and flush to ensure we have some deltafiles.
+  for (int32_t row_key = 0; row_key < kNumRows; row_key++) {
+    unique_ptr<client::KuduDelete> del(table->NewDelete());
+    KuduPartialRow* row = del->mutable_row();
+    ASSERT_OK(row->SetInt32(0, row_key));
+    ASSERT_OK(session->Apply(del.release()));
+  }
+  ASSERT_OK(session->Flush());
+  uint64_t measured_size_before_gc;
+  ASSERT_OK(Env::Default()->GetFileSizeOnDiskRecursively(cluster_->GetTabletServerFsRoot(0),
+                                                         &measured_size_before_gc));
+  // Move forward the clock so our rowsets are all considered ancient.
+  HybridClock* c = down_cast<HybridClock*>(tablet->clock());
+  AddTimeToHybridClock(c, MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec));
+
+  // Eventually a deleted rowset GC op will run in the background.
+  ASSERT_EVENTUALLY([&] {
+    ASSERT_EQ(0, tablet->num_rowsets());
+  });
+  // We should see a reduction in space.
+  // NOTE: we ASSERT_EVENTUALLY because hole punching is asynchronous and so we
+  // might not see an immediate decrease in file size.
+  ASSERT_EVENTUALLY([&] {
+    uint64_t measured_size_after_gc;
+    ASSERT_OK(Env::Default()->GetFileSizeOnDiskRecursively(cluster_->GetTabletServerFsRoot(0),
+                                                           &measured_size_after_gc));
+    ASSERT_LT(measured_size_after_gc, measured_size_before_gc);
+  });
+
+  // With no DRSs, we shouldn't see running ops.
+  ASSERT_EQ(0, tablet->metrics()->deleted_rowset_estimated_retained_bytes->value());
+  ASSERT_EQ(0, tablet->metrics()->deleted_rowset_gc_running->value());
+  ASSERT_GT(tablet->metrics()->deleted_rowset_gc_duration->TotalCount(), 0);
+  // NOTE: we could try checking this matches the before/after sizes, but
+  // there's a chance the delete raced with other delta background ops.
+  ASSERT_GT(tablet->metrics()->deleted_rowset_gc_bytes_deleted->value(), 0);
+}
+
 // Whether a MaterializedTestRow is deleted or not.
 enum IsDeleted {
   NOT_DELETED,
@@ -314,6 +390,7 @@ class RandomizedTabletHistoryGcITest : public TabletHistoryGcITest {
     kMergeCompaction,
     kRedoDeltaCompaction,
     kUndoDeltaBlockGc,
+    kDeletedRowsetGc,
     kMoveTimeForward,
     kStartScan,
     kNumActions, // Count of items in this enum. Keep as last entry.
@@ -850,6 +927,15 @@ TEST_F(RandomizedTabletHistoryGcITest, TestRandomHistoryGCWorkload) {
                 << " blocks and " << bytes_deleted << " bytes";
         break;
       }
+      case kDeletedRowsetGc: {
+        VLOG(1) << "Running deleted rowset GC";
+        int64_t bytes_to_delete = 0;
+        ASSERT_OK(tablet->GetBytesInAncientDeletedRowsets(&bytes_to_delete));
+        VLOG(1) << Substitute("Found $0 bytes in ancient, deleted rowsets",
+                              bytes_to_delete);
+        ASSERT_OK(tablet->DeleteAncientDeletedRowsets());
+        break;
+      }
       case kMoveTimeForward: {
         VLOG(1) << "Moving clock forward";
         AddTimeToHybridClock(clock_, MonoDelta::FromSeconds(200));
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index 00d47d7..7b53fc8 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -24,6 +24,7 @@
 #include <string>
 #include <utility>
 
+#include <boost/optional/optional.hpp>
 #include <boost/range/adaptor/reversed.hpp>
 #include <glog/logging.h>
 
@@ -446,6 +447,26 @@ Status DeltaTracker::CompactStores(const IOContext* io_context, int start_idx, i
   return Status::OK();
 }
 
+bool DeltaTracker::EstimateAllRedosAreAncient(Timestamp ancient_history_mark) {
+  shared_ptr<DeltaStore> newest_redo;
+  std::lock_guard<rw_spinlock> lock(component_lock_);
+  const boost::optional<Timestamp> dms_highest_timestamp =
+      dms_ ? dms_->highest_timestamp() : boost::none;
+  if (dms_highest_timestamp) {
+    return *dms_highest_timestamp < ancient_history_mark;
+  }
+
+  // If we don't have a DMS or our DMS hasn't been written to at all, look at
+  // the newest redo store.
+  if (!redo_delta_stores_.empty()) {
+    newest_redo = redo_delta_stores_.back();
+  }
+  // TODO(awong): keep the delta stats cached after flushing a DMS so a flush
+  // doesn't invalidate this rowset.
+  return newest_redo && newest_redo->Initted() &&
+      newest_redo->delta_stats().max_timestamp() < ancient_history_mark;
+}
+
 Status DeltaTracker::EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp ancient_history_mark,
                                                                  int64_t* bytes) {
   DCHECK_NE(Timestamp::kInvalidTimestamp, ancient_history_mark);
diff --git a/src/kudu/tablet/delta_tracker.h b/src/kudu/tablet/delta_tracker.h
index 5ae97a2..b317788 100644
--- a/src/kudu/tablet/delta_tracker.h
+++ b/src/kudu/tablet/delta_tracker.h
@@ -175,6 +175,12 @@ class DeltaTracker {
   Status EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp ancient_history_mark,
                                                      int64_t* bytes);
 
+  // Returns whether all redo (DMS and newest redo delta file) are ancient
+  // (i.e. that the redo with the highest timestamp is older than the AHM).
+  // This is an estimate, since if the newest redo file has not yet been
+  // initted, this will return a false negative.
+  bool EstimateAllRedosAreAncient(Timestamp ancient_history_mark);
+
   // See RowSet::InitUndoDeltas().
   Status InitUndoDeltas(Timestamp ancient_history_mark,
                         MonoTime deadline,
diff --git a/src/kudu/tablet/deltamemstore.cc b/src/kudu/tablet/deltamemstore.cc
index b78a108..d6aaddb 100644
--- a/src/kudu/tablet/deltamemstore.cc
+++ b/src/kudu/tablet/deltamemstore.cc
@@ -17,9 +17,9 @@
 
 #include "kudu/tablet/deltamemstore.h"
 
+#include <algorithm>
 #include <memory>
 #include <ostream>
-#include <utility>
 
 #include <boost/optional/optional.hpp>
 #include <glog/logging.h>
@@ -74,6 +74,7 @@ DeltaMemStore::DeltaMemStore(int64_t id,
                              shared_ptr<MemTracker> parent_tracker)
   : id_(id),
     rs_id_(rs_id),
+    highest_timestamp_(Timestamp::kMin),
     allocator_(new MemoryTrackingBufferAllocator(
         HeapBufferAllocator::Get(), std::move(parent_tracker))),
     arena_(new ThreadSafeMemoryTrackingArena(kInitialArenaSize, allocator_)),
@@ -123,6 +124,8 @@ Status DeltaMemStore::Update(Timestamp timestamp,
     deleted_row_count_.Increment();
   }
 
+  std::lock_guard<simple_spinlock> l(ts_lock_);
+  highest_timestamp_ = std::max(highest_timestamp_, timestamp);
   return Status::OK();
 }
 
diff --git a/src/kudu/tablet/deltamemstore.h b/src/kudu/tablet/deltamemstore.h
index e287b68..4a42551 100644
--- a/src/kudu/tablet/deltamemstore.h
+++ b/src/kudu/tablet/deltamemstore.h
@@ -19,10 +19,14 @@
 #include <cstddef>
 #include <cstdint>
 #include <memory>
+#include <mutex>
 #include <string>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
+
 #include "kudu/common/rowid.h"
+#include "kudu/common/timestamp.h"
 #include "kudu/consensus/log_anchor_registry.h"
 #include "kudu/gutil/atomicops.h"
 #include "kudu/gutil/macros.h"
@@ -31,6 +35,7 @@
 #include "kudu/tablet/delta_stats.h"
 #include "kudu/tablet/delta_store.h"
 #include "kudu/util/atomic.h"
+#include "kudu/util/locks.h"
 #include "kudu/util/memory/arena.h"
 #include "kudu/util/status.h"
 
@@ -42,7 +47,6 @@ class MemoryTrackingBufferAllocator;
 class RowChangeList;
 class ScanSpec;
 class SelectionVector;
-class Timestamp;
 struct ColumnId;
 
 namespace consensus {
@@ -148,6 +152,14 @@ class DeltaMemStore : public DeltaStore,
   // Returns the number of deleted rows in this DMS.
   int64_t deleted_row_count() const;
 
+  // Returns the highest timestamp of any updates applied to this DMS. Returns
+  // 'none' if no updates have been applied.
+  boost::optional<Timestamp> highest_timestamp() const {
+    std::lock_guard<simple_spinlock> l(ts_lock_);
+    return highest_timestamp_ == Timestamp::kMin ?
+        boost::none : boost::make_optional(highest_timestamp_);
+  }
+
  private:
   friend class DMSIterator;
 
@@ -163,6 +175,9 @@ class DeltaMemStore : public DeltaStore,
   const int64_t id_;    // DeltaMemStore ID.
   const int64_t rs_id_; // Rowset ID.
 
+  mutable simple_spinlock ts_lock_;
+  Timestamp highest_timestamp_;
+
   std::shared_ptr<MemoryTrackingBufferAllocator> allocator_;
 
   std::shared_ptr<ThreadSafeMemoryTrackingArena> arena_;
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index e7c3d59..8a939fb 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -873,6 +873,22 @@ Status DiskRowSet::EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp ancient
   return delta_tracker_->EstimateBytesInPotentiallyAncientUndoDeltas(ancient_history_mark, bytes);
 }
 
+Status DiskRowSet::IsDeletedAndFullyAncient(Timestamp ancient_history_mark,
+                                            bool* deleted_and_ancient) {
+  uint64_t live_row_count = 0;
+  RETURN_NOT_OK(CountLiveRows(&live_row_count));
+  if (live_row_count > 0) {
+    *deleted_and_ancient = false;
+    return Status::OK();
+  }
+  // NOTE: this estimate might not read from disk and may thus return false
+  // despite having ancient on-disk data. That's sufficient because false
+  // negatives are OK for the purposes of GCing deleted rowsets -- we just
+  // won't delete them.
+  *deleted_and_ancient = delta_tracker_->EstimateAllRedosAreAncient(ancient_history_mark);
+  return Status::OK();
+}
+
 Status DiskRowSet::InitUndoDeltas(Timestamp ancient_history_mark,
                                   MonoTime deadline,
                                   const IOContext* io_context,
diff --git a/src/kudu/tablet/diskrowset.h b/src/kudu/tablet/diskrowset.h
index 045826b..5e27adf 100644
--- a/src/kudu/tablet/diskrowset.h
+++ b/src/kudu/tablet/diskrowset.h
@@ -398,6 +398,9 @@ class DiskRowSet : public RowSet {
   Status EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp ancient_history_mark,
                                                      int64_t* bytes) override;
 
+  Status IsDeletedAndFullyAncient(Timestamp ancient_history_mark,
+                                  bool* deleted_and_ancient) override;
+
   Status InitUndoDeltas(Timestamp ancient_history_mark,
                         MonoTime deadline,
                         const fs::IOContext* io_context,
diff --git a/src/kudu/tablet/memrowset.h b/src/kudu/tablet/memrowset.h
index ecd8e03..261f411 100644
--- a/src/kudu/tablet/memrowset.h
+++ b/src/kudu/tablet/memrowset.h
@@ -393,6 +393,13 @@ class MemRowSet : public RowSet,
     return Status::OK();
   }
 
+  Status IsDeletedAndFullyAncient(Timestamp /*ancient_history_mark*/,
+                                  bool* deleted_and_ancient) override {
+    DCHECK(deleted_and_ancient);
+    *deleted_and_ancient = false;
+    return Status::OK();
+  }
+
   Status InitUndoDeltas(Timestamp /*ancient_history_mark*/,
                         MonoTime /*deadline*/,
                         const fs::IOContext* /*io_context*/,
diff --git a/src/kudu/tablet/mock-rowsets.h b/src/kudu/tablet/mock-rowsets.h
index b86153f..b3b8930 100644
--- a/src/kudu/tablet/mock-rowsets.h
+++ b/src/kudu/tablet/mock-rowsets.h
@@ -39,7 +39,7 @@ class MockRowSet : public RowSet {
  public:
   virtual Status CheckRowPresent(const RowSetKeyProbe& /*probe*/,
                                  const fs::IOContext* /*io_context*/,
-                                 bool* /*present*/, ProbeStats* /*stats*/) const OVERRIDE {
+                                 bool* /*present*/, ProbeStats* /*stats*/) const override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
@@ -49,35 +49,35 @@ class MockRowSet : public RowSet {
                            const consensus::OpId& /*op_id_*/,
                            const fs::IOContext* /*io_context*/,
                            ProbeStats* /*stats*/,
-                           OperationResultPB* /*result*/) OVERRIDE {
+                           OperationResultPB* /*result*/) override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
   virtual Status NewRowIterator(const RowIteratorOptions& /*opts*/,
-                                std::unique_ptr<RowwiseIterator>* /*out*/) const OVERRIDE {
+                                std::unique_ptr<RowwiseIterator>* /*out*/) const override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
   virtual Status NewCompactionInput(const Schema* /*projection*/,
                                     const MvccSnapshot& /*snap*/,
                                     const fs::IOContext* /*io_context*/,
-                                    std::unique_ptr<CompactionInput>* /*out*/) const OVERRIDE {
+                                    std::unique_ptr<CompactionInput>* /*out*/) const override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
-  virtual Status CountRows(const fs::IOContext* /*io_context*/, rowid_t* /*count*/) const OVERRIDE {
+  virtual Status CountRows(const fs::IOContext* /*io_context*/, rowid_t* /*count*/) const override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
-  virtual Status CountLiveRows(uint64_t* /*count*/) const OVERRIDE {
+  virtual Status CountLiveRows(uint64_t* /*count*/) const override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
-  virtual std::string ToString() const OVERRIDE {
+  virtual std::string ToString() const override {
     LOG(FATAL) << "Unimplemented";
     return "";
   }
-  virtual Status DebugDump(std::vector<std::string>* /*lines*/) OVERRIDE {
+  virtual Status DebugDump(std::vector<std::string>* /*lines*/) override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
@@ -85,70 +85,76 @@ class MockRowSet : public RowSet {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
-  virtual uint64_t OnDiskSize() const OVERRIDE {
+  virtual uint64_t OnDiskSize() const override {
     LOG(FATAL) << "Unimplemented";
     return 0;
   }
-  virtual uint64_t OnDiskBaseDataSize() const OVERRIDE {
+  virtual uint64_t OnDiskBaseDataSize() const override {
     LOG(FATAL) << "Unimplemented";
     return 0;
   }
-  virtual uint64_t OnDiskBaseDataColumnSize(const ColumnId& col_id) const OVERRIDE {
+  virtual uint64_t OnDiskBaseDataColumnSize(const ColumnId& /*col_id*/) const override {
     LOG(FATAL) << "Unimplemented";
     return 0;
   }
-  virtual uint64_t OnDiskBaseDataSizeWithRedos() const OVERRIDE {
+  virtual uint64_t OnDiskBaseDataSizeWithRedos() const override {
     LOG(FATAL) << "Unimplemented";
     return 0;
   }
-  virtual std::mutex *compact_flush_lock() OVERRIDE {
+  virtual std::mutex *compact_flush_lock() override {
     LOG(FATAL) << "Unimplemented";
-    return NULL;
+    return nullptr;
   }
-  virtual bool has_been_compacted() const OVERRIDE {
+  virtual bool has_been_compacted() const override {
     LOG(FATAL) << "Unimplemented";
     return false;
   }
-  virtual void set_has_been_compacted() OVERRIDE {
+  virtual void set_has_been_compacted() override {
     LOG(FATAL) << "Unimplemented";
   }
-  virtual std::shared_ptr<RowSetMetadata> metadata() OVERRIDE {
-    return NULL;
+  virtual std::shared_ptr<RowSetMetadata> metadata() override {
+    return nullptr;
   }
 
-  virtual size_t DeltaMemStoreSize() const OVERRIDE {
+  virtual size_t DeltaMemStoreSize() const override {
     LOG(FATAL) << "Unimplemented";
     return 0;
   }
 
-  virtual bool DeltaMemStoreEmpty() const OVERRIDE {
+  virtual bool DeltaMemStoreEmpty() const override {
     LOG(FATAL) << "Unimplemented";
     return 0;
   }
 
-  virtual int64_t MinUnflushedLogIndex() const OVERRIDE {
+  virtual int64_t MinUnflushedLogIndex() const override {
     LOG(FATAL) << "Unimplemented";
     return -1;
   }
 
   virtual double DeltaStoresCompactionPerfImprovementScore(DeltaCompactionType /*type*/)
-      const OVERRIDE {
+      const override {
     LOG(FATAL) << "Unimplemented";
     return 0;
   }
 
-  virtual Status FlushDeltas(const fs::IOContext* /*io_context*/) OVERRIDE {
+  virtual Status FlushDeltas(const fs::IOContext* /*io_context*/) override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
 
-  virtual Status MinorCompactDeltaStores(const fs::IOContext* /*io_context*/) OVERRIDE {
+  virtual Status MinorCompactDeltaStores(const fs::IOContext* /*io_context*/) override {
+    LOG(FATAL) << "Unimplemented";
+    return Status::OK();
+  }
+
+  virtual Status IsDeletedAndFullyAncient(Timestamp /*ancient_history_mark*/,
+                                          bool* /*deleted_and_ancient*/) override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
 
   virtual Status EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp /*ancient_history_mark*/,
-                                                             int64_t* /*bytes*/) OVERRIDE {
+                                                             int64_t* /*bytes*/) override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
@@ -157,7 +163,7 @@ class MockRowSet : public RowSet {
                                 MonoTime /*deadline*/,
                                 const fs::IOContext* /*io_context*/,
                                 int64_t* /*delta_blocks_initialized*/,
-                                int64_t* /*bytes_in_ancient_undos*/) OVERRIDE {
+                                int64_t* /*bytes_in_ancient_undos*/) override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
@@ -165,12 +171,12 @@ class MockRowSet : public RowSet {
   virtual Status DeleteAncientUndoDeltas(Timestamp /*ancient_history_mark*/,
                                          const fs::IOContext* /*io_context*/,
                                          int64_t* /*blocks_deleted*/,
-                                         int64_t* /*bytes_deleted*/) OVERRIDE {
+                                         int64_t* /*bytes_deleted*/) override {
     LOG(FATAL) << "Unimplemented";
     return Status::OK();
   }
 
-  virtual bool IsAvailableForCompaction() OVERRIDE {
+  virtual bool IsAvailableForCompaction() override {
     return true;
   }
 };
@@ -186,29 +192,29 @@ class MockDiskRowSet : public MockRowSet {
         column_size_(column_size) {}
 
   virtual Status GetBounds(std::string* min_encoded_key,
-                           std::string* max_encoded_key) const OVERRIDE {
+                           std::string* max_encoded_key) const override {
     *min_encoded_key = first_key_;
     *max_encoded_key = last_key_;
     return Status::OK();
   }
 
-  virtual uint64_t OnDiskSize() const OVERRIDE {
+  virtual uint64_t OnDiskSize() const override {
     return size_;
   }
 
-  virtual uint64_t OnDiskBaseDataSize() const OVERRIDE {
+  virtual uint64_t OnDiskBaseDataSize() const override {
     return size_;
   }
 
-  virtual uint64_t OnDiskBaseDataColumnSize(const ColumnId& col_id) const OVERRIDE {
+  virtual uint64_t OnDiskBaseDataColumnSize(const ColumnId& /*col_id*/) const override {
     return column_size_;
   }
 
-  virtual uint64_t OnDiskBaseDataSizeWithRedos() const OVERRIDE {
+  virtual uint64_t OnDiskBaseDataSizeWithRedos() const override {
     return size_;
   }
 
-  virtual std::string ToString() const OVERRIDE {
+  virtual std::string ToString() const override {
     return strings::Substitute("mock[$0, $1]",
                                Slice(first_key_).ToDebugString(),
                                Slice(last_key_).ToDebugString());
@@ -224,8 +230,8 @@ class MockDiskRowSet : public MockRowSet {
 // Mock which acts like a MemRowSet and has no known bounds.
 class MockMemRowSet : public MockRowSet {
  public:
-  virtual Status GetBounds(std::string* min_encoded_key,
-                           std::string* max_encoded_key) const OVERRIDE {
+  virtual Status GetBounds(std::string* /*min_encoded_key*/,
+                           std::string* /*max_encoded_key*/) const override {
     return Status::NotSupported("");
   }
 
diff --git a/src/kudu/tablet/mt-tablet-test.cc b/src/kudu/tablet/mt-tablet-test.cc
index 2321efe..f40b579 100644
--- a/src/kudu/tablet/mt-tablet-test.cc
+++ b/src/kudu/tablet/mt-tablet-test.cc
@@ -37,10 +37,12 @@
 #include "kudu/common/rowid.h"
 #include "kudu/common/schema.h"
 #include "kudu/gutil/basictypes.h"
+#include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/local_tablet_writer.h"
 #include "kudu/tablet/rowset.h"
 #include "kudu/tablet/tablet-harness.h"
 #include "kudu/tablet/tablet-test-base.h"
+#include "kudu/tablet/tablet-test-util.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/util/countdown_latch.h"
 #include "kudu/util/faststring.h"
@@ -61,6 +63,7 @@ DEFINE_int32(num_slowreader_threads, 1, "Number of 'slow' reader threads to laun
 DEFINE_int32(num_flush_threads, 1, "Number of flusher reader threads to launch");
 DEFINE_int32(num_compact_threads, 1, "Number of compactor threads to launch");
 DEFINE_int32(num_undo_delta_gc_threads, 1, "Number of undo delta gc threads to launch");
+DEFINE_int32(num_deleted_rowset_gc_threads, 1, "Number of deleted rowset gc threads to launch");
 DEFINE_int32(num_updater_threads, 1, "Number of updating threads to launch");
 DEFINE_int32(num_flush_delta_threads, 1, "Number of delta flusher reader threads to launch");
 DEFINE_int32(num_minor_compact_deltas_threads, 1,
@@ -82,6 +85,10 @@ using std::vector;
 namespace kudu {
 namespace tablet {
 
+namespace {
+const MonoDelta kBackgroundOpInterval = MonoDelta::FromMilliseconds(100);
+} // anonymous namespace
+
 template<class SETUP>
 class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
   // Import some names from superclass, since C++ is stingy about
@@ -113,8 +120,7 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
                                    TabletHarness::Options::ClockType::LOGICAL_CLOCK)
     : TabletTestBase<SETUP>(clock_type),
       running_insert_count_(FLAGS_num_insert_threads),
-      ts_collector_(::testing::UnitTest::GetInstance()->current_test_info()->test_case_name()) {
-  }
+      ts_collector_(::testing::UnitTest::GetInstance()->current_test_info()->test_case_name()) {}
 
   void InsertThread(int tid) {
     CountDownOnScopeExit dec_count(&running_insert_count_);
@@ -308,13 +314,12 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
     }
   }
 
-  void FlushDeltasThread(int tid) {
-    int wait_time = 100;
+  void FlushDeltasThread(int /*tid*/) {
     while (running_insert_count_.count() > 0) {
       CHECK_OK(tablet()->FlushBiggestDMS());
 
       // Wait, unless the inserters are all done.
-      running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(wait_time));
+      running_insert_count_.WaitFor(kBackgroundOpInterval);
     }
   }
 
@@ -327,30 +332,27 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
   }
 
   void CompactDeltas(RowSet::DeltaCompactionType type) {
-    int wait_time = 100;
     while (running_insert_count_.count() > 0) {
       VLOG(1) << "Compacting worst deltas";
       CHECK_OK(tablet()->CompactWorstDeltas(type));
 
       // Wait, unless the inserters are all done.
-      running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(wait_time));
+      running_insert_count_.WaitFor(kBackgroundOpInterval);
     }
   }
 
-  void CompactThread(int tid) {
-    int wait_time = 100;
+  void CompactThread(int /*tid*/) {
     while (running_insert_count_.count() > 0) {
       CHECK_OK(tablet()->Compact(Tablet::COMPACT_NO_FLAGS));
 
       // Wait, unless the inserters are all done.
-      running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(wait_time));
+      running_insert_count_.WaitFor(kBackgroundOpInterval);
     }
   }
 
   void DeleteAncientUndoDeltasThread(int /*tid*/) {
-    int wait_time = 100;
     while (running_insert_count_.count() > 0) {
-      MonoDelta time_budget = MonoDelta::FromMilliseconds(wait_time);
+      MonoDelta time_budget = kBackgroundOpInterval;
       int64_t bytes_in_ancient_undos = 0;
       CHECK_OK(tablet()->InitAncientUndoDeltas(time_budget, &bytes_in_ancient_undos));
       VLOG(1) << "Found " << bytes_in_ancient_undos << " bytes of ancient delta undos";
@@ -364,10 +366,26 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
       }
 
       // Wait, unless the inserters are all done.
-      running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(wait_time));
+      running_insert_count_.WaitFor(kBackgroundOpInterval);
     }
   }
 
+  // Thread that looks for rowsets that are ancient and fully deleted, GCing
+  // those that are.
+  void DeleteAncientDeletedRowsetsThreads(int /*tid*/) {
+    do {
+      int64_t bytes_in_ancient_deleted_rowsets = 0;
+      CHECK_OK(tablet()->GetBytesInAncientDeletedRowsets(&bytes_in_ancient_deleted_rowsets));
+      VLOG(1) << Substitute("Found $0 bytes in ancient, fully deleted rowsets",
+                            bytes_in_ancient_deleted_rowsets);
+      if (bytes_in_ancient_deleted_rowsets > 0) {
+        CHECK_OK(tablet()->DeleteAncientDeletedRowsets());
+        LOG(INFO) << Substitute("Deleted $0 bytes found in ancient, fully deleted rowsets",
+                                bytes_in_ancient_deleted_rowsets);
+      }
+    } while (!running_insert_count_.WaitFor(kBackgroundOpInterval));
+  }
+
   // Thread which cycles between inserting and deleting a test row, each time
   // with a different value.
   void DeleteAndReinsertCycleThread(int tid) {
@@ -556,18 +574,28 @@ TYPED_TEST(MultiThreadedHybridClockTabletTest, UpdateNoMergeCompaction) {
   FLAGS_flusher_initial_frequency_ms = 1;
   FLAGS_tablet_delta_store_major_compact_min_ratio = 0.01F;
   FLAGS_tablet_delta_store_minor_compact_max = 10;
+
+  // Start up our background op threads, targeting the creation of delta files.
   this->StartThreads(FLAGS_num_flush_threads,
                      [this](int i) { this->FlushThread(i); });
   this->StartThreads(FLAGS_num_flush_delta_threads,
                      [this](int i) { this->FlushDeltasThread(i); });
   this->StartThreads(FLAGS_num_major_compact_deltas_threads,
                      [this](int i) { this->MajorCompactDeltasThread(i); });
+  this->StartThreads(FLAGS_num_undo_delta_gc_threads,
+                     [this](int i) { this->DeleteAncientUndoDeltasThread(i); });
+  this->StartThreads(FLAGS_num_deleted_rowset_gc_threads,
+                     [this](int i) { this->DeleteAncientDeletedRowsetsThreads(i); });
+  // Start our workload threads, targeting the creation of deltas that we can
+  // eventually GC.
   this->StartThreads(10,
                      [this](int i) { this->DeleteAndReinsertCycleThread(i); });
   this->StartThreads(10,
                      [this](int i) { this->StubbornlyUpdateSameRowThread(i); });
-  this->StartThreads(FLAGS_num_undo_delta_gc_threads,
-                     [this](int i) { this->DeleteAncientUndoDeltasThread(i); });
+
+  // For good measure, we'll also start a thread that scans.
+  this->StartThreads(FLAGS_num_summer_threads,
+                     [this](int i) { this->SummerThread(i); });
 
   // Run very quickly in dev builds, longer in slow builds.
   float runtime_seconds = AllowSlowTests() ? 2 : 0.1;
diff --git a/src/kudu/tablet/rowset.h b/src/kudu/tablet/rowset.h
index 524572a..85c8c99 100644
--- a/src/kudu/tablet/rowset.h
+++ b/src/kudu/tablet/rowset.h
@@ -219,6 +219,13 @@ class RowSet {
   // Compact delta stores if more than one.
   virtual Status MinorCompactDeltaStores(const fs::IOContext* io_context) = 0;
 
+  // Returns whether the rowset contains no live rows and is fully ancient (its
+  // newest update is older than 'ancient_history_mark').
+  //
+  // This may return false negatives, but should not return false positives.
+  virtual Status IsDeletedAndFullyAncient(Timestamp ancient_history_mark,
+                                          bool* deleted_and_ancient) = 0;
+
   // Estimate the number of bytes in ancient undo delta stores. This may be an
   // overestimate. The argument 'ancient_history_mark' must be valid (it may
   // not be equal to Timestamp::kInvalidTimestamp).
@@ -474,6 +481,13 @@ class DuplicatingRowSet : public RowSet {
     return Status::OK();
   }
 
+  Status IsDeletedAndFullyAncient(Timestamp /*ancient_history_mark*/,
+                                  bool* deleted_and_ancient) override {
+    DCHECK(deleted_and_ancient);
+    *deleted_and_ancient = false;
+    return Status::OK();
+  }
+
   Status InitUndoDeltas(Timestamp /*ancient_history_mark*/,
                         MonoTime /*deadline*/,
                         const fs::IOContext* /*io_context*/,
diff --git a/src/kudu/tablet/tablet-test-base.h b/src/kudu/tablet/tablet-test-base.h
index 763b694..90f544e 100644
--- a/src/kudu/tablet/tablet-test-base.h
+++ b/src/kudu/tablet/tablet-test-base.h
@@ -339,6 +339,14 @@ class TabletTestBase : public KuduTabletTest {
     InsertOrUpsertTestRows(RowOperationsPB::UPSERT, first_row, count, val, ts);
   }
 
+  // Deletes 'count' rows, starting with 'first_row'.
+  void DeleteTestRows(int64_t first_row, int64_t count) {
+    LocalTabletWriter writer(tablet().get(), &client_schema_);
+    for (auto i = first_row; i < first_row + count; i++) {
+      CHECK_OK(DeleteTestRow(&writer, i));
+    }
+  }
+
   void InsertOrUpsertTestRows(RowOperationsPB::Type type,
                               int64_t first_row,
                               int64_t count,
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 06661f8..36ac2ff 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -1476,6 +1476,14 @@ void Tablet::RegisterMaintenanceOps(MaintenanceManager* maint_mgr) {
   maint_mgr->RegisterOp(undo_delta_block_gc_op.get());
   maintenance_ops.push_back(undo_delta_block_gc_op.release());
 
+  // The deleted rowset GC operation relies on live rowset counting. If this
+  // tablet doesn't support such counting, do not register the op.
+  if (metadata_->supports_live_row_count()) {
+    unique_ptr<MaintenanceOp> deleted_rowset_gc_op(new DeletedRowsetGCOp(this));
+    maint_mgr->RegisterOp(deleted_rowset_gc_op.get());
+    maintenance_ops.push_back(deleted_rowset_gc_op.release());
+  }
+
   std::lock_guard<simple_spinlock> l(state_lock_);
   maintenance_ops_.swap(maintenance_ops);
 }
@@ -2308,6 +2316,87 @@ Status Tablet::InitAncientUndoDeltas(MonoDelta time_budget, int64_t* bytes_in_an
   return Status::OK();
 }
 
+Status Tablet::GetBytesInAncientDeletedRowsets(int64_t* bytes_in_ancient_deleted_rowsets) {
+  Timestamp ancient_history_mark;
+  if (!Tablet::GetTabletAncientHistoryMark(&ancient_history_mark)) {
+    VLOG_WITH_PREFIX(1) << "Cannot get ancient history mark. "
+                           "The clock is likely not a hybrid clock";
+    *bytes_in_ancient_deleted_rowsets = 0;
+    return Status::OK();
+  }
+
+  scoped_refptr<TabletComponents> comps;
+  GetComponents(&comps);
+  int64_t bytes = 0;
+  {
+    std::lock_guard<std::mutex> csl(compact_select_lock_);
+    for (const auto& rowset : comps->rowsets->all_rowsets()) {
+      if (!rowset->IsAvailableForCompaction()) {
+        continue;
+      }
+      bool deleted_and_ancient = false;
+      RETURN_NOT_OK(rowset->IsDeletedAndFullyAncient(ancient_history_mark, &deleted_and_ancient));
+      if (deleted_and_ancient) {
+        bytes += rowset->OnDiskSize();
+      }
+    }
+  }
+  metrics_->deleted_rowset_estimated_retained_bytes->set_value(bytes);
+  *bytes_in_ancient_deleted_rowsets = bytes;
+  return Status::OK();
+}
+
+Status Tablet::DeleteAncientDeletedRowsets() {
+  RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen);
+  const MonoTime start_time = MonoTime::Now();
+  Timestamp ancient_history_mark;
+  if (!Tablet::GetTabletAncientHistoryMark(&ancient_history_mark)) {
+    VLOG_WITH_PREFIX(1) << "Cannot get ancient history mark. "
+                           "The clock is likely not a hybrid clock";
+    return Status::OK();
+  }
+
+  scoped_refptr<TabletComponents> comps;
+  GetComponents(&comps);
+
+  // We'll take our the rowsets' locks to ensure we don't GC the rowsets while
+  // they're being compacted.
+  RowSetVector to_delete;
+  int num_unavailable_for_delete = 0;
+  vector<std::unique_lock<std::mutex>> rowset_locks;
+  int64_t bytes_deleted = 0;
+  {
+    std::lock_guard<std::mutex> csl(compact_select_lock_);
+    for (const auto& rowset : comps->rowsets->all_rowsets()) {
+      // Check if this rowset has been locked by a compaction. If so, we
+      // shouldn't attempt to delete it.
+      if (!rowset->IsAvailableForCompaction()) {
+        num_unavailable_for_delete++;
+        continue;
+      }
+      bool deleted_and_empty = false;
+      RETURN_NOT_OK(rowset->IsDeletedAndFullyAncient(ancient_history_mark, &deleted_and_empty));
+      if (deleted_and_empty) {
+        // If we intend on deleting the rowset, take its lock so concurrent
+        // compactions don't try to select it for compactions.
+        std::unique_lock<std::mutex> l(*rowset->compact_flush_lock(), std::try_to_lock);
+        CHECK(l.owns_lock());
+        to_delete.emplace_back(rowset);
+        rowset_locks.emplace_back(std::move(l));
+        bytes_deleted += rowset->OnDiskSize();
+      }
+    }
+  }
+  if (to_delete.empty()) {
+    return Status::OK();
+  }
+  RETURN_NOT_OK(HandleEmptyCompactionOrFlush(
+      to_delete, TabletMetadata::kNoMrsFlushed));
+  metrics_->deleted_rowset_gc_bytes_deleted->IncrementBy(bytes_deleted);
+  metrics_->deleted_rowset_gc_duration->Increment((MonoTime::Now() - start_time).ToMilliseconds());
+  return Status::OK();
+}
+
 Status Tablet::DeleteAncientUndoDeltas(int64_t* blocks_deleted, int64_t* bytes_deleted) {
   RETURN_IF_STOPPED_OR_CHECK_STATE(kOpen);
   MonoTime tablet_delete_start = MonoTime::Now();
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index 6950cc0..6177420 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -333,10 +333,31 @@ class Tablet {
   // Find and delete all undo delta blocks that have a maximum op timestamp
   // prior to the current ancient history mark. If this method returns OK, the
   // number of blocks and bytes deleted are returned in the out-parameters.
+  //
+  // Returns an error if the metadata update fails. Upon failure, no in-memory
+  // state is change.
   Status DeleteAncientUndoDeltas(int64_t* blocks_deleted = nullptr,
                                  int64_t* bytes_deleted = nullptr);
 
-  // Count the number of deltas in the tablet. Only used for tests.
+  // Returns the number of bytes potentially used by rowsets that have no live
+  // rows and are entirely ancient.
+  //
+  // These checks may not touch on-disk block data if we can determine from the
+  // live row count that the rowsets aren't fully deleted, or from the DMS that
+  // the latest update is not considered ancient. If there is no DMS, looks at
+  // the newest redo but doesn't initialize it. As such, since we may miss out
+  // on counting rowsets we haven't initialized yet, this may be an
+  // underestimate.
+  Status GetBytesInAncientDeletedRowsets(int64_t* bytes_in_ancient_deleted_rowsets);
+
+  // Finds and GCs all fully deleted rowsets that have a maximum op timestamp
+  // prior to the current ancient history mark.
+  //
+  // Returns an error if the metadata update fails. Upon failure, no in-memory
+  // state is change.
+  Status DeleteAncientDeletedRowsets();
+
+  // Counts the number of deltas in the tablet. Only used for tests.
   int64_t CountUndoDeltasForTests() const;
   int64_t CountRedoDeltasForTests() const;
 
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index b4d980f..32dedbe 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -1706,13 +1706,17 @@ bool FlushedStoresSnapshot::IsMemStoreActive(const MemStoreTargetPB& target) con
       // If we have no data about this DRS, then there are two cases:
       //
       // 1) The DRS has already been flushed, but then later got removed because
-      // it got compacted away. Since it was flushed, we don't need to replay it.
+      //    it got compacted away or culled because it was empty. In the former
+      //    case, the deltas should have been reflected in the new compaction
+      //    output, and in the latter, all the rows in the rowset have been
+      //    deleted and there's nothing to replay.
       //
-      // 2) The DRS was in the process of being written, but haven't yet flushed the
-      // TabletMetadata update that includes it. We only write to an in-progress DRS like
-      // this when we are in the 'duplicating' phase of a compaction. In that case,
-      // the other duplicated 'target' should still be present in the metadata, and we
-      // can base our decision based on that one.
+      // 2) The DRS was in the process of being written, but haven't yet
+      //    flushed the TabletMetadata update that includes it. We only write
+      //    to an in-progress DRS like this when we are in the 'duplicating'
+      //    phase of a compaction. In that case, the other duplicated 'target'
+      //    should still be present in the metadata, and we can base our
+      //    decision based on that one.
       return false;
     }
 
diff --git a/src/kudu/tablet/tablet_history_gc-test.cc b/src/kudu/tablet/tablet_history_gc-test.cc
index 4969900..b5fa40b 100644
--- a/src/kudu/tablet/tablet_history_gc-test.cc
+++ b/src/kudu/tablet/tablet_history_gc-test.cc
@@ -21,6 +21,7 @@
 #include <memory>
 #include <ostream>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include <boost/optional/optional.hpp>
@@ -97,8 +98,17 @@ class TabletHistoryGcTest : public TabletTestBase<IntKeyTestSetup<INT64>> {
     NO_FLUSH
   };
 
+  // Helper functions that mutate rows in batches of keys:
+  //   [0, rows_per_rowset)
+  //   [rows_per_rowset, 2*rows_per_rowset)
+  //   ...
+  //   [(num_rowsets - 1)*rows_per_rowset, num_rowsets*rows_per_rowset)
+  //
+  // ...flushing MRS or DMS (depending on the workload) in between batches.
   void InsertOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset);
   void UpdateOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset, int32_t val);
+  void DeleteOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset, bool flush_dms);
+
   void AddTimeToHybridClock(MonoDelta delta) {
     uint64_t now = HybridClock::GetPhysicalValueMicros(clock()->Now());
     uint64_t new_time = now + delta.ToMicroseconds();
@@ -107,29 +117,42 @@ class TabletHistoryGcTest : public TabletTestBase<IntKeyTestSetup<INT64>> {
   // Specify row regex to match on. Empty string means don't match anything.
   void VerifyDebugDumpRowsMatch(const string& pattern) const;
 
-  int64_t TotalNumRows() const { return num_rowsets_ * rows_per_rowset_; }
+  // Returns the total number of rows there should be after inserting rows.
+  int64_t TotalNumRows() const { return kNumRowsets * rows_per_rowset_; }
 
-  TestRowVerifier GenRowsEqualVerifier(int32_t expected_val) {
-    return [=](int32_t key, int32_t val) -> bool { return val == expected_val; };
+  // Returns a functor that returns whether all rows have 'expected_val' for
+  // their values.
+  static TestRowVerifier GenRowsEqualVerifier(int32_t expected_val) {
+    return [=](int32_t /*key*/, int32_t val) -> bool { return val == expected_val; };
   }
-
   const TestRowVerifier kRowsEqual0 = GenRowsEqualVerifier(0);
   const TestRowVerifier kRowsEqual1 = GenRowsEqualVerifier(1);
   const TestRowVerifier kRowsEqual2 = GenRowsEqualVerifier(2);
 
   const int kStartRow = 0;
-  int num_rowsets_ = 3;
+  const int kNumRowsets = 3;
   int64_t rows_per_rowset_ = 300;
 };
 
 void TabletHistoryGcTest::InsertOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset) {
   for (int rowset_id = 0; rowset_id < num_rowsets; rowset_id++) {
-    InsertTestRows(rowset_id * rows_per_rowset, rows_per_rowset, 0);
+    InsertTestRows(rowset_id * rows_per_rowset, rows_per_rowset, /*val*/0);
     ASSERT_OK(tablet()->Flush());
   }
   ASSERT_EQ(num_rowsets, tablet()->num_rowsets());
 }
 
+void TabletHistoryGcTest::DeleteOriginalRows(int64_t num_rowsets,
+    int64_t rows_per_rowset, bool flush_dms) {
+  for (int rowset_id = 0; rowset_id < num_rowsets; rowset_id++) {
+    NO_FATALS(DeleteTestRows(rowset_id * rows_per_rowset, rows_per_rowset));
+    if (flush_dms) {
+      ASSERT_OK(tablet()->FlushAllDMSForTests());
+    }
+  }
+  ASSERT_EQ(num_rowsets, tablet()->num_rowsets());
+}
+
 void TabletHistoryGcTest::UpdateOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset,
                                              int32_t val) {
   for (int rowset_id = 0; rowset_id < num_rowsets; rowset_id++) {
@@ -155,7 +178,7 @@ void TabletHistoryGcTest::VerifyDebugDumpRowsMatch(const string& pattern) const
 TEST_F(TabletHistoryGcTest, TestNoGenerateUndoOnMajorDeltaCompaction) {
   FLAGS_tablet_history_max_age_sec = 1; // Keep history for 1 second.
 
-  NO_FATALS(InsertOriginalRows(num_rowsets_, rows_per_rowset_));
+  NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_));
   NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), kRowsEqual0));
   Timestamp time_after_insert = clock()->Now();
 
@@ -169,7 +192,7 @@ TEST_F(TabletHistoryGcTest, TestNoGenerateUndoOnMajorDeltaCompaction) {
       ASSERT_OK(UpdateTestRow(&writer, row_idx, val));
     }
     // We must flush the DMS before major compaction can operate on these REDOs.
-    for (int i = 0; i < num_rowsets_; i++) {
+    for (int i = 0; i < kNumRowsets; i++) {
       tablet()->FlushBiggestDMS();
     }
     post_update_ts[val - 1] = clock()->Now();
@@ -189,7 +212,7 @@ TEST_F(TabletHistoryGcTest, TestNoGenerateUndoOnMajorDeltaCompaction) {
   NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), kRowsEqual2));
 
   // Run major delta compaction.
-  for (int i = 0; i < num_rowsets_; i++) {
+  for (int i = 0; i < kNumRowsets; i++) {
     ASSERT_OK(tablet()->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION));
   }
 
@@ -209,10 +232,9 @@ TEST_F(TabletHistoryGcTest, TestNoGenerateUndoOnMajorDeltaCompaction) {
 TEST_F(TabletHistoryGcTest, TestMajorDeltaCompactionOnSubsetOfColumns) {
   FLAGS_tablet_history_max_age_sec = 100;
 
-  num_rowsets_ = 3;
   rows_per_rowset_ = 20;
 
-  NO_FATALS(InsertOriginalRows(num_rowsets_, rows_per_rowset_));
+  NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_));
   NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), kRowsEqual0));
 
   LocalTabletWriter writer(tablet().get(), &client_schema_);
@@ -223,7 +245,7 @@ TEST_F(TabletHistoryGcTest, TestMajorDeltaCompactionOnSubsetOfColumns) {
     ASSERT_OK_FAST(row.SetInt32(2, 2));
     ASSERT_OK_FAST(writer.Update(row));
   }
-  for (int i = 0; i < num_rowsets_; i++) {
+  for (int i = 0; i < kNumRowsets; i++) {
     tablet()->FlushBiggestDMS();
   }
 
@@ -231,7 +253,7 @@ TEST_F(TabletHistoryGcTest, TestMajorDeltaCompactionOnSubsetOfColumns) {
 
   vector<std::shared_ptr<RowSet>> rowsets;
   tablet()->GetRowSetsForTests(&rowsets);
-  for (int i = 0; i < num_rowsets_; i++) {
+  for (int i = 0; i < kNumRowsets; i++) {
     DiskRowSet* drs = down_cast<DiskRowSet*>(rowsets[i].get());
     vector<ColumnId> col_ids_to_compact = { schema_.column_id(2) };
     ASSERT_OK(drs->MajorCompactDeltaStoresWithColumnIds(col_ids_to_compact, nullptr,
@@ -306,7 +328,7 @@ TEST_F(TabletHistoryGcTest, TestUndoGCOnMergeCompaction) {
   FLAGS_tablet_history_max_age_sec = 1; // Keep history for 1 second.
 
   Timestamp time_before_insert = clock()->Now();
-  NO_FATALS(InsertOriginalRows(num_rowsets_, rows_per_rowset_));
+  NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_));
   NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), kRowsEqual0));
 
   // The earliest thing we can see is an empty tablet.
@@ -327,7 +349,7 @@ TEST_F(TabletHistoryGcTest, TestUndoGCOnMergeCompaction) {
 TEST_F(TabletHistoryGcTest, TestRowRemovalGCOnMergeCompaction) {
   FLAGS_tablet_history_max_age_sec = 100; // Keep history for 100 seconds.
 
-  NO_FATALS(InsertOriginalRows(num_rowsets_, rows_per_rowset_));
+  NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_));
   NO_FATALS(VerifyTestRowsWithVerifier(kStartRow, TotalNumRows(), kRowsEqual0));
 
   Timestamp prev_time = clock()->Now();
@@ -366,7 +388,7 @@ TEST_F(TabletHistoryGcTest, TestRowRemovalGCOnMergeCompaction) {
 TEST_F(TabletHistoryGcTest, TestNoUndoGCUntilAncientHistoryMark) {
   FLAGS_tablet_history_max_age_sec = 1000; // 1000 seconds before we GC history.
 
-  NO_FATALS(InsertOriginalRows(num_rowsets_, rows_per_rowset_));
+  NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_));
 
   Timestamp prev_time = clock()->Now();
   NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(2)));
@@ -383,7 +405,7 @@ TEST_F(TabletHistoryGcTest, TestNoUndoGCUntilAncientHistoryMark) {
   NO_FATALS(VerifyTestRowsWithTimestampAndVerifier(kStartRow, TotalNumRows(), prev_time,
                                                    kRowsEqual0));
 
-  for (int i = 0; i < num_rowsets_; i++) {
+  for (int i = 0; i < kNumRowsets; i++) {
     ASSERT_OK(tablet()->CompactWorstDeltas(RowSet::MINOR_DELTA_COMPACTION));
     ASSERT_OK(tablet()->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION));
   }
@@ -429,11 +451,10 @@ TEST_F(TabletHistoryGcTest, TestGhostRowsNotRevived) {
 // non-GCed rows in each rowset.
 TEST_F(TabletHistoryGcTest, TestGcOnAlternatingRows) {
   FLAGS_tablet_history_max_age_sec = 100;
-  num_rowsets_ = 3;
   rows_per_rowset_ = 5;
 
   LocalTabletWriter writer(tablet().get(), &client_schema_);
-  for (int rowset_id = 0; rowset_id < num_rowsets_; rowset_id++) {
+  for (int rowset_id = 0; rowset_id < kNumRowsets; rowset_id++) {
     for (int i = 0; i < rows_per_rowset_; i++) {
       int32_t row_key = rowset_id * rows_per_rowset_ + i;
       ASSERT_OK(InsertTestRow(&writer, row_key, 0));
@@ -580,22 +601,22 @@ class TabletHistoryGcNoMaintMgrTest : public TabletHistoryGcTest {
 TEST_F(TabletHistoryGcNoMaintMgrTest, TestUndoDeltaBlockGc) {
   FLAGS_tablet_history_max_age_sec = 1000;
 
-  NO_FATALS(InsertOriginalRows(num_rowsets_, rows_per_rowset_));
-  ASSERT_EQ(num_rowsets_, tablet()->CountUndoDeltasForTests());
+  NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_));
+  ASSERT_EQ(kNumRowsets, tablet()->CountUndoDeltasForTests());
 
   // Generate a bunch of redo deltas and then compact them into undo deltas.
   constexpr int kNumMutationsPerRow = 5;
   for (int i = 0; i < kNumMutationsPerRow; i++) {
     SCOPED_TRACE(i);
-    ASSERT_EQ((i + 1) * num_rowsets_, tablet()->CountUndoDeltasForTests());
+    ASSERT_EQ((i + 1) * kNumRowsets, tablet()->CountUndoDeltasForTests());
     NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(1)));
-    NO_FATALS(UpdateOriginalRows(num_rowsets_, rows_per_rowset_, i));
+    NO_FATALS(UpdateOriginalRows(kNumRowsets, rows_per_rowset_, i));
     ASSERT_OK(tablet()->MajorCompactAllDeltaStoresForTests());
-    ASSERT_EQ((i + 2) * num_rowsets_, tablet()->CountUndoDeltasForTests());
+    ASSERT_EQ((i + 2) * kNumRowsets, tablet()->CountUndoDeltasForTests());
   }
 
   ASSERT_EQ(0, tablet()->CountRedoDeltasForTests());
-  const int expected_undo_blocks = (kNumMutationsPerRow + 1) * num_rowsets_;
+  const int expected_undo_blocks = (kNumMutationsPerRow + 1) * kNumRowsets;
   ASSERT_EQ(expected_undo_blocks, tablet()->CountUndoDeltasForTests());
 
   // There will be uninitialized undos so we will estimate that there may be
@@ -639,5 +660,59 @@ TEST_F(TabletHistoryGcNoMaintMgrTest, TestUndoDeltaBlockGc) {
   ASSERT_EQ(1, tablet()->metrics()->undo_delta_block_gc_delete_duration->TotalCount());
 }
 
+TEST_F(TabletHistoryGcNoMaintMgrTest, TestGCDeletedRowsetsWithRedoFiles) {
+  FLAGS_tablet_history_max_age_sec = 1000;
+  NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_));
+  ASSERT_EQ(kNumRowsets, tablet()->CountUndoDeltasForTests());
+
+  NO_FATALS(DeleteOriginalRows(kNumRowsets, rows_per_rowset_, /*flush_dms*/true));
+  ASSERT_EQ(kNumRowsets, tablet()->CountRedoDeltasForTests());
+
+  // TODO(awong): keep the delta stats cached after flushing a DMS so we don't
+  // have to scan to read stats.
+  NO_FATALS(VerifyTestRows(0, 0));
+
+  // We shouldn't have any ancient rowsets since we haven't passed the AHM.
+  int64_t bytes = 0;
+  ASSERT_OK(tablet()->GetBytesInAncientDeletedRowsets(&bytes));
+  ASSERT_EQ(0, bytes);
+
+  // Try to delete ancient deleted rowsets. This should effectively no-op.
+  ASSERT_OK(tablet()->DeleteAncientDeletedRowsets());
+  ASSERT_EQ(kNumRowsets, tablet()->CountUndoDeltasForTests());
+  ASSERT_EQ(kNumRowsets, tablet()->CountRedoDeltasForTests());
+  ASSERT_OK(tablet()->GetBytesInAncientDeletedRowsets(&bytes));
+  ASSERT_EQ(0, bytes);
+  const auto* metrics = tablet()->metrics();
+  ASSERT_EQ(0, metrics->deleted_rowset_gc_bytes_deleted->value());
+  ASSERT_EQ(0, metrics->deleted_rowset_gc_duration->TotalCount());
+
+  // Move the clock so all rowsets are ancient. Our GC should succeed and we
+  // should be left with no rowsets.
+  NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec + 1)));
+  ASSERT_OK(tablet()->GetBytesInAncientDeletedRowsets(&bytes));
+  ASSERT_LT(0, bytes);
+  ASSERT_OK(tablet()->DeleteAncientDeletedRowsets());
+  ASSERT_EQ(0, tablet()->CountUndoDeltasForTests());
+  ASSERT_EQ(0, tablet()->CountRedoDeltasForTests());
+
+  // Check that these show up.
+  ASSERT_EQ(bytes, metrics->deleted_rowset_gc_bytes_deleted->value());
+  ASSERT_EQ(1, metrics->deleted_rowset_gc_duration->TotalCount());
+}
+
+TEST_F(TabletHistoryGcNoMaintMgrTest, TestGCDeletedRowsetsWithDMS) {
+  FLAGS_tablet_history_max_age_sec = 1000;
+  NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_));
+  NO_FATALS(DeleteOriginalRows(kNumRowsets, rows_per_rowset_, /*flush_dms*/false));
+  NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec + 1)));
+  int64_t bytes = 0;
+  ASSERT_OK(tablet()->GetBytesInAncientDeletedRowsets(&bytes));
+  ASSERT_LT(0, bytes);
+  ASSERT_OK(tablet()->DeleteAncientDeletedRowsets());
+  ASSERT_EQ(bytes, tablet()->metrics()->deleted_rowset_gc_bytes_deleted->value());
+  ASSERT_EQ(1, tablet()->metrics()->deleted_rowset_gc_duration->TotalCount());
+}
+
 } // namespace tablet
 } // namespace kudu
diff --git a/src/kudu/tablet/tablet_metrics.cc b/src/kudu/tablet/tablet_metrics.cc
index a135f8d..a8cccbc 100644
--- a/src/kudu/tablet/tablet_metrics.cc
+++ b/src/kudu/tablet/tablet_metrics.cc
@@ -151,6 +151,12 @@ METRIC_DEFINE_counter(tablet, undo_delta_block_gc_bytes_deleted,
                       "Does not include bytes garbage collected during compactions.",
                       kudu::MetricLevel::kDebug);
 
+METRIC_DEFINE_counter(tablet, deleted_rowset_gc_bytes_deleted,
+                      "Deleted Rowsets GC Bytes Deleted",
+                      kudu::MetricUnit::kBytes,
+                      "Number of bytes deleted by garbage-collecting deleted rowsets.",
+                      kudu::MetricLevel::kDebug);
+
 METRIC_DEFINE_histogram(tablet, bloom_lookups_per_op, "Bloom Lookups per Operation",
                         kudu::MetricUnit::kProbes,
                         "Tracks the number of bloom filter lookups performed by each "
@@ -249,6 +255,18 @@ METRIC_DEFINE_gauge_int64(tablet, undo_delta_block_estimated_retained_bytes,
   "May be an overestimate.",
   kudu::MetricLevel::kDebug);
 
+METRIC_DEFINE_gauge_uint32(tablet, deleted_rowset_gc_running,
+  "Deleted Rowset GC Running",
+  kudu::MetricUnit::kMaintenanceOperations,
+  "Number of deleted rowset GC operations currently running.",
+  kudu::MetricLevel::kDebug);
+
+METRIC_DEFINE_gauge_int64(tablet, deleted_rowset_estimated_retained_bytes,
+  "Estimated Deletable Bytes Retained in Deleted Rowsets",
+  kudu::MetricUnit::kBytes,
+  "Estimated bytes of deletable data in deleted rowsets for this tablet.",
+  kudu::MetricLevel::kDebug);
+
 METRIC_DEFINE_histogram(tablet, flush_dms_duration,
   "DeltaMemStore Flush Duration",
   kudu::MetricUnit::kMilliseconds,
@@ -305,6 +323,13 @@ METRIC_DEFINE_histogram(tablet, undo_delta_block_gc_perform_duration,
   kudu::MetricLevel::kInfo,
   60000LU, 1);
 
+METRIC_DEFINE_histogram(tablet, deleted_rowset_gc_duration,
+  "Deleted Rowset GC Duration",
+  kudu::MetricUnit::kMilliseconds,
+  "Time spent running the maintenance operation to GC deleted rowsets.",
+  kudu::MetricLevel::kInfo,
+  60000LU, 1);
+
 METRIC_DEFINE_counter(tablet, leader_memory_pressure_rejections,
   "Leader Memory Pressure Rejections",
   kudu::MetricUnit::kRequests,
@@ -347,6 +372,7 @@ TabletMetrics::TabletMetrics(const scoped_refptr<MetricEntity>& entity)
     MINIT(delta_file_lookups),
     MINIT(mrs_lookups),
     MINIT(bytes_flushed),
+    MINIT(deleted_rowset_gc_bytes_deleted),
     MINIT(undo_delta_block_gc_bytes_deleted),
     MINIT(bloom_lookups_per_op),
     MINIT(key_file_lookups_per_op),
@@ -358,6 +384,8 @@ TabletMetrics::TabletMetrics(const scoped_refptr<MetricEntity>& entity)
     GINIT(flush_dms_running),
     GINIT(flush_mrs_running),
     GINIT(compact_rs_running),
+    GINIT(deleted_rowset_estimated_retained_bytes),
+    GINIT(deleted_rowset_gc_running),
     GINIT(delta_minor_compact_rs_running),
     GINIT(delta_major_compact_rs_running),
     GINIT(undo_delta_block_gc_running),
@@ -365,6 +393,7 @@ TabletMetrics::TabletMetrics(const scoped_refptr<MetricEntity>& entity)
     MINIT(flush_dms_duration),
     MINIT(flush_mrs_duration),
     MINIT(compact_rs_duration),
+    MINIT(deleted_rowset_gc_duration),
     MINIT(delta_minor_compact_rs_duration),
     MINIT(delta_major_compact_rs_duration),
     MINIT(undo_delta_block_gc_init_duration),
diff --git a/src/kudu/tablet/tablet_metrics.h b/src/kudu/tablet/tablet_metrics.h
index 7dab1ea..1975779 100644
--- a/src/kudu/tablet/tablet_metrics.h
+++ b/src/kudu/tablet/tablet_metrics.h
@@ -70,6 +70,7 @@ struct TabletMetrics {
 
   // Operation stats.
   scoped_refptr<Counter> bytes_flushed;
+  scoped_refptr<Counter> deleted_rowset_gc_bytes_deleted;
   scoped_refptr<Counter> undo_delta_block_gc_bytes_deleted;
 
   scoped_refptr<Histogram> bloom_lookups_per_op;
@@ -84,6 +85,8 @@ struct TabletMetrics {
   scoped_refptr<AtomicGauge<uint32_t> > flush_dms_running;
   scoped_refptr<AtomicGauge<uint32_t> > flush_mrs_running;
   scoped_refptr<AtomicGauge<uint32_t> > compact_rs_running;
+  scoped_refptr<AtomicGauge<int64_t> > deleted_rowset_estimated_retained_bytes;
+  scoped_refptr<AtomicGauge<uint32_t> > deleted_rowset_gc_running;
   scoped_refptr<AtomicGauge<uint32_t> > delta_minor_compact_rs_running;
   scoped_refptr<AtomicGauge<uint32_t> > delta_major_compact_rs_running;
   scoped_refptr<AtomicGauge<uint32_t> > undo_delta_block_gc_running;
@@ -92,6 +95,7 @@ struct TabletMetrics {
   scoped_refptr<Histogram> flush_dms_duration;
   scoped_refptr<Histogram> flush_mrs_duration;
   scoped_refptr<Histogram> compact_rs_duration;
+  scoped_refptr<Histogram> deleted_rowset_gc_duration;
   scoped_refptr<Histogram> delta_minor_compact_rs_duration;
   scoped_refptr<Histogram> delta_major_compact_rs_duration;
   scoped_refptr<Histogram> undo_delta_block_gc_init_duration;
diff --git a/src/kudu/tablet/tablet_mm_ops.cc b/src/kudu/tablet/tablet_mm_ops.cc
index 849d74a..c0f3caf 100644
--- a/src/kudu/tablet/tablet_mm_ops.cc
+++ b/src/kudu/tablet/tablet_mm_ops.cc
@@ -76,6 +76,13 @@ DEFINE_bool(enable_undo_delta_block_gc, true,
 TAG_FLAG(enable_undo_delta_block_gc, runtime);
 TAG_FLAG(enable_undo_delta_block_gc, unsafe);
 
+DEFINE_bool(enable_deleted_rowset_gc, true,
+    "Whether to enable garbage collection of fully deleted rowsets. Disabling "
+    "deleted rowset garbage collection may increase disk space usage for workloads "
+    "that involve a high number of deletes. Only deleted rowsets that are entirely "
+    "considered ancient history (see --tablet_history_max_age_sec) are deleted.");
+TAG_FLAG(enable_deleted_rowset_gc, runtime);
+
 using std::string;
 using strings::Substitute;
 
@@ -387,5 +394,46 @@ std::string UndoDeltaBlockGCOp::LogPrefix() const {
   return tablet_->LogPrefix();
 }
 
+DeletedRowsetGCOp::DeletedRowsetGCOp(Tablet* tablet)
+    : TabletOpBase(Substitute("DeletedRowSetGCOp($0)", tablet->tablet_id()),
+                   MaintenanceOp::HIGH_IO_USAGE, tablet),
+      running_(false) {
+}
+
+void DeletedRowsetGCOp::UpdateStats(MaintenanceOpStats* stats) {
+  if (!FLAGS_enable_deleted_rowset_gc) {
+    stats->set_runnable(false);
+    return;
+  }
+  if (running_.load()) {
+    VLOG(1) << LogPrefix() << " not updating stats: already running";
+    stats->set_runnable(false);
+    return;
+  }
+  int64_t estimated_retained_bytes = 0;
+  WARN_NOT_OK(tablet_->GetBytesInAncientDeletedRowsets(&estimated_retained_bytes),
+              "Unable to count bytes in ancient, deleted rowsets");
+  stats->set_data_retained_bytes(estimated_retained_bytes);
+  stats->set_runnable(estimated_retained_bytes > 0);
+}
+
+void DeletedRowsetGCOp::Perform() {
+  WARN_NOT_OK(tablet_->DeleteAncientDeletedRowsets(),
+      Substitute("$0GC of deleted rowsets failed", LogPrefix()));
+  running_.store(false);
+}
+
+scoped_refptr<Histogram> DeletedRowsetGCOp::DurationHistogram() const {
+  return tablet_->metrics()->deleted_rowset_gc_duration;
+}
+
+scoped_refptr<AtomicGauge<uint32_t>> DeletedRowsetGCOp::RunningGauge() const {
+  return tablet_->metrics()->deleted_rowset_gc_running;
+}
+
+std::string DeletedRowsetGCOp::LogPrefix() const {
+  return tablet_->LogPrefix();
+}
+
 } // namespace tablet
 } // namespace kudu
diff --git a/src/kudu/tablet/tablet_mm_ops.h b/src/kudu/tablet/tablet_mm_ops.h
index e85dcf0..114e551 100644
--- a/src/kudu/tablet/tablet_mm_ops.h
+++ b/src/kudu/tablet/tablet_mm_ops.h
@@ -14,10 +14,9 @@
 // KIND, either express or implied.  See the License for the
 // specific language governing permissions and limitations
 // under the License.
+#pragma once
 
-#ifndef KUDU_TABLET_TABLET_MM_OPS_H_
-#define KUDU_TABLET_TABLET_MM_OPS_H_
-
+#include <atomic>
 #include <cstdint>
 #include <string>
 
@@ -159,8 +158,39 @@ class UndoDeltaBlockGCOp : public TabletOpBase {
   DISALLOW_COPY_AND_ASSIGN(UndoDeltaBlockGCOp);
 };
 
+// MaintenanceOp to garbage-collect entire rowsets that are fully deleted and
+// older than the ancient history mark.
+class DeletedRowsetGCOp : public TabletOpBase {
+ public:
+  explicit DeletedRowsetGCOp(Tablet* tablet);
+
+  // Estimate the number of bytes from rowsets that have been fully deleted and
+  // exist entirely before the AHM (i.e. their most recent update happened
+  // before the AHM).
+  void UpdateStats(MaintenanceOpStats* stats) override;
+
+  // If this op is already running, we shouldn't run it again.
+  bool Prepare() override {
+    bool false_ref = false;
+    return running_.compare_exchange_strong(false_ref, true);
+  }
+
+  // Deletes ancient deleted rowsets from disk.
+  void Perform() override;
+
+  // Metrics for this op.
+  scoped_refptr<Histogram> DurationHistogram() const override;
+  scoped_refptr<AtomicGauge<uint32_t> > RunningGauge() const override;
+ private:
+  std::string LogPrefix() const;
+
+  // Used to ensure only a single instance of this op is scheduled per tablet
+  // at a time.
+  std::atomic<bool> running_;
+
+  DISALLOW_COPY_AND_ASSIGN(DeletedRowsetGCOp);
+};
 
 } // namespace tablet
 } // namespace kudu
 
-#endif /* KUDU_TABLET_TABLET_MM_OPS_H_ */
diff --git a/src/kudu/tablet/tablet_replica-test.cc b/src/kudu/tablet/tablet_replica-test.cc
index d805735..c4b88d3 100644
--- a/src/kudu/tablet/tablet_replica-test.cc
+++ b/src/kudu/tablet/tablet_replica-test.cc
@@ -52,6 +52,7 @@
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/rpc/messenger.h"
 #include "kudu/rpc/result_tracker.h"
+#include "kudu/tablet/tablet-harness.h"
 #include "kudu/tablet/tablet-test-util.h"
 #include "kudu/tablet/tablet.h"
 #include "kudu/tablet/tablet_bootstrap.h"
@@ -76,7 +77,9 @@
 #include "kudu/util/test_util.h"
 #include "kudu/util/threadpool.h"
 
+DECLARE_bool(enable_maintenance_manager);
 DECLARE_int32(flush_threshold_mb);
+DECLARE_int32(tablet_history_max_age_sec);
 
 METRIC_DECLARE_entity(tablet);
 
@@ -115,7 +118,8 @@ static Schema GetTestSchema() {
 class TabletReplicaTest : public KuduTabletTest {
  public:
   TabletReplicaTest()
-      : KuduTabletTest(GetTestSchema()),
+      : KuduTabletTest(GetTestSchema(),
+                       TabletHarness::Options::ClockType::HYBRID_CLOCK),
         insert_counter_(0),
         delete_counter_(0),
         dns_resolver_(new DnsResolver) {
@@ -341,22 +345,20 @@ class TabletReplicaTest : public KuduTabletTest {
   // Execute insert requests and roll log after each one.
   Status ExecuteInsertsAndRollLogs(int num_inserts) {
     for (int i = 0; i < num_inserts; i++) {
-      unique_ptr<WriteRequestPB> req(new WriteRequestPB());
-      RETURN_NOT_OK(GenerateSequentialInsertRequest(GetTestSchema(), req.get()));
-      RETURN_NOT_OK(ExecuteWriteAndRollLog(tablet_replica_.get(), *req));
+      WriteRequestPB req;
+      RETURN_NOT_OK(GenerateSequentialInsertRequest(GetTestSchema(), &req));
+      RETURN_NOT_OK(ExecuteWriteAndRollLog(tablet_replica_.get(), req));
     }
-
     return Status::OK();
   }
 
   // Execute delete requests and roll log after each one.
   Status ExecuteDeletesAndRollLogs(int num_deletes) {
     for (int i = 0; i < num_deletes; i++) {
-      unique_ptr<WriteRequestPB> req(new WriteRequestPB());
-      CHECK_OK(GenerateSequentialDeleteRequest(req.get()));
-      CHECK_OK(ExecuteWriteAndRollLog(tablet_replica_.get(), *req));
+      WriteRequestPB req;
+      RETURN_NOT_OK(GenerateSequentialDeleteRequest(&req));
+      RETURN_NOT_OK(ExecuteWriteAndRollLog(tablet_replica_.get(), req));
     }
-
     return Status::OK();
   }
 
@@ -797,5 +799,44 @@ TEST_F(TabletReplicaTest, TestLiveRowCountMetric) {
   ASSERT_EQ(kNumInsert - kNumDelete, live_row_count->value());
 }
 
+TEST_F(TabletReplicaTest, TestRestartAfterGCDeletedRowsets) {
+  FLAGS_enable_maintenance_manager = false;
+  FLAGS_tablet_history_max_age_sec = 1;
+  const int kNumRows = 10;
+  ConsensusBootstrapInfo info;
+  ASSERT_OK(StartReplicaAndWaitUntilLeader(info));
+  auto* tablet = tablet_replica_->tablet();
+  // Metrics are already registered so pass a dummy lambda.
+  auto live_row_count = METRIC_live_row_count.InstantiateFunctionGauge(
+      tablet->GetMetricEntity(), [] () { return 0; });
+
+  // Insert some rows and flush so we get a DRS.
+  ASSERT_OK(ExecuteInsertsAndRollLogs(kNumRows));
+  ASSERT_OK(tablet->Flush());
+  ASSERT_OK(ExecuteDeletesAndRollLogs(kNumRows));
+  ASSERT_EQ(1, tablet->num_rowsets());
+  ASSERT_EQ(0, live_row_count->value());
+  SleepFor(MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec));
+
+  // Insert some fresh rows so we can validate that we don't GC everything.
+  ASSERT_OK(ExecuteInsertsAndRollLogs(kNumRows));
+  ASSERT_OK(tablet->Flush());
+  ASSERT_EQ(2, tablet->num_rowsets());
+  ASSERT_EQ(kNumRows, live_row_count->value());
+
+  // Now GC what we can. The first rowset should be gone.
+  ASSERT_OK(tablet->DeleteAncientDeletedRowsets());
+  ASSERT_EQ(1, tablet->num_rowsets());
+  ASSERT_EQ(kNumRows, live_row_count->value());
+
+  // Restart and ensure we can get online okay.
+  NO_FATALS(RestartReplica());
+  tablet = tablet_replica_->tablet();
+  ASSERT_EQ(1, tablet->num_rowsets());
+  live_row_count = METRIC_live_row_count.InstantiateFunctionGauge(
+      tablet->GetMetricEntity(), [] () { return 0; });
+  ASSERT_EQ(kNumRows, live_row_count->value());
+}
+
 } // namespace tablet
 } // namespace kudu


[kudu] 01/04: test: add some buffer in timing metrics

Posted by aw...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 7dff767a8ebd6ea4d61c78ea84ebaeb85e0f07d4
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Fri Mar 20 20:53:09 2020 -0700

    test: add some buffer in timing metrics
    
    We've seen some flakiness in TestEchoSubprocess where the time is off by
    1ms, with logs like:
    
    [ERROR - main] (RetryRule.java:219) org.apache.kudu.subprocess.echo.TestEchoSubprocess.testSlowWriterMetrics: failed attempt 1
    java.lang.AssertionError: Expected a higher outbound queue time: 399 ms
    	at org.junit.Assert.fail(Assert.java:89) ~[junit-4.13.jar:4.13]
    	at org.junit.Assert.assertTrue(Assert.java:42) ~[junit-4.13.jar:4.13]
    	at org.apache.kudu.subprocess.echo.TestEchoSubprocess.testSlowWriterMetrics(TestEchoSubprocess.java:194) ~[test/:?]
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_141]
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_141]
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_141]
            ...
    
    This patch adds some buffer (5ms to be conservative) to allow for some
    small errors.
    
    Change-Id: I5a96dd3804cedece2faf16b28d5c48396d87bcb7
    Reviewed-on: http://gerrit.cloudera.org:8080/15518
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 .../java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java   | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java
index 294e643..704d036 100644
--- a/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java
+++ b/java/kudu-subprocess/src/test/java/org/apache/kudu/subprocess/echo/TestEchoSubprocess.java
@@ -186,17 +186,20 @@ public class TestEchoSubprocess extends SubprocessTestUtil {
     SubprocessMetricsPB m = receiveResponse().getMetrics();
     Assert.assertEquals(2, m.getOutboundQueueLength());
 
+    // NOTE: timing on the exact slept time sometimes yields a small error, so
+    // leave some buffer in checking for correctness.
+    final int BUFFER_MS = 5;
     m = receiveResponse().getMetrics();
     Assert.assertEquals(1, m.getOutboundQueueLength());
     Assert.assertTrue(
         String.format("Expected a higher outbound queue time: %s ms", m.getOutboundQueueTimeMs()),
-        m.getOutboundQueueTimeMs() >= BLOCK_MS);
+        m.getOutboundQueueTimeMs() + BUFFER_MS >= BLOCK_MS);
 
     m = receiveResponse().getMetrics();
     Assert.assertEquals(0, m.getOutboundQueueLength());
     Assert.assertTrue(
         String.format("Expected a higher outbound queue time: %s ms", m.getOutboundQueueTimeMs()),
-        m.getOutboundQueueTimeMs() >= 2 * BLOCK_MS);
+        m.getOutboundQueueTimeMs() + BUFFER_MS >= 2 * BLOCK_MS);
   }
 
   /**