You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2018/12/07 03:47:52 UTC
[3/5] kudu git commit: (03/05) delta_store: support iteration with
snap_to_exclude
(03/05) delta_store: support iteration with snap_to_exclude
This patch changes the delta stores (DMS and delta files) to respect
snap_to_exclude during iteration. The key changes are:
- The introduction of the "selection" criteria, a new delta relevancy
formula for determining whether a delta applies to a scan with both
snap_to_exclude and snap_to_include. The existing "application" criteria
was formalized and moved into delta_relevancy.h. There was also a
non-trivial change to DeltaFileReader::IsRelevantForSnapshot() to use both
criterias when culling entire delta files.
- A new SelectUpdates() method for using the selection criteria on a batch
of prepared deltas. SelectUpdates() requires new in-memory state, the
creation of which is gated behind a new PREPARE_FOR_SELECT flag so as not
to affect regular scans.
- Updates to the delta fuzz testing logic to test iterators with two
timestamps, and to provide SelectUpdates() coverage.
A future patch will modify DeltaApplier to use SelectUpdates for diff scans.
Change-Id: I7811e185fef270f40fdbbb38f491eee8b4aa043c
Reviewed-on: http://gerrit.cloudera.org:8080/11858
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/9f4657ab
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/9f4657ab
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/9f4657ab
Branch: refs/heads/master
Commit: 9f4657ab56366df541eddce12850726038d95fd0
Parents: d927675
Author: Adar Dembo <ad...@cloudera.com>
Authored: Thu Nov 1 17:07:18 2018 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Fri Dec 7 03:45:13 2018 +0000
----------------------------------------------------------------------
src/kudu/tablet/delta_iterator_merger.cc | 7 +
src/kudu/tablet/delta_iterator_merger.h | 2 +
src/kudu/tablet/delta_relevancy.h | 181 ++++++++++++++++++++++++++
src/kudu/tablet/delta_store.cc | 132 ++++++++++++-------
src/kudu/tablet/delta_store.h | 28 +++-
src/kudu/tablet/deltafile.cc | 41 ++++--
src/kudu/tablet/deltafile.h | 8 +-
src/kudu/tablet/deltamemstore.cc | 5 +-
src/kudu/tablet/deltamemstore.h | 2 +
src/kudu/tablet/tablet-test-util.cc | 17 ++-
src/kudu/tablet/tablet-test-util.h | 142 ++++++++++++++++----
11 files changed, 473 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/9f4657ab/src/kudu/tablet/delta_iterator_merger.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_iterator_merger.cc b/src/kudu/tablet/delta_iterator_merger.cc
index f6f5840..58d14da 100644
--- a/src/kudu/tablet/delta_iterator_merger.cc
+++ b/src/kudu/tablet/delta_iterator_merger.cc
@@ -79,6 +79,13 @@ Status DeltaIteratorMerger::ApplyDeletes(SelectionVector* sel_vec) {
return Status::OK();
}
+Status DeltaIteratorMerger::SelectUpdates(SelectionVector* sel_vec) {
+ for (const unique_ptr<DeltaIterator>& iter : iters_) {
+ RETURN_NOT_OK(iter->SelectUpdates(sel_vec));
+ }
+ return Status::OK();
+}
+
Status DeltaIteratorMerger::CollectMutations(vector<Mutation*>* dst, Arena* arena) {
for (const unique_ptr<DeltaIterator> &iter : iters_) {
RETURN_NOT_OK(iter->CollectMutations(dst, arena));
http://git-wip-us.apache.org/repos/asf/kudu/blob/9f4657ab/src/kudu/tablet/delta_iterator_merger.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_iterator_merger.h b/src/kudu/tablet/delta_iterator_merger.h
index a8efeb6..c5a36e8 100644
--- a/src/kudu/tablet/delta_iterator_merger.h
+++ b/src/kudu/tablet/delta_iterator_merger.h
@@ -67,6 +67,8 @@ class DeltaIteratorMerger : public DeltaIterator {
Status ApplyDeletes(SelectionVector* sel_vec) override;
+ Status SelectUpdates(SelectionVector* sel_vec) override;
+
Status CollectMutations(std::vector<Mutation*>* dst, Arena* arena) override;
Status FilterColumnIdsAndCollectDeltas(const std::vector<ColumnId>& col_ids,
http://git-wip-us.apache.org/repos/asf/kudu/blob/9f4657ab/src/kudu/tablet/delta_relevancy.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_relevancy.h b/src/kudu/tablet/delta_relevancy.h
new file mode 100644
index 0000000..69640ac
--- /dev/null
+++ b/src/kudu/tablet/delta_relevancy.h
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "kudu/common/timestamp.h"
+#include "kudu/tablet/delta_key.h"
+#include "kudu/tablet/mvcc.h"
+
+namespace kudu {
+namespace tablet {
+
+// Functions that evaluate the relevancy of deltas against a snapshot or pair
+// of snapshots.
+//
+// When constructing an iterator tree, one must specify either one or two MVCC
+// snapshots. The first always serves as an upper bound, ensuring that the data
+// may be further mutated by writers without affecting the results of the
+// iteration. If present, the second serves as a lower bound, excluding any
+// deltas that were committed before it in time.
+//
+// Together, the snapshots affect the behavior of deltas encountered during the
+// iteration. Various criteria is used to determine whether a particular delta
+// is relevant to the iteration; which criteria is used depends on the context.
+//
+// Selection criteria
+// ==================
+// The selection (or "select") criteria determines whether the particular delta
+// should cause its row to be included in the iteration. It's only applicable to
+// two snapshots. In detail, regardless of delta type, if the delta's timestamp
+// is "between" the two snapshots (i.e. not committed in the lower bound
+// snapshot but committed in the upper bound snapshot), its row should be included.
+//
+// Application criteria
+// ====================
+// The application (or "apply") criteria determines whether the logical contents
+// of the UPDATE, DELETE, or REINSERT in a particular delta should be applied to
+// a row block during a scan. It only considers the first snapshot. In detail:
+// - For REDO deltas: if the delta's timestamp is committed in the snapshot,
+// the mutation should be applied.
+// - For UNDO deltas: if the delta's timestamp is not committed in the snapshot,
+// the mutation should be applied.
+
+// Returns whether a delta at 'delta_ts' is relevant under the apply criteria to 'snap'.
+template<DeltaType Type>
+inline bool IsDeltaRelevantForApply(const MvccSnapshot& snap,
+ const Timestamp& delta_ts) {
+ bool ignored;
+ return IsDeltaRelevantForApply<Type>(snap, delta_ts, &ignored);
+}
+
+// A variant of IsDeltaRelevantForApply that, if the delta is not relevant,
+// further checks whether any remaining deltas for this row can be skipped; this
+// is an optimization and not necessary for correctness.
+template<DeltaType Type>
+inline bool IsDeltaRelevantForApply(const MvccSnapshot& snap,
+ const Timestamp& delta_ts,
+ bool* finished_row);
+
+template<>
+inline bool IsDeltaRelevantForApply<REDO>(const MvccSnapshot& snap,
+ const Timestamp& delta_ts,
+ bool* finished_row) {
+ *finished_row = false;
+ if (snap.IsCommitted(delta_ts)) {
+ return true;
+ }
+ if (!snap.MayHaveCommittedTransactionsAtOrAfter(delta_ts)) {
+ // REDO deltas are sorted first in ascending row ordinal order, then in
+ // ascending timestamp order. Thus, if we know that there are no more
+ // committed transactions whose timestamps are >= 'delta_ts', we know that
+ // any future deltas belonging to this row aren't relevant (as per the apply
+ // criteria, REDOs are relevant if they are committed in the snapshot), and
+ // we can skip to the next row.
+ *finished_row = true;
+ }
+ return false;
+}
+
+template<>
+inline bool IsDeltaRelevantForApply<UNDO>(const MvccSnapshot& snap,
+ const Timestamp& delta_ts,
+ bool* finished_row) {
+ *finished_row = false;
+ if (!snap.IsCommitted(delta_ts)) {
+ return true;
+ }
+ if (!snap.MayHaveUncommittedTransactionsAtOrBefore(delta_ts)) {
+ // UNDO deltas are sorted first in ascending row ordinal order, then in
+ // descending timestamp order. Thus, if we know that there are no more
+ // uncommitted transactions whose timestamps are <= 'delta_ts', we know that
+ // any future deltas belonging to this row aren't relevant (as per the apply
+ // criteria, UNDOs are relevant if they are uncommitted in the snapshot),
+ // and we can skip to the next row.
+ *finished_row = true;
+ }
+ return false;
+}
+
+// Returns whether deltas within the time range 'delta_ts_start' to
+// 'delta_ts_end' are relevant under the select criteria to 'snap_start' and 'snap_end'.
+inline bool IsDeltaRelevantForSelect(const MvccSnapshot& snap_start,
+ const MvccSnapshot& snap_end,
+ const Timestamp& delta_ts_start,
+ const Timestamp& delta_ts_end) {
+ return !snap_start.IsCommitted(delta_ts_end) &&
+ snap_end.IsCommitted(delta_ts_start);
+}
+
+// A variant of IsDeltaRelevantForSelect that operates on a single delta's
+// timestamp given by 'delta_ts', and if the delta is not relevant, further
+// checks whether any remaining deltas for this row can be skipped; this is an
+// optimization and not necessary for correctness.
+template<DeltaType Type>
+inline bool IsDeltaRelevantForSelect(const MvccSnapshot& snap_start,
+ const MvccSnapshot& snap_end,
+ const Timestamp& delta_ts,
+ bool* finished_row);
+
+template<>
+inline bool IsDeltaRelevantForSelect<REDO>(const MvccSnapshot& snap_start,
+ const MvccSnapshot& snap_end,
+ const Timestamp& delta_ts,
+ bool* finished_row) {
+ *finished_row = false;
+ if (snap_start.IsCommitted(delta_ts)) {
+ // No short-circuit available here; because REDO deltas for a given row are
+ // sorted in ascending timestamp order, the next REDO may be uncommitted in
+ // 'snap_start'.
+ return false;
+ }
+ if (!snap_end.IsCommitted(delta_ts)) {
+ if (!snap_end.MayHaveCommittedTransactionsAtOrAfter(delta_ts)) {
+ // But if 'delta_ts' is not committed in 'snap_end', all future REDOs may
+ // also be uncommitted in 'snap_end'.
+ *finished_row = true;
+ }
+ return false;
+ }
+ return true;
+}
+
+template<>
+inline bool IsDeltaRelevantForSelect<UNDO>(const MvccSnapshot& snap_start,
+ const MvccSnapshot& snap_end,
+ const Timestamp& delta_ts,
+ bool* finished_row) {
+ *finished_row = false;
+ if (!snap_end.IsCommitted(delta_ts)) {
+ // No short-circuit available here; because UNDO deltas for a given row are
+ // sorted in descending timestamp order, the next UNDO may be committed in
+ // 'snap_end'.
+ return false;
+ }
+ if (snap_start.IsCommitted(delta_ts)) {
+ if (!snap_start.MayHaveUncommittedTransactionsAtOrBefore(delta_ts)) {
+ // But if 'delta_ts' is committed in 'snap_start', all future UNDOs may
+ // also be committed in 'snap_start'.
+ *finished_row = true;
+ }
+ return false;
+ }
+ return true;
+}
+
+} // namespace tablet
+} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/9f4657ab/src/kudu/tablet/delta_store.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_store.cc b/src/kudu/tablet/delta_store.cc
index bb7a91f..4f97476 100644
--- a/src/kudu/tablet/delta_store.cc
+++ b/src/kudu/tablet/delta_store.cc
@@ -35,6 +35,7 @@
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/strcat.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/tablet/delta_relevancy.h"
#include "kudu/tablet/delta_stats.h"
#include "kudu/tablet/deltafile.h"
#include "kudu/tablet/mutation.h"
@@ -51,47 +52,6 @@ using std::string;
using std::vector;
using strings::Substitute;
-namespace {
-
-// Returns whether a mutation at 'ts' is relevant under 'snap'.
-//
-// If not relevant, further checks whether any remaining deltas for this row can
-// be skipped; this is an optimization and not necessary for correctness.
-template<DeltaType Type>
-bool IsDeltaRelevant(const MvccSnapshot& snap,
- const Timestamp& ts,
- bool* finished_row);
-
-template<>
-bool IsDeltaRelevant<REDO>(const MvccSnapshot& snap,
- const Timestamp& ts,
- bool* finished_row) {
- *finished_row = false;
- if (!snap.IsCommitted(ts)) {
- if (!snap.MayHaveCommittedTransactionsAtOrAfter(ts)) {
- *finished_row = true;
- }
- return false;
- }
- return true;
-}
-
-template<>
-bool IsDeltaRelevant<UNDO>(const MvccSnapshot& snap,
- const Timestamp& ts,
- bool* finished_row) {
- *finished_row = false;
- if (snap.IsCommitted(ts)) {
- if (!snap.MayHaveUncommittedTransactionsAtOrBefore(ts)) {
- *finished_row = true;
- }
- return false;
- }
- return true;
-}
-
-} // anonymous namespace
-
string DeltaKeyAndUpdate::Stringify(DeltaType type, const Schema& schema, bool pad_key) const {
return StrCat(Substitute("($0 delta key=$2, change_list=$1)",
DeltaType_Name(type),
@@ -119,8 +79,18 @@ void DeltaPreparer<Traits>::Seek(rowid_t row_idx) {
}
template<class Traits>
-void DeltaPreparer<Traits>::Start(int prepare_flags) {
+void DeltaPreparer<Traits>::Start(size_t nrows, int prepare_flags) {
DCHECK_NE(prepare_flags, DeltaIterator::PREPARE_NONE);
+
+ if (prepare_flags & DeltaIterator::PREPARE_FOR_SELECT) {
+ DCHECK(opts_.snap_to_exclude);
+
+ // Ensure we have a selection vector at least 'nrows' long.
+ if (!selected_ || selected_->nrows() < nrows) {
+ selected_.reset(new SelectionVector(nrows));
+ }
+ selected_->SetAllFalse();
+ }
prepared_flags_ = prepare_flags;
if (updates_by_col_.empty()) {
updates_by_col_.resize(opts_.projection->num_columns());
@@ -132,6 +102,13 @@ void DeltaPreparer<Traits>::Start(int prepare_flags) {
reinserted_.clear();
prepared_deltas_.clear();
deletion_state_ = UNKNOWN;
+
+ if (VLOG_IS_ON(3)) {
+ string snap_to_exclude = opts_.snap_to_exclude ?
+ opts_.snap_to_exclude->ToString() : "INF";
+ VLOG(3) << "Starting batch for [" << snap_to_exclude << ","
+ << opts_.snap_to_include.ToString() << ")";
+ }
}
template<class Traits>
@@ -139,17 +116,53 @@ void DeltaPreparer<Traits>::Finish(size_t nrows) {
MaybeProcessPreviousRowChange(boost::none);
prev_prepared_idx_ = cur_prepared_idx_;
cur_prepared_idx_ += nrows;
+
+ if (VLOG_IS_ON(3)) {
+ string snap_to_exclude = opts_.snap_to_exclude ?
+ opts_.snap_to_exclude->ToString() : "INF";
+ VLOG(3) << "Finishing batch for [" << snap_to_exclude << ","
+ << opts_.snap_to_include.ToString() << ")";
+ }
}
template<class Traits>
Status DeltaPreparer<Traits>::AddDelta(const DeltaKey& key, Slice val, bool* finished_row) {
- if (!IsDeltaRelevant<Traits::kType>(opts_.snap_to_include,
- key.timestamp(), finished_row)) {
- return Status::OK();
- }
MaybeProcessPreviousRowChange(key.row_idx());
- if (prepared_flags_ & DeltaIterator::PREPARE_FOR_APPLY) {
+ VLOG(4) << "Considering delta " << key.ToString() << ": "
+ << RowChangeList(val).ToString(*opts_.projection);
+
+ // Different preparations may use different criteria for delta relevancy. Each
+ // criteria offers a short-circuit when processing of the current row is known
+ // to be finished, but that short-circuit can only be used if we're not also
+ // handling a preparation with a different criteria.
+
+ if (prepared_flags_ & DeltaIterator::PREPARE_FOR_SELECT) {
+ bool finished_row_for_select;
+ if (IsDeltaRelevantForSelect<Traits::kType>(*opts_.snap_to_exclude,
+ opts_.snap_to_include,
+ key.timestamp(),
+ &finished_row_for_select)) {
+ selected_->SetRowSelected(key.row_idx() - cur_prepared_idx_);
+ }
+
+ if (finished_row_for_select &&
+ !(prepared_flags_ & ~DeltaIterator::PREPARE_FOR_SELECT)) {
+ *finished_row = true;
+ }
+ }
+
+ // Apply and collect use the same relevancy criteria.
+ bool relevant_for_apply_or_collect = false;
+ bool finished_row_for_apply_or_collect = false;
+ if (prepared_flags_ & (DeltaIterator::PREPARE_FOR_APPLY |
+ DeltaIterator::PREPARE_FOR_COLLECT)) {
+ relevant_for_apply_or_collect = IsDeltaRelevantForApply<Traits::kType>(
+ opts_.snap_to_include, key.timestamp(), &finished_row_for_apply_or_collect);
+ }
+
+ if (prepared_flags_ & DeltaIterator::PREPARE_FOR_APPLY &&
+ relevant_for_apply_or_collect) {
RowChangeListDecoder decoder((RowChangeList(val)));
if (Traits::kInitializeDecodersWithSafetyChecks) {
RETURN_NOT_OK(decoder.Init());
@@ -194,13 +207,21 @@ Status DeltaPreparer<Traits>::AddDelta(const DeltaKey& key, Slice val, bool* fin
}
}
}
- if (prepared_flags_ & DeltaIterator::PREPARE_FOR_COLLECT) {
+
+ if (prepared_flags_ & DeltaIterator::PREPARE_FOR_COLLECT &&
+ relevant_for_apply_or_collect) {
PreparedDelta d;
d.key = key;
d.val = val;
prepared_deltas_.emplace_back(d);
}
+ if (finished_row_for_apply_or_collect &&
+ !(prepared_flags_ & ~(DeltaIterator::PREPARE_FOR_APPLY |
+ DeltaIterator::PREPARE_FOR_COLLECT))) {
+ *finished_row = true;
+ }
+
last_added_idx_ = key.row_idx();
return Status::OK();
}
@@ -245,6 +266,21 @@ Status DeltaPreparer<Traits>::ApplyDeletes(SelectionVector* sel_vec) {
}
template<class Traits>
+Status DeltaPreparer<Traits>::SelectUpdates(SelectionVector* sel_vec) {
+ DCHECK(prepared_flags_ & DeltaIterator::PREPARE_FOR_SELECT);
+ DCHECK_LE(cur_prepared_idx_ - prev_prepared_idx_, sel_vec->nrows());
+
+ // SelectUpdates() is additive: it should never exclude rows, only include them.
+ for (rowid_t idx = 0; idx < sel_vec->nrows(); idx++) {
+ if (selected_->IsRowSelected(idx)) {
+ sel_vec->SetRowSelected(idx);
+ }
+ }
+
+ return Status::OK();
+}
+
+template<class Traits>
Status DeltaPreparer<Traits>::CollectMutations(vector<Mutation*>* dst, Arena* arena) {
DCHECK(prepared_flags_ & DeltaIterator::PREPARE_FOR_COLLECT);
DCHECK_LE(cur_prepared_idx_ - prev_prepared_idx_, dst->size());
http://git-wip-us.apache.org/repos/asf/kudu/blob/9f4657ab/src/kudu/tablet/delta_store.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_store.h b/src/kudu/tablet/delta_store.h
index 955f4a7..cd359c4 100644
--- a/src/kudu/tablet/delta_store.h
+++ b/src/kudu/tablet/delta_store.h
@@ -155,6 +155,14 @@ class PreparedDeltas {
// Deltas must have been prepared with the flag PREPARE_FOR_APPLY.
virtual Status ApplyDeletes(SelectionVector* sel_vec) = 0;
+ // Updates the given selection vector to reflect the snapshotted updates.
+ //
+ // Rows which have been updated or deleted in the associated MVCC snapshot are
+ // set to 1 in the selection vector so that they show up in the output.
+ //
+ // Deltas must have been prepared with the flag PREPARE_FOR_SELECT.
+ virtual Status SelectUpdates(SelectionVector* sel_vec) = 0;
+
// Collects the mutations associated with each row in the current prepared batch.
//
// Each entry in the vector will be treated as a singly linked list of Mutation
@@ -226,7 +234,14 @@ class DeltaIterator : public PreparedDeltas {
// in the order that they were loaded from the backing store.
//
// On success, CollectMutations and FilterColumnIdsAndCollectDeltas will be callable.
- PREPARE_FOR_COLLECT = 1 << 1
+ PREPARE_FOR_COLLECT = 1 << 1,
+
+ // Prepare a batch of deltas for selecting. All deltas in the batch will be
+ // decoded, and a data structure describing which rows had deltas will be
+ // populated.
+ //
+ // On success, SelectUpdates will be callable.
+ PREPARE_FOR_SELECT = 1 << 2
};
virtual Status PrepareBatch(size_t nrows, int prepare_flags) = 0;
@@ -281,7 +296,7 @@ class DeltaPreparer : public PreparedDeltas {
// on the part of a DeltaIterator.
//
// Call at the beginning of DeltaIterator::PrepareBatch.
- void Start(int prepare_flags);
+ void Start(size_t nrows, int prepare_flags);
// Updates internal state to reflect the end of delta batch preparation on the
// part of a DeltaIterator.
@@ -305,6 +320,8 @@ class DeltaPreparer : public PreparedDeltas {
Status ApplyDeletes(SelectionVector* sel_vec) override;
+ Status SelectUpdates(SelectionVector* sel_vec) override;
+
Status CollectMutations(std::vector<Mutation*>* dst, Arena* arena) override;
Status FilterColumnIdsAndCollectDeltas(const std::vector<ColumnId>& col_ids,
@@ -359,6 +376,9 @@ class DeltaPreparer : public PreparedDeltas {
std::deque<rowid_t> reinserted_;
// The deletion state of the row last processed by AddDelta().
+ //
+ // As a row's DELETEs and REINSERTs are processed, the deletion state
+ // alternates between the values below.
enum RowDeletionState {
UNKNOWN,
DELETED,
@@ -374,6 +394,10 @@ class DeltaPreparer : public PreparedDeltas {
};
std::deque<PreparedDelta> prepared_deltas_;
+ // State when prepared_for_ & PREPARED_FOR_SELECT
+ // ------------------------------------------------------------
+ std::unique_ptr<SelectionVector> selected_;
+
DISALLOW_COPY_AND_ASSIGN(DeltaPreparer);
};
http://git-wip-us.apache.org/repos/asf/kudu/blob/9f4657ab/src/kudu/tablet/deltafile.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc
index 7ca6719..30bb01b 100644
--- a/src/kudu/tablet/deltafile.cc
+++ b/src/kudu/tablet/deltafile.cc
@@ -44,6 +44,7 @@
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/stringprintf.h"
#include "kudu/gutil/strings/substitute.h"
+#include "kudu/tablet/delta_relevancy.h"
#include "kudu/tablet/mvcc.h"
#include "kudu/tablet/rowset.h"
#include "kudu/tablet/tablet.pb.h"
@@ -285,20 +286,30 @@ Status DeltaFileReader::ReadDeltaStats() {
return Status::OK();
}
-bool DeltaFileReader::IsRelevantForSnapshot(const MvccSnapshot& snap) const {
+bool DeltaFileReader::IsRelevantForSnapshots(
+ const boost::optional<MvccSnapshot>& snap_to_exclude,
+ const MvccSnapshot& snap_to_include) const {
if (!init_once_.init_succeeded()) {
// If we're not initted, it means we have no delta stats and must
// assume that this file is relevant for every snapshot.
return true;
}
- if (delta_type_ == REDO) {
- return snap.MayHaveCommittedTransactionsAtOrAfter(delta_stats_->min_timestamp());
- }
- if (delta_type_ == UNDO) {
- return snap.MayHaveUncommittedTransactionsAtOrBefore(delta_stats_->max_timestamp());
+
+ // We don't know whether the caller's intent is to apply deltas, to select
+ // them, or both. As such, we must be conservative and assume 'both', which
+ // means the file is relevant if any relevancy criteria is true.
+ bool relevant = delta_type_ == REDO ?
+ IsDeltaRelevantForApply<REDO>(snap_to_include,
+ delta_stats_->min_timestamp()) :
+ IsDeltaRelevantForApply<UNDO>(snap_to_include,
+ delta_stats_->max_timestamp());
+ if (snap_to_exclude) {
+ // The select criteria is the same regardless of delta_type_.
+ relevant |= IsDeltaRelevantForSelect(*snap_to_exclude, snap_to_include,
+ delta_stats_->min_timestamp(),
+ delta_stats_->max_timestamp());
}
- LOG(DFATAL) << "Cannot reach here";
- return false;
+ return relevant;
}
Status DeltaFileReader::CloneForDebugging(FsManager* fs_manager,
@@ -313,7 +324,7 @@ Status DeltaFileReader::CloneForDebugging(FsManager* fs_manager,
Status DeltaFileReader::NewDeltaIterator(const RowIteratorOptions& opts,
DeltaIterator** iterator) const {
- if (IsRelevantForSnapshot(opts.snap_to_include)) {
+ if (IsRelevantForSnapshots(opts.snap_to_exclude, opts.snap_to_include)) {
if (VLOG_IS_ON(2)) {
if (!init_once_.init_succeeded()) {
TRACE_COUNTER_INCREMENT("delta_iterators_lazy_initted", 1);
@@ -429,11 +440,12 @@ Status DeltaFileIterator<Type>::SeekToOrdinal(rowid_t idx) {
// Finish the initialization of any lazily-initialized state.
RETURN_NOT_OK(dfr_->Init(preparer_.opts().io_context));
- // Check again whether this delta file is relevant given the snapshot
+ // Check again whether this delta file is relevant given the snapshots
// that we are querying. We did this already before creating the
// DeltaFileIterator, but due to lazy initialization, it's possible
// that we weren't able to check at that time.
- if (!dfr_->IsRelevantForSnapshot(preparer_.opts().snap_to_include)) {
+ if (!dfr_->IsRelevantForSnapshots(preparer_.opts().snap_to_exclude,
+ preparer_.opts().snap_to_include)) {
exhausted_ = true;
delta_blocks_.clear();
return Status::OK();
@@ -587,7 +599,7 @@ Status DeltaFileIterator<Type>::PrepareBatch(size_t nrows, int prepare_flags) {
#endif
prepared_ = true;
- preparer_.Start(prepare_flags);
+ preparer_.Start(nrows, prepare_flags);
RETURN_NOT_OK(AddDeltas(start_row, stop_row));
preparer_.Finish(nrows);
return Status::OK();
@@ -672,6 +684,11 @@ Status DeltaFileIterator<Type>::ApplyDeletes(SelectionVector* sel_vec) {
}
template<DeltaType Type>
+Status DeltaFileIterator<Type>::SelectUpdates(SelectionVector* sel_vec) {
+ return preparer_.SelectUpdates(sel_vec);
+}
+
+template<DeltaType Type>
Status DeltaFileIterator<Type>::CollectMutations(vector<Mutation*>* dst, Arena* arena) {
return preparer_.CollectMutations(dst, arena);
}
http://git-wip-us.apache.org/repos/asf/kudu/blob/9f4657ab/src/kudu/tablet/deltafile.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile.h b/src/kudu/tablet/deltafile.h
index 217dd4c..e330ce5 100644
--- a/src/kudu/tablet/deltafile.h
+++ b/src/kudu/tablet/deltafile.h
@@ -24,6 +24,7 @@
#include <string>
#include <vector>
+#include <boost/optional/optional.hpp>
#include <glog/logging.h>
#include "kudu/cfile/block_handle.h"
@@ -180,9 +181,10 @@ class DeltaFileReader : public DeltaStore,
}
// Returns true if this delta file may include any deltas which need to be
- // applied when scanning the given snapshot, or if the file has not yet
+ // applied when scanning the given snapshots, or if the file has not yet
// been fully initialized.
- bool IsRelevantForSnapshot(const MvccSnapshot& snap) const;
+ bool IsRelevantForSnapshots(const boost::optional<MvccSnapshot>& snap_to_exclude,
+ const MvccSnapshot& snap_to_include) const;
// Clone this DeltaFileReader for testing and validation purposes (such as
// while in DEBUG mode). The resulting object will not be Initted().
@@ -234,6 +236,8 @@ class DeltaFileIterator : public DeltaIterator {
Status ApplyDeletes(SelectionVector* sel_vec) override;
+ Status SelectUpdates(SelectionVector* sel_vec) override;
+
Status CollectMutations(std::vector<Mutation*>*dst, Arena* arena) override;
Status FilterColumnIdsAndCollectDeltas(const std::vector<ColumnId>& col_ids,
http://git-wip-us.apache.org/repos/asf/kudu/blob/9f4657ab/src/kudu/tablet/deltamemstore.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltamemstore.cc b/src/kudu/tablet/deltamemstore.cc
index a25c110..c2a8f0d 100644
--- a/src/kudu/tablet/deltamemstore.cc
+++ b/src/kudu/tablet/deltamemstore.cc
@@ -234,7 +234,7 @@ Status DMSIterator::PrepareBatch(size_t nrows, int prepare_flags) {
rowid_t start_row = preparer_.cur_prepared_idx();
rowid_t stop_row = start_row + nrows - 1;
- preparer_.Start(prepare_flags);
+ preparer_.Start(nrows, prepare_flags);
bool finished_row = false;
while (iter_->IsValid()) {
Slice key_slice, val;
@@ -284,6 +284,9 @@ Status DMSIterator::ApplyDeletes(SelectionVector* sel_vec) {
return preparer_.ApplyDeletes(sel_vec);
}
+Status DMSIterator::SelectUpdates(SelectionVector* sel_vec) {
+ return preparer_.SelectUpdates(sel_vec);
+}
Status DMSIterator::CollectMutations(vector<Mutation*>*dst, Arena* arena) {
return preparer_.CollectMutations(dst, arena);
http://git-wip-us.apache.org/repos/asf/kudu/blob/9f4657ab/src/kudu/tablet/deltamemstore.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltamemstore.h b/src/kudu/tablet/deltamemstore.h
index a5a92f7..3cc02a0 100644
--- a/src/kudu/tablet/deltamemstore.h
+++ b/src/kudu/tablet/deltamemstore.h
@@ -206,6 +206,8 @@ class DMSIterator : public DeltaIterator {
Status ApplyDeletes(SelectionVector* sel_vec) override;
+ Status SelectUpdates(SelectionVector* sel_vec) override;
+
Status CollectMutations(std::vector<Mutation*>* dst, Arena* arena) override;
Status FilterColumnIdsAndCollectDeltas(const std::vector<ColumnId>& col_ids,
http://git-wip-us.apache.org/repos/asf/kudu/blob/9f4657ab/src/kudu/tablet/tablet-test-util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet-test-util.cc b/src/kudu/tablet/tablet-test-util.cc
index 0575e3b..5453c30 100644
--- a/src/kudu/tablet/tablet-test-util.cc
+++ b/src/kudu/tablet/tablet-test-util.cc
@@ -21,16 +21,29 @@ namespace kudu {
namespace tablet {
template<>
-bool MirroredDeltas<DeltaTypeSelector<REDO>>::IsDeltaRelevant(
+bool MirroredDeltas<DeltaTypeSelector<REDO>>::IsDeltaRelevantForApply(
Timestamp to_include, Timestamp ts) const {
return ts < to_include;
}
template<>
-bool MirroredDeltas<DeltaTypeSelector<UNDO>>::IsDeltaRelevant(
+bool MirroredDeltas<DeltaTypeSelector<UNDO>>::IsDeltaRelevantForApply(
Timestamp to_include, Timestamp ts) const {
return ts >= to_include;
}
+template<typename T>
+bool MirroredDeltas<T>::IsDeltaRelevantForSelect(
+ Timestamp to_exclude, Timestamp to_include, Timestamp ts) const {
+ return ts >= to_exclude && ts < to_include;
+}
+
+template
+bool MirroredDeltas<DeltaTypeSelector<REDO>>::IsDeltaRelevantForSelect(
+ Timestamp to_exclude, Timestamp to_include, Timestamp ts) const;
+template
+bool MirroredDeltas<DeltaTypeSelector<UNDO>>::IsDeltaRelevantForSelect(
+ Timestamp to_exclude, Timestamp to_include, Timestamp ts) const;
+
} // namespace tablet
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/9f4657ab/src/kudu/tablet/tablet-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet-test-util.h b/src/kudu/tablet/tablet-test-util.h
index 87daee6..f43e40d 100644
--- a/src/kudu/tablet/tablet-test-util.h
+++ b/src/kudu/tablet/tablet-test-util.h
@@ -365,10 +365,15 @@ class MirroredDeltas {
}
// Returns true if all tracked deltas are irrelevant to 'ts', false otherwise.
- bool CheckAllDeltasCulled(Timestamp ts) const {
+ bool CheckAllDeltasCulled(const boost::optional<Timestamp>& lower_ts,
+ Timestamp upper_ts) const {
for (const auto& e1 : all_deltas_) {
for (const auto& e2 : e1.second) {
- if (IsDeltaRelevant(ts, e2.first)) {
+ bool relevant = IsDeltaRelevantForApply(upper_ts, e2.first);
+ if (lower_ts) {
+ relevant |= IsDeltaRelevantForSelect(*lower_ts, upper_ts, e2.first);
+ }
+ if (relevant) {
return false;
}
}
@@ -401,15 +406,44 @@ class MirroredDeltas {
//
// Rows not set in 'filter' are skipped.
//
- // Deltas not relevant to 'ts' are skipped. The set of rows considered is
- // determined by 'start_row_idx' and the number of rows in 'cb'.
- Status ApplyUpdates(const Schema& projection, Timestamp ts,
+ // Deltas not relevant to 'lower_ts' or 'upper_ts' are skipped. The set of
+ // rows considered is determined by 'start_row_idx' and the number of rows in 'cb'.
+ Status ApplyUpdates(const Schema& projection,
+ const boost::optional<Timestamp>& lower_ts, Timestamp upper_ts,
rowid_t start_row_idx, int col_idx, ColumnBlock* cb,
const SelectionVector& filter) {
+ if (VLOG_IS_ON(3)) {
+ std::string lower_ts_str = lower_ts ? lower_ts->ToString() : "INF";
+ VLOG(3) << "Begin applying for timestamps [" << lower_ts_str << ","
+ << upper_ts << ")";
+ }
for (int i = 0; i < cb->nrows(); i++) {
rowid_t row_idx = start_row_idx + i;
+
+ if (lower_ts) {
+ // First pass: establish whether this row should be applied at all.
+ bool at_least_one_delta_relevant = false;
+ for (const auto& e : all_deltas_[row_idx]) {
+ bool is_relevant = IsDeltaRelevantForSelect(*lower_ts, upper_ts, e.first);
+ if (VLOG_IS_ON(3)) {
+ RowChangeList changes(e.second);
+ VLOG(3) << "Row " << i << " ts " << e.first << " (relevant: "
+ << is_relevant << "): " << changes.ToString(*schema_);
+ }
+ if (is_relevant) {
+ at_least_one_delta_relevant = true;
+ break;
+ }
+ }
+ if (!at_least_one_delta_relevant) {
+ // Not one delta was relevant; skip the row.
+ continue;
+ }
+ }
+
+ // Second pass: apply all relevant deltas.
for (const auto& e : all_deltas_[row_idx]) {
- if (!IsDeltaRelevant(ts, e.first)) {
+ if (!IsDeltaRelevantForApply(upper_ts, e.first)) {
// No need to keep iterating; all future deltas for this row will also
// be irrelevant.
break;
@@ -438,7 +472,7 @@ class MirroredDeltas {
for (int i = 0; i < sel_vec->nrows(); i++) {
bool deleted = false;
for (const auto& e : all_deltas_[start_row_idx + i]) {
- if (!IsDeltaRelevant(ts, e.first)) {
+ if (!IsDeltaRelevantForApply(ts, e.first)) {
// No need to keep iterating; all future deltas for this row will also
// be irrelevant.
break;
@@ -455,6 +489,25 @@ class MirroredDeltas {
return Status::OK();
}
+ // Selects all rows in 'sel_vec' for which there exists a tracked delta.
+ //
+ // Deltas not relevant to 'lower_ts' or 'upper_ts' are skipped. The set of
+ // rows considered is determined by 'start_row_idx' and the number of rows in 'sel_vec'.
+ void SelectUpdates(Timestamp lower_ts, Timestamp upper_ts,
+ rowid_t start_row_idx, SelectionVector* sel_vec) {
+ for (int i = 0; i < sel_vec->nrows(); i++) {
+ for (const auto& e : all_deltas_[start_row_idx + i]) {
+ if (!IsDeltaRelevantForSelect(lower_ts, upper_ts, e.first)) {
+ // Must keep iterating; short-circuit out of the select criteria is
+ // complex and not worth using in test code.
+ continue;
+ }
+ sel_vec->SetRowSelected(i);
+ break;
+ }
+ }
+ }
+
// Transforms and writes deltas into 'deltas', a vector of "delta lists", each
// of which represents a particular row, and each entry of which is a
// Timestamp and encoded delta pair. The encoded delta is a Slice into
@@ -467,7 +520,7 @@ class MirroredDeltas {
std::vector<DeltaList>* deltas) {
for (int i = 0; i < deltas->size(); i++) {
for (const auto& e : all_deltas_[start_row_idx + i]) {
- if (!IsDeltaRelevant(ts, e.first)) {
+ if (!IsDeltaRelevantForApply(ts, e.first)) {
// No need to keep iterating; all future deltas for this row will also
// be irrelevant.
break;
@@ -513,7 +566,13 @@ class MirroredDeltas {
private:
// Returns true if 'ts' is relevant to 'to_include', false otherwise.
- bool IsDeltaRelevant(Timestamp to_include, Timestamp ts) const;
+ bool IsDeltaRelevantForApply(Timestamp to_include, Timestamp ts) const;
+
+ // Returns true if 'ts' is relevant with respect to both 'to_exclude' and
+ // 'to_include', false otherwise.
+ bool IsDeltaRelevantForSelect(Timestamp to_exclude,
+ Timestamp to_include,
+ Timestamp ts) const;
// All encoded deltas, arranged in DeltaKey order.
MirroredDeltaMap all_deltas_;
@@ -779,15 +838,23 @@ void RunDeltaFuzzTest(const DeltaStore& store,
for (int i = 0; i < kNumScans + 1; i++) {
// Pick a timestamp for the iterator. The last iteration will use a snapshot
// that includes all deltas.
- Timestamp ts;
+ Timestamp upper_ts;
+ boost::optional<Timestamp> lower_ts;
if (i < kNumScans) {
- ts = Timestamp(prng->Uniform(ts_range.second - ts_range.first) +
- ts_range.first);
+ uint64_t upper_ts_val = prng->Uniform(ts_range.second - ts_range.first) +
+ ts_range.first;
+ upper_ts = Timestamp(upper_ts_val);
+
+ // Use a lower bound in half the scans.
+ if (prng->Uniform(2)) {
+ uint64_t lower_ts_val = upper_ts_val > 0 ? prng->Uniform(upper_ts_val) : 0;
+ lower_ts = Timestamp(lower_ts_val);
+ }
} else if (T::kTag == REDO) {
- ts = Timestamp::kMax;
+ upper_ts = Timestamp::kMax;
} else {
DCHECK(T::kTag == UNDO);
- ts = Timestamp::kMin;
+ upper_ts = Timestamp::kMin;
}
// Create and initialize the iterator. If none iterator is returned, it's
@@ -796,13 +863,18 @@ void RunDeltaFuzzTest(const DeltaStore& store,
SCOPED_TRACE(strings::Substitute("Projection $0", projection.ToString()));
RowIteratorOptions opts;
opts.projection = &projection;
- opts.snap_to_include = MvccSnapshot(ts);
- SCOPED_TRACE(strings::Substitute("Timestamp $0", ts.ToString()));
+ if (lower_ts) {
+ opts.snap_to_exclude = MvccSnapshot(*lower_ts);
+ }
+ opts.snap_to_include = MvccSnapshot(upper_ts);
+ SCOPED_TRACE(strings::Substitute("Timestamps: [$0,$1)",
+ lower_ts ? lower_ts->ToString() : "INF",
+ upper_ts.ToString()));
DeltaIterator* raw_iter;
Status s = store.NewDeltaIterator(opts, &raw_iter);
if (s.IsNotFound()) {
ASSERT_STR_CONTAINS(s.ToString(), "MvccSnapshot outside the range of this delta");
- ASSERT_TRUE(mirror->CheckAllDeltasCulled(ts));
+ ASSERT_TRUE(mirror->CheckAllDeltasCulled(lower_ts, upper_ts));
continue;
}
ASSERT_OK(s);
@@ -816,20 +888,39 @@ void RunDeltaFuzzTest(const DeltaStore& store,
int batch_size = prng->Uniform(kMaxBatchSize) + 1;
SCOPED_TRACE(strings::Substitute("batch starting at $0 ($1 rows)",
start_row_idx, batch_size));
- ASSERT_OK(iter->PrepareBatch(batch_size, DeltaIterator::PREPARE_FOR_APPLY |
- DeltaIterator::PREPARE_FOR_COLLECT));
+ int prepare_flags = DeltaIterator::PREPARE_FOR_APPLY |
+ DeltaIterator::PREPARE_FOR_COLLECT;
+ if (lower_ts) {
+ prepare_flags |= DeltaIterator::PREPARE_FOR_SELECT;
+ }
+ ASSERT_OK(iter->PrepareBatch(batch_size, prepare_flags));
+
+ // Test SelectUpdates: the selection vector begins all false and a row is
+ // set if there is at least one relevant update for it.
+ //
+ // Note: we retain 'actual_selected' for use as a possible filter in the
+ // ApplyUpdates test below.
+ SelectionVector actual_selected(batch_size);
+ if (lower_ts) {
+ SelectionVector expected_selected(batch_size);
+ expected_selected.SetAllFalse();
+ actual_selected.SetAllFalse();
+ mirror->SelectUpdates(*lower_ts, upper_ts, start_row_idx, &expected_selected);
+ ASSERT_OK(iter->SelectUpdates(&actual_selected));
+ ASSERT_EQ(expected_selected, actual_selected);
+ }
// Test ApplyDeletes: the selection vector is all true and a row is unset
// if the last relevant update deleted it.
//
- // Note: we retain 'actual_deleted' for use as a filter in the
+ // Note: we retain 'actual_deleted' for use as a possible filter in the
// ApplyUpdates test below.
SelectionVector actual_deleted(batch_size);
{
SelectionVector expected_deleted(batch_size);
expected_deleted.SetAllTrue();
actual_deleted.SetAllTrue();
- ASSERT_OK(mirror->ApplyDeletes(ts, start_row_idx, &expected_deleted));
+ ASSERT_OK(mirror->ApplyDeletes(upper_ts, start_row_idx, &expected_deleted));
ASSERT_OK(iter->ApplyDeletes(&actual_deleted));
ASSERT_EQ(expected_deleted, actual_deleted);
}
@@ -843,9 +934,10 @@ void RunDeltaFuzzTest(const DeltaStore& store,
expected_scb[k] = 0;
actual_scb[k] = 0;
}
- ASSERT_OK(mirror->ApplyUpdates(*opts.projection, ts, start_row_idx, j,
- &expected_scb, actual_deleted));
- ASSERT_OK(iter->ApplyUpdates(j, &actual_scb, actual_deleted));
+ const SelectionVector& filter = lower_ts ? actual_selected : actual_deleted;
+ ASSERT_OK(mirror->ApplyUpdates(*opts.projection, lower_ts, upper_ts,
+ start_row_idx, j, &expected_scb, filter));
+ ASSERT_OK(iter->ApplyUpdates(j, &actual_scb, filter));
ASSERT_EQ(expected_scb, actual_scb)
<< "Expected column block: " << expected_scb.ToString()
<< "\nActual column block: " << actual_scb.ToString();
@@ -857,7 +949,7 @@ void RunDeltaFuzzTest(const DeltaStore& store,
std::vector<typename MirroredDeltas<T>::DeltaList> expected_muts(batch_size);
std::vector<Mutation*> actual_muts(batch_size);
ASSERT_OK(iter->CollectMutations(&actual_muts, &arena));
- mirror->CollectMutations(ts, start_row_idx, &expected_muts);
+ mirror->CollectMutations(upper_ts, start_row_idx, &expected_muts);
for (int i = 0; i < expected_muts.size(); i++) {
const auto& expected = expected_muts[i];
auto* actual = actual_muts[i];