You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/03/21 05:16:55 UTC

[kudu] 04/04: tablet: plumb delta stats into delta compaction outputs

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 58f7a3015c967e7590e91d729cea5327d253d74d
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Thu Mar 19 02:30:08 2020 -0700

    tablet: plumb delta stats into delta compaction outputs
    
    This plumbs delta stats from a delta compaction into the delta tracker's
    call to open the output delta readers. This means that in-memory delta
    stats aren't "lost" when doing a delta compaction, and that we can GC
    ancient deleted rowsets after a delta compaction without reading the
    stats from disk.
    
    I considered also plumbing stats into merge compaction outputs, but
    opted not to. The plumbing for compactions seems more significant, and
    chances are, large bulk deletes will come after data has had time to be
    compacted (e.g. deleting old ranges of keyspace).
    
    Change-Id: Iea2f28fb2905ddcc007c88ab80ae2185587400f0
    Reviewed-on: http://gerrit.cloudera.org:8080/15506
    Reviewed-by: Adar Dembo <ad...@cloudera.com>
    Tested-by: Kudu Jenkins
---
 .../integration-tests/tablet_history_gc-itest.cc   |  1 -
 src/kudu/tablet/delta_compaction-test.cc           |  8 +--
 src/kudu/tablet/delta_compaction.cc                | 28 +++++----
 src/kudu/tablet/delta_store.cc                     |  6 +-
 src/kudu/tablet/delta_tracker.cc                   | 42 +++++++++----
 src/kudu/tablet/delta_tracker.h                    | 22 ++++---
 src/kudu/tablet/deltafile-test.cc                  |  6 +-
 src/kudu/tablet/deltafile.cc                       |  5 +-
 src/kudu/tablet/deltafile.h                        | 10 ++-
 src/kudu/tablet/deltamemstore-test.cc              |  4 +-
 src/kudu/tablet/deltamemstore.cc                   |  7 +--
 src/kudu/tablet/deltamemstore.h                    |  4 +-
 src/kudu/tablet/diskrowset.cc                      |  8 +--
 src/kudu/tablet/tablet-test-util.h                 |  6 +-
 src/kudu/tablet/tablet.cc                          |  2 +
 src/kudu/tablet/tablet_history_gc-test.cc          | 73 ++++++++++++++++------
 16 files changed, 147 insertions(+), 85 deletions(-)

diff --git a/src/kudu/integration-tests/tablet_history_gc-itest.cc b/src/kudu/integration-tests/tablet_history_gc-itest.cc
index 02928cc..205d793 100644
--- a/src/kudu/integration-tests/tablet_history_gc-itest.cc
+++ b/src/kudu/integration-tests/tablet_history_gc-itest.cc
@@ -284,7 +284,6 @@ TEST_F(TabletHistoryGcITest, TestUndoDeltaBlockGc) {
 TEST_F(TabletHistoryGcITest, TestDeletedRowsetGc) {
   // Disable merge compactions, since they may also cull deleted rowsets.
   FLAGS_enable_rowset_compaction = false;
-  FLAGS_enable_flush_deltamemstores = false; // Don't flush DMS so we don't have to init deltafiles.
   FLAGS_flush_threshold_secs = 0; // Flush as aggressively as possible.
   FLAGS_maintenance_manager_num_threads = 4; // Encourage concurrency.
   FLAGS_maintenance_manager_polling_interval_ms = 1; // Spin on MM for a quick test.
diff --git a/src/kudu/tablet/delta_compaction-test.cc b/src/kudu/tablet/delta_compaction-test.cc
index a602b11..77f9f20 100644
--- a/src/kudu/tablet/delta_compaction-test.cc
+++ b/src/kudu/tablet/delta_compaction-test.cc
@@ -143,7 +143,7 @@ TEST_F(TestDeltaCompaction, TestMergeMultipleSchemas) {
     // and update number (see update_value assignment).
     size_t kNumUpdates = 10;
     size_t kNumMultipleUpdates = kNumUpdates / 2;
-    DeltaStats stats;
+    unique_ptr<DeltaStats> stats(new DeltaStats);
     for (size_t i = 0; i < kNumUpdates; ++i) {
       buf.clear();
       RowChangeListEncoder update(&buf);
@@ -151,7 +151,7 @@ TEST_F(TestDeltaCompaction, TestMergeMultipleSchemas) {
         ColumnId col_id = schema.column_id(col_idx);
         DCHECK_GE(col_id, 0);
 
-        stats.IncrUpdateCount(col_id, 1);
+        stats->IncrUpdateCount(col_id, 1);
         const ColumnSchema& col_schema = schema.column(col_idx);
         int update_value = deltafile_idx * 100 + i;
         switch (col_schema.type_info()->physical_type()) {
@@ -180,12 +180,12 @@ TEST_F(TestDeltaCompaction, TestMergeMultipleSchemas) {
       DeltaKey key((i < kNumMultipleUpdates) ? i : row_id, Timestamp(curr_timestamp));
       RowChangeList row_changes = update.as_changelist();
       ASSERT_OK(dfw->AppendDelta<REDO>(key, row_changes));
-      ASSERT_OK(stats.UpdateStats(key.timestamp(), row_changes));
+      ASSERT_OK(stats->UpdateStats(key.timestamp(), row_changes));
       curr_timestamp++;
       row_id++;
     }
 
-    dfw->WriteDeltaStats(stats);
+    dfw->WriteDeltaStats(std::move(stats));
     ASSERT_OK(dfw->Finish());
     shared_ptr<DeltaFileReader> dfr;
     ASSERT_OK(GetDeltaFileReader(block_id, &dfr));
diff --git a/src/kudu/tablet/delta_compaction.cc b/src/kudu/tablet/delta_compaction.cc
index 21eefdd..1e9dda0 100644
--- a/src/kudu/tablet/delta_compaction.cc
+++ b/src/kudu/tablet/delta_compaction.cc
@@ -132,8 +132,8 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const IOContext* io_context) {
   RowBlock block(&partial_schema_, kRowsPerBlock, &arena);
 
   DVLOG(1) << "Applying deltas and rewriting columns (" << partial_schema_.ToString() << ")";
-  DeltaStats redo_stats;
-  DeltaStats undo_stats;
+  unique_ptr<DeltaStats> redo_stats(new DeltaStats);
+  unique_ptr<DeltaStats> undo_stats(new DeltaStats);
   size_t nrows = 0;
   // We know that we're reading everything from disk so we're including all transactions.
   MvccSnapshot snap = MvccSnapshot::CreateSnapshotIncludingAllTransactions();
@@ -199,7 +199,7 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const IOContext* io_context) {
       for (const Mutation *mut = new_undos_head; mut != nullptr; mut = mut->next()) {
         DeltaKey undo_key(nrows + dst_row.row_index(), mut->timestamp());
         RETURN_NOT_OK(new_undo_delta_writer_->AppendDelta<UNDO>(undo_key, mut->changelist()));
-        undo_stats.UpdateStats(mut->timestamp(), mut->changelist());
+        undo_stats->UpdateStats(mut->timestamp(), mut->changelist());
         undo_delta_mutations_written_++;
       }
     }
@@ -227,7 +227,7 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const IOContext* io_context) {
                << key_and_update.Stringify(DeltaType::REDO, base_schema_);
       RETURN_NOT_OK_PREPEND(new_redo_delta_writer_->AppendDelta<REDO>(key_and_update.key, update),
                             "Failed to append a delta");
-      WARN_NOT_OK(redo_stats.UpdateStats(key_and_update.key.timestamp(), update),
+      WARN_NOT_OK(redo_stats->UpdateStats(key_and_update.key.timestamp(), update),
                   "Failed to update stats");
     }
     redo_delta_mutations_written_ += out.size();
@@ -239,12 +239,12 @@ Status MajorDeltaCompaction::FlushRowSetAndDeltas(const IOContext* io_context) {
   RETURN_NOT_OK(base_data_writer_->FinishAndReleaseBlocks(transaction.get()));
 
   if (redo_delta_mutations_written_ > 0) {
-    new_redo_delta_writer_->WriteDeltaStats(redo_stats);
+    new_redo_delta_writer_->WriteDeltaStats(std::move(redo_stats));
     RETURN_NOT_OK(new_redo_delta_writer_->FinishAndReleaseBlock(transaction.get()));
   }
 
   if (undo_delta_mutations_written_ > 0) {
-    new_undo_delta_writer_->WriteDeltaStats(undo_stats);
+    new_undo_delta_writer_->WriteDeltaStats(std::move(undo_stats));
     RETURN_NOT_OK(new_undo_delta_writer_->FinishAndReleaseBlock(transaction.get()));
   }
   transaction->CommitCreatedBlocks();
@@ -417,19 +417,23 @@ Status MajorDeltaCompaction::UpdateDeltaTracker(DeltaTracker* tracker,
   // diskrowset operations.
 
   // Create blocks for the new redo deltas.
-  vector<BlockId> new_redo_blocks;
+  vector<DeltaBlockIdAndStats> new_redo_blocks;
   if (redo_delta_mutations_written_ > 0) {
-    new_redo_blocks.push_back(new_redo_delta_block_);
+    new_redo_blocks.emplace_back(std::make_pair(new_redo_delta_block_,
+        new_redo_delta_writer_->release_delta_stats()));
   }
   SharedDeltaStoreVector new_redo_stores;
-  RETURN_NOT_OK(tracker->OpenDeltaReaders(new_redo_blocks, io_context, &new_redo_stores, REDO));
+  RETURN_NOT_OK(tracker->OpenDeltaReaders(std::move(new_redo_blocks), io_context,
+                                          &new_redo_stores, REDO));
 
   // Create blocks for the new undo deltas.
   SharedDeltaStoreVector new_undo_stores;
   if (undo_delta_mutations_written_ > 0) {
-    vector<BlockId> new_undo_blocks;
-    new_undo_blocks.push_back(new_undo_delta_block_);
-    RETURN_NOT_OK(tracker->OpenDeltaReaders(new_undo_blocks, io_context, &new_undo_stores, UNDO));
+    vector<DeltaBlockIdAndStats> new_undo_blocks;
+    new_undo_blocks.emplace_back(std::make_pair(new_undo_delta_block_,
+        new_undo_delta_writer_->release_delta_stats()));
+    RETURN_NOT_OK(tracker->OpenDeltaReaders(std::move(new_undo_blocks), io_context,
+                                            &new_undo_stores, UNDO));
   }
 
   // 2. Only now that we cannot fail do we update the in-memory state.
diff --git a/src/kudu/tablet/delta_store.cc b/src/kudu/tablet/delta_store.cc
index 4ddfc9c..5938a34 100644
--- a/src/kudu/tablet/delta_store.cc
+++ b/src/kudu/tablet/delta_store.cc
@@ -623,7 +623,7 @@ Status WriteDeltaIteratorToFile(DeltaIterator* iter,
   RETURN_NOT_OK(iter->SeekToOrdinal(0));
 
   const size_t kRowsPerBlock = 100;
-  DeltaStats stats;
+  std::unique_ptr<DeltaStats> stats(new DeltaStats);
   Arena arena(32 * 1024);
   for (size_t i = 0; iter->HasNext(); ) {
     size_t n;
@@ -646,12 +646,12 @@ Status WriteDeltaIteratorToFile(DeltaIterator* iter,
     for (const DeltaKeyAndUpdate& cell : cells) {
       RowChangeList rcl(cell.cell);
       RETURN_NOT_OK(out->AppendDelta<Type>(cell.key, rcl));
-      RETURN_NOT_OK(stats.UpdateStats(cell.key.timestamp(), rcl));
+      RETURN_NOT_OK(stats->UpdateStats(cell.key.timestamp(), rcl));
     }
 
     i += n;
   }
-  out->WriteDeltaStats(stats);
+  out->WriteDeltaStats(std::move(stats));
   return Status::OK();
 }
 
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index 2e8c2ea..8b975a5 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -100,12 +100,14 @@ DeltaTracker::DeltaTracker(shared_ptr<RowSetMetadata> rowset_metadata,
       dms_exists_(false),
       deleted_row_count_(0) {}
 
-Status DeltaTracker::OpenDeltaReaders(const vector<BlockId>& blocks,
+Status DeltaTracker::OpenDeltaReaders(vector<DeltaBlockIdAndStats> blocks,
                                       const IOContext* io_context,
                                       vector<shared_ptr<DeltaStore> >* stores,
                                       DeltaType type) {
   FsManager* fs = rowset_metadata_->fs_manager();
-  for (const BlockId& block_id : blocks) {
+  for (auto& block_and_stats : blocks) {
+    const auto& block_id = block_and_stats.first;
+    unique_ptr<DeltaStats> stats = std::move(block_and_stats.second);
     unique_ptr<ReadableBlock> block;
     Status s = fs->OpenBlock(block_id, &block);
     if (!s.ok()) {
@@ -122,7 +124,7 @@ Status DeltaTracker::OpenDeltaReaders(const vector<BlockId>& blocks,
     s = DeltaFileReader::OpenNoInit(std::move(block),
                                     type,
                                     std::move(options),
-                                    /*delta_stats*/nullptr,
+                                    std::move(stats),
                                     &dfr);
     if (!s.ok()) {
       LOG_WITH_PREFIX(ERROR) << "Failed to open " << DeltaType_Name(type)
@@ -145,11 +147,19 @@ Status DeltaTracker::DoOpen(const IOContext* io_context) {
   CHECK(undo_delta_stores_.empty()) << "should call before opening any readers";
   CHECK(!open_);
 
-  RETURN_NOT_OK(OpenDeltaReaders(rowset_metadata_->redo_delta_blocks(),
+  vector<DeltaBlockIdAndStats> redos;
+  for (auto block_id : rowset_metadata_->redo_delta_blocks()) {
+    redos.emplace_back(std::make_pair(block_id, nullptr));
+  }
+  RETURN_NOT_OK(OpenDeltaReaders(std::move(redos),
                                  io_context,
                                  &redo_delta_stores_,
                                  REDO));
-  RETURN_NOT_OK(OpenDeltaReaders(rowset_metadata_->undo_delta_blocks(),
+  vector<DeltaBlockIdAndStats> undos;
+  for (auto block_id : rowset_metadata_->undo_delta_blocks()) {
+    undos.emplace_back(std::make_pair(block_id, nullptr));
+  }
+  RETURN_NOT_OK(OpenDeltaReaders(std::move(undos),
                                  io_context,
                                  &undo_delta_stores_,
                                  UNDO));
@@ -353,7 +363,7 @@ Status DeltaTracker::Compact(const IOContext* io_context) {
 
 Status DeltaTracker::CommitDeltaStoreMetadataUpdate(const RowSetMetadataUpdate& update,
                                                     const SharedDeltaStoreVector& to_remove,
-                                                    const vector<BlockId>& new_delta_blocks,
+                                                    vector<DeltaBlockIdAndStats> new_delta_blocks,
                                                     const IOContext* io_context,
                                                     DeltaType type,
                                                     MetadataFlushType flush_type) {
@@ -365,7 +375,7 @@ Status DeltaTracker::CommitDeltaStoreMetadataUpdate(const RowSetMetadataUpdate&
   DCHECK(!to_remove.empty());
 
   SharedDeltaStoreVector new_stores;
-  RETURN_NOT_OK_PREPEND(OpenDeltaReaders(new_delta_blocks, io_context,
+  RETURN_NOT_OK_PREPEND(OpenDeltaReaders(std::move(new_delta_blocks), io_context,
                                          &new_stores, type),
                         "Unable to open delta blocks");
 
@@ -428,12 +438,13 @@ Status DeltaTracker::CompactStores(const IOContext* io_context, int start_idx, i
   // Merge and compact the stores.
   vector<shared_ptr<DeltaStore> > compacted_stores;
   vector<BlockId> compacted_blocks;
-  RETURN_NOT_OK(DoCompactStores(io_context, start_idx, end_idx, std::move(block),
+  unique_ptr<DeltaStats> stats;
+  RETURN_NOT_OK(DoCompactStores(io_context, start_idx, end_idx, std::move(block), &stats,
                                 &compacted_stores, &compacted_blocks));
 
-  vector<BlockId> new_blocks = { new_block_id };
+  vector<BlockId> new_block = { new_block_id };
   RowSetMetadataUpdate update;
-  update.ReplaceRedoDeltaBlocks(compacted_blocks, new_blocks);
+  update.ReplaceRedoDeltaBlocks(compacted_blocks, new_block);
 
   const auto num_blocks_compacted = compacted_blocks.size();
   TRACE_COUNTER_INCREMENT("delta_blocks_compacted", num_blocks_compacted);
@@ -442,7 +453,10 @@ Status DeltaTracker::CompactStores(const IOContext* io_context, int start_idx, i
                                     num_blocks_compacted,
                                     BlockId::JoinStrings(compacted_blocks),
                                     new_block_id.ToString());
-  RETURN_NOT_OK_PREPEND(CommitDeltaStoreMetadataUpdate(update, compacted_stores, new_blocks,
+  vector<DeltaBlockIdAndStats> new_block_and_stats;
+  new_block_and_stats.emplace_back(std::make_pair(new_block_id, std::move(stats)));
+  RETURN_NOT_OK_PREPEND(CommitDeltaStoreMetadataUpdate(update, compacted_stores,
+                                                       std::move(new_block_and_stats),
                                                        io_context, REDO, FLUSH_METADATA),
                         "DeltaTracker: CompactStores: Unable to commit delta update");
   return Status::OK();
@@ -580,6 +594,7 @@ Status DeltaTracker::DeleteAncientUndoDeltas(Timestamp ancient_history_mark,
 Status DeltaTracker::DoCompactStores(const IOContext* io_context,
                                      size_t start_idx, size_t end_idx,
                                      unique_ptr<WritableBlock> block,
+                                     unique_ptr<DeltaStats>* output_stats,
                                      SharedDeltaStoreVector* compacted_stores,
                                      vector<BlockId> *compacted_blocks) {
   unique_ptr<DeltaIterator> inputs_merge;
@@ -598,6 +613,7 @@ Status DeltaTracker::DoCompactStores(const IOContext* io_context,
                                                ITERATE_OVER_ALL_ROWS,
                                                &dfw));
   RETURN_NOT_OK(dfw.Finish());
+  *output_stats = dfw.release_delta_stats();
   return Status::OK();
 }
 
@@ -738,9 +754,9 @@ Status DeltaTracker::FlushDMS(DeltaMemStore* dms,
                         Substitute("Unable to start writing to delta block $0",
                                    block_id.ToString()));
 
-  unique_ptr<DeltaStats> stats;
-  RETURN_NOT_OK(dms->FlushToFile(&dfw, &stats));
+  RETURN_NOT_OK(dms->FlushToFile(&dfw));
   RETURN_NOT_OK(dfw.Finish());
+  unique_ptr<DeltaStats> stats = dfw.release_delta_stats();
   const auto bytes_written = dfw.written_size();
   TRACE_COUNTER_INCREMENT("bytes_written", bytes_written);
   TRACE_COUNTER_INCREMENT("delete_count", stats->delete_count());
diff --git a/src/kudu/tablet/delta_tracker.h b/src/kudu/tablet/delta_tracker.h
index b317788..dfec6a3 100644
--- a/src/kudu/tablet/delta_tracker.h
+++ b/src/kudu/tablet/delta_tracker.h
@@ -20,14 +20,17 @@
 #include <cstdint>
 #include <memory>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include <gtest/gtest_prod.h>
 
 #include "kudu/common/rowid.h"
+#include "kudu/fs/block_id.h"
 #include "kudu/gutil/macros.h"
 #include "kudu/tablet/cfile_set.h"
 #include "kudu/tablet/delta_key.h"
+#include "kudu/tablet/delta_stats.h"
 #include "kudu/tablet/delta_store.h"
 #include "kudu/tablet/tablet_mem_trackers.h"
 #include "kudu/util/atomic.h"
@@ -37,7 +40,6 @@
 
 namespace kudu {
 
-class BlockId;
 class ColumnwiseIterator;
 class MonoTime;
 class RowChangeList;
@@ -68,6 +70,8 @@ class RowSetMetadataUpdate;
 struct ProbeStats;
 struct RowIteratorOptions;
 
+typedef std::pair<BlockId, std::unique_ptr<DeltaStats>> DeltaBlockIdAndStats;
+
 // The DeltaTracker is the part of a DiskRowSet which is responsible for
 // tracking modifications against the base data. It consists of a set of
 // DeltaStores which each contain a set of mutations against the base data.
@@ -160,7 +164,7 @@ class DeltaTracker {
   // The 'compact_flush_lock_' should be acquired before calling this method.
   Status CommitDeltaStoreMetadataUpdate(const RowSetMetadataUpdate& update,
                                         const SharedDeltaStoreVector& to_remove,
-                                        const std::vector<BlockId>& new_delta_blocks,
+                                        std::vector<DeltaBlockIdAndStats> new_delta_blocks,
                                         const fs::IOContext* io_context,
                                         DeltaType type,
                                         MetadataFlushType flush_type);
@@ -194,7 +198,7 @@ class DeltaTracker {
 
   // Opens the input 'blocks' of type 'type' and returns the opened delta file
   // readers in 'stores'.
-  Status OpenDeltaReaders(const std::vector<BlockId>& blocks,
+  Status OpenDeltaReaders(std::vector<DeltaBlockIdAndStats> blocks,
                           const fs::IOContext* io_context,
                           std::vector<std::shared_ptr<DeltaStore>>* stores,
                           DeltaType type);
@@ -296,16 +300,18 @@ class DeltaTracker {
   void CollectStores(std::vector<std::shared_ptr<DeltaStore>>* stores,
                      WhichStores which) const;
 
-  // Performs the actual compaction. Results of compaction are written to "block",
-  // while delta stores that underwent compaction are appended to "compacted_stores", while
-  // their corresponding block ids are appended to "compacted_blocks".
+  // Performs the actual compaction. Results of compaction are written with
+  // 'block' and stats for are populated in 'output_stats'. Delta stores that
+  // underwent compaction are appended to 'compacted_stores', and their
+  // corresponding block ids are appended to 'compacted_blocks'.
   //
   // NOTE: the caller of this method should acquire or already hold an
-  // exclusive lock on 'compact_flush_lock_' before calling this
-  // method in order to protect 'redo_delta_stores_'.
+  // exclusive lock on 'compact_flush_lock_' before calling this method in
+  // order to protect 'redo_delta_stores_'.
   Status DoCompactStores(const fs::IOContext* io_context,
                          size_t start_idx, size_t end_idx,
                          std::unique_ptr<fs::WritableBlock> block,
+                         std::unique_ptr<DeltaStats>* output_stats,
                          std::vector<std::shared_ptr<DeltaStore>>* compacted_stores,
                          std::vector<BlockId>* compacted_blocks);
 
diff --git a/src/kudu/tablet/deltafile-test.cc b/src/kudu/tablet/deltafile-test.cc
index c88396f..1acdb1a 100644
--- a/src/kudu/tablet/deltafile-test.cc
+++ b/src/kudu/tablet/deltafile-test.cc
@@ -110,7 +110,7 @@ class TestDeltaFile : public KuduTest {
     // Update even numbered rows.
     faststring buf;
 
-    DeltaStats stats;
+    unique_ptr<DeltaStats> stats(new DeltaStats);
     for (int i = FLAGS_first_row_to_update; i <= FLAGS_last_row_to_update; i += 2) {
       for (int timestamp = min_timestamp; timestamp <= max_timestamp; timestamp++) {
         buf.clear();
@@ -120,10 +120,10 @@ class TestDeltaFile : public KuduTest {
         DeltaKey key(i, Timestamp(timestamp));
         RowChangeList rcl(buf);
         ASSERT_OK_FAST(dfw.AppendDelta<REDO>(key, rcl));
-        ASSERT_OK_FAST(stats.UpdateStats(key.timestamp(), rcl));
+        ASSERT_OK_FAST(stats->UpdateStats(key.timestamp(), rcl));
       }
     }
-    dfw.WriteDeltaStats(stats);
+    dfw.WriteDeltaStats(std::move(stats));
     ASSERT_OK(dfw.Finish());
   }
 
diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc
index 5e83342..864c20c 100644
--- a/src/kudu/tablet/deltafile.cc
+++ b/src/kudu/tablet/deltafile.cc
@@ -195,13 +195,14 @@ Status DeltaFileWriter::AppendDelta<UNDO>(
   return DoAppendDelta(key, delta);
 }
 
-void DeltaFileWriter::WriteDeltaStats(const DeltaStats& stats) {
+void DeltaFileWriter::WriteDeltaStats(std::unique_ptr<DeltaStats> stats) {
   DeltaStatsPB delta_stats_pb;
-  stats.ToPB(&delta_stats_pb);
+  stats->ToPB(&delta_stats_pb);
 
   faststring buf;
   pb_util::SerializeToString(delta_stats_pb, &buf);
   writer_->AddMetadataPair(DeltaFileReader::kDeltaStatsEntryName, buf.ToString());
+  delta_stats_ = std::move(stats);
 }
 
 
diff --git a/src/kudu/tablet/deltafile.h b/src/kudu/tablet/deltafile.h
index 1850d1a..777069e 100644
--- a/src/kudu/tablet/deltafile.h
+++ b/src/kudu/tablet/deltafile.h
@@ -22,6 +22,7 @@
 #include <memory>
 #include <mutex>
 #include <string>
+#include <utility>
 #include <vector>
 
 #include <boost/optional/optional.hpp>
@@ -101,7 +102,11 @@ class DeltaFileWriter {
   template<DeltaType Type>
   Status AppendDelta(const DeltaKey &key, const RowChangeList &delta);
 
-  void WriteDeltaStats(const DeltaStats& stats);
+  void WriteDeltaStats(std::unique_ptr<DeltaStats> stats);
+
+  std::unique_ptr<DeltaStats> release_delta_stats() {
+    return std::move(delta_stats_);
+  }
 
   size_t written_size() const {
     return writer_->written_size();
@@ -110,6 +115,7 @@ class DeltaFileWriter {
  private:
   Status DoAppendDelta(const DeltaKey &key, const RowChangeList &delta);
 
+  std::unique_ptr<DeltaStats> delta_stats_;
   std::unique_ptr<cfile::CFileWriter> writer_;
 
   // Buffer used as a temporary for storing the serialized form
@@ -219,6 +225,8 @@ class DeltaFileReader : public DeltaStore,
 
   std::shared_ptr<cfile::CFileReader> reader_;
 
+  // TODO(awong): it'd be nice to not heap-allocate this and other usages of
+  // delta stats.
   mutable simple_spinlock stats_lock_;
   std::unique_ptr<DeltaStats> delta_stats_;
 
diff --git a/src/kudu/tablet/deltamemstore-test.cc b/src/kudu/tablet/deltamemstore-test.cc
index 51099e5..60d04c9 100644
--- a/src/kudu/tablet/deltamemstore-test.cc
+++ b/src/kudu/tablet/deltamemstore-test.cc
@@ -211,8 +211,8 @@ TEST_F(TestDeltaMemStore, TestUpdateCount) {
   ASSERT_OK(fs.CreateNewBlock({}, &block));
   DeltaFileWriter dfw(std::move(block));
   ASSERT_OK(dfw.Start());
-  unique_ptr<DeltaStats> stats;
-  dms_->FlushToFile(&dfw, &stats);
+  dms_->FlushToFile(&dfw);
+  unique_ptr<DeltaStats> stats = dfw.release_delta_stats();
 
   ASSERT_EQ(n_rows / 2, stats->update_count_for_col_id(schema_.column_id(kIntColumn)));
   ASSERT_EQ(n_rows / 4, stats->update_count_for_col_id(schema_.column_id(kStringColumn)));
diff --git a/src/kudu/tablet/deltamemstore.cc b/src/kudu/tablet/deltamemstore.cc
index d6aaddb..0309ca6 100644
--- a/src/kudu/tablet/deltamemstore.cc
+++ b/src/kudu/tablet/deltamemstore.cc
@@ -129,8 +129,7 @@ Status DeltaMemStore::Update(Timestamp timestamp,
   return Status::OK();
 }
 
-Status DeltaMemStore::FlushToFile(DeltaFileWriter *dfw,
-                                  unique_ptr<DeltaStats>* stats_ret) {
+Status DeltaMemStore::FlushToFile(DeltaFileWriter *dfw) {
   unique_ptr<DeltaStats> stats(new DeltaStats());
 
   unique_ptr<DMSTreeIter> iter(tree_.NewIterator());
@@ -146,9 +145,7 @@ Status DeltaMemStore::FlushToFile(DeltaFileWriter *dfw,
     stats->UpdateStats(key.timestamp(), rcl);
     iter->Next();
   }
-  dfw->WriteDeltaStats(*stats);
-
-  stats_ret->swap(stats);
+  dfw->WriteDeltaStats(std::move(stats));
   return Status::OK();
 }
 
diff --git a/src/kudu/tablet/deltamemstore.h b/src/kudu/tablet/deltamemstore.h
index 7603ebc..49dfaa1 100644
--- a/src/kudu/tablet/deltamemstore.h
+++ b/src/kudu/tablet/deltamemstore.h
@@ -105,9 +105,7 @@ class DeltaMemStore : public DeltaStore,
   void DebugPrint() const;
 
   // Flush the DMS to the given file writer.
-  // Returns statistics in *stats.
-  Status FlushToFile(DeltaFileWriter *dfw,
-                     std::unique_ptr<DeltaStats>* stats);
+  Status FlushToFile(DeltaFileWriter* dfw);
 
   // Create an iterator for applying deltas from this DMS.
   //
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index 8a939fb..caef925 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -449,8 +449,8 @@ Status RollingDiskRowSetWriter::FinishCurrentWriter() {
     RETURN_NOT_OK(writer_status);
     CHECK_GT(cur_writer_->written_count(), 0);
 
-    cur_undo_writer_->WriteDeltaStats(*cur_undo_delta_stats_);
-    cur_redo_writer_->WriteDeltaStats(*cur_redo_delta_stats_);
+    cur_undo_writer_->WriteDeltaStats(std::move(cur_undo_delta_stats_));
+    cur_redo_writer_->WriteDeltaStats(std::move(cur_redo_delta_stats_));
 
     // Commit the UNDO block. Status::Aborted() indicates that there
     // were no UNDOs written.
@@ -458,8 +458,6 @@ Status RollingDiskRowSetWriter::FinishCurrentWriter() {
     if (!s.IsAborted()) {
       RETURN_NOT_OK(s);
       cur_drs_metadata_->CommitUndoDeltaDataBlock(cur_undo_ds_block_id_);
-    } else {
-      DCHECK_EQ(cur_undo_delta_stats_->min_timestamp(), Timestamp::kMax);
     }
 
     // Same for the REDO block.
@@ -467,8 +465,6 @@ Status RollingDiskRowSetWriter::FinishCurrentWriter() {
     if (!s.IsAborted()) {
       RETURN_NOT_OK(s);
       cur_drs_metadata_->CommitRedoDeltaDataBlock(0, 0, cur_redo_ds_block_id_);
-    } else {
-      DCHECK_EQ(cur_redo_delta_stats_->min_timestamp(), Timestamp::kMax);
     }
 
     written_size_ += cur_writer_->written_size();
diff --git a/src/kudu/tablet/tablet-test-util.h b/src/kudu/tablet/tablet-test-util.h
index 937221b..8734a98 100644
--- a/src/kudu/tablet/tablet-test-util.h
+++ b/src/kudu/tablet/tablet-test-util.h
@@ -870,16 +870,16 @@ Status CreateRandomDeltaFile(const Schema& schema,
   BlockId block_id = wb->id();
   std::unique_ptr<DeltaFileWriter> writer(new DeltaFileWriter(std::move(wb)));
   RETURN_NOT_OK(writer->Start());
-  DeltaStats stats;
+  std::unique_ptr<DeltaStats> stats(new DeltaStats);
   for (const auto& e1 : mirror->all_deltas()) {
     for (const auto& e2 : e1.second) {
       DeltaKey k(e1.first, e2.first);
       RowChangeList changes(e2.second);
       RETURN_NOT_OK(writer->AppendDelta<T::kTag>(k, changes));
-      RETURN_NOT_OK(stats.UpdateStats(k.timestamp(), changes));
+      RETURN_NOT_OK(stats->UpdateStats(k.timestamp(), changes));
     }
   }
-  writer->WriteDeltaStats(stats);
+  writer->WriteDeltaStats(std::move(stats));
   RETURN_NOT_OK(writer->Finish());
 
   // Open a reader for this newly written delta file.
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 36ac2ff..d638284 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -1588,6 +1588,8 @@ Status Tablet::DoMergeCompactionOrFlush(const RowSetsInCompaction &input,
   {
     TRACE_EVENT0("tablet", "Opening compaction results");
     for (const shared_ptr<RowSetMetadata>& meta : new_drs_metas) {
+      // TODO(awong): it'd be nice to plumb delta stats from the rowset writer
+      // into the new deltafile readers opened here.
       shared_ptr<DiskRowSet> new_rowset;
       Status s = DiskRowSet::Open(meta,
                                   log_anchor_registry_.get(),
diff --git a/src/kudu/tablet/tablet_history_gc-test.cc b/src/kudu/tablet/tablet_history_gc-test.cc
index a7e4ea0..857ea89 100644
--- a/src/kudu/tablet/tablet_history_gc-test.cc
+++ b/src/kudu/tablet/tablet_history_gc-test.cc
@@ -98,6 +98,20 @@ class TabletHistoryGcTest : public TabletTestBase<IntKeyTestSetup<INT64>> {
     NO_FLUSH
   };
 
+  // Attempt to run the deleted rowset GC, triggering an assertion failure if
+  // it failed or if our metrics don't make sense.
+  void TryRunningDeletedRowsetGC() {
+    const auto& metrics = tablet()->metrics();
+    int orig_bytes = metrics->deleted_rowset_gc_bytes_deleted->value();
+    int orig_count = metrics->deleted_rowset_gc_duration->TotalCount();
+    int64_t bytes = 0;
+    ASSERT_OK(tablet()->GetBytesInAncientDeletedRowsets(&bytes));
+    ASSERT_LT(0, bytes);
+    ASSERT_OK(tablet()->DeleteAncientDeletedRowsets());
+    ASSERT_EQ(bytes + orig_bytes, metrics->deleted_rowset_gc_bytes_deleted->value());
+    ASSERT_GT(metrics->deleted_rowset_gc_duration->TotalCount(), orig_count);
+  }
+
   // Helper functions that mutate rows in batches of keys:
   //   [0, rows_per_rowset)
   //   [rows_per_rowset, 2*rows_per_rowset)
@@ -107,7 +121,7 @@ class TabletHistoryGcTest : public TabletTestBase<IntKeyTestSetup<INT64>> {
   // ...flushing MRS or DMS (depending on the workload) in between batches.
   void InsertOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset);
   void UpdateOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset, int32_t val);
-  void DeleteOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset, bool flush_dms);
+  void DeleteOriginalRows(int64_t num_batches, int64_t rows_per_batch, bool flush_dms);
 
   void AddTimeToHybridClock(MonoDelta delta) {
     uint64_t now = HybridClock::GetPhysicalValueMicros(clock()->Now());
@@ -142,15 +156,14 @@ void TabletHistoryGcTest::InsertOriginalRows(int64_t num_rowsets, int64_t rows_p
   ASSERT_EQ(num_rowsets, tablet()->num_rowsets());
 }
 
-void TabletHistoryGcTest::DeleteOriginalRows(int64_t num_rowsets,
-    int64_t rows_per_rowset, bool flush_dms) {
-  for (int rowset_id = 0; rowset_id < num_rowsets; rowset_id++) {
-    NO_FATALS(DeleteTestRows(rowset_id * rows_per_rowset, rows_per_rowset));
+void TabletHistoryGcTest::DeleteOriginalRows(int64_t num_batches,
+    int64_t rows_per_batch, bool flush_dms) {
+  for (int rowset_id = 0; rowset_id < num_batches; rowset_id++) {
+    NO_FATALS(DeleteTestRows(rowset_id * rows_per_batch, rows_per_batch));
     if (flush_dms) {
       ASSERT_OK(tablet()->FlushAllDMSForTests());
     }
   }
-  ASSERT_EQ(num_rowsets, tablet()->num_rowsets());
 }
 
 void TabletHistoryGcTest::UpdateOriginalRows(int64_t num_rowsets, int64_t rows_per_rowset,
@@ -686,15 +699,9 @@ TEST_F(TabletHistoryGcNoMaintMgrTest, TestGCDeletedRowsetsWithRedoFiles) {
   // Move the clock so all rowsets are ancient. Our GC should succeed and we
   // should be left with no rowsets.
   NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec + 1)));
-  ASSERT_OK(tablet()->GetBytesInAncientDeletedRowsets(&bytes));
-  ASSERT_LT(0, bytes);
-  ASSERT_OK(tablet()->DeleteAncientDeletedRowsets());
+  NO_FATALS(TryRunningDeletedRowsetGC());
   ASSERT_EQ(0, tablet()->CountUndoDeltasForTests());
   ASSERT_EQ(0, tablet()->CountRedoDeltasForTests());
-
-  // Check that these show up.
-  ASSERT_EQ(bytes, metrics->deleted_rowset_gc_bytes_deleted->value());
-  ASSERT_EQ(1, metrics->deleted_rowset_gc_duration->TotalCount());
 }
 
 TEST_F(TabletHistoryGcNoMaintMgrTest, TestGCDeletedRowsetsWithDMS) {
@@ -702,12 +709,40 @@ TEST_F(TabletHistoryGcNoMaintMgrTest, TestGCDeletedRowsetsWithDMS) {
   NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_));
   NO_FATALS(DeleteOriginalRows(kNumRowsets, rows_per_rowset_, /*flush_dms*/false));
   NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec + 1)));
-  int64_t bytes = 0;
-  ASSERT_OK(tablet()->GetBytesInAncientDeletedRowsets(&bytes));
-  ASSERT_LT(0, bytes);
-  ASSERT_OK(tablet()->DeleteAncientDeletedRowsets());
-  ASSERT_EQ(bytes, tablet()->metrics()->deleted_rowset_gc_bytes_deleted->value());
-  ASSERT_EQ(1, tablet()->metrics()->deleted_rowset_gc_duration->TotalCount());
+  NO_FATALS(TryRunningDeletedRowsetGC());
+}
+
+TEST_F(TabletHistoryGcNoMaintMgrTest, TestGCDeletedRowsetsAfterMinorCompaction) {
+  FLAGS_tablet_history_max_age_sec = 1000;
+  NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_));
+  // Flush twice as frequently when deleting so we end up with multiple delta
+  // files per DRS.
+  NO_FATALS(DeleteOriginalRows(kNumRowsets * 2, rows_per_rowset_ / 2, /*flush_dms*/true));
+  NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec + 1)));
+  int num_redos_before_compaction = tablet()->CountRedoDeltasForTests();
+  for (int i = 0; i < kNumRowsets; i++) {
+    ASSERT_OK(tablet()->CompactWorstDeltas(RowSet::MINOR_DELTA_COMPACTION));
+  }
+  ASSERT_GT(num_redos_before_compaction, tablet()->CountRedoDeltasForTests());
+  NO_FATALS(TryRunningDeletedRowsetGC());
+}
+
+TEST_F(TabletHistoryGcNoMaintMgrTest, TestGCDeletedRowsetsAfterMajorCompaction) {
+  FLAGS_tablet_history_max_age_sec = 1000;
+  NO_FATALS(InsertOriginalRows(kNumRowsets, rows_per_rowset_));
+  // Major delta compaction is a no-op if we only have deletes, so trick the
+  // rowsets into major compacting by throwing in some updates.
+  NO_FATALS(UpdateOriginalRows(kNumRowsets, rows_per_rowset_, 5));
+  // Flush twice as frequently when deleting so we end up with multiple delta
+  // files per DRS.
+  NO_FATALS(DeleteOriginalRows(kNumRowsets * 2, rows_per_rowset_ / 2, /*flush_dms*/true));
+  NO_FATALS(AddTimeToHybridClock(MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec + 1)));
+  int num_redos_before_compaction = tablet()->CountRedoDeltasForTests();
+  for (int i = 0; i < kNumRowsets; i++) {
+    ASSERT_OK(tablet()->CompactWorstDeltas(RowSet::MAJOR_DELTA_COMPACTION));
+  }
+  ASSERT_GT(num_redos_before_compaction, tablet()->CountRedoDeltasForTests());
+  NO_FATALS(TryRunningDeletedRowsetGC());
 }
 
 } // namespace tablet