You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/02/24 04:07:31 UTC

[4/4] kudu git commit: KUDU-1601. Delete ancient UNDO delta blocks in the background

KUDU-1601. Delete ancient UNDO delta blocks in the background

This patch adds a maintenance manager background task that deletes
"ancient" UNDO delta blocks, which means blocks that correspond to data
that is considered no longer reachable and a candidate for garbage
collection. The task only deletes entire blocks and so does not provoke
write amplification.

This maintenance task operates in the following way:

1. UpdateStats() returns the maximum potentially gc'able bytes of undos
   in the rowset, which is the sum of all undo delta store sizes up
   until an initialized one with max_timestamp > the AHM (ancient
   history mark). The accuracy of this estimate improves over time, and
   in a steady state will be exact, as undo delta blocks are initialized
   while running Perform().

2. Perform() initializes undo delta stores for the tablet for some
   budgeted amount of time. Per rowset it initializes undo delta stores
   with its budget until it finds the earliest one with max_timestamp >
   AHM. That makes the next UpdateStats() call more accurate. Once it
   has exhausted its time budget, or has initialized all ancient undo
   blocks, it garbage-collects all of the known ancient undo delta
   blocks in the tablet.

To avoid starvation of performance improvement maintenance ops, a new
flag named --data_gc_prioritization_prob has been introduced that
incorporates some randomness into the scheduler at the maintenance
manager level. This controls the fraction of the time that the scheduler
considers data GC ops higher priority than performance improvement ops.

This patch includes the following:

* New UNDO delta block GC MM task
* New UNDO delta block GC metrics (at the tablet level only)
* Flags to enable / disable the GC task as well as flags to throttle it
* A few minor improvements in the maintenance manager
* Fixes for a few preexisting clang-tidy lint complaints

Notable implementation details:

* When performing undo delta GC in Tablet::DeleteAncientUndoDeltas(), we
  only flush the tablet metadata after making the metadata changes
  across all rowsets. This is safe because we are not actually modifying
  any data, we are simply removing references to blocks that are no
  longer reachable by new scanners. The code path that handles the
  metadata update for compactions and ancient history data GC,
  DeltaTracker::CommitDeltaStoreMetadataUpdate(), has a DCHECK in place
  to ensure that it is never called without specifying blocks to remove.
  This guarantees that the DeltaMemStore flush code path located in
  DeltaTracker::FlushDMS(), the only delta-related code path that
  modifies user-visible data, does not utilize that routine for its
  flush. This fact was also verified by inspection -- FlushDMS()
  contains its own flush code path.

Includes the following tests:

* RowSet-level unit test in diskrowset-test
* Tablet-level functional test in tablet_history_gc-test
* Tablet-level concurrency test in mt-tablet-test
* Integration test utilizing the tserver-level MM task in
  tablet_history_gc-itest
* Incorporated into RandomizedTabletHistoryGcITest in
  tablet_history_gc-itest

Manual testing:

* I ran 300 iterations of TabletHistoryGcITest.TestUndoDeltaBlockGc on
  the dist-test cluster under TSAN with 12 stress threads:
  http://dist-test.cloudera.org/job?job_id=mpercy.1487901212.3733

* I also ran YCSB on a 10-node cluster on a table with 200 tablets with
  mostly default parameters except for --tablet_history_max_age_sec=60.
  YCSB was configured like so:

    recordcount=100000
    operationcount=6000000
    updateproportion=1.0
    requestdistribution=zipfian
    threadcount=10
    kudu_pre_split_num_tablets=200
    kudu_sync_ops=true

  This workload took 839 seconds to run and I did not observe an average
  update latency increase over time (there was a mild sawtooth pattern),
  which indicated to me that the compaction operations were keeping up
  with the updates. The undo delta GC operations were also keeping pace
  and garbage was being collected aggressively, with generally only tens
  of MB, or less, of reclaimable data per tablet being present at any
  given time. It seems the current defaults are reasonable, although
  additional performance testing is likely warranted.

Change-Id: I0309bf7acfb6d018860c80f354012c3500da5c68
Reviewed-on: http://gerrit.cloudera.org:8080/4363
Tested-by: Mike Percy <mp...@apache.org>
Reviewed-by: David Ribeiro Alves <dr...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/fea449ac
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/fea449ac
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/fea449ac

Branch: refs/heads/master
Commit: fea449ace2f774ba648f4a847299bd290cd104d6
Parents: ac85351
Author: Mike Percy <mp...@apache.org>
Authored: Mon Feb 6 12:58:55 2017 -0800
Committer: Mike Percy <mp...@apache.org>
Committed: Fri Feb 24 04:06:26 2017 +0000

----------------------------------------------------------------------
 .../tablet_history_gc-itest.cc                  | 163 ++++++++++-
 src/kudu/master/master.cc                       |   2 +-
 src/kudu/tablet/delta_tracker.cc                | 287 +++++++++++++++----
 src/kudu/tablet/delta_tracker.h                 |  50 +++-
 src/kudu/tablet/deltafile.cc                    |  10 +
 src/kudu/tablet/deltafile.h                     |   6 +
 src/kudu/tablet/diskrowset-test.cc              |  57 +++-
 src/kudu/tablet/diskrowset.cc                   |  48 +++-
 src/kudu/tablet/diskrowset.h                    |  20 +-
 src/kudu/tablet/memrowset.h                     |  23 ++
 src/kudu/tablet/mock-rowsets.h                  |  21 ++
 src/kudu/tablet/mt-tablet-test.cc               |  81 +++++-
 src/kudu/tablet/rowset.h                        |  73 +++++
 src/kudu/tablet/rowset_metadata.cc              |  33 +++
 src/kudu/tablet/rowset_metadata.h               |   5 +
 src/kudu/tablet/tablet.cc                       | 206 +++++++++++++
 src/kudu/tablet/tablet.h                        |  29 +-
 src/kudu/tablet/tablet_history_gc-test.cc       |  88 +++++-
 src/kudu/tablet/tablet_metrics.cc               |  39 +++
 src/kudu/tablet/tablet_metrics.h                |   8 +
 src/kudu/tablet/tablet_mm_ops.cc                |  63 +++-
 src/kudu/tablet/tablet_mm_ops.h                 |  30 ++
 src/kudu/tserver/tablet_server.cc               |   2 +-
 src/kudu/util/maintenance_manager-test.cc       |  10 +-
 src/kudu/util/maintenance_manager.cc            | 114 +++++---
 src/kudu/util/maintenance_manager.h             |  29 +-
 26 files changed, 1353 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/integration-tests/tablet_history_gc-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/tablet_history_gc-itest.cc b/src/kudu/integration-tests/tablet_history_gc-itest.cc
index 66753db..93e780a 100644
--- a/src/kudu/integration-tests/tablet_history_gc-itest.cc
+++ b/src/kudu/integration-tests/tablet_history_gc-itest.cc
@@ -30,6 +30,7 @@
 #include "kudu/server/hybrid_clock.h"
 #include "kudu/tablet/local_tablet_writer.h"
 #include "kudu/tablet/tablet.h"
+#include "kudu/tablet/tablet_metrics.h"
 #include "kudu/tablet/tablet_peer.h"
 #include "kudu/tserver/mini_tablet_server.h"
 #include "kudu/tserver/tablet_server.h"
@@ -39,7 +40,7 @@
 using kudu::client::KuduScanner;
 using kudu::client::KuduTable;
 using kudu::client::sp::shared_ptr;
-using kudu::server::Clock;
+using kudu::server::HybridClock;
 using kudu::tablet::Tablet;
 using kudu::tablet::TabletPeer;
 using kudu::tserver::MiniTabletServer;
@@ -50,22 +51,30 @@ using std::unique_ptr;
 using std::vector;
 using strings::Substitute;
 
+DECLARE_bool(enable_maintenance_manager);
 DECLARE_bool(use_mock_wall_clock);
+DECLARE_double(missed_heartbeats_before_rejecting_snapshot_scans);
+DECLARE_int32(flush_threshold_secs);
+DECLARE_int32(maintenance_manager_num_threads);
+DECLARE_int32(maintenance_manager_polling_interval_ms);
+DECLARE_int32(safe_time_max_lag_ms);
 DECLARE_int32(scanner_ttl_ms);
 DECLARE_int32(tablet_history_max_age_sec);
+DECLARE_int32(undo_delta_block_gc_init_budget_millis);
 DECLARE_string(block_manager);
-DECLARE_bool(enable_maintenance_manager);
-DECLARE_double(missed_heartbeats_before_rejecting_snapshot_scans);
-DECLARE_int32(safe_time_max_lag_ms);
 
 DEFINE_int32(test_num_rounds, 200, "Number of rounds to loop "
                                    "RandomizedTabletHistoryGcITest.TestRandomHistoryGCWorkload");
 
-using kudu::server::HybridClock;
-
 namespace kudu {
 
 class TabletHistoryGcITest : public MiniClusterITestBase {
+ protected:
+  void AddTimeToHybridClock(HybridClock* clock, MonoDelta delta) {
+    uint64_t now = HybridClock::GetPhysicalValueMicros(clock->Now());
+    uint64_t new_time = now + delta.ToMicroseconds();
+    clock->SetMockClockWallTimeForTests(new_time);
+  }
 };
 
 // Check that attempts to scan prior to the ancient history mark fail.
@@ -92,6 +101,123 @@ TEST_F(TabletHistoryGcITest, TestSnapshotScanBeforeAHM) {
   ASSERT_STR_CONTAINS(s.ToString(), "Snapshot timestamp is earlier than the ancient history mark");
 }
 
+// Check that the maintenance manager op to delete undo deltas actually deletes them.
+TEST_F(TabletHistoryGcITest, TestUndoDeltaBlockGc) {
+  FLAGS_use_mock_wall_clock = true; // Allow moving the clock.
+  FLAGS_tablet_history_max_age_sec = 1000;
+  FLAGS_flush_threshold_secs = 0; // Flush as aggressively as possible.
+  FLAGS_maintenance_manager_polling_interval_ms = 1; // Spin on MM for a quick test.
+  FLAGS_maintenance_manager_num_threads = 4; // Encourage concurrency.
+
+  NO_FATALS(StartCluster(1)); // Single-node cluster.
+
+  TestWorkload workload(cluster_.get());
+  workload.set_num_replicas(1);
+  workload.Setup();
+
+  shared_ptr<KuduTable> table;
+  ASSERT_OK(client_->OpenTable(TestWorkload::kDefaultTableName, &table));
+  client::sp::shared_ptr<client::KuduSession> session = client_->NewSession();
+
+  // Find the tablet.
+  tserver::MiniTabletServer* mts = cluster_->mini_tablet_server(0);
+  vector<scoped_refptr<TabletPeer>> tablet_peers;
+  mts->server()->tablet_manager()->GetTabletPeers(&tablet_peers);
+  ASSERT_EQ(1, tablet_peers.size());
+  std::shared_ptr<Tablet> tablet = tablet_peers[0]->shared_tablet();
+
+  const int32_t kNumRows = AllowSlowTests() ? 100 : 10;
+
+  // Insert a few rows.
+  for (int32_t row_key = 0; row_key < kNumRows; row_key++) {
+    unique_ptr<client::KuduInsert> insert(table->NewInsert());
+    KuduPartialRow* row = insert->mutable_row();
+    ASSERT_OK_FAST(row->SetInt32(0, row_key));
+    ASSERT_OK_FAST(row->SetInt32(1, 0));
+    ASSERT_OK_FAST(row->SetString(2, ""));
+    ASSERT_OK_FAST(session->Apply(insert.release()));
+  }
+  ASSERT_OK_FAST(session->Flush());
+
+  // Update rows in a loop; wait until some undo deltas are generated.
+  int32_t row_value = 0;
+  while (true) {
+    for (int32_t row_key = 0; row_key < kNumRows; row_key++) {
+      unique_ptr<client::KuduUpdate> update(table->NewUpdate());
+      KuduPartialRow* row = update->mutable_row();
+      ASSERT_OK_FAST(row->SetInt32(0, row_key));
+      ASSERT_OK_FAST(row->SetInt32(1, row_value));
+      ASSERT_OK_FAST(session->Apply(update.release()));
+    }
+    ASSERT_OK_FAST(session->Flush());
+
+    VLOG(1) << "Number of undo deltas: " << tablet->CountUndoDeltasForTests();
+    VLOG(1) << "Number of redo deltas: " << tablet->CountRedoDeltasForTests();
+
+    // Only break out of the loop once we have undos.
+    if (tablet->CountUndoDeltasForTests() > 0) break;
+
+    SleepFor(MonoDelta::FromMilliseconds(5));
+    row_value++;
+  }
+
+  // Manually flush all mem stores so that we can measure the maximum disk
+  // size before moving the AHM. This ensures the test isn't flaky.
+  ASSERT_OK(tablet->Flush());
+  ASSERT_OK(tablet->FlushAllDMSForTests());
+
+  uint64_t measured_size_before_gc = 0;
+  ASSERT_OK(Env::Default()->GetFileSizeOnDiskRecursively(cluster_->GetTabletServerFsRoot(0),
+                                                         &measured_size_before_gc));
+
+  // Move the clock so all operations are in the past. Then wait until we have
+  // no more undo deltas.
+  HybridClock* c = down_cast<HybridClock*>(tablet->clock().get());
+  AddTimeToHybridClock(c, MonoDelta::FromSeconds(FLAGS_tablet_history_max_age_sec));
+  AssertEventually([&] {
+    ASSERT_EQ(0, tablet->CountUndoDeltasForTests());
+  });
+
+  // Verify that the latest values are still there.
+  KuduScanner scanner(table.get());
+  ASSERT_OK(scanner.SetReadMode(KuduScanner::READ_AT_SNAPSHOT));
+  ASSERT_OK(scanner.Open());
+  int num_rows_scanned = 0;
+  while (scanner.HasMoreRows()) {
+    client::KuduScanBatch batch;
+    ASSERT_OK(scanner.NextBatch(&batch));
+    for (const auto& row : batch) {
+      int32_t key;
+      int32_t value;
+      ASSERT_OK(row.GetInt32("key", &key));
+      ASSERT_OK(row.GetInt32("int_val", &value));
+      ASSERT_LT(key, kNumRows);
+      ASSERT_EQ(row_value, value) << "key=" << key << ", int_val=" << value;
+      num_rows_scanned++;
+    }
+  }
+  ASSERT_EQ(kNumRows, num_rows_scanned);
+
+  // Check that the tablet metrics have reasonable values.
+  AssertEventually([&] {
+    ASSERT_GT(tablet->metrics()->undo_delta_block_gc_init_duration->TotalCount(), 0);
+    ASSERT_GT(tablet->metrics()->undo_delta_block_gc_delete_duration->TotalCount(), 0);
+    ASSERT_GT(tablet->metrics()->undo_delta_block_gc_perform_duration->TotalCount(), 0);
+    ASSERT_EQ(0, tablet->metrics()->undo_delta_block_gc_running->value());
+    ASSERT_GT(tablet->metrics()->undo_delta_block_gc_bytes_deleted->value(), 0);
+    ASSERT_EQ(0, tablet->metrics()->undo_delta_block_estimated_retained_bytes->value());
+
+    // Check that we are now using less space.
+    // We manually flush the tablet metadata here because the list of orphaned
+    // blocks may take up space.
+    ASSERT_OK(tablet->metadata()->Flush());
+    uint64_t measured_size_after_gc = 0;
+    ASSERT_OK(Env::Default()->GetFileSizeOnDiskRecursively(cluster_->GetTabletServerFsRoot(0),
+                                                           &measured_size_after_gc));
+    ASSERT_LT(measured_size_after_gc, measured_size_before_gc);
+  });
+}
+
 // Whether a MaterializedTestRow is deleted or not.
 enum IsDeleted {
   NOT_DELETED,
@@ -130,6 +256,7 @@ class RandomizedTabletHistoryGcITest : public TabletHistoryGcITest {
     kFlush,
     kMergeCompaction,
     kRedoDeltaCompaction,
+    kUndoDeltaBlockGc,
     kMoveTimeForward,
     kStartScan,
     kNumActions, // Count of items in this enum. Keep as last entry.
@@ -177,12 +304,6 @@ class RandomizedTabletHistoryGcITest : public TabletHistoryGcITest {
     latest_snapshot_ts_ = ts;
   }
 
-  void AddTimeToClock(MonoDelta delta) {
-    uint64_t now = HybridClock::GetPhysicalValueMicros(clock_->Now());
-    uint64_t new_time = now + delta.ToMicroseconds();
-    clock_->SetMockClockWallTimeForTests(new_time);
-  }
-
   void RegisterScanner(unique_ptr<client::KuduScanner> scanner, Timestamp snap_ts,
                        int verify_round) {
     CHECK_GE(verify_round, cur_round_);
@@ -646,9 +767,25 @@ TEST_F(RandomizedTabletHistoryGcITest, TestRandomHistoryGCWorkload) {
                                                      tablet::RowSet::MINOR_DELTA_COMPACTION));
         break;
       }
+      case kUndoDeltaBlockGc: {
+        VLOG(1) << "Running UNDO delta block GC";
+        ASSERT_OK(tablet->InitAncientUndoDeltas(
+            MonoDelta::FromMilliseconds(FLAGS_undo_delta_block_gc_init_budget_millis), nullptr));
+        int64_t blocks_deleted;
+        int64_t bytes_deleted;
+        ASSERT_OK(tablet->DeleteAncientUndoDeltas(&blocks_deleted, &bytes_deleted));
+        // If one of these equals zero, both should equal zero.
+        if (blocks_deleted == 0 || bytes_deleted == 0) {
+          ASSERT_EQ(0, blocks_deleted);
+          ASSERT_EQ(0, bytes_deleted);
+        }
+        VLOG(1) << "UNDO delta block GC deleted " << blocks_deleted
+                << " blocks and " << bytes_deleted << " bytes";
+        break;
+      }
       case kMoveTimeForward: {
         VLOG(1) << "Moving clock forward";
-        AddTimeToClock(MonoDelta::FromSeconds(200));
+        AddTimeToHybridClock(clock_, MonoDelta::FromSeconds(200));
         break;
       }
       case kStartScan: {

http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/master/master.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/master.cc b/src/kudu/master/master.cc
index 753419b..ce7eb08 100644
--- a/src/kudu/master/master.cc
+++ b/src/kudu/master/master.cc
@@ -137,7 +137,7 @@ Status Master::Start() {
 Status Master::StartAsync() {
   CHECK_EQ(kInitialized, state_);
 
-  RETURN_NOT_OK(maintenance_manager_->Init());
+  RETURN_NOT_OK(maintenance_manager_->Init(fs_manager_->uuid()));
 
   gscoped_ptr<ServiceIf> impl(new MasterServiceImpl(this));
   gscoped_ptr<ServiceIf> consensus_service(new ConsensusServiceImpl(

http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/tablet/delta_tracker.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index 7d25345..a4b4d66 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -17,6 +17,8 @@
 
 #include "kudu/tablet/delta_tracker.h"
 
+#include <boost/range/adaptor/reversed.hpp>
+#include <glog/stl_logging.h>
 #include <mutex>
 #include <set>
 
@@ -77,9 +79,9 @@ Status DeltaTracker::OpenDeltaReaders(const vector<BlockId>& blocks,
     gscoped_ptr<ReadableBlock> block;
     Status s = fs->OpenBlock(block_id, &block);
     if (!s.ok()) {
-      LOG(ERROR) << "Failed to open " << DeltaType_Name(type)
-                 << " delta file " << block_id.ToString() << ": "
-                 << s.ToString();
+      LOG_WITH_PREFIX(ERROR) << "Failed to open " << DeltaType_Name(type)
+                             << " delta file " << block_id.ToString() << ": "
+                             << s.ToString();
       return s;
     }
 
@@ -91,14 +93,14 @@ Status DeltaTracker::OpenDeltaReaders(const vector<BlockId>& blocks,
                                     std::move(options),
                                     &dfr);
     if (!s.ok()) {
-      LOG(ERROR) << "Failed to open " << DeltaType_Name(type)
-                 << " delta file reader " << block_id.ToString() << ": "
-                 << s.ToString();
+      LOG_WITH_PREFIX(ERROR) << "Failed to open " << DeltaType_Name(type)
+                             << " delta file reader " << block_id.ToString() << ": "
+                             << s.ToString();
       return s;
     }
 
-    VLOG(1) << "Successfully opened " << DeltaType_Name(type)
-            << " delta file " << block_id.ToString();
+    VLOG_WITH_PREFIX(1) << "Successfully opened " << DeltaType_Name(type)
+                        << " delta file " << block_id.ToString();
     stores->push_back(dfr);
   }
   return Status::OK();
@@ -147,7 +149,7 @@ Status DeltaTracker::MakeDeltaIteratorMergerUnlocked(size_t start_idx, size_t en
     ignore_result(down_cast<DeltaFileReader*>(delta_store.get()));
     shared_ptr<DeltaFileReader> dfr = std::static_pointer_cast<DeltaFileReader>(delta_store);
 
-    LOG(INFO) << "Preparing to minor compact delta file: " << dfr->ToString();
+    LOG_WITH_PREFIX(INFO) << "Preparing to minor compact delta file: " << dfr->ToString();
 
     inputs.push_back(delta_store);
     target_stores->push_back(delta_store);
@@ -169,36 +171,53 @@ string JoinDeltaStoreStrings(const SharedDeltaStoreVector& stores) {
   return ::JoinStrings(strings, ",");
 }
 
-// Validate that 'first' may precede 'second' in an ordered list of deltas,
-// given a delta type of 'type'.
-void ValidateDeltaOrder(const std::shared_ptr<DeltaStore>& first,
-                        const std::shared_ptr<DeltaStore>& second,
-                        DeltaType type) {
-  DCHECK_OK(first->Init());
-  DCHECK_OK(second->Init());
+} // anonymous namespace
+
+void DeltaTracker::ValidateDeltaOrder(const std::shared_ptr<DeltaStore>& first,
+                                      const std::shared_ptr<DeltaStore>& second,
+                                      DeltaType type) {
+  shared_ptr<DeltaStore> first_copy = first;
+  shared_ptr<DeltaStore> second_copy = second;
+
+  // Make clones so we don't leave the original ones initted. That can affect
+  // tests. We know it's a DeltaFileReader if it's not Initted().
+  if (!first_copy->Initted()) {
+    shared_ptr<DeltaFileReader> first_clone;
+    DCHECK_OK(down_cast<DeltaFileReader*>(first.get())->CloneForDebugging(
+        rowset_metadata_->fs_manager(), mem_trackers_.tablet_tracker, &first_clone));
+    DCHECK_OK(first_clone->Init());
+    first_copy = first_clone;
+  }
+  if (!second_copy->Initted()) {
+    shared_ptr<DeltaFileReader> second_clone;
+    DCHECK_OK(down_cast<DeltaFileReader*>(second.get())->CloneForDebugging(
+        rowset_metadata_->fs_manager(), mem_trackers_.tablet_tracker, &second_clone));
+    DCHECK_OK(second_clone->Init());
+    second_copy = second_clone;
+  }
+
   switch (type) {
     case REDO:
-      DCHECK_LE(first->delta_stats().min_timestamp(), second->delta_stats().min_timestamp())
-          << "Found out-of-order deltas: [{" << first->ToString() << "}, {"
-          << second->ToString() << "}]: type = " << type;
+      DCHECK_LE(first_copy->delta_stats().min_timestamp(),
+                second_copy->delta_stats().min_timestamp())
+          << "Found out-of-order deltas: [{" << first_copy->ToString() << "}, {"
+          << second_copy->ToString() << "}]: type = " << type;
       break;
     case UNDO:
-      DCHECK_GE(first->delta_stats().min_timestamp(), second->delta_stats().min_timestamp())
-          << "Found out-of-order deltas: [{" << first->ToString() << "}, {"
-          << second->ToString() << "}]: type = " << type;
+      DCHECK_GE(first_copy->delta_stats().min_timestamp(),
+                second_copy->delta_stats().min_timestamp())
+          << "Found out-of-order deltas: [{" << first_copy->ToString() << "}, {"
+          << second_copy->ToString() << "}]: type = " << type;
       break;
   }
 }
 
-// Validate the relative ordering of the deltas in the specified list.
-void ValidateDeltasOrdered(const SharedDeltaStoreVector& list, DeltaType type) {
+void DeltaTracker::ValidateDeltasOrdered(const SharedDeltaStoreVector& list, DeltaType type) {
   for (size_t i = 0; i < list.size() - 1; i++) {
     ValidateDeltaOrder(list[i], list[i + 1], type);
   }
 }
 
-} // anonymous namespace
-
 Status DeltaTracker::AtomicUpdateStores(const SharedDeltaStoreVector& stores_to_replace,
                                         const vector<BlockId>& new_delta_blocks,
                                         DeltaType type) {
@@ -239,7 +258,8 @@ Status DeltaTracker::AtomicUpdateStores(const SharedDeltaStoreVector& stores_to_
     for (const shared_ptr<DeltaStore>& ds : stores_to_replace) {
       if (end_it == stores_to_update->end() || *end_it != ds) {
         return Status::InvalidArgument(
-            strings::Substitute("Cannot find deltastore sequence <$0> in <$1>",
+            strings::Substitute("Cannot find $0 deltastore sequence <$1> in <$2>",
+                                DeltaType_Name(type),
                                 JoinDeltaStoreStrings(stores_to_replace),
                                 JoinDeltaStoreStrings(*stores_to_update)));
       }
@@ -265,8 +285,8 @@ Status DeltaTracker::AtomicUpdateStores(const SharedDeltaStoreVector& stores_to_
   // Insert the new stores.
   stores_to_update->insert(start_it, new_stores.begin(), new_stores.end());
 
-  VLOG(1) << "New " << DeltaType_Name(type) << " stores: "
-          << JoinDeltaStoreStrings(*stores_to_update);
+  VLOG_WITH_PREFIX(1) << "New " << DeltaType_Name(type) << " stores: "
+                      << JoinDeltaStoreStrings(*stores_to_update);
   return Status::OK();
 }
 
@@ -274,6 +294,36 @@ Status DeltaTracker::Compact() {
   return CompactStores(0, -1);
 }
 
+Status DeltaTracker::CommitDeltaStoreMetadataUpdate(const RowSetMetadataUpdate& update,
+                                                    const SharedDeltaStoreVector& to_remove,
+                                                    const vector<BlockId>& new_delta_blocks,
+                                                    DeltaType type,
+                                                    MetadataFlushType flush_type) {
+  compact_flush_lock_.AssertAcquired();
+
+  // This method is only used for compactions and GC, not data modifications.
+  // Therefore, flushing is not required for safety.
+  // We enforce that with this DCHECK.
+  DCHECK(!to_remove.empty());
+
+  // Update the in-memory metadata.
+  RETURN_NOT_OK(rowset_metadata_->CommitUpdate(update));
+  // Once we successfully commit to the rowset metadata, let's ensure we update
+  // the delta stores to maintain consistency between the two. We enforce this
+  // with a CHECK_OK here.
+  CHECK_OK(AtomicUpdateStores(to_remove, new_delta_blocks, type));
+  if (flush_type == FLUSH_METADATA) {
+    // Flushing the metadata is considered best-effort in this function.
+    // No consistency problems will be visible if we don't successfully
+    // Flush(), so no need to CHECK_OK here, because this function is specified
+    // only to be used for compactions or ancient history data GC, which do not
+    // add or subtract any user-visible ops. Compactions only swap the location
+    // of ops on disk, and ancient history data GC has no user-visible effects.
+    RETURN_NOT_OK(rowset_metadata_->Flush());
+  }
+  return Status::OK();
+}
+
 Status DeltaTracker::CompactStores(int start_idx, int end_idx) {
   // Prevent concurrent compactions or a compaction concurrent with a flush
   //
@@ -301,32 +351,129 @@ Status DeltaTracker::CompactStores(int start_idx, int end_idx) {
                         "Could not allocate delta block");
   BlockId new_block_id(block->id());
 
-  // Merge and compact the stores and write and output to "data_writer"
+  // Merge and compact the stores.
   vector<shared_ptr<DeltaStore> > compacted_stores;
   vector<BlockId> compacted_blocks;
   RETURN_NOT_OK(DoCompactStores(start_idx, end_idx, std::move(block),
-                &compacted_stores, &compacted_blocks));
-
-  // Update delta_stores_, removing the compacted delta files and inserted the new
-  RETURN_NOT_OK(AtomicUpdateStores(compacted_stores, { new_block_id }, REDO));
-  LOG(INFO) << "Opened delta block for read: " << new_block_id.ToString();
+                                &compacted_stores, &compacted_blocks));
 
-  // Update the metadata accordingly
+  vector<BlockId> new_blocks = { new_block_id };
   RowSetMetadataUpdate update;
-  update.ReplaceRedoDeltaBlocks(compacted_blocks, { new_block_id });
-  // TODO: need to have some error handling here -- if we somehow can't persist the
-  // metadata, do we end up losing data on recovery?
-  CHECK_OK(rowset_metadata_->CommitUpdate(update));
-
-  Status s = rowset_metadata_->Flush();
-  if (!s.ok()) {
-    // TODO: again need to figure out some way of making this safe. Should we be
-    // writing the metadata _ahead_ of the actual store swap? Probably.
-    LOG(FATAL) << "Unable to commit delta data block metadata for "
-               << new_block_id.ToString() << ": " << s.ToString();
-    return s;
+  update.ReplaceRedoDeltaBlocks(compacted_blocks, new_blocks);
+
+  LOG_WITH_PREFIX(INFO) << "Flushing compaction of redo delta blocks { " << compacted_blocks
+                        << " } into block " << new_block_id;
+  RETURN_NOT_OK_PREPEND(CommitDeltaStoreMetadataUpdate(update, compacted_stores, new_blocks, REDO,
+                                                       FLUSH_METADATA),
+                        "DeltaTracker: CompactStores: Unable to commit delta update");
+  return Status::OK();
+}
+
+Status DeltaTracker::EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp ancient_history_mark,
+                                                                 int64_t* bytes) {
+  DCHECK_NE(Timestamp::kInvalidTimestamp, ancient_history_mark);
+  DCHECK(bytes);
+  SharedDeltaStoreVector undos_newest_first;
+  CollectStores(&undos_newest_first, UNDOS_ONLY);
+
+  int64_t tmp_bytes = 0;
+  for (const auto& undo : boost::adaptors::reverse(undos_newest_first)) {
+    // Short-circuit once we hit an initialized delta block with 'max_timestamp' > AHM.
+    if (undo->Initted() &&
+        undo->delta_stats().max_timestamp() >= ancient_history_mark) {
+      break;
+    }
+    tmp_bytes += undo->EstimateSize(); // Can be called before Init().
+  }
+
+  *bytes = tmp_bytes;
+  return Status::OK();
+}
+
+Status DeltaTracker::InitUndoDeltas(Timestamp ancient_history_mark,
+                                    MonoTime deadline,
+                                    int64_t* delta_blocks_initialized,
+                                    int64_t* bytes_in_ancient_undos) {
+  SharedDeltaStoreVector undos_newest_first;
+  CollectStores(&undos_newest_first, UNDOS_ONLY);
+  int64_t tmp_blocks_initialized = 0;
+  int64_t tmp_bytes_in_ancient_undos = 0;
+
+  // Traverse oldest-first, initializing delta stores as we go.
+  for (auto& undo : boost::adaptors::reverse(undos_newest_first)) {
+    if (deadline.Initialized() && MonoTime::Now() >= deadline) break;
+
+    if (!undo->Initted()) {
+      RETURN_NOT_OK(undo->Init());
+      tmp_blocks_initialized++;
+    }
+
+    // Stop initializing delta files once we start hitting newer deltas that
+    // are not GC'able.
+    if (ancient_history_mark != Timestamp::kInvalidTimestamp &&
+        undo->delta_stats().max_timestamp() >= ancient_history_mark) break;
+
+    // We only want to count the bytes in the ancient undos so this needs to
+    // come after the short-circuit above.
+    tmp_bytes_in_ancient_undos += undo->EstimateSize();
   }
 
+  if (delta_blocks_initialized) *delta_blocks_initialized = tmp_blocks_initialized;
+  if (bytes_in_ancient_undos) *bytes_in_ancient_undos = tmp_bytes_in_ancient_undos;
+  return Status::OK();
+}
+
+Status DeltaTracker::DeleteAncientUndoDeltas(Timestamp ancient_history_mark,
+                                             int64_t* blocks_deleted, int64_t* bytes_deleted) {
+  DCHECK_NE(Timestamp::kInvalidTimestamp, ancient_history_mark);
+
+  // Performing data GC is similar is many ways to a compaction. We are
+  // updating both the rowset metadata and the delta stores in this method, so
+  // we need to be the only thread doing a flush or a compaction on this RowSet
+  // while we do our work.
+  std::lock_guard<Mutex> l(compact_flush_lock_);
+
+  // Get the list of undo deltas.
+  SharedDeltaStoreVector undos_newest_first;
+  CollectStores(&undos_newest_first, UNDOS_ONLY);
+
+  if (undos_newest_first.empty()) {
+    *blocks_deleted = 0;
+    *bytes_deleted = 0;
+    return Status::OK();
+  }
+
+  SharedDeltaStoreVector undos_to_remove;
+  vector<BlockId> block_ids_to_remove;
+
+  int64_t tmp_blocks_deleted = 0;
+  int64_t tmp_bytes_deleted = 0;
+
+  // Traverse oldest-first.
+  for (auto& undo : boost::adaptors::reverse(undos_newest_first)) {
+    if (!undo->Initted()) break; // Never initialize the deltas in this code path (it's slow).
+    if (undo->delta_stats().max_timestamp() >= ancient_history_mark) break;
+    tmp_blocks_deleted++;
+    tmp_bytes_deleted += undo->EstimateSize();
+    // This is always a safe downcast because UNDO deltas are always on disk.
+    block_ids_to_remove.push_back(down_cast<DeltaFileReader*>(undo.get())->block_id());
+    undos_to_remove.push_back(std::move(undo));
+  }
+  undos_newest_first.clear(); // We did a std::move() on some elements from this vector above.
+
+  // Only flush the rowset metadata if we are going to modify it.
+  if (!undos_to_remove.empty()) {
+    // We iterated in reverse order and CommitDeltaStoreMetadataUpdate() requires storage order.
+    std::reverse(undos_to_remove.begin(), undos_to_remove.end());
+    RowSetMetadataUpdate update;
+    update.RemoveUndoDeltaBlocks(block_ids_to_remove);
+    // We do not flush the tablet metadata - that is the caller's responsibility.
+    RETURN_NOT_OK(CommitDeltaStoreMetadataUpdate(update, undos_to_remove, {}, UNDO,
+                                                 NO_FLUSH_METADATA));
+  }
+
+  if (blocks_deleted) *blocks_deleted = tmp_blocks_deleted;
+  if (bytes_deleted) *bytes_deleted = tmp_bytes_deleted;
   return Status::OK();
 }
 
@@ -343,14 +490,14 @@ Status DeltaTracker::DoCompactStores(size_t start_idx, size_t end_idx,
   Schema empty_schema;
   RETURN_NOT_OK(MakeDeltaIteratorMergerUnlocked(start_idx, end_idx, &empty_schema, compacted_stores,
                                                 compacted_blocks, &inputs_merge));
-  LOG(INFO) << "Compacting " << (end_idx - start_idx + 1) << " delta files.";
+  LOG_WITH_PREFIX(INFO) << "Compacting " << (end_idx - start_idx + 1) << " delta files.";
   DeltaFileWriter dfw(std::move(block));
   RETURN_NOT_OK(dfw.Start());
   RETURN_NOT_OK(WriteDeltaIteratorToFile<REDO>(inputs_merge.get(),
                                                ITERATE_OVER_ALL_ROWS,
                                                &dfw));
   RETURN_NOT_OK(dfw.Finish());
-  LOG(INFO) << "Succesfully compacted the specified delta files.";
+  LOG_WITH_PREFIX(INFO) << "Succesfully compacted the specified delta files.";
   return Status::OK();
 }
 
@@ -390,7 +537,7 @@ Status DeltaTracker::NewDeltaFileIterator(
     } else if (type == REDO) {
       *included_stores = redo_delta_stores_;
     } else {
-      LOG(FATAL);
+      LOG_WITH_PREFIX(FATAL);
     }
   }
 
@@ -415,7 +562,6 @@ Status DeltaTracker::WrapIterator(const shared_ptr<CFileSet::Iterator> &base,
   return Status::OK();
 }
 
-
 Status DeltaTracker::Update(Timestamp timestamp,
                             rowid_t row_idx,
                             const RowChangeList &update,
@@ -479,8 +625,9 @@ Status DeltaTracker::FlushDMS(DeltaMemStore* dms,
   gscoped_ptr<DeltaStats> stats;
   RETURN_NOT_OK(dms->FlushToFile(&dfw, &stats));
   RETURN_NOT_OK(dfw.Finish());
-  LOG(INFO) << "Flushed delta block: " << block_id.ToString()
-            << " ts range: [" << stats->min_timestamp() << ", " << stats->max_timestamp() << "]";
+  LOG_WITH_PREFIX(INFO) << "Flushed delta block: " << block_id.ToString()
+                        << " ts range: [" << stats->min_timestamp()
+                        << ", " << stats->max_timestamp() << "]";
 
   // Now re-open for read
   gscoped_ptr<ReadableBlock> readable_block;
@@ -491,7 +638,7 @@ Status DeltaTracker::FlushDMS(DeltaMemStore* dms,
                                             REDO,
                                             std::move(options),
                                             dfr));
-  LOG(INFO) << "Reopened delta block for read: " << block_id.ToString();
+  LOG_WITH_PREFIX(INFO) << "Reopened delta block for read: " << block_id.ToString();
 
   RETURN_NOT_OK(rowset_metadata_->CommitRedoDeltaDataBlock(dms->id(), block_id));
   if (flush_type == FLUSH_METADATA) {
@@ -536,10 +683,10 @@ Status DeltaTracker::Flush(MetadataFlushType flush_type) {
     redo_delta_stores_.push_back(old_dms);
   }
 
-  LOG(INFO) << "Flushing " << count << " deltas from DMS " << old_dms->id() << "...";
+  LOG_WITH_PREFIX(INFO) << "Flushing " << count << " deltas from DMS " << old_dms->id() << "...";
 
   // Now, actually flush the contents of the old DMS.
-  // TODO: need another lock to prevent concurrent flushers
+  // TODO(todd): need another lock to prevent concurrent flushers
   // at some point.
   shared_ptr<DeltaFileReader> dfr;
   Status s = FlushDMS(old_dms.get(), &dfr, flush_type);
@@ -577,6 +724,11 @@ int64_t DeltaTracker::MinUnflushedLogIndex() const {
   return dms_->MinLogIndex();
 }
 
+size_t DeltaTracker::CountUndoDeltaStores() const {
+  shared_lock<rw_spinlock> lock(component_lock_);
+  return undo_delta_stores_.size();
+}
+
 size_t DeltaTracker::CountRedoDeltaStores() const {
   shared_lock<rw_spinlock> lock(component_lock_);
   return redo_delta_stores_.size();
@@ -606,5 +758,26 @@ void DeltaTracker::GetColumnIdsWithUpdates(std::vector<ColumnId>* col_ids) const
   col_ids->assign(column_ids_with_updates.begin(), column_ids_with_updates.end());
 }
 
+Status DeltaTracker::InitAllDeltaStoresForTests(WhichStores stores) {
+  shared_lock<rw_spinlock> lock(component_lock_);
+  if (stores == UNDOS_AND_REDOS || stores == UNDOS_ONLY) {
+    for (const shared_ptr<DeltaStore>& ds : undo_delta_stores_) {
+      RETURN_NOT_OK(ds->Init());
+    }
+  }
+  if (stores == UNDOS_AND_REDOS || stores == REDOS_ONLY) {
+    for (const shared_ptr<DeltaStore>& ds : redo_delta_stores_) {
+      RETURN_NOT_OK(ds->Init());
+    }
+  }
+  return Status::OK();
+}
+
+string DeltaTracker::LogPrefix() const {
+  return Substitute("T $0 P $1: ",
+                    rowset_metadata_->tablet_metadata()->tablet_id(),
+                    rowset_metadata_->fs_manager()->uuid());
+}
+
 } // namespace tablet
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/tablet/delta_tracker.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_tracker.h b/src/kudu/tablet/delta_tracker.h
index a65d263..0a43dc9 100644
--- a/src/kudu/tablet/delta_tracker.h
+++ b/src/kudu/tablet/delta_tracker.h
@@ -144,12 +144,47 @@ class DeltaTracker {
   // and if so, compact the stores.
   Status Compact();
 
+  // Persist the given delta updates to disk and then make them visible via the
+  // DeltaTracker. The 'compact_flush_lock_' should be acquired before calling
+  // this method. This method should only be used for compactions or ancient
+  // history data GC, not when adding mutations, since it makes the updated
+  // stores visible before attempting to flush the metadata to disk.
+  Status CommitDeltaStoreMetadataUpdate(const RowSetMetadataUpdate& update,
+                                        const SharedDeltaStoreVector& to_remove,
+                                        const vector<BlockId>& new_delta_blocks,
+                                        DeltaType type,
+                                        MetadataFlushType flush_type);
+
   // Performs minor compaction on all REDO delta files between index
   // "start_idx" and "end_idx" (inclusive) and writes this to a
   // new REDO delta block. If "end_idx" is set to -1, then delta files at
   // all indexes starting with "start_idx" will be compacted.
   Status CompactStores(int start_idx, int end_idx);
 
+  // See RowSet::EstimateBytesInPotentiallyAncientUndoDeltas().
+  Status EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp ancient_history_mark,
+                                                     int64_t* bytes);
+
+  // See RowSet::InitUndoDeltas().
+  Status InitUndoDeltas(Timestamp ancient_history_mark,
+                        MonoTime deadline,
+                        int64_t* delta_blocks_initialized,
+                        int64_t* bytes_in_ancient_undos);
+
+  // See RowSet::DeleteAncientUndoDeltas().
+  Status DeleteAncientUndoDeltas(Timestamp ancient_history_mark,
+                                 int64_t* blocks_deleted, int64_t* bytes_deleted);
+
+  // Validate that 'first' may precede 'second' in an ordered list of deltas,
+  // given a delta type of 'type'. This should only be run in DEBUG mode.
+  void ValidateDeltaOrder(const std::shared_ptr<DeltaStore>& first,
+                          const std::shared_ptr<DeltaStore>& second,
+                          DeltaType type);
+
+  // Validate the relative ordering of the deltas in the specified list. This
+  // should only be run in DEBUG mode.
+  void ValidateDeltasOrdered(const SharedDeltaStoreVector& list, DeltaType type);
+
   // Replace the subsequence of stores that matches 'stores_to_replace' with
   // delta file readers corresponding to 'new_delta_blocks', which may be empty.
   // If 'stores_to_replace' is empty then the stores represented by
@@ -175,6 +210,9 @@ class DeltaTracker {
   // Get the minimum log index for this tracker's DMS, -1 if it wasn't set.
   int64_t MinUnflushedLogIndex() const;
 
+  // Return the number of undo delta stores.
+  size_t CountUndoDeltaStores() const;
+
   // Return the number of redo delta stores, not including the DeltaMemStore.
   size_t CountRedoDeltaStores() const;
 
@@ -187,6 +225,9 @@ class DeltaTracker {
     return &compact_flush_lock_;
   }
 
+  // Init() all of the specified delta stores. For tests only.
+  Status InitAllDeltaStoresForTests(WhichStores stores);
+
  private:
   FRIEND_TEST(TestRowSet, TestRowSetUpdate);
   FRIEND_TEST(TestRowSet, TestDMSFlush);
@@ -239,6 +280,8 @@ class DeltaTracker {
                                          vector<BlockId> *target_blocks,
                                          std::unique_ptr<DeltaIterator> *out);
 
+  std::string LogPrefix() const;
+
   std::shared_ptr<RowSetMetadata> rowset_metadata_;
 
   // The number of rows in the DiskRowSet that this tracker is associated with.
@@ -274,9 +317,10 @@ class DeltaTracker {
   mutable rw_spinlock component_lock_;
 
   // Exclusive lock that ensures that only one flush or compaction can run
-  // at a time. Protects delta_stores_. NOTE: this lock cannot be acquired
-  // while component_lock is held: otherwise, Flush and Compaction threads
-  // (that both first acquire this lock and then component_lock) will deadlock.
+  // at a time. Protects redo_delta_stores_ and undo_delta_stores_.
+  // NOTE: this lock cannot be acquired while component_lock is held:
+  // otherwise, Flush and Compaction threads (that both first acquire this lock
+  // and then component_lock) will deadlock.
   //
   // TODO(perf): this needs to be more fine grained
   mutable Mutex compact_flush_lock_;

http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/tablet/deltafile.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc
index 3975f4c..7a9e55f 100644
--- a/src/kudu/tablet/deltafile.cc
+++ b/src/kudu/tablet/deltafile.cc
@@ -260,6 +260,16 @@ bool DeltaFileReader::IsRelevantForSnapshot(const MvccSnapshot& snap) const {
   return false;
 }
 
+Status DeltaFileReader::CloneForDebugging(FsManager* fs_manager,
+                                          const shared_ptr<MemTracker>& parent_mem_tracker,
+                                          shared_ptr<DeltaFileReader>* out) const {
+  gscoped_ptr<ReadableBlock> block;
+  RETURN_NOT_OK(fs_manager->OpenBlock(reader_->block_id(), &block));
+  ReaderOptions options;
+  options.parent_mem_tracker = parent_mem_tracker;
+  return DeltaFileReader::OpenNoInit(std::move(block), delta_type_, options, out);
+}
+
 Status DeltaFileReader::NewDeltaIterator(const Schema *projection,
                                          const MvccSnapshot &snap,
                                          DeltaIterator** iterator) const {

http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/tablet/deltafile.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile.h b/src/kudu/tablet/deltafile.h
index 35060d8..78bea23 100644
--- a/src/kudu/tablet/deltafile.h
+++ b/src/kudu/tablet/deltafile.h
@@ -158,6 +158,12 @@ class DeltaFileReader : public DeltaStore,
   // been fully initialized.
   bool IsRelevantForSnapshot(const MvccSnapshot& snap) const;
 
+  // Clone this DeltaFileReader for testing and validation purposes (such as
+  // while in DEBUG mode). The resulting object will not be Initted().
+  Status CloneForDebugging(FsManager* fs_manager,
+                           const std::shared_ptr<MemTracker>& parent_mem_tracker,
+                           std::shared_ptr<DeltaFileReader>* out) const;
+
  private:
   friend class DeltaFileIterator;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/tablet/diskrowset-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/diskrowset-test.cc b/src/kudu/tablet/diskrowset-test.cc
index c64f8ba..c4e5ff7 100644
--- a/src/kudu/tablet/diskrowset-test.cc
+++ b/src/kudu/tablet/diskrowset-test.cc
@@ -489,21 +489,25 @@ TEST_F(TestRowSet, TestCompactStores) {
   ASSERT_OK(rs->FlushDeltas());
   // One file isn't enough for minor compactions, but a major compaction can run.
   ASSERT_EQ(0, rs->DeltaStoresCompactionPerfImprovementScore(RowSet::MINOR_DELTA_COMPACTION));
-  BetweenZeroAndOne(rs->DeltaStoresCompactionPerfImprovementScore(RowSet::MAJOR_DELTA_COMPACTION));
+  NO_FATALS(BetweenZeroAndOne(rs->DeltaStoresCompactionPerfImprovementScore(
+      RowSet::MAJOR_DELTA_COMPACTION)));
 
   // Write a second delta file.
   UpdateExistingRows(rs.get(), FLAGS_update_fraction, nullptr);
   ASSERT_OK(rs->FlushDeltas());
   // Two files is enough for all delta compactions.
-  BetweenZeroAndOne(rs->DeltaStoresCompactionPerfImprovementScore(RowSet::MINOR_DELTA_COMPACTION));
-  BetweenZeroAndOne(rs->DeltaStoresCompactionPerfImprovementScore(RowSet::MAJOR_DELTA_COMPACTION));
+  NO_FATALS(BetweenZeroAndOne(rs->DeltaStoresCompactionPerfImprovementScore(
+      RowSet::MINOR_DELTA_COMPACTION)));
+  NO_FATALS(BetweenZeroAndOne(rs->DeltaStoresCompactionPerfImprovementScore(
+      RowSet::MAJOR_DELTA_COMPACTION)));
 
   // Write a third delta file.
   UpdateExistingRows(rs.get(), FLAGS_update_fraction, nullptr);
   ASSERT_OK(rs->FlushDeltas());
   // We're hitting the max for minor compactions but not for major compactions.
   ASSERT_EQ(1, rs->DeltaStoresCompactionPerfImprovementScore(RowSet::MINOR_DELTA_COMPACTION));
-  BetweenZeroAndOne(rs->DeltaStoresCompactionPerfImprovementScore(RowSet::MAJOR_DELTA_COMPACTION));
+  NO_FATALS(BetweenZeroAndOne(rs->DeltaStoresCompactionPerfImprovementScore(
+      RowSet::MAJOR_DELTA_COMPACTION)));
 
   // Compact the deltafiles
   DeltaTracker *dt = rs->delta_tracker();
@@ -516,24 +520,59 @@ TEST_F(TestRowSet, TestCompactStores) {
   ASSERT_EQ(1,  num_stores);
   // Back to one store, can't minor compact.
   ASSERT_EQ(0, rs->DeltaStoresCompactionPerfImprovementScore(RowSet::MINOR_DELTA_COMPACTION));
-  BetweenZeroAndOne(rs->DeltaStoresCompactionPerfImprovementScore(RowSet::MAJOR_DELTA_COMPACTION));
+  NO_FATALS(BetweenZeroAndOne(rs->DeltaStoresCompactionPerfImprovementScore(
+      RowSet::MAJOR_DELTA_COMPACTION)));
 
   // Verify that the resulting deltafile is valid
   vector<shared_ptr<DeltaStore> > compacted_stores;
   vector<BlockId> compacted_blocks;
   unique_ptr<DeltaIterator> merge_iter;
   ASSERT_OK(dt->MakeDeltaIteratorMergerUnlocked(0, num_stores - 1, &schema_,
-                                                       &compacted_stores,
-                                                       &compacted_blocks, &merge_iter));
+                                                &compacted_stores,
+                                                &compacted_blocks, &merge_iter));
   vector<string> results;
   ASSERT_OK(DebugDumpDeltaIterator(REDO, merge_iter.get(), schema_,
-                                          ITERATE_OVER_ALL_ROWS,
-                                          &results));
+                                   ITERATE_OVER_ALL_ROWS,
+                                   &results));
   for (const string &str : results) {
     VLOG(1) << str;
   }
   ASSERT_TRUE(is_sorted(results.begin(), results.end()));
 }
 
+TEST_F(TestRowSet, TestGCAncientStores) {
+  // Disable lazy open so that major delta compactions don't require manual REDO initialization.
+  FLAGS_cfile_lazy_open = false;
+
+  // Write some base data.
+  // Note: Our test methods here don't write UNDO delete deltas.
+  WriteTestRowSet();
+  shared_ptr<DiskRowSet> rs;
+  ASSERT_OK(OpenTestRowSet(&rs));
+  DeltaTracker *dt = rs->delta_tracker();
+  ASSERT_EQ(0, dt->CountUndoDeltaStores());
+  ASSERT_EQ(0, dt->CountRedoDeltaStores());
+
+  // Write and flush a new REDO delta file.
+  UpdateExistingRows(rs.get(), FLAGS_update_fraction, nullptr);
+  ASSERT_OK(rs->FlushDeltas());
+  ASSERT_EQ(0, dt->CountUndoDeltaStores());
+  ASSERT_EQ(1, dt->CountRedoDeltaStores());
+
+  // Convert the REDO delta to an UNDO delta.
+  ASSERT_OK(rs->MajorCompactDeltaStores(HistoryGcOpts::Disabled()));
+  ASSERT_EQ(1, dt->CountUndoDeltaStores()); // From doing the major delta compaction.
+  ASSERT_EQ(0, dt->CountRedoDeltaStores());
+
+  // Delete all the UNDO deltas. There shouldn't be any delta stores left.
+  int64_t blocks_deleted;
+  int64_t bytes_deleted;
+  ASSERT_OK(dt->DeleteAncientUndoDeltas(clock_->Now(), &blocks_deleted, &bytes_deleted));
+  ASSERT_GT(blocks_deleted, 0);
+  ASSERT_GT(bytes_deleted, 0);
+  ASSERT_EQ(0, dt->CountUndoDeltaStores());
+  ASSERT_EQ(0, dt->CountRedoDeltaStores());
+}
+
 } // namespace tablet
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/tablet/diskrowset.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/diskrowset.cc b/src/kudu/tablet/diskrowset.cc
index 462ee32..06b3897 100644
--- a/src/kudu/tablet/diskrowset.cc
+++ b/src/kudu/tablet/diskrowset.cc
@@ -17,6 +17,7 @@
 
 #include <algorithm>
 #include <glog/logging.h>
+#include <glog/stl_logging.h>
 #include <mutex>
 #include <vector>
 
@@ -515,6 +516,7 @@ Status DiskRowSet::MajorCompactDeltaStores(HistoryGcOpts history_gc_opts) {
   delta_tracker_->GetColumnIdsWithUpdates(&col_ids);
 
   if (col_ids.empty()) {
+    VLOG_WITH_PREFIX(2) << "There are no column ids with updates";
     return Status::OK();
   }
 
@@ -523,33 +525,39 @@ Status DiskRowSet::MajorCompactDeltaStores(HistoryGcOpts history_gc_opts) {
 
 Status DiskRowSet::MajorCompactDeltaStoresWithColumnIds(const vector<ColumnId>& col_ids,
                                                         HistoryGcOpts history_gc_opts) {
-  TRACE_EVENT0("tablet", "DiskRowSet::MajorCompactDeltaStores");
+  LOG_WITH_PREFIX(INFO) << "Major compacting REDO delta stores (cols: " << col_ids << ")";
+  TRACE_EVENT0("tablet", "DiskRowSet::MajorCompactDeltaStoresWithColumnIds");
   std::lock_guard<Mutex> l(*delta_tracker()->compact_flush_lock());
 
-  // TODO: do we need to lock schema or anything here?
+  // TODO(todd): do we need to lock schema or anything here?
   gscoped_ptr<MajorDeltaCompaction> compaction;
   RETURN_NOT_OK(NewMajorDeltaCompaction(col_ids, std::move(history_gc_opts), &compaction));
 
   RETURN_NOT_OK(compaction->Compact());
 
-  // Update and flush the metadata. This needs to happen before we make the new files visible to
-  // prevent inconsistencies after a server crash.
+  // Update the metadata.
   RowSetMetadataUpdate update;
   RETURN_NOT_OK(compaction->CreateMetadataUpdate(&update));
   RETURN_NOT_OK(rowset_metadata_->CommitUpdate(update));
-  RETURN_NOT_OK(rowset_metadata_->Flush());
 
-  // Make the new base data and delta files visible.
+  // Since we've already updated the metadata in memory, now we update the
+  // delta tracker's stores. Those stores should match the blocks in the
+  // metadata so, since we've already updated the metadata, we use CHECK_OK
+  // here.
   shared_ptr<CFileSet> new_base;
   RETURN_NOT_OK(CFileSet::Open(rowset_metadata_,
                                mem_trackers_.tablet_tracker,
                                &new_base));
   {
     std::lock_guard<percpu_rwlock> lock(component_lock_);
-    RETURN_NOT_OK(compaction->UpdateDeltaTracker(delta_tracker_.get()));
+    CHECK_OK(compaction->UpdateDeltaTracker(delta_tracker_.get()));
     base_data_.swap(new_base);
   }
-  return Status::OK();
+
+  // We don't CHECK_OK on Flush here because if we don't successfully flush we
+  // don't have consistency problems in the case of major delta compaction --
+  // we are not adding additional mutations that weren't already present.
+  return rowset_metadata_->Flush();
 }
 
 Status DiskRowSet::NewMajorDeltaCompaction(const vector<ColumnId>& col_ids,
@@ -735,11 +743,33 @@ double DiskRowSet::DeltaStoresCompactionPerfImprovementScore(DeltaCompactionType
       perf_improv = static_cast<double>(store_count) / FLAGS_tablet_delta_store_minor_compact_max;
     }
   } else {
-    LOG(FATAL) << "Unknown delta compaction type " << type;
+    LOG_WITH_PREFIX(FATAL) << "Unknown delta compaction type " << type;
   }
   return std::min(1.0, perf_improv);
 }
 
+Status DiskRowSet::EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp ancient_history_mark,
+                                                               int64_t* bytes) {
+  TRACE_EVENT0("tablet", "DiskRowSet::EstimateBytesInPotentiallyAncientUndoDeltas");
+  return delta_tracker_->EstimateBytesInPotentiallyAncientUndoDeltas(ancient_history_mark, bytes);
+}
+
+Status DiskRowSet::InitUndoDeltas(Timestamp ancient_history_mark,
+                                  MonoTime deadline,
+                                  int64_t* delta_blocks_initialized,
+                                  int64_t* bytes_in_ancient_undos) {
+  TRACE_EVENT0("tablet", "DiskRowSet::InitUndoDeltas");
+  return delta_tracker_->InitUndoDeltas(ancient_history_mark, deadline,
+                                        delta_blocks_initialized, bytes_in_ancient_undos);
+}
+
+Status DiskRowSet::DeleteAncientUndoDeltas(Timestamp ancient_history_mark,
+                                           int64_t* blocks_deleted, int64_t* bytes_deleted) {
+  TRACE_EVENT0("tablet", "DiskRowSet::DeleteAncientUndoDeltas");
+  return delta_tracker_->DeleteAncientUndoDeltas(ancient_history_mark,
+                                                 blocks_deleted, bytes_deleted);
+}
+
 Status DiskRowSet::DebugDump(vector<string> *lines) {
   // Using CompactionInput to dump our data is an easy way of seeing all the
   // rows and deltas.

http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/tablet/diskrowset.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/diskrowset.h b/src/kudu/tablet/diskrowset.h
index 2398c77..8d0313b 100644
--- a/src/kudu/tablet/diskrowset.h
+++ b/src/kudu/tablet/diskrowset.h
@@ -348,6 +348,17 @@ class DiskRowSet : public RowSet {
 
   double DeltaStoresCompactionPerfImprovementScore(DeltaCompactionType type) const OVERRIDE;
 
+  Status EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp ancient_history_mark,
+                                                     int64_t* bytes) OVERRIDE;
+
+  Status InitUndoDeltas(Timestamp ancient_history_mark,
+                        MonoTime deadline,
+                        int64_t* delta_blocks_initialized,
+                        int64_t* bytes_in_ancient_undos) OVERRIDE;
+
+  Status DeleteAncientUndoDeltas(Timestamp ancient_history_mark,
+                                 int64_t* blocks_deleted, int64_t* bytes_deleted) OVERRIDE;
+
   // Major compacts all the delta files for all the columns.
   Status MajorCompactDeltaStores(HistoryGcOpts history_gc_opts);
 
@@ -367,7 +378,14 @@ class DiskRowSet : public RowSet {
     return rowset_metadata_->ToString();
   }
 
-  virtual Status DebugDump(std::vector<std::string> *out = NULL) OVERRIDE;
+  std::string LogPrefix() const {
+    return strings::Substitute("T $0 P $1: $2: ",
+        rowset_metadata_->tablet_metadata()->tablet_id(),
+        rowset_metadata_->fs_manager()->uuid(),
+        ToString());
+  }
+
+  virtual Status DebugDump(std::vector<std::string> *lines = NULL) OVERRIDE;
 
  private:
   FRIEND_TEST(TestRowSet, TestRowSetUpdate);

http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/tablet/memrowset.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/memrowset.h b/src/kudu/tablet/memrowset.h
index dd438eb..5e180c5 100644
--- a/src/kudu/tablet/memrowset.h
+++ b/src/kudu/tablet/memrowset.h
@@ -338,6 +338,29 @@ class MemRowSet : public RowSet,
     return 0;
   }
 
+  Status EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp /*ancient_history_mark*/,
+                                                     int64_t* bytes) OVERRIDE {
+    DCHECK(bytes);
+    *bytes = 0;
+    return Status::OK();
+  }
+
+  Status InitUndoDeltas(Timestamp /*ancient_history_mark*/,
+                        MonoTime /*deadline*/,
+                        int64_t* delta_blocks_initialized,
+                        int64_t* bytes_in_ancient_undos) OVERRIDE {
+    if (delta_blocks_initialized) *delta_blocks_initialized = 0;
+    if (bytes_in_ancient_undos) *bytes_in_ancient_undos = 0;
+    return Status::OK();
+  }
+
+  Status DeleteAncientUndoDeltas(Timestamp /*ancient_history_mark*/,
+                                 int64_t* blocks_deleted, int64_t* bytes_deleted) OVERRIDE {
+    if (blocks_deleted) *blocks_deleted = 0;
+    if (bytes_deleted) *bytes_deleted = 0;
+    return Status::OK();
+  }
+
   Status FlushDeltas() OVERRIDE { return Status::OK(); }
 
   Status MinorCompactDeltaStores() OVERRIDE { return Status::OK(); }

http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/tablet/mock-rowsets.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mock-rowsets.h b/src/kudu/tablet/mock-rowsets.h
index 2b5622d..c93f985 100644
--- a/src/kudu/tablet/mock-rowsets.h
+++ b/src/kudu/tablet/mock-rowsets.h
@@ -118,6 +118,27 @@ class MockRowSet : public RowSet {
     return Status::OK();
   }
 
+  virtual Status EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp /*ancient_history_mark*/,
+                                                             int64_t* /*bytes*/) OVERRIDE {
+    LOG(FATAL) << "Unimplemented";
+    return Status::OK();
+  }
+
+  virtual Status InitUndoDeltas(Timestamp /*ancient_history_mark*/,
+                                MonoTime /*deadline*/,
+                                int64_t* /*delta_blocks_initialized*/,
+                                int64_t* /*bytes_in_ancient_undos*/) OVERRIDE {
+    LOG(FATAL) << "Unimplemented";
+    return Status::OK();
+  }
+
+  virtual Status DeleteAncientUndoDeltas(Timestamp /*ancient_history_mark*/,
+                                         int64_t* /*blocks_deleted*/,
+                                         int64_t* /*bytes_deleted*/) OVERRIDE {
+    LOG(FATAL) << "Unimplemented";
+    return Status::OK();
+  }
+
   virtual bool IsAvailableForCompaction() OVERRIDE {
     return true;
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/tablet/mt-tablet-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mt-tablet-test.cc b/src/kudu/tablet/mt-tablet-test.cc
index 5d320c0..bb6af9a 100644
--- a/src/kudu/tablet/mt-tablet-test.cc
+++ b/src/kudu/tablet/mt-tablet-test.cc
@@ -28,6 +28,7 @@
 #include "kudu/util/test_graph.h"
 #include "kudu/util/thread.h"
 
+DECLARE_int32(tablet_history_max_age_sec);
 DECLARE_double(tablet_delta_store_major_compact_min_ratio);
 DECLARE_int32(tablet_delta_store_minor_compact_max);
 DEFINE_int32(num_insert_threads, 8, "Number of inserting threads to launch");
@@ -37,6 +38,7 @@ DEFINE_int32(num_updater_threads, 1, "Number of updating threads to launch");
 DEFINE_int32(num_slowreader_threads, 1, "Number of 'slow' reader threads to launch");
 DEFINE_int32(num_flush_threads, 1, "Number of flusher reader threads to launch");
 DEFINE_int32(num_compact_threads, 1, "Number of compactor threads to launch");
+DEFINE_int32(num_undo_delta_gc_threads, 1, "Number of undo delta gc threads to launch");
 DEFINE_int32(num_flush_delta_threads, 1, "Number of delta flusher reader threads to launch");
 DEFINE_int32(num_minor_compact_deltas_threads, 1,
              "Number of delta minor compactor threads to launch");
@@ -81,8 +83,10 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
     ts_collector_.StartDumperThread();
   }
 
-  MultiThreadedTabletTest()
-    : running_insert_count_(FLAGS_num_insert_threads),
+  explicit MultiThreadedTabletTest(TabletHarness::Options::ClockType clock_type =
+                                   TabletHarness::Options::ClockType::LOGICAL_CLOCK)
+    : TabletTestBase<SETUP>(clock_type),
+      running_insert_count_(FLAGS_num_insert_threads),
       ts_collector_(::testing::UnitTest::GetInstance()->current_test_info()->test_case_name()) {
   }
 
@@ -254,9 +258,7 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
     return sum;
   }
 
-
-
-  void FlushThread(int tid) {
+  void FlushThread(int /*tid*/) {
     // Start off with a very short wait time between flushes.
     // But, especially in debug mode, this will only allow a few
     // rows to get inserted between each flush, and the test will take
@@ -301,6 +303,7 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
   void CompactDeltas(RowSet::DeltaCompactionType type) {
     int wait_time = 100;
     while (running_insert_count_.count() > 0) {
+      VLOG(1) << "Compacting worst deltas";
       CHECK_OK(tablet()->CompactWorstDeltas(type));
 
       // Wait, unless the inserters are all done.
@@ -318,6 +321,27 @@ class MultiThreadedTabletTest : public TabletTestBase<SETUP> {
     }
   }
 
+  void DeleteAncientUndoDeltasThread(int /*tid*/) {
+    int wait_time = 100;
+    while (running_insert_count_.count() > 0) {
+      MonoDelta time_budget = MonoDelta::FromMilliseconds(wait_time);
+      int64_t bytes_in_ancient_undos = 0;
+      CHECK_OK(tablet()->InitAncientUndoDeltas(time_budget, &bytes_in_ancient_undos));
+      VLOG(1) << "Found " << bytes_in_ancient_undos << " bytes of ancient delta undos";
+
+      int64_t blocks_deleted = 0;
+      int64_t bytes_deleted = 0;
+      CHECK_OK(tablet()->DeleteAncientUndoDeltas(&blocks_deleted, &bytes_deleted));
+      if (blocks_deleted > 0) {
+        LOG(INFO) << "Deleted " << blocks_deleted << " blocks (" << bytes_deleted << " bytes) "
+                  << "of ancient delta undos";
+      }
+
+      // Wait, unless the inserters are all done.
+      running_insert_count_.WaitFor(MonoDelta::FromMilliseconds(wait_time));
+    }
+  }
+
   // Thread which cycles between inserting and deleting a test row, each time
   // with a different value.
   void DeleteAndReinsertCycleThread(int tid) {
@@ -412,6 +436,7 @@ TYPED_TEST(MultiThreadedTabletTest, DoTestAllAtOnce) {
   this->StartThreads(FLAGS_num_summer_threads, &TestFixture::SummerThread);
   this->StartThreads(FLAGS_num_flush_threads, &TestFixture::FlushThread);
   this->StartThreads(FLAGS_num_compact_threads, &TestFixture::CompactThread);
+  this->StartThreads(FLAGS_num_undo_delta_gc_threads, &TestFixture::DeleteAncientUndoDeltasThread);
   this->StartThreads(FLAGS_num_flush_delta_threads, &TestFixture::FlushDeltasThread);
   this->StartThreads(FLAGS_num_minor_compact_deltas_threads,
                      &TestFixture::MinorCompactDeltasThread);
@@ -442,6 +467,7 @@ TYPED_TEST(MultiThreadedTabletTest, DeleteAndReinsert) {
   FLAGS_tablet_delta_store_minor_compact_max = 10;
   this->StartThreads(FLAGS_num_flush_threads, &TestFixture::FlushThread);
   this->StartThreads(FLAGS_num_compact_threads, &TestFixture::CompactThread);
+  this->StartThreads(FLAGS_num_undo_delta_gc_threads, &TestFixture::DeleteAncientUndoDeltasThread);
   this->StartThreads(FLAGS_num_flush_delta_threads, &TestFixture::FlushDeltasThread);
   this->StartThreads(FLAGS_num_minor_compact_deltas_threads,
                      &TestFixture::MinorCompactDeltasThread);
@@ -465,5 +491,50 @@ TYPED_TEST(MultiThreadedTabletTest, DeleteAndReinsert) {
   this->JoinThreads();
 }
 
+// For tests where we want to use the hybrid clock. The hybrid clock is
+// required for tablet history gc.
+template<class SETUP>
+class MultiThreadedHybridClockTabletTest : public MultiThreadedTabletTest<SETUP> {
+ public:
+  MultiThreadedHybridClockTabletTest()
+    : MultiThreadedTabletTest<SETUP>(TabletHarness::Options::ClockType::HYBRID_CLOCK) {
+  }
+};
+
+TYPED_TEST_CASE(MultiThreadedHybridClockTabletTest, TabletTestHelperTypes);
+
+// Perform many updates and continuously flush and major compact deltas, as
+// well as run undo delta gc.
+TYPED_TEST(MultiThreadedHybridClockTabletTest, UpdateNoMergeCompaction) {
+  google::FlagSaver saver;
+  FLAGS_tablet_history_max_age_sec = 0; // GC data as aggressively as possible.
+
+  FLAGS_flusher_backoff = 1.0f;
+  FLAGS_flusher_initial_frequency_ms = 1;
+  FLAGS_tablet_delta_store_major_compact_min_ratio = 0.01f;
+  FLAGS_tablet_delta_store_minor_compact_max = 10;
+  this->StartThreads(FLAGS_num_flush_threads, &TestFixture::FlushThread);
+  this->StartThreads(FLAGS_num_flush_delta_threads, &TestFixture::FlushDeltasThread);
+  this->StartThreads(FLAGS_num_major_compact_deltas_threads,
+                     &TestFixture::MajorCompactDeltasThread);
+  this->StartThreads(10, &TestFixture::DeleteAndReinsertCycleThread);
+  this->StartThreads(10, &TestFixture::StubbornlyUpdateSameRowThread);
+  this->StartThreads(FLAGS_num_undo_delta_gc_threads, &TestFixture::DeleteAncientUndoDeltasThread);
+
+  // Run very quickly in dev builds, longer in slow builds.
+  float runtime_seconds = AllowSlowTests() ? 2 : 0.1;
+  Stopwatch sw;
+  sw.start();
+  while (sw.elapsed().wall < runtime_seconds * NANOS_PER_SECOND &&
+         !this->HasFatalFailure()) {
+    SleepFor(MonoDelta::FromMilliseconds(5));
+  }
+
+  // This is sort of a hack -- the flusher thread stops when it sees this
+  // countdown latch go to 0.
+  this->running_insert_count_.Reset(0);
+  this->JoinThreads();
+}
+
 } // namespace tablet
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/tablet/rowset.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/rowset.h b/src/kudu/tablet/rowset.h
index 094e76c..cd6dcaf 100644
--- a/src/kudu/tablet/rowset.h
+++ b/src/kudu/tablet/rowset.h
@@ -143,6 +143,56 @@ class RowSet {
   // Compact delta stores if more than one.
   virtual Status MinorCompactDeltaStores() = 0;
 
+  // Estimate the number of bytes in ancient undo delta stores. This may be an
+  // overestimate. The argument 'ancient_history_mark' must be valid (it may
+  // not be equal to Timestamp::kInvalidTimestamp).
+  virtual Status EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp ancient_history_mark,
+                                                             int64_t* bytes) = 0;
+
+  // Initialize undo delta blocks until the given 'deadline' is passed, or
+  // until all undo delta blocks with a max timestamp older than
+  // 'ancient_history_mark' have been initialized.
+  //
+  // Invoking this method may also improve the estimate given by
+  // EstimateBytesInPotentiallyAncientUndoDeltas().
+  //
+  // If this method returns OK, it returns the number of blocks actually
+  // initialized in the out-param 'delta_blocks_initialized' and the number of
+  // bytes that can be freed from disk in 'bytes_in_ancient_undos'.
+  //
+  // If 'ancient_history_mark' is set to Timestamp::kInvalidTimestamp then the
+  // 'max_timestamp' of the blocks being initialized is ignored and no
+  // age-based short-circuiting takes place.
+  // If 'deadline' is not Initialized() then no deadline is enforced.
+  //
+  // The out-parameters, 'delta_blocks_initialized' and 'bytes_in_ancient_undos',
+  // may be passed in as nullptr.
+  virtual Status InitUndoDeltas(Timestamp ancient_history_mark,
+                                MonoTime deadline,
+                                int64_t* delta_blocks_initialized,
+                                int64_t* bytes_in_ancient_undos) = 0;
+
+  // Delete all initialized undo delta blocks with a max timestamp earlier than
+  // the specified 'ancient_history_mark'.
+  //
+  // Note: This method does not flush updates to the rowset metadata. If this
+  // method returns OK, the caller is responsible for persisting changes to the
+  // rowset metadata by explicity flushing it.
+  //
+  // Note also: Blocks are not actually deleted until the rowset metadata is
+  // flushed, because that invokes tablet metadata flush, which iterates over
+  // and deletes the blocks present in the metadata orphans list.
+  //
+  // If this method returns OK, it also returns the number of delta blocks
+  // deleted and the number of bytes deleted in the out-params 'blocks_deleted'
+  // and 'bytes_deleted', respectively.
+  //
+  // The out-parameters, 'blocks_deleted' and 'bytes_deleted', may be passed in
+  // as nullptr.
+  virtual Status DeleteAncientUndoDeltas(Timestamp ancient_history_mark,
+                                         int64_t* blocks_deleted,
+                                         int64_t* bytes_deleted) = 0;
+
   virtual ~RowSet() {}
 
   // Return true if this RowSet is available for compaction, based on
@@ -313,6 +363,29 @@ class DuplicatingRowSet : public RowSet {
     return Status::OK();
   }
 
+  Status EstimateBytesInPotentiallyAncientUndoDeltas(Timestamp /*ancient_history_mark*/,
+                                                     int64_t* bytes) OVERRIDE {
+    DCHECK(bytes);
+    *bytes = 0;
+    return Status::OK();
+  }
+
+  Status InitUndoDeltas(Timestamp /*ancient_history_mark*/,
+                        MonoTime /*deadline*/,
+                        int64_t* delta_blocks_initialized,
+                        int64_t* bytes_in_ancient_undos) OVERRIDE {
+    if (delta_blocks_initialized) *delta_blocks_initialized = 0;
+    if (bytes_in_ancient_undos) *bytes_in_ancient_undos = 0;
+    return Status::OK();
+  }
+
+  Status DeleteAncientUndoDeltas(Timestamp /*ancient_history_mark*/,
+                                 int64_t* blocks_deleted, int64_t* bytes_deleted) OVERRIDE {
+    if (blocks_deleted) *blocks_deleted = 0;
+    if (bytes_deleted) *bytes_deleted = 0;
+    return Status::OK();
+  }
+
   Status MinorCompactDeltaStores() OVERRIDE { return Status::OK(); }
 
  private:

http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/tablet/rowset_metadata.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/rowset_metadata.cc b/src/kudu/tablet/rowset_metadata.cc
index 57be903..f0e80d0 100644
--- a/src/kudu/tablet/rowset_metadata.cc
+++ b/src/kudu/tablet/rowset_metadata.cc
@@ -19,13 +19,17 @@
 
 #include <mutex>
 #include <string>
+#include <unordered_set>
 #include <utility>
 #include <vector>
 
+#include <glog/stl_logging.h>
+
 #include "kudu/common/wire_protocol.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/gutil/map-util.h"
 
+using std::vector;
 using strings::Substitute;
 
 namespace kudu {
@@ -186,6 +190,29 @@ Status RowSetMetadata::CommitUpdate(const RowSetMetadataUpdate& update) {
       redo_delta_blocks_.push_back(b);
     }
 
+    // Remove undo blocks.
+    std::unordered_set<BlockId, BlockIdHash> undos_to_remove(update.remove_undo_blocks_.begin(),
+                                                             update.remove_undo_blocks_.end());
+    int64_t num_removed = 0;
+    auto iter = undo_delta_blocks_.begin();
+    while (iter != undo_delta_blocks_.end()) {
+      if (ContainsKey(undos_to_remove, *iter)) {
+        removed.push_back(*iter);
+        undos_to_remove.erase(*iter);
+        iter = undo_delta_blocks_.erase(iter);
+        num_removed++;
+      } else {
+        ++iter;
+      }
+    }
+    CHECK(undos_to_remove.empty())
+        << "Tablet " << tablet_metadata_->tablet_id() << " RowSet " << id_ << ": "
+        << "Attempted to remove an undo delta block from the RowSetMetadata that is not present. "
+        << "Removed: { " << removed << " }; "
+        << "Failed to remove: { "
+        << vector<BlockId>(undos_to_remove.begin(), undos_to_remove.end())
+        << " }";
+
     if (!update.new_undo_block_.IsNull()) {
       // Front-loading to keep the UNDO files in their natural order.
       undo_delta_blocks_.insert(undo_delta_blocks_.begin(), update.new_undo_block_);
@@ -259,6 +286,12 @@ RowSetMetadataUpdate& RowSetMetadataUpdate::ReplaceRedoDeltaBlocks(
   return *this;
 }
 
+RowSetMetadataUpdate& RowSetMetadataUpdate::RemoveUndoDeltaBlocks(
+    const std::vector<BlockId>& to_remove) {
+  remove_undo_blocks_.insert(remove_undo_blocks_.end(), to_remove.begin(), to_remove.end());
+  return *this;
+}
+
 RowSetMetadataUpdate& RowSetMetadataUpdate::SetNewUndoBlock(const BlockId& undo_block) {
   new_undo_block_ = undo_block;
   return *this;

http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/tablet/rowset_metadata.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/rowset_metadata.h b/src/kudu/tablet/rowset_metadata.h
index 8813602..ee664b7 100644
--- a/src/kudu/tablet/rowset_metadata.h
+++ b/src/kudu/tablet/rowset_metadata.h
@@ -230,6 +230,9 @@ class RowSetMetadataUpdate {
   RowSetMetadataUpdate& ReplaceRedoDeltaBlocks(const std::vector<BlockId>& to_remove,
                                                const std::vector<BlockId>& to_add);
 
+  // Remove the specified undo delta blocks.
+  RowSetMetadataUpdate& RemoveUndoDeltaBlocks(const std::vector<BlockId>& to_remove);
+
   // Replace the CFile for the given column ID.
   RowSetMetadataUpdate& ReplaceColumnId(ColumnId col_id, const BlockId& block_id);
 
@@ -251,6 +254,8 @@ class RowSetMetadataUpdate {
     std::vector<BlockId> to_add;
   };
   std::vector<ReplaceDeltaBlocks> replace_redo_blocks_;
+
+  std::vector<BlockId> remove_undo_blocks_;
   BlockId new_undo_block_;
 
   DISALLOW_COPY_AND_ASSIGN(RowSetMetadataUpdate);

http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/tablet/tablet.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 5f24041..19e68c8 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -39,6 +39,7 @@
 #include "kudu/gutil/atomicops.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/stl_util.h"
+#include "kudu/gutil/strings/human_readable.h"
 #include "kudu/gutil/strings/numbers.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/server/hybrid_clock.h"
@@ -64,6 +65,7 @@
 #include "kudu/util/maintenance_manager.h"
 #include "kudu/util/mem_tracker.h"
 #include "kudu/util/metrics.h"
+#include "kudu/util/scoped_cleanup.h"
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/trace.h"
 #include "kudu/util/url-coding.h"
@@ -131,6 +133,13 @@ DEFINE_int32(max_encoded_key_size_bytes, 16 * 1024,
              "result in an error.");
 TAG_FLAG(max_encoded_key_size_bytes, unsafe);
 
+DEFINE_bool(enable_undo_delta_block_gc, true,
+    "Whether to enable undo delta block garbage collection. "
+    "This only affects the undo delta block deletion background task, and "
+    "doesn't control whether compactions delete ancient history. "
+    "To change what is considered ancient history use --tablet_history_max_age_sec");
+TAG_FLAG(enable_undo_delta_block_gc, evolving);
+
 METRIC_DEFINE_entity(tablet);
 METRIC_DEFINE_gauge_size(tablet, memrowset_size, "MemRowSet Memory Usage",
                          kudu::MetricUnit::kBytes,
@@ -1117,6 +1126,12 @@ void Tablet::RegisterMaintenanceOps(MaintenanceManager* maint_mgr) {
   gscoped_ptr<MaintenanceOp> major_delta_compact_op(new MajorDeltaCompactionOp(this));
   maint_mgr->RegisterOp(major_delta_compact_op.get());
   maintenance_ops_.push_back(major_delta_compact_op.release());
+
+  if (FLAGS_enable_undo_delta_block_gc) {
+    gscoped_ptr<MaintenanceOp> undo_delta_block_gc_op(new UndoDeltaBlockGCOp(this));
+    maint_mgr->RegisterOp(undo_delta_block_gc_op.get());
+    maintenance_ops_.push_back(undo_delta_block_gc_op.release());
+  }
 }
 
 void Tablet::UnregisterMaintenanceOps() {
@@ -1637,6 +1652,31 @@ Status Tablet::FlushBiggestDMS() {
   return max_size > 0 ? biggest_drs->FlushDeltas() : Status::OK();
 }
 
+Status Tablet::FlushAllDMSForTests() {
+  CHECK_EQ(state_, kOpen);
+  scoped_refptr<TabletComponents> comps;
+  GetComponents(&comps);
+  for (const auto& rowset : comps->rowsets->all_rowsets()) {
+    RETURN_NOT_OK(rowset->FlushDeltas());
+  }
+  return Status::OK();
+}
+
+Status Tablet::MajorCompactAllDeltaStoresForTests() {
+  LOG_WITH_PREFIX(INFO) << "Major compacting all delta stores, for tests";
+  CHECK_EQ(state_, kOpen);
+  scoped_refptr<TabletComponents> comps;
+  GetComponents(&comps);
+  for (const auto& rs : comps->rowsets->all_rowsets()) {
+    if (!rs->IsAvailableForCompaction()) continue;
+    DiskRowSet* drs = down_cast<DiskRowSet*>(rs.get());
+    RETURN_NOT_OK(drs->delta_tracker()->InitAllDeltaStoresForTests(DeltaTracker::REDOS_ONLY));
+    RETURN_NOT_OK_PREPEND(drs->MajorCompactDeltaStores(
+        GetHistoryGcOpts()), "Failed major delta compaction on " + rs->ToString());
+  }
+  return Status::OK();
+}
+
 Status Tablet::CompactWorstDeltas(RowSet::DeltaCompactionType type) {
   CHECK_EQ(state_, kOpen);
   shared_ptr<RowSet> rs;
@@ -1700,6 +1740,172 @@ double Tablet::GetPerfImprovementForBestDeltaCompactUnlocked(RowSet::DeltaCompac
   return worst_delta_perf;
 }
 
+Status Tablet::EstimateBytesInPotentiallyAncientUndoDeltas(int64_t* bytes) {
+  DCHECK(bytes);
+
+  Timestamp ancient_history_mark;
+  if (!Tablet::GetTabletAncientHistoryMark(&ancient_history_mark)) {
+    VLOG_WITH_PREFIX(1) << "Cannot get ancient history mark. "
+                           "The clock is likely not a hybrid clock";
+    return Status::OK();
+  }
+
+  scoped_refptr<TabletComponents> comps;
+  GetComponents(&comps);
+
+  int64_t tablet_bytes = 0;
+  for (const auto& rowset : comps->rowsets->all_rowsets()) {
+    int64_t rowset_bytes;
+    RETURN_NOT_OK(rowset->EstimateBytesInPotentiallyAncientUndoDeltas(ancient_history_mark,
+                                                                      &rowset_bytes));
+    tablet_bytes += rowset_bytes;
+  }
+
+  metrics_->undo_delta_block_estimated_retained_bytes->set_value(tablet_bytes);
+  *bytes = tablet_bytes;
+  return Status::OK();
+}
+
+Status Tablet::InitAncientUndoDeltas(MonoDelta time_budget, int64_t* bytes_in_ancient_undos) {
+  MonoTime tablet_init_start = MonoTime::Now();
+
+  Timestamp ancient_history_mark;
+  if (!Tablet::GetTabletAncientHistoryMark(&ancient_history_mark)) {
+    VLOG_WITH_PREFIX(1) << "Cannot get ancient history mark. "
+                           "The clock is likely not a hybrid clock";
+    return Status::OK();
+  }
+
+  scoped_refptr<TabletComponents> comps;
+  GetComponents(&comps);
+
+  RowSetVector rowsets = comps->rowsets->all_rowsets();
+
+  // Estimate the size of the ancient undos in each rowset so that we can
+  // initialize them greedily.
+  vector<pair<size_t, int64_t>> rowset_ancient_undos_est_sizes; // index, bytes
+  rowset_ancient_undos_est_sizes.reserve(rowsets.size());
+  for (size_t i = 0; i < rowsets.size(); i++) {
+    const auto& rowset = rowsets[i];
+    int64_t bytes;
+    RETURN_NOT_OK(rowset->EstimateBytesInPotentiallyAncientUndoDeltas(ancient_history_mark,
+                                                                      &bytes));
+    rowset_ancient_undos_est_sizes.emplace_back(i, bytes);
+  }
+
+  // Sort the rowsets in descending size order to optimize for the worst offenders.
+  std::sort(rowset_ancient_undos_est_sizes.begin(), rowset_ancient_undos_est_sizes.end(),
+            [&](const pair<size_t, int64_t>& a, const pair<size_t, int64_t>& b) {
+              return a.second > b.second; // Descending order.
+            });
+
+  // Begin timeout / deadline countdown here in case the above takes some time.
+  MonoTime deadline = time_budget.Initialized() ? MonoTime::Now() + time_budget : MonoTime();
+
+  // Initialize the rowsets largest-first.
+  int64_t tablet_bytes_in_ancient_undos = 0;
+  for (const auto& rs_est_size : rowset_ancient_undos_est_sizes) {
+    size_t index = rs_est_size.first;
+    const auto& rowset = rowsets[index];
+    int64_t rowset_blocks_initialized;
+    int64_t rowset_bytes_in_ancient_undos;
+    RETURN_NOT_OK(rowset->InitUndoDeltas(ancient_history_mark, deadline,
+                                         &rowset_blocks_initialized,
+                                         &rowset_bytes_in_ancient_undos));
+    tablet_bytes_in_ancient_undos += rowset_bytes_in_ancient_undos;
+  }
+
+  MonoDelta tablet_init_duration = MonoTime::Now() - tablet_init_start;
+  metrics_->undo_delta_block_gc_init_duration->Increment(
+      tablet_init_duration.ToMilliseconds());
+
+  VLOG_WITH_PREFIX(2) << Substitute("Bytes in ancient undos: $0. Init duration: $1",
+                                    HumanReadableNumBytes::ToString(tablet_bytes_in_ancient_undos),
+                                    tablet_init_duration.ToString());
+
+  if (bytes_in_ancient_undos) *bytes_in_ancient_undos = tablet_bytes_in_ancient_undos;
+  return Status::OK();
+}
+
+Status Tablet::DeleteAncientUndoDeltas(int64_t* blocks_deleted, int64_t* bytes_deleted) {
+  MonoTime tablet_delete_start = MonoTime::Now();
+
+  Timestamp ancient_history_mark;
+  if (!Tablet::GetTabletAncientHistoryMark(&ancient_history_mark)) return Status::OK();
+
+  scoped_refptr<TabletComponents> comps;
+  GetComponents(&comps);
+
+  // We need to hold the compact_flush_lock for each rowset we GC undos from.
+  RowSetVector rowsets_to_gc_undos;
+  vector<std::unique_lock<std::mutex>> rowset_locks;
+  {
+    // We hold the selection lock so other threads will not attempt to select the
+    // same rowsets for compaction while we delete old undos.
+    std::lock_guard<std::mutex> compact_lock(compact_select_lock_);
+    for (const auto& rowset : comps->rowsets->all_rowsets()) {
+      if (!rowset->IsAvailableForCompaction()) {
+        continue;
+      }
+      std::unique_lock<std::mutex> lock(*rowset->compact_flush_lock(), std::try_to_lock);
+      CHECK(lock.owns_lock()) << rowset->ToString() << " unable to lock compact_flush_lock";
+      rowsets_to_gc_undos.push_back(rowset);
+      rowset_locks.push_back(std::move(lock));
+    }
+  }
+
+  int64_t tablet_blocks_deleted = 0;
+  int64_t tablet_bytes_deleted = 0;
+  for (const auto& rowset : rowsets_to_gc_undos) {
+    int64_t rowset_blocks_deleted;
+    int64_t rowset_bytes_deleted;
+    RETURN_NOT_OK(rowset->DeleteAncientUndoDeltas(ancient_history_mark,
+                                                  &rowset_blocks_deleted, &rowset_bytes_deleted));
+    tablet_blocks_deleted += rowset_blocks_deleted;
+    tablet_bytes_deleted += rowset_bytes_deleted;
+  }
+  // We flush the tablet metadata at the end because we don't flush per-RowSet
+  // for performance reasons.
+  if (tablet_blocks_deleted > 0) {
+    RETURN_NOT_OK(metadata_->Flush());
+  }
+
+  MonoDelta tablet_delete_duration = MonoTime::Now() - tablet_delete_start;
+  metrics_->undo_delta_block_gc_bytes_deleted->IncrementBy(tablet_bytes_deleted);
+  metrics_->undo_delta_block_gc_delete_duration->Increment(
+      tablet_delete_duration.ToMilliseconds());
+
+  if (blocks_deleted) *blocks_deleted = tablet_blocks_deleted;
+  if (bytes_deleted) *bytes_deleted = tablet_bytes_deleted;
+  return Status::OK();
+}
+
+int64_t Tablet::CountUndoDeltasForTests() const {
+  scoped_refptr<TabletComponents> comps;
+  GetComponents(&comps);
+  int64_t sum = 0;
+  for (const auto& rowset : comps->rowsets->all_rowsets()) {
+    shared_ptr<RowSetMetadata> metadata = rowset->metadata();
+    if (metadata) {
+      sum += metadata->undo_delta_blocks().size();
+    }
+  }
+  return sum;
+}
+
+int64_t Tablet::CountRedoDeltasForTests() const {
+  scoped_refptr<TabletComponents> comps;
+  GetComponents(&comps);
+  int64_t sum = 0;
+  for (const auto& rowset : comps->rowsets->all_rowsets()) {
+    shared_ptr<RowSetMetadata> metadata = rowset->metadata();
+    if (metadata) {
+      sum += metadata->redo_delta_blocks().size();
+    }
+  }
+  return sum;
+}
+
 size_t Tablet::num_rowsets() const {
   shared_lock<rw_spinlock> l(component_lock_);
   return components_->rowsets->all_rowsets().size();

http://git-wip-us.apache.org/repos/asf/kudu/blob/fea449ac/src/kudu/tablet/tablet.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index 0d4f82f..3749a71 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -17,6 +17,7 @@
 #ifndef KUDU_TABLET_TABLET_H
 #define KUDU_TABLET_TABLET_H
 
+#include <condition_variable>
 #include <iosfwd>
 #include <map>
 #include <memory>
@@ -254,6 +255,13 @@ class Tablet {
   // Flush only the biggest DMS
   Status FlushBiggestDMS();
 
+  // Flush all delta memstores. Only used for tests.
+  Status FlushAllDMSForTests();
+
+  // Run a major compaction on all delta stores. Initializes any un-initialized
+  // redo delta stores. Only used for tests.
+  Status MajorCompactAllDeltaStoresForTests();
+
   // Finds the RowSet which has the most separate delta files and
   // issues a delta compaction.
   Status CompactWorstDeltas(RowSet::DeltaCompactionType type);
@@ -270,6 +278,26 @@ class Tablet {
   double GetPerfImprovementForBestDeltaCompactUnlocked(RowSet::DeltaCompactionType type,
                                                        std::shared_ptr<RowSet>* rs) const;
 
+  // Estimate the number of bytes in ancient undo delta stores. This may be an
+  // overestimate.
+  Status EstimateBytesInPotentiallyAncientUndoDeltas(int64_t* bytes);
+
+  // Initialize undo delta blocks for up to 'time_budget' amount of time.
+  // If 'time_budget' is not Initialized() then there is no time limit.
+  // If this method returns OK, the number of bytes found in ancient undo files
+  // is returned in the out-param 'bytes_in_ancient_undos'.
+  Status InitAncientUndoDeltas(MonoDelta time_budget, int64_t* bytes_in_ancient_undos);
+
+  // Find and delete all undo delta blocks that have a maximum op timestamp
+  // prior to the current ancient history mark. If this method returns OK, the
+  // number of blocks and bytes deleted are returned in the out-parameters.
+  Status DeleteAncientUndoDeltas(int64_t* blocks_deleted = nullptr,
+                                 int64_t* bytes_deleted = nullptr);
+
+  // Count the number of deltas in the tablet. Only used for tests.
+  int64_t CountUndoDeltasForTests() const;
+  int64_t CountRedoDeltasForTests() const;
+
   // Return the current number of rowsets in the tablet.
   size_t num_rowsets() const;
 
@@ -550,7 +578,6 @@ class Tablet {
 
   gscoped_ptr<CompactionPolicy> compaction_policy_;
 
-
   // Lock protecting the selection of rowsets for compaction.
   // Only one thread may run the compaction selection algorithm at a time
   // so that they don't both try to select the same rowset.