You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/02/07 19:45:08 UTC

[2/2] kudu git commit: compaction: Add additional validation in DeltaTracker

compaction: Add additional validation in DeltaTracker

This adds DEBUG-mode validation to AtomicUpdateStores() in the
DeltaTracker. It also updates the doc comment on the method.

Change-Id: I05fed515bc5d6a09484fadb61e73ac1346f6654a
Reviewed-on: http://gerrit.cloudera.org:8080/5919
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Reviewed-by: David Ribeiro Alves <dr...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/a72ed600
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/a72ed600
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/a72ed600

Branch: refs/heads/master
Commit: a72ed600c0916906359025b2afc0668ffb9d0f5d
Parents: 32abe5b
Author: Mike Percy <mp...@apache.org>
Authored: Fri Feb 3 18:04:39 2017 -0800
Committer: Mike Percy <mp...@apache.org>
Committed: Tue Feb 7 19:44:34 2017 +0000

----------------------------------------------------------------------
 src/kudu/tablet/delta_compaction.cc |  5 +-
 src/kudu/tablet/delta_tracker.cc    | 81 +++++++++++++++++++++++++++-----
 src/kudu/tablet/delta_tracker.h     |  2 +
 src/kudu/tablet/deltafile.h         |  3 +-
 src/kudu/util/status.h              | 12 +++++
 5 files changed, 87 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/a72ed600/src/kudu/tablet/delta_compaction.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_compaction.cc b/src/kudu/tablet/delta_compaction.cc
index f66e402..d2b69b9 100644
--- a/src/kudu/tablet/delta_compaction.cc
+++ b/src/kudu/tablet/delta_compaction.cc
@@ -355,12 +355,11 @@ Status MajorDeltaCompaction::UpdateDeltaTracker(DeltaTracker* tracker) {
   if (undo_delta_mutations_written_ > 0) {
     vector<BlockId> new_undo_blocks;
     new_undo_blocks.push_back(new_undo_delta_block_);
-    return tracker->AtomicUpdateStores(SharedDeltaStoreVector(),
+    return tracker->AtomicUpdateStores({},
                                        new_undo_blocks,
                                        UNDO);
-  } else {
-    return Status::OK();
   }
+  return Status::OK();
 }
 
 } // namespace tablet

http://git-wip-us.apache.org/repos/asf/kudu/blob/a72ed600/src/kudu/tablet/delta_tracker.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index ea09750..7d25345 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -169,9 +169,37 @@ string JoinDeltaStoreStrings(const SharedDeltaStoreVector& stores) {
   return ::JoinStrings(strings, ",");
 }
 
+// Validate that 'first' may precede 'second' in an ordered list of deltas,
+// given a delta type of 'type'.
+void ValidateDeltaOrder(const std::shared_ptr<DeltaStore>& first,
+                        const std::shared_ptr<DeltaStore>& second,
+                        DeltaType type) {
+  DCHECK_OK(first->Init());
+  DCHECK_OK(second->Init());
+  switch (type) {
+    case REDO:
+      DCHECK_LE(first->delta_stats().min_timestamp(), second->delta_stats().min_timestamp())
+          << "Found out-of-order deltas: [{" << first->ToString() << "}, {"
+          << second->ToString() << "}]: type = " << type;
+      break;
+    case UNDO:
+      DCHECK_GE(first->delta_stats().min_timestamp(), second->delta_stats().min_timestamp())
+          << "Found out-of-order deltas: [{" << first->ToString() << "}, {"
+          << second->ToString() << "}]: type = " << type;
+      break;
+  }
+}
+
+// Validate the relative ordering of the deltas in the specified list.
+void ValidateDeltasOrdered(const SharedDeltaStoreVector& list, DeltaType type) {
+  for (size_t i = 0; i < list.size() - 1; i++) {
+    ValidateDeltaOrder(list[i], list[i + 1], type);
+  }
+}
+
 } // anonymous namespace
 
-Status DeltaTracker::AtomicUpdateStores(const SharedDeltaStoreVector& to_remove,
+Status DeltaTracker::AtomicUpdateStores(const SharedDeltaStoreVector& stores_to_replace,
                                         const vector<BlockId>& new_delta_blocks,
                                         DeltaType type) {
   SharedDeltaStoreVector new_stores;
@@ -182,30 +210,59 @@ Status DeltaTracker::AtomicUpdateStores(const SharedDeltaStoreVector& to_remove,
   SharedDeltaStoreVector* stores_to_update =
       type == REDO ? &redo_delta_stores_ : &undo_delta_stores_;
   SharedDeltaStoreVector::iterator start_it;
-  // TODO this is hacky, we do this because UNDOs don't currently get replaced and we need to
-  // front-load them. When we start GCing UNDO files (KUDU-236) we'll need to be able to atomically
-  // replace them too, and in their right order.
-  if (!to_remove.empty()) {
+
+  // We only support two scenarios in this function:
+  //
+  // 1. Prepending deltas to the specified list.
+  //    In the case of prepending REDO deltas, that means we should only be
+  //    prepending older-timestamped data because REDOs are stored in ascending
+  //    timestamp order, and in the case of UNDO deltas, that means we should
+  //    only be prepending newer-timestamped data because UNDOs are stored in
+  //    descending timestamp order.
+  //
+  // 2. Replacing a range of deltas with a replacement range.
+  //    In the case of major REDO delta compaction, we are simply compacting a
+  //    range of REDOs into a smaller set of REDOs.
+  //
+  // We validate these assumptions (in DEBUG mode only) below.
+
+  if (stores_to_replace.empty()) {
+    // With nothing to remove, we always prepend to the front of the list.
+    start_it = stores_to_update->begin();
+
+  } else {
+    // With something to remove, we do a range-replace.
     start_it =
-        std::find(stores_to_update->begin(), stores_to_update->end(), to_remove[0]);
+        std::find(stores_to_update->begin(), stores_to_update->end(), stores_to_replace[0]);
 
     auto end_it = start_it;
-    for (const shared_ptr<DeltaStore>& ds : to_remove) {
+    for (const shared_ptr<DeltaStore>& ds : stores_to_replace) {
       if (end_it == stores_to_update->end() || *end_it != ds) {
         return Status::InvalidArgument(
             strings::Substitute("Cannot find deltastore sequence <$0> in <$1>",
-                                JoinDeltaStoreStrings(to_remove),
+                                JoinDeltaStoreStrings(stores_to_replace),
                                 JoinDeltaStoreStrings(*stores_to_update)));
       }
       ++end_it;
     }
-    // Remove the old stores
+    // Remove the old stores.
     stores_to_update->erase(start_it, end_it);
-  } else {
-    start_it = stores_to_update->begin();
   }
 
-  // Insert the new store
+#ifndef NDEBUG
+  // Perform validation checks to ensure callers do not violate our contract.
+  if (!new_stores.empty()) {
+    // Make sure the new stores are already ordered.
+    ValidateDeltasOrdered(new_stores, type);
+    if (start_it != stores_to_update->end()) {
+      // Sanity check that the last store we are adding would logically appear
+      // before the first store that would follow it.
+      ValidateDeltaOrder(*new_stores.rbegin(), *start_it, type);
+    }
+  }
+#endif // NDEBUG
+
+  // Insert the new stores.
   stores_to_update->insert(start_it, new_stores.begin(), new_stores.end());
 
   VLOG(1) << "New " << DeltaType_Name(type) << " stores: "

http://git-wip-us.apache.org/repos/asf/kudu/blob/a72ed600/src/kudu/tablet/delta_tracker.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_tracker.h b/src/kudu/tablet/delta_tracker.h
index ba56d31..a65d263 100644
--- a/src/kudu/tablet/delta_tracker.h
+++ b/src/kudu/tablet/delta_tracker.h
@@ -152,6 +152,8 @@ class DeltaTracker {
 
   // Replace the subsequence of stores that matches 'stores_to_replace' with
   // delta file readers corresponding to 'new_delta_blocks', which may be empty.
+  // If 'stores_to_replace' is empty then the stores represented by
+  // 'new_delta_blocks' are prepended to the relevant delta stores list.
   Status AtomicUpdateStores(const SharedDeltaStoreVector& stores_to_replace,
                             const std::vector<BlockId>& new_delta_blocks,
                             DeltaType type);

http://git-wip-us.apache.org/repos/asf/kudu/blob/a72ed600/src/kudu/tablet/deltafile.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile.h b/src/kudu/tablet/deltafile.h
index 5706b08..35060d8 100644
--- a/src/kudu/tablet/deltafile.h
+++ b/src/kudu/tablet/deltafile.h
@@ -149,7 +149,8 @@ class DeltaFileReader : public DeltaStore,
   }
 
   virtual std::string ToString() const OVERRIDE {
-    return reader_->ToString();
+    if (!init_once_.initted()) return reader_->ToString();
+    return strings::Substitute("$0 ($1)", reader_->ToString(), delta_stats_->ToString());
   }
 
   // Returns true if this delta file may include any deltas which need to be

http://git-wip-us.apache.org/repos/asf/kudu/blob/a72ed600/src/kudu/util/status.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/status.h b/src/kudu/util/status.h
index 289d4e1..75f0699 100644
--- a/src/kudu/util/status.h
+++ b/src/kudu/util/status.h
@@ -82,6 +82,15 @@
 ///   logged message.
 #define KUDU_CHECK_OK(s) KUDU_CHECK_OK_PREPEND(s, "Bad status")
 
+/// @brief If @c to_call returns a bad status, DCHECK immediately with
+///   a logged message of @c msg followed by the status.
+#define KUDU_DCHECK_OK_PREPEND(to_call, msg) do { \
+    const ::kudu::Status& _s = (to_call);                   \
+    KUDU_DCHECK(_s.ok()) << (msg) << ": " << _s.ToString();  \
+  } while (0);
+
+#define KUDU_DCHECK_OK(s) KUDU_DCHECK_OK_PREPEND(s, "Bad status")
+
 /// @file status.h
 ///
 /// This header is used in both the Kudu build as well as in builds of
@@ -104,10 +113,13 @@
 #define RETURN_NOT_OK_LOG     KUDU_RETURN_NOT_OK_LOG
 #define CHECK_OK_PREPEND      KUDU_CHECK_OK_PREPEND
 #define CHECK_OK              KUDU_CHECK_OK
+#define DCHECK_OK_PREPEND     KUDU_DCHECK_OK_PREPEND
+#define DCHECK_OK             KUDU_DCHECK_OK
 
 // These are standard glog macros.
 #define KUDU_LOG              LOG
 #define KUDU_CHECK            CHECK
+#define KUDU_DCHECK           DCHECK
 #endif
 
 namespace kudu {