You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/02/07 19:45:08 UTC
[2/2] kudu git commit: compaction: Add additional validation in
DeltaTracker
compaction: Add additional validation in DeltaTracker
This adds DEBUG-mode validation to AtomicUpdateStores() in the
DeltaTracker. It also updates the doc comment on the method.
Change-Id: I05fed515bc5d6a09484fadb61e73ac1346f6654a
Reviewed-on: http://gerrit.cloudera.org:8080/5919
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans <jd...@apache.org>
Reviewed-by: David Ribeiro Alves <dr...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/a72ed600
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/a72ed600
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/a72ed600
Branch: refs/heads/master
Commit: a72ed600c0916906359025b2afc0668ffb9d0f5d
Parents: 32abe5b
Author: Mike Percy <mp...@apache.org>
Authored: Fri Feb 3 18:04:39 2017 -0800
Committer: Mike Percy <mp...@apache.org>
Committed: Tue Feb 7 19:44:34 2017 +0000
----------------------------------------------------------------------
src/kudu/tablet/delta_compaction.cc | 5 +-
src/kudu/tablet/delta_tracker.cc | 81 +++++++++++++++++++++++++++-----
src/kudu/tablet/delta_tracker.h | 2 +
src/kudu/tablet/deltafile.h | 3 +-
src/kudu/util/status.h | 12 +++++
5 files changed, 87 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/a72ed600/src/kudu/tablet/delta_compaction.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_compaction.cc b/src/kudu/tablet/delta_compaction.cc
index f66e402..d2b69b9 100644
--- a/src/kudu/tablet/delta_compaction.cc
+++ b/src/kudu/tablet/delta_compaction.cc
@@ -355,12 +355,11 @@ Status MajorDeltaCompaction::UpdateDeltaTracker(DeltaTracker* tracker) {
if (undo_delta_mutations_written_ > 0) {
vector<BlockId> new_undo_blocks;
new_undo_blocks.push_back(new_undo_delta_block_);
- return tracker->AtomicUpdateStores(SharedDeltaStoreVector(),
+ return tracker->AtomicUpdateStores({},
new_undo_blocks,
UNDO);
- } else {
- return Status::OK();
}
+ return Status::OK();
}
} // namespace tablet
http://git-wip-us.apache.org/repos/asf/kudu/blob/a72ed600/src/kudu/tablet/delta_tracker.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_tracker.cc b/src/kudu/tablet/delta_tracker.cc
index ea09750..7d25345 100644
--- a/src/kudu/tablet/delta_tracker.cc
+++ b/src/kudu/tablet/delta_tracker.cc
@@ -169,9 +169,37 @@ string JoinDeltaStoreStrings(const SharedDeltaStoreVector& stores) {
return ::JoinStrings(strings, ",");
}
+// Validate that 'first' may precede 'second' in an ordered list of deltas,
+// given a delta type of 'type'.
+void ValidateDeltaOrder(const std::shared_ptr<DeltaStore>& first,
+ const std::shared_ptr<DeltaStore>& second,
+ DeltaType type) {
+ DCHECK_OK(first->Init());
+ DCHECK_OK(second->Init());
+ switch (type) {
+ case REDO:
+ DCHECK_LE(first->delta_stats().min_timestamp(), second->delta_stats().min_timestamp())
+ << "Found out-of-order deltas: [{" << first->ToString() << "}, {"
+ << second->ToString() << "}]: type = " << type;
+ break;
+ case UNDO:
+ DCHECK_GE(first->delta_stats().min_timestamp(), second->delta_stats().min_timestamp())
+ << "Found out-of-order deltas: [{" << first->ToString() << "}, {"
+ << second->ToString() << "}]: type = " << type;
+ break;
+ }
+}
+
+// Validate the relative ordering of the deltas in the specified list.
+void ValidateDeltasOrdered(const SharedDeltaStoreVector& list, DeltaType type) {
+ for (size_t i = 0; i < list.size() - 1; i++) {
+ ValidateDeltaOrder(list[i], list[i + 1], type);
+ }
+}
+
} // anonymous namespace
-Status DeltaTracker::AtomicUpdateStores(const SharedDeltaStoreVector& to_remove,
+Status DeltaTracker::AtomicUpdateStores(const SharedDeltaStoreVector& stores_to_replace,
const vector<BlockId>& new_delta_blocks,
DeltaType type) {
SharedDeltaStoreVector new_stores;
@@ -182,30 +210,59 @@ Status DeltaTracker::AtomicUpdateStores(const SharedDeltaStoreVector& to_remove,
SharedDeltaStoreVector* stores_to_update =
type == REDO ? &redo_delta_stores_ : &undo_delta_stores_;
SharedDeltaStoreVector::iterator start_it;
- // TODO this is hacky, we do this because UNDOs don't currently get replaced and we need to
- // front-load them. When we start GCing UNDO files (KUDU-236) we'll need to be able to atomically
- // replace them too, and in their right order.
- if (!to_remove.empty()) {
+
+ // We only support two scenarios in this function:
+ //
+ // 1. Prepending deltas to the specified list.
+ // In the case of prepending REDO deltas, that means we should only be
+ // prepending older-timestamped data because REDOs are stored in ascending
+ // timestamp order, and in the case of UNDO deltas, that means we should
+ // only be prepending newer-timestamped data because UNDOs are stored in
+ // descending timestamp order.
+ //
+ // 2. Replacing a range of deltas with a replacement range.
+ // In the case of major REDO delta compaction, we are simply compacting a
+ // range of REDOs into a smaller set of REDOs.
+ //
+ // We validate these assumptions (in DEBUG mode only) below.
+
+ if (stores_to_replace.empty()) {
+ // With nothing to remove, we always prepend to the front of the list.
+ start_it = stores_to_update->begin();
+
+ } else {
+ // With something to remove, we do a range-replace.
start_it =
- std::find(stores_to_update->begin(), stores_to_update->end(), to_remove[0]);
+ std::find(stores_to_update->begin(), stores_to_update->end(), stores_to_replace[0]);
auto end_it = start_it;
- for (const shared_ptr<DeltaStore>& ds : to_remove) {
+ for (const shared_ptr<DeltaStore>& ds : stores_to_replace) {
if (end_it == stores_to_update->end() || *end_it != ds) {
return Status::InvalidArgument(
strings::Substitute("Cannot find deltastore sequence <$0> in <$1>",
- JoinDeltaStoreStrings(to_remove),
+ JoinDeltaStoreStrings(stores_to_replace),
JoinDeltaStoreStrings(*stores_to_update)));
}
++end_it;
}
- // Remove the old stores
+ // Remove the old stores.
stores_to_update->erase(start_it, end_it);
- } else {
- start_it = stores_to_update->begin();
}
- // Insert the new store
+#ifndef NDEBUG
+ // Perform validation checks to ensure callers do not violate our contract.
+ if (!new_stores.empty()) {
+ // Make sure the new stores are already ordered.
+ ValidateDeltasOrdered(new_stores, type);
+ if (start_it != stores_to_update->end()) {
+ // Sanity check that the last store we are adding would logically appear
+ // before the first store that would follow it.
+ ValidateDeltaOrder(*new_stores.rbegin(), *start_it, type);
+ }
+ }
+#endif // NDEBUG
+
+ // Insert the new stores.
stores_to_update->insert(start_it, new_stores.begin(), new_stores.end());
VLOG(1) << "New " << DeltaType_Name(type) << " stores: "
http://git-wip-us.apache.org/repos/asf/kudu/blob/a72ed600/src/kudu/tablet/delta_tracker.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_tracker.h b/src/kudu/tablet/delta_tracker.h
index ba56d31..a65d263 100644
--- a/src/kudu/tablet/delta_tracker.h
+++ b/src/kudu/tablet/delta_tracker.h
@@ -152,6 +152,8 @@ class DeltaTracker {
// Replace the subsequence of stores that matches 'stores_to_replace' with
// delta file readers corresponding to 'new_delta_blocks', which may be empty.
+ // If 'stores_to_replace' is empty then the stores represented by
+ // 'new_delta_blocks' are prepended to the relevant delta stores list.
Status AtomicUpdateStores(const SharedDeltaStoreVector& stores_to_replace,
const std::vector<BlockId>& new_delta_blocks,
DeltaType type);
http://git-wip-us.apache.org/repos/asf/kudu/blob/a72ed600/src/kudu/tablet/deltafile.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile.h b/src/kudu/tablet/deltafile.h
index 5706b08..35060d8 100644
--- a/src/kudu/tablet/deltafile.h
+++ b/src/kudu/tablet/deltafile.h
@@ -149,7 +149,8 @@ class DeltaFileReader : public DeltaStore,
}
virtual std::string ToString() const OVERRIDE {
- return reader_->ToString();
+ if (!init_once_.initted()) return reader_->ToString();
+ return strings::Substitute("$0 ($1)", reader_->ToString(), delta_stats_->ToString());
}
// Returns true if this delta file may include any deltas which need to be
http://git-wip-us.apache.org/repos/asf/kudu/blob/a72ed600/src/kudu/util/status.h
----------------------------------------------------------------------
diff --git a/src/kudu/util/status.h b/src/kudu/util/status.h
index 289d4e1..75f0699 100644
--- a/src/kudu/util/status.h
+++ b/src/kudu/util/status.h
@@ -82,6 +82,15 @@
/// logged message.
#define KUDU_CHECK_OK(s) KUDU_CHECK_OK_PREPEND(s, "Bad status")
+/// @brief If @c to_call returns a bad status, DCHECK immediately with
+/// a logged message of @c msg followed by the status.
+#define KUDU_DCHECK_OK_PREPEND(to_call, msg) do { \
+ const ::kudu::Status& _s = (to_call); \
+ KUDU_DCHECK(_s.ok()) << (msg) << ": " << _s.ToString(); \
+ } while (0);
+
+#define KUDU_DCHECK_OK(s) KUDU_DCHECK_OK_PREPEND(s, "Bad status")
+
/// @file status.h
///
/// This header is used in both the Kudu build as well as in builds of
@@ -104,10 +113,13 @@
#define RETURN_NOT_OK_LOG KUDU_RETURN_NOT_OK_LOG
#define CHECK_OK_PREPEND KUDU_CHECK_OK_PREPEND
#define CHECK_OK KUDU_CHECK_OK
+#define DCHECK_OK_PREPEND KUDU_DCHECK_OK_PREPEND
+#define DCHECK_OK KUDU_DCHECK_OK
// These are standard glog macros.
#define KUDU_LOG LOG
#define KUDU_CHECK CHECK
+#define KUDU_DCHECK DCHECK
#endif
namespace kudu {