You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2018/12/07 03:47:52 UTC

[3/5] kudu git commit: (03/05) delta_store: support iteration with snap_to_exclude

(03/05) delta_store: support iteration with snap_to_exclude

This patch changes the delta stores (DMS and delta files) to respect
snap_to_exclude during iteration. The key changes are:
- The introduction of the "selection" criteria, a new delta relevancy
  formula for determining whether a delta applies to a scan with both
  snap_to_exclude and snap_to_include. The existing "application" criteria
  was formalized and moved into delta_relevancy.h. There was also a
  non-trivial change to DeltaFileReader::IsRelevantForSnapshot() to use both
  criterias when culling entire delta files.
- A new SelectUpdates() method for using the selection criteria on a batch
  of prepared deltas. SelectUpdates() requires new in-memory state, the
  creation of which is gated behind a new PREPARE_FOR_SELECT flag so as not
  to affect regular scans.
- Updates to the delta fuzz testing logic to test iterators with two
  timestamps, and to provide SelectUpdates() coverage.

A future patch will modify DeltaApplier to use SelectUpdates for diff scans.

Change-Id: I7811e185fef270f40fdbbb38f491eee8b4aa043c
Reviewed-on: http://gerrit.cloudera.org:8080/11858
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/9f4657ab
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/9f4657ab
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/9f4657ab

Branch: refs/heads/master
Commit: 9f4657ab56366df541eddce12850726038d95fd0
Parents: d927675
Author: Adar Dembo <ad...@cloudera.com>
Authored: Thu Nov 1 17:07:18 2018 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Fri Dec 7 03:45:13 2018 +0000

----------------------------------------------------------------------
 src/kudu/tablet/delta_iterator_merger.cc |   7 +
 src/kudu/tablet/delta_iterator_merger.h  |   2 +
 src/kudu/tablet/delta_relevancy.h        | 181 ++++++++++++++++++++++++++
 src/kudu/tablet/delta_store.cc           | 132 ++++++++++++-------
 src/kudu/tablet/delta_store.h            |  28 +++-
 src/kudu/tablet/deltafile.cc             |  41 ++++--
 src/kudu/tablet/deltafile.h              |   8 +-
 src/kudu/tablet/deltamemstore.cc         |   5 +-
 src/kudu/tablet/deltamemstore.h          |   2 +
 src/kudu/tablet/tablet-test-util.cc      |  17 ++-
 src/kudu/tablet/tablet-test-util.h       | 142 ++++++++++++++++----
 11 files changed, 473 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/9f4657ab/src/kudu/tablet/delta_iterator_merger.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_iterator_merger.cc b/src/kudu/tablet/delta_iterator_merger.cc
index f6f5840..58d14da 100644
--- a/src/kudu/tablet/delta_iterator_merger.cc
+++ b/src/kudu/tablet/delta_iterator_merger.cc
@@ -79,6 +79,13 @@ Status DeltaIteratorMerger::ApplyDeletes(SelectionVector* sel_vec) {
   return Status::OK();
 }
 
+Status DeltaIteratorMerger::SelectUpdates(SelectionVector* sel_vec) {
+  for (const unique_ptr<DeltaIterator>& iter : iters_) {
+    RETURN_NOT_OK(iter->SelectUpdates(sel_vec));
+  }
+  return Status::OK();
+}
+
 Status DeltaIteratorMerger::CollectMutations(vector<Mutation*>* dst, Arena* arena) {
   for (const unique_ptr<DeltaIterator> &iter : iters_) {
     RETURN_NOT_OK(iter->CollectMutations(dst, arena));

http://git-wip-us.apache.org/repos/asf/kudu/blob/9f4657ab/src/kudu/tablet/delta_iterator_merger.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_iterator_merger.h b/src/kudu/tablet/delta_iterator_merger.h
index a8efeb6..c5a36e8 100644
--- a/src/kudu/tablet/delta_iterator_merger.h
+++ b/src/kudu/tablet/delta_iterator_merger.h
@@ -67,6 +67,8 @@ class DeltaIteratorMerger : public DeltaIterator {
 
   Status ApplyDeletes(SelectionVector* sel_vec) override;
 
+  Status SelectUpdates(SelectionVector* sel_vec) override;
+
   Status CollectMutations(std::vector<Mutation*>* dst, Arena* arena) override;
 
   Status FilterColumnIdsAndCollectDeltas(const std::vector<ColumnId>& col_ids,

http://git-wip-us.apache.org/repos/asf/kudu/blob/9f4657ab/src/kudu/tablet/delta_relevancy.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_relevancy.h b/src/kudu/tablet/delta_relevancy.h
new file mode 100644
index 0000000..69640ac
--- /dev/null
+++ b/src/kudu/tablet/delta_relevancy.h
@@ -0,0 +1,181 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "kudu/common/timestamp.h"
+#include "kudu/tablet/delta_key.h"
+#include "kudu/tablet/mvcc.h"
+
+namespace kudu {
+namespace tablet {
+
+// Functions that evaluate the relevancy of deltas against a snapshot or pair
+// of snapshots.
+//
+// When constructing an iterator tree, one must specify either one or two MVCC
+// snapshots. The first always serves as an upper bound, ensuring that the data
+// may be further mutated by writers without affecting the results of the
+// iteration. If present, the second serves as a lower bound, excluding any
+// deltas that were committed before it in time.
+//
+// Together, the snapshots affect the behavior of deltas encountered during the
+// iteration. Various criteria is used to determine whether a particular delta
+// is relevant to the iteration; which criteria is used depends on the context.
+//
+// Selection criteria
+// ==================
+// The selection (or "select") criteria determines whether the particular delta
+// should cause its row to be included in the iteration. It's only applicable to
+// two snapshots. In detail, regardless of delta type, if the delta's timestamp
+// is "between" the two snapshots (i.e. not committed in the lower bound
+// snapshot but committed in the upper bound snapshot), its row should be included.
+//
+// Application criteria
+// ====================
+// The application (or "apply") criteria determines whether the logical contents
+// of the UPDATE, DELETE, or REINSERT in a particular delta should be applied to
+// a row block during a scan. It only considers the first snapshot. In detail:
+// - For REDO deltas: if the delta's timestamp is committed in the snapshot,
+//   the mutation should be applied.
+// - For UNDO deltas: if the delta's timestamp is not committed in the snapshot,
+//   the mutation should be applied.
+
+// Returns whether a delta at 'delta_ts' is relevant under the apply criteria to 'snap'.
+template<DeltaType Type>
+inline bool IsDeltaRelevantForApply(const MvccSnapshot& snap,
+                                    const Timestamp& delta_ts) {
+  bool ignored;
+  return IsDeltaRelevantForApply<Type>(snap, delta_ts, &ignored);
+}
+
+// A variant of IsDeltaRelevantForApply that, if the delta is not relevant,
+// further checks whether any remaining deltas for this row can be skipped; this
+// is an optimization and not necessary for correctness.
+template<DeltaType Type>
+inline bool IsDeltaRelevantForApply(const MvccSnapshot& snap,
+                                    const Timestamp& delta_ts,
+                                    bool* finished_row);
+
+template<>
+inline bool IsDeltaRelevantForApply<REDO>(const MvccSnapshot& snap,
+                                          const Timestamp& delta_ts,
+                                          bool* finished_row) {
+  *finished_row = false;
+  if (snap.IsCommitted(delta_ts)) {
+    return true;
+  }
+  if (!snap.MayHaveCommittedTransactionsAtOrAfter(delta_ts)) {
+    // REDO deltas are sorted first in ascending row ordinal order, then in
+    // ascending timestamp order. Thus, if we know that there are no more
+    // committed transactions whose timestamps are >= 'delta_ts', we know that
+    // any future deltas belonging to this row aren't relevant (as per the apply
+    // criteria, REDOs are relevant if they are committed in the snapshot), and
+    // we can skip to the next row.
+    *finished_row = true;
+  }
+  return false;
+}
+
+template<>
+inline bool IsDeltaRelevantForApply<UNDO>(const MvccSnapshot& snap,
+                                          const Timestamp& delta_ts,
+                                          bool* finished_row) {
+  *finished_row = false;
+  if (!snap.IsCommitted(delta_ts)) {
+    return true;
+  }
+  if (!snap.MayHaveUncommittedTransactionsAtOrBefore(delta_ts)) {
+    // UNDO deltas are sorted first in ascending row ordinal order, then in
+    // descending timestamp order. Thus, if we know that there are no more
+    // uncommitted transactions whose timestamps are <= 'delta_ts', we know that
+    // any future deltas belonging to this row aren't relevant (as per the apply
+    // criteria, UNDOs are relevant if they are uncommitted in the snapshot),
+    // and we can skip to the next row.
+    *finished_row = true;
+  }
+  return false;
+}
+
+// Returns whether deltas within the time range 'delta_ts_start' to
+// 'delta_ts_end' are relevant under the select criteria to 'snap_start' and 'snap_end'.
+inline bool IsDeltaRelevantForSelect(const MvccSnapshot& snap_start,
+                                     const MvccSnapshot& snap_end,
+                                     const Timestamp& delta_ts_start,
+                                     const Timestamp& delta_ts_end) {
+  return !snap_start.IsCommitted(delta_ts_end) &&
+      snap_end.IsCommitted(delta_ts_start);
+}
+
+// A variant of IsDeltaRelevantForSelect that operates on a single delta's
+// timestamp given by 'delta_ts', and if the delta is not relevant, further
+// checks whether any remaining deltas for this row can be skipped; this is an
+// optimization and not necessary for correctness.
+template<DeltaType Type>
+inline bool IsDeltaRelevantForSelect(const MvccSnapshot& snap_start,
+                                     const MvccSnapshot& snap_end,
+                                     const Timestamp& delta_ts,
+                                     bool* finished_row);
+
+template<>
+inline bool IsDeltaRelevantForSelect<REDO>(const MvccSnapshot& snap_start,
+                                           const MvccSnapshot& snap_end,
+                                           const Timestamp& delta_ts,
+                                           bool* finished_row) {
+  *finished_row = false;
+  if (snap_start.IsCommitted(delta_ts)) {
+    // No short-circuit available here; because REDO deltas for a given row are
+    // sorted in ascending timestamp order, the next REDO may be uncommitted in
+    // 'snap_start'.
+    return false;
+  }
+  if (!snap_end.IsCommitted(delta_ts)) {
+    if (!snap_end.MayHaveCommittedTransactionsAtOrAfter(delta_ts)) {
+      // But if 'delta_ts' is not committed in 'snap_end', all future REDOs may
+      // also be uncommitted in 'snap_end'.
+      *finished_row = true;
+    }
+    return false;
+  }
+  return true;
+}
+
+template<>
+inline bool IsDeltaRelevantForSelect<UNDO>(const MvccSnapshot& snap_start,
+                                           const MvccSnapshot& snap_end,
+                                           const Timestamp& delta_ts,
+                                           bool* finished_row) {
+  *finished_row = false;
+  if (!snap_end.IsCommitted(delta_ts)) {
+    // No short-circuit available here; because UNDO deltas for a given row are
+    // sorted in descending timestamp order, the next UNDO may be committed in
+    // 'snap_end'.
+    return false;
+  }
+  if (snap_start.IsCommitted(delta_ts)) {
+    if (!snap_start.MayHaveUncommittedTransactionsAtOrBefore(delta_ts)) {
+      // But if 'delta_ts' is committed in 'snap_start', all future UNDOs may
+      // also be committed in 'snap_start'.
+      *finished_row = true;
+    }
+    return false;
+  }
+  return true;
+}
+
+} // namespace tablet
+} // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/9f4657ab/src/kudu/tablet/delta_store.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_store.cc b/src/kudu/tablet/delta_store.cc
index bb7a91f..4f97476 100644
--- a/src/kudu/tablet/delta_store.cc
+++ b/src/kudu/tablet/delta_store.cc
@@ -35,6 +35,7 @@
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/tablet/delta_relevancy.h"
 #include "kudu/tablet/delta_stats.h"
 #include "kudu/tablet/deltafile.h"
 #include "kudu/tablet/mutation.h"
@@ -51,47 +52,6 @@ using std::string;
 using std::vector;
 using strings::Substitute;
 
-namespace {
-
-// Returns whether a mutation at 'ts' is relevant under 'snap'.
-//
-// If not relevant, further checks whether any remaining deltas for this row can
-// be skipped; this is an optimization and not necessary for correctness.
-template<DeltaType Type>
-bool IsDeltaRelevant(const MvccSnapshot& snap,
-                     const Timestamp& ts,
-                     bool* finished_row);
-
-template<>
-bool IsDeltaRelevant<REDO>(const MvccSnapshot& snap,
-                           const Timestamp& ts,
-                           bool* finished_row) {
-  *finished_row = false;
-  if (!snap.IsCommitted(ts)) {
-    if (!snap.MayHaveCommittedTransactionsAtOrAfter(ts)) {
-      *finished_row = true;
-    }
-    return false;
-  }
-  return true;
-}
-
-template<>
-bool IsDeltaRelevant<UNDO>(const MvccSnapshot& snap,
-                           const Timestamp& ts,
-                           bool* finished_row) {
-  *finished_row = false;
-  if (snap.IsCommitted(ts)) {
-    if (!snap.MayHaveUncommittedTransactionsAtOrBefore(ts)) {
-      *finished_row = true;
-    }
-    return false;
-  }
-  return true;
-}
-
-} // anonymous namespace
-
 string DeltaKeyAndUpdate::Stringify(DeltaType type, const Schema& schema, bool pad_key) const {
   return StrCat(Substitute("($0 delta key=$2, change_list=$1)",
                            DeltaType_Name(type),
@@ -119,8 +79,18 @@ void DeltaPreparer<Traits>::Seek(rowid_t row_idx) {
 }
 
 template<class Traits>
-void DeltaPreparer<Traits>::Start(int prepare_flags) {
+void DeltaPreparer<Traits>::Start(size_t nrows, int prepare_flags) {
   DCHECK_NE(prepare_flags, DeltaIterator::PREPARE_NONE);
+
+  if (prepare_flags & DeltaIterator::PREPARE_FOR_SELECT) {
+    DCHECK(opts_.snap_to_exclude);
+
+    // Ensure we have a selection vector at least 'nrows' long.
+    if (!selected_ || selected_->nrows() < nrows) {
+      selected_.reset(new SelectionVector(nrows));
+    }
+    selected_->SetAllFalse();
+  }
   prepared_flags_ = prepare_flags;
   if (updates_by_col_.empty()) {
     updates_by_col_.resize(opts_.projection->num_columns());
@@ -132,6 +102,13 @@ void DeltaPreparer<Traits>::Start(int prepare_flags) {
   reinserted_.clear();
   prepared_deltas_.clear();
   deletion_state_ = UNKNOWN;
+
+  if (VLOG_IS_ON(3)) {
+    string snap_to_exclude = opts_.snap_to_exclude ?
+                             opts_.snap_to_exclude->ToString() : "INF";
+    VLOG(3) << "Starting batch for [" << snap_to_exclude << ","
+            << opts_.snap_to_include.ToString() << ")";
+  }
 }
 
 template<class Traits>
@@ -139,17 +116,53 @@ void DeltaPreparer<Traits>::Finish(size_t nrows) {
   MaybeProcessPreviousRowChange(boost::none);
   prev_prepared_idx_ = cur_prepared_idx_;
   cur_prepared_idx_ += nrows;
+
+  if (VLOG_IS_ON(3)) {
+    string snap_to_exclude = opts_.snap_to_exclude ?
+                             opts_.snap_to_exclude->ToString() : "INF";
+    VLOG(3) << "Finishing batch for [" << snap_to_exclude << ","
+            << opts_.snap_to_include.ToString() << ")";
+  }
 }
 
 template<class Traits>
 Status DeltaPreparer<Traits>::AddDelta(const DeltaKey& key, Slice val, bool* finished_row) {
-  if (!IsDeltaRelevant<Traits::kType>(opts_.snap_to_include,
-                                      key.timestamp(), finished_row)) {
-    return Status::OK();
-  }
   MaybeProcessPreviousRowChange(key.row_idx());
 
-  if (prepared_flags_ & DeltaIterator::PREPARE_FOR_APPLY) {
+  VLOG(4) << "Considering delta " << key.ToString() << ": "
+          << RowChangeList(val).ToString(*opts_.projection);
+
+  // Different preparations may use different criteria for delta relevancy. Each
+  // criteria offers a short-circuit when processing of the current row is known
+  // to be finished, but that short-circuit can only be used if we're not also
+  // handling a preparation with a different criteria.
+
+  if (prepared_flags_ & DeltaIterator::PREPARE_FOR_SELECT) {
+    bool finished_row_for_select;
+    if (IsDeltaRelevantForSelect<Traits::kType>(*opts_.snap_to_exclude,
+                                                opts_.snap_to_include,
+                                                key.timestamp(),
+                                                &finished_row_for_select)) {
+      selected_->SetRowSelected(key.row_idx() - cur_prepared_idx_);
+    }
+
+    if (finished_row_for_select &&
+        !(prepared_flags_ & ~DeltaIterator::PREPARE_FOR_SELECT)) {
+      *finished_row = true;
+    }
+  }
+
+  // Apply and collect use the same relevancy criteria.
+  bool relevant_for_apply_or_collect = false;
+  bool finished_row_for_apply_or_collect = false;
+  if (prepared_flags_ & (DeltaIterator::PREPARE_FOR_APPLY |
+                         DeltaIterator::PREPARE_FOR_COLLECT)) {
+    relevant_for_apply_or_collect = IsDeltaRelevantForApply<Traits::kType>(
+        opts_.snap_to_include, key.timestamp(), &finished_row_for_apply_or_collect);
+  }
+
+  if (prepared_flags_ & DeltaIterator::PREPARE_FOR_APPLY &&
+      relevant_for_apply_or_collect) {
     RowChangeListDecoder decoder((RowChangeList(val)));
     if (Traits::kInitializeDecodersWithSafetyChecks) {
       RETURN_NOT_OK(decoder.Init());
@@ -194,13 +207,21 @@ Status DeltaPreparer<Traits>::AddDelta(const DeltaKey& key, Slice val, bool* fin
       }
     }
   }
-  if (prepared_flags_ & DeltaIterator::PREPARE_FOR_COLLECT) {
+
+  if (prepared_flags_ & DeltaIterator::PREPARE_FOR_COLLECT &&
+      relevant_for_apply_or_collect) {
     PreparedDelta d;
     d.key = key;
     d.val = val;
     prepared_deltas_.emplace_back(d);
   }
 
+  if (finished_row_for_apply_or_collect &&
+      !(prepared_flags_ & ~(DeltaIterator::PREPARE_FOR_APPLY |
+                            DeltaIterator::PREPARE_FOR_COLLECT))) {
+    *finished_row = true;
+  }
+
   last_added_idx_ = key.row_idx();
   return Status::OK();
 }
@@ -245,6 +266,21 @@ Status DeltaPreparer<Traits>::ApplyDeletes(SelectionVector* sel_vec) {
 }
 
 template<class Traits>
+Status DeltaPreparer<Traits>::SelectUpdates(SelectionVector* sel_vec) {
+  DCHECK(prepared_flags_ & DeltaIterator::PREPARE_FOR_SELECT);
+  DCHECK_LE(cur_prepared_idx_ - prev_prepared_idx_, sel_vec->nrows());
+
+  // SelectUpdates() is additive: it should never exclude rows, only include them.
+  for (rowid_t idx = 0; idx < sel_vec->nrows(); idx++) {
+    if (selected_->IsRowSelected(idx)) {
+      sel_vec->SetRowSelected(idx);
+    }
+  }
+
+  return Status::OK();
+}
+
+template<class Traits>
 Status DeltaPreparer<Traits>::CollectMutations(vector<Mutation*>* dst, Arena* arena) {
   DCHECK(prepared_flags_ & DeltaIterator::PREPARE_FOR_COLLECT);
   DCHECK_LE(cur_prepared_idx_ - prev_prepared_idx_, dst->size());

http://git-wip-us.apache.org/repos/asf/kudu/blob/9f4657ab/src/kudu/tablet/delta_store.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/delta_store.h b/src/kudu/tablet/delta_store.h
index 955f4a7..cd359c4 100644
--- a/src/kudu/tablet/delta_store.h
+++ b/src/kudu/tablet/delta_store.h
@@ -155,6 +155,14 @@ class PreparedDeltas {
   // Deltas must have been prepared with the flag PREPARE_FOR_APPLY.
   virtual Status ApplyDeletes(SelectionVector* sel_vec) = 0;
 
+  // Updates the given selection vector to reflect the snapshotted updates.
+  //
+  // Rows which have been updated or deleted in the associated MVCC snapshot are
+  // set to 1 in the selection vector so that they show up in the output.
+  //
+  // Deltas must have been prepared with the flag PREPARE_FOR_SELECT.
+  virtual Status SelectUpdates(SelectionVector* sel_vec) = 0;
+
   // Collects the mutations associated with each row in the current prepared batch.
   //
   // Each entry in the vector will be treated as a singly linked list of Mutation
@@ -226,7 +234,14 @@ class DeltaIterator : public PreparedDeltas {
     // in the order that they were loaded from the backing store.
     //
     // On success, CollectMutations and FilterColumnIdsAndCollectDeltas will be callable.
-    PREPARE_FOR_COLLECT = 1 << 1
+    PREPARE_FOR_COLLECT = 1 << 1,
+
+    // Prepare a batch of deltas for selecting. All deltas in the batch will be
+    // decoded, and a data structure describing which rows had deltas will be
+    // populated.
+    //
+    // On success, SelectUpdates will be callable.
+    PREPARE_FOR_SELECT = 1 << 2
   };
   virtual Status PrepareBatch(size_t nrows, int prepare_flags) = 0;
 
@@ -281,7 +296,7 @@ class DeltaPreparer : public PreparedDeltas {
   // on the part of a DeltaIterator.
   //
   // Call at the beginning of DeltaIterator::PrepareBatch.
-  void Start(int prepare_flags);
+  void Start(size_t nrows, int prepare_flags);
 
   // Updates internal state to reflect the end of delta batch preparation on the
   // part of a DeltaIterator.
@@ -305,6 +320,8 @@ class DeltaPreparer : public PreparedDeltas {
 
   Status ApplyDeletes(SelectionVector* sel_vec) override;
 
+  Status SelectUpdates(SelectionVector* sel_vec) override;
+
   Status CollectMutations(std::vector<Mutation*>* dst, Arena* arena) override;
 
   Status FilterColumnIdsAndCollectDeltas(const std::vector<ColumnId>& col_ids,
@@ -359,6 +376,9 @@ class DeltaPreparer : public PreparedDeltas {
   std::deque<rowid_t> reinserted_;
 
   // The deletion state of the row last processed by AddDelta().
+  //
+  // As a row's DELETEs and REINSERTs are processed, the deletion state
+  // alternates between the values below.
   enum RowDeletionState {
     UNKNOWN,
     DELETED,
@@ -374,6 +394,10 @@ class DeltaPreparer : public PreparedDeltas {
   };
   std::deque<PreparedDelta> prepared_deltas_;
 
+  // State when prepared_for_ & PREPARED_FOR_SELECT
+  // ------------------------------------------------------------
+  std::unique_ptr<SelectionVector> selected_;
+
   DISALLOW_COPY_AND_ASSIGN(DeltaPreparer);
 };
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/9f4657ab/src/kudu/tablet/deltafile.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc
index 7ca6719..30bb01b 100644
--- a/src/kudu/tablet/deltafile.cc
+++ b/src/kudu/tablet/deltafile.cc
@@ -44,6 +44,7 @@
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/stringprintf.h"
 #include "kudu/gutil/strings/substitute.h"
+#include "kudu/tablet/delta_relevancy.h"
 #include "kudu/tablet/mvcc.h"
 #include "kudu/tablet/rowset.h"
 #include "kudu/tablet/tablet.pb.h"
@@ -285,20 +286,30 @@ Status DeltaFileReader::ReadDeltaStats() {
   return Status::OK();
 }
 
-bool DeltaFileReader::IsRelevantForSnapshot(const MvccSnapshot& snap) const {
+bool DeltaFileReader::IsRelevantForSnapshots(
+    const boost::optional<MvccSnapshot>& snap_to_exclude,
+    const MvccSnapshot& snap_to_include) const {
   if (!init_once_.init_succeeded()) {
     // If we're not initted, it means we have no delta stats and must
     // assume that this file is relevant for every snapshot.
     return true;
   }
-  if (delta_type_ == REDO) {
-    return snap.MayHaveCommittedTransactionsAtOrAfter(delta_stats_->min_timestamp());
-  }
-  if (delta_type_ == UNDO) {
-    return snap.MayHaveUncommittedTransactionsAtOrBefore(delta_stats_->max_timestamp());
+
+  // We don't know whether the caller's intent is to apply deltas, to select
+  // them, or both. As such, we must be conservative and assume 'both', which
+  // means the file is relevant if any relevancy criteria is true.
+  bool relevant = delta_type_ == REDO ?
+                  IsDeltaRelevantForApply<REDO>(snap_to_include,
+                                                delta_stats_->min_timestamp()) :
+                  IsDeltaRelevantForApply<UNDO>(snap_to_include,
+                                                delta_stats_->max_timestamp());
+  if (snap_to_exclude) {
+    // The select criteria is the same regardless of delta_type_.
+    relevant |= IsDeltaRelevantForSelect(*snap_to_exclude, snap_to_include,
+                                         delta_stats_->min_timestamp(),
+                                         delta_stats_->max_timestamp());
   }
-  LOG(DFATAL) << "Cannot reach here";
-  return false;
+  return relevant;
 }
 
 Status DeltaFileReader::CloneForDebugging(FsManager* fs_manager,
@@ -313,7 +324,7 @@ Status DeltaFileReader::CloneForDebugging(FsManager* fs_manager,
 
 Status DeltaFileReader::NewDeltaIterator(const RowIteratorOptions& opts,
                                          DeltaIterator** iterator) const {
-  if (IsRelevantForSnapshot(opts.snap_to_include)) {
+  if (IsRelevantForSnapshots(opts.snap_to_exclude, opts.snap_to_include)) {
     if (VLOG_IS_ON(2)) {
       if (!init_once_.init_succeeded()) {
         TRACE_COUNTER_INCREMENT("delta_iterators_lazy_initted", 1);
@@ -429,11 +440,12 @@ Status DeltaFileIterator<Type>::SeekToOrdinal(rowid_t idx) {
   // Finish the initialization of any lazily-initialized state.
   RETURN_NOT_OK(dfr_->Init(preparer_.opts().io_context));
 
-  // Check again whether this delta file is relevant given the snapshot
+  // Check again whether this delta file is relevant given the snapshots
   // that we are querying. We did this already before creating the
   // DeltaFileIterator, but due to lazy initialization, it's possible
   // that we weren't able to check at that time.
-  if (!dfr_->IsRelevantForSnapshot(preparer_.opts().snap_to_include)) {
+  if (!dfr_->IsRelevantForSnapshots(preparer_.opts().snap_to_exclude,
+                                    preparer_.opts().snap_to_include)) {
     exhausted_ = true;
     delta_blocks_.clear();
     return Status::OK();
@@ -587,7 +599,7 @@ Status DeltaFileIterator<Type>::PrepareBatch(size_t nrows, int prepare_flags) {
   #endif
   prepared_ = true;
 
-  preparer_.Start(prepare_flags);
+  preparer_.Start(nrows, prepare_flags);
   RETURN_NOT_OK(AddDeltas(start_row, stop_row));
   preparer_.Finish(nrows);
   return Status::OK();
@@ -672,6 +684,11 @@ Status DeltaFileIterator<Type>::ApplyDeletes(SelectionVector* sel_vec) {
 }
 
 template<DeltaType Type>
+Status DeltaFileIterator<Type>::SelectUpdates(SelectionVector* sel_vec) {
+  return preparer_.SelectUpdates(sel_vec);
+}
+
+template<DeltaType Type>
 Status DeltaFileIterator<Type>::CollectMutations(vector<Mutation*>* dst, Arena* arena) {
   return preparer_.CollectMutations(dst, arena);
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/9f4657ab/src/kudu/tablet/deltafile.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltafile.h b/src/kudu/tablet/deltafile.h
index 217dd4c..e330ce5 100644
--- a/src/kudu/tablet/deltafile.h
+++ b/src/kudu/tablet/deltafile.h
@@ -24,6 +24,7 @@
 #include <string>
 #include <vector>
 
+#include <boost/optional/optional.hpp>
 #include <glog/logging.h>
 
 #include "kudu/cfile/block_handle.h"
@@ -180,9 +181,10 @@ class DeltaFileReader : public DeltaStore,
   }
 
   // Returns true if this delta file may include any deltas which need to be
-  // applied when scanning the given snapshot, or if the file has not yet
+  // applied when scanning the given snapshots, or if the file has not yet
   // been fully initialized.
-  bool IsRelevantForSnapshot(const MvccSnapshot& snap) const;
+  bool IsRelevantForSnapshots(const boost::optional<MvccSnapshot>& snap_to_exclude,
+                              const MvccSnapshot& snap_to_include) const;
 
   // Clone this DeltaFileReader for testing and validation purposes (such as
   // while in DEBUG mode). The resulting object will not be Initted().
@@ -234,6 +236,8 @@ class DeltaFileIterator : public DeltaIterator {
 
   Status ApplyDeletes(SelectionVector* sel_vec) override;
 
+  Status SelectUpdates(SelectionVector* sel_vec) override;
+
   Status CollectMutations(std::vector<Mutation*>*dst, Arena* arena) override;
 
   Status FilterColumnIdsAndCollectDeltas(const std::vector<ColumnId>& col_ids,

http://git-wip-us.apache.org/repos/asf/kudu/blob/9f4657ab/src/kudu/tablet/deltamemstore.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltamemstore.cc b/src/kudu/tablet/deltamemstore.cc
index a25c110..c2a8f0d 100644
--- a/src/kudu/tablet/deltamemstore.cc
+++ b/src/kudu/tablet/deltamemstore.cc
@@ -234,7 +234,7 @@ Status DMSIterator::PrepareBatch(size_t nrows, int prepare_flags) {
   rowid_t start_row = preparer_.cur_prepared_idx();
   rowid_t stop_row = start_row + nrows - 1;
 
-  preparer_.Start(prepare_flags);
+  preparer_.Start(nrows, prepare_flags);
   bool finished_row = false;
   while (iter_->IsValid()) {
     Slice key_slice, val;
@@ -284,6 +284,9 @@ Status DMSIterator::ApplyDeletes(SelectionVector* sel_vec) {
   return preparer_.ApplyDeletes(sel_vec);
 }
 
+Status DMSIterator::SelectUpdates(SelectionVector* sel_vec) {
+  return preparer_.SelectUpdates(sel_vec);
+}
 
 Status DMSIterator::CollectMutations(vector<Mutation*>*dst, Arena* arena) {
   return preparer_.CollectMutations(dst, arena);

http://git-wip-us.apache.org/repos/asf/kudu/blob/9f4657ab/src/kudu/tablet/deltamemstore.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/deltamemstore.h b/src/kudu/tablet/deltamemstore.h
index a5a92f7..3cc02a0 100644
--- a/src/kudu/tablet/deltamemstore.h
+++ b/src/kudu/tablet/deltamemstore.h
@@ -206,6 +206,8 @@ class DMSIterator : public DeltaIterator {
 
   Status ApplyDeletes(SelectionVector* sel_vec) override;
 
+  Status SelectUpdates(SelectionVector* sel_vec) override;
+
   Status CollectMutations(std::vector<Mutation*>* dst, Arena* arena) override;
 
   Status FilterColumnIdsAndCollectDeltas(const std::vector<ColumnId>& col_ids,

http://git-wip-us.apache.org/repos/asf/kudu/blob/9f4657ab/src/kudu/tablet/tablet-test-util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet-test-util.cc b/src/kudu/tablet/tablet-test-util.cc
index 0575e3b..5453c30 100644
--- a/src/kudu/tablet/tablet-test-util.cc
+++ b/src/kudu/tablet/tablet-test-util.cc
@@ -21,16 +21,29 @@ namespace kudu {
 namespace tablet {
 
 template<>
-bool MirroredDeltas<DeltaTypeSelector<REDO>>::IsDeltaRelevant(
+bool MirroredDeltas<DeltaTypeSelector<REDO>>::IsDeltaRelevantForApply(
     Timestamp to_include, Timestamp ts) const {
   return ts < to_include;
 }
 
 template<>
-bool MirroredDeltas<DeltaTypeSelector<UNDO>>::IsDeltaRelevant(
+bool MirroredDeltas<DeltaTypeSelector<UNDO>>::IsDeltaRelevantForApply(
     Timestamp to_include, Timestamp ts) const {
   return ts >= to_include;
 }
 
+template<typename T>
+bool MirroredDeltas<T>::IsDeltaRelevantForSelect(
+    Timestamp to_exclude, Timestamp to_include, Timestamp ts) const {
+  return ts >= to_exclude && ts < to_include;
+}
+
+template
+bool MirroredDeltas<DeltaTypeSelector<REDO>>::IsDeltaRelevantForSelect(
+    Timestamp to_exclude, Timestamp to_include, Timestamp ts) const;
+template
+bool MirroredDeltas<DeltaTypeSelector<UNDO>>::IsDeltaRelevantForSelect(
+    Timestamp to_exclude, Timestamp to_include, Timestamp ts) const;
+
 } // namespace tablet
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/9f4657ab/src/kudu/tablet/tablet-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet-test-util.h b/src/kudu/tablet/tablet-test-util.h
index 87daee6..f43e40d 100644
--- a/src/kudu/tablet/tablet-test-util.h
+++ b/src/kudu/tablet/tablet-test-util.h
@@ -365,10 +365,15 @@ class MirroredDeltas {
   }
 
   // Returns true if all tracked deltas are irrelevant to 'ts', false otherwise.
-  bool CheckAllDeltasCulled(Timestamp ts) const {
+  bool CheckAllDeltasCulled(const boost::optional<Timestamp>& lower_ts,
+                            Timestamp upper_ts) const {
     for (const auto& e1 : all_deltas_) {
       for (const auto& e2 : e1.second) {
-        if (IsDeltaRelevant(ts, e2.first)) {
+        bool relevant = IsDeltaRelevantForApply(upper_ts, e2.first);
+        if (lower_ts) {
+          relevant |= IsDeltaRelevantForSelect(*lower_ts, upper_ts, e2.first);
+        }
+        if (relevant) {
           return false;
         }
       }
@@ -401,15 +406,44 @@ class MirroredDeltas {
   //
   // Rows not set in 'filter' are skipped.
   //
-  // Deltas not relevant to 'ts' are skipped. The set of rows considered is
-  // determined by 'start_row_idx' and the number of rows in 'cb'.
-  Status ApplyUpdates(const Schema& projection, Timestamp ts,
+  // Deltas not relevant to 'lower_ts' or 'upper_ts' are skipped. The set of
+  // rows considered is determined by 'start_row_idx' and the number of rows in 'cb'.
+  Status ApplyUpdates(const Schema& projection,
+                      const boost::optional<Timestamp>& lower_ts, Timestamp upper_ts,
                       rowid_t start_row_idx, int col_idx, ColumnBlock* cb,
                       const SelectionVector& filter) {
+    if (VLOG_IS_ON(3)) {
+      std::string lower_ts_str = lower_ts ? lower_ts->ToString() : "INF";
+      VLOG(3) << "Begin applying for timestamps [" << lower_ts_str << ","
+              << upper_ts << ")";
+    }
     for (int i = 0; i < cb->nrows(); i++) {
       rowid_t row_idx = start_row_idx + i;
+
+      if (lower_ts) {
+        // First pass: establish whether this row should be applied at all.
+        bool at_least_one_delta_relevant = false;
+        for (const auto& e : all_deltas_[row_idx]) {
+          bool is_relevant = IsDeltaRelevantForSelect(*lower_ts, upper_ts, e.first);
+          if (VLOG_IS_ON(3)) {
+            RowChangeList changes(e.second);
+            VLOG(3) << "Row " << i << " ts " << e.first << " (relevant: "
+                    << is_relevant << "): " << changes.ToString(*schema_);
+          }
+          if (is_relevant) {
+            at_least_one_delta_relevant = true;
+            break;
+          }
+        }
+        if (!at_least_one_delta_relevant) {
+          // Not one delta was relevant; skip the row.
+          continue;
+        }
+      }
+
+      // Second pass: apply all relevant deltas.
       for (const auto& e : all_deltas_[row_idx]) {
-        if (!IsDeltaRelevant(ts, e.first)) {
+        if (!IsDeltaRelevantForApply(upper_ts, e.first)) {
           // No need to keep iterating; all future deltas for this row will also
           // be irrelevant.
           break;
@@ -438,7 +472,7 @@ class MirroredDeltas {
     for (int i = 0; i < sel_vec->nrows(); i++) {
       bool deleted = false;
       for (const auto& e : all_deltas_[start_row_idx + i]) {
-        if (!IsDeltaRelevant(ts, e.first)) {
+        if (!IsDeltaRelevantForApply(ts, e.first)) {
           // No need to keep iterating; all future deltas for this row will also
           // be irrelevant.
           break;
@@ -455,6 +489,25 @@ class MirroredDeltas {
     return Status::OK();
   }
 
+  // Selects all rows in 'sel_vec' for which there exists a tracked delta.
+  //
+  // Deltas not relevant to 'lower_ts' or 'upper_ts' are skipped. The set of
+  // rows considered is determined by 'start_row_idx' and the number of rows in 'sel_vec'.
+  void SelectUpdates(Timestamp lower_ts, Timestamp upper_ts,
+                     rowid_t start_row_idx, SelectionVector* sel_vec) {
+    for (int i = 0; i < sel_vec->nrows(); i++) {
+      for (const auto& e : all_deltas_[start_row_idx + i]) {
+        if (!IsDeltaRelevantForSelect(lower_ts, upper_ts, e.first)) {
+          // Must keep iterating; short-circuit out of the select criteria is
+          // complex and not worth using in test code.
+          continue;
+        }
+        sel_vec->SetRowSelected(i);
+        break;
+      }
+    }
+  }
+
   // Transforms and writes deltas into 'deltas', a vector of "delta lists", each
   // of which represents a particular row, and each entry of which is a
   // Timestamp and encoded delta pair. The encoded delta is a Slice into
@@ -467,7 +520,7 @@ class MirroredDeltas {
                         std::vector<DeltaList>* deltas) {
     for (int i = 0; i < deltas->size(); i++) {
       for (const auto& e : all_deltas_[start_row_idx + i]) {
-        if (!IsDeltaRelevant(ts, e.first)) {
+        if (!IsDeltaRelevantForApply(ts, e.first)) {
           // No need to keep iterating; all future deltas for this row will also
           // be irrelevant.
           break;
@@ -513,7 +566,13 @@ class MirroredDeltas {
 
  private:
   // Returns true if 'ts' is relevant to 'to_include', false otherwise.
-  bool IsDeltaRelevant(Timestamp to_include, Timestamp ts) const;
+  bool IsDeltaRelevantForApply(Timestamp to_include, Timestamp ts) const;
+
+  // Returns true if 'ts' is relevant with respect to both 'to_exclude' and
+  // 'to_include', false otherwise.
+  bool IsDeltaRelevantForSelect(Timestamp to_exclude,
+                                Timestamp to_include,
+                                Timestamp ts) const;
 
   // All encoded deltas, arranged in DeltaKey order.
   MirroredDeltaMap all_deltas_;
@@ -779,15 +838,23 @@ void RunDeltaFuzzTest(const DeltaStore& store,
   for (int i = 0; i < kNumScans + 1; i++) {
     // Pick a timestamp for the iterator. The last iteration will use a snapshot
     // that includes all deltas.
-    Timestamp ts;
+    Timestamp upper_ts;
+    boost::optional<Timestamp> lower_ts;
     if (i < kNumScans) {
-      ts = Timestamp(prng->Uniform(ts_range.second - ts_range.first) +
-                     ts_range.first);
+      uint64_t upper_ts_val = prng->Uniform(ts_range.second - ts_range.first) +
+                              ts_range.first;
+      upper_ts = Timestamp(upper_ts_val);
+
+      // Use a lower bound in half the scans.
+      if (prng->Uniform(2)) {
+        uint64_t lower_ts_val = upper_ts_val > 0 ? prng->Uniform(upper_ts_val) : 0;
+        lower_ts = Timestamp(lower_ts_val);
+      }
     } else if (T::kTag == REDO) {
-      ts = Timestamp::kMax;
+      upper_ts = Timestamp::kMax;
     } else {
       DCHECK(T::kTag == UNDO);
-      ts = Timestamp::kMin;
+      upper_ts = Timestamp::kMin;
     }
 
     // Create and initialize the iterator. If none iterator is returned, it's
@@ -796,13 +863,18 @@ void RunDeltaFuzzTest(const DeltaStore& store,
     SCOPED_TRACE(strings::Substitute("Projection $0", projection.ToString()));
     RowIteratorOptions opts;
     opts.projection = &projection;
-    opts.snap_to_include = MvccSnapshot(ts);
-    SCOPED_TRACE(strings::Substitute("Timestamp $0", ts.ToString()));
+    if (lower_ts) {
+      opts.snap_to_exclude = MvccSnapshot(*lower_ts);
+    }
+    opts.snap_to_include = MvccSnapshot(upper_ts);
+    SCOPED_TRACE(strings::Substitute("Timestamps: [$0,$1)",
+                                     lower_ts ? lower_ts->ToString() : "INF",
+                                     upper_ts.ToString()));
     DeltaIterator* raw_iter;
     Status s = store.NewDeltaIterator(opts, &raw_iter);
     if (s.IsNotFound()) {
       ASSERT_STR_CONTAINS(s.ToString(), "MvccSnapshot outside the range of this delta");
-      ASSERT_TRUE(mirror->CheckAllDeltasCulled(ts));
+      ASSERT_TRUE(mirror->CheckAllDeltasCulled(lower_ts, upper_ts));
       continue;
     }
     ASSERT_OK(s);
@@ -816,20 +888,39 @@ void RunDeltaFuzzTest(const DeltaStore& store,
       int batch_size = prng->Uniform(kMaxBatchSize) + 1;
       SCOPED_TRACE(strings::Substitute("batch starting at $0 ($1 rows)",
                                        start_row_idx, batch_size));
-      ASSERT_OK(iter->PrepareBatch(batch_size, DeltaIterator::PREPARE_FOR_APPLY |
-                                               DeltaIterator::PREPARE_FOR_COLLECT));
+      int prepare_flags = DeltaIterator::PREPARE_FOR_APPLY |
+                          DeltaIterator::PREPARE_FOR_COLLECT;
+      if (lower_ts) {
+        prepare_flags |= DeltaIterator::PREPARE_FOR_SELECT;
+      }
+      ASSERT_OK(iter->PrepareBatch(batch_size, prepare_flags));
+
+      // Test SelectUpdates: the selection vector begins all false and a row is
+      // set if there is at least one relevant update for it.
+      //
+      // Note: we retain 'actual_selected' for use as a possible filter in the
+      // ApplyUpdates test below.
+      SelectionVector actual_selected(batch_size);
+      if (lower_ts) {
+        SelectionVector expected_selected(batch_size);
+        expected_selected.SetAllFalse();
+        actual_selected.SetAllFalse();
+        mirror->SelectUpdates(*lower_ts, upper_ts, start_row_idx, &expected_selected);
+        ASSERT_OK(iter->SelectUpdates(&actual_selected));
+        ASSERT_EQ(expected_selected, actual_selected);
+      }
 
       // Test ApplyDeletes: the selection vector is all true and a row is unset
       // if the last relevant update deleted it.
       //
-      // Note: we retain 'actual_deleted' for use as a filter in the
+      // Note: we retain 'actual_deleted' for use as a possible filter in the
       // ApplyUpdates test below.
       SelectionVector actual_deleted(batch_size);
       {
         SelectionVector expected_deleted(batch_size);
         expected_deleted.SetAllTrue();
         actual_deleted.SetAllTrue();
-        ASSERT_OK(mirror->ApplyDeletes(ts, start_row_idx, &expected_deleted));
+        ASSERT_OK(mirror->ApplyDeletes(upper_ts, start_row_idx, &expected_deleted));
         ASSERT_OK(iter->ApplyDeletes(&actual_deleted));
         ASSERT_EQ(expected_deleted, actual_deleted);
       }
@@ -843,9 +934,10 @@ void RunDeltaFuzzTest(const DeltaStore& store,
           expected_scb[k] = 0;
           actual_scb[k] = 0;
         }
-        ASSERT_OK(mirror->ApplyUpdates(*opts.projection, ts, start_row_idx, j,
-                                       &expected_scb, actual_deleted));
-        ASSERT_OK(iter->ApplyUpdates(j, &actual_scb, actual_deleted));
+        const SelectionVector& filter = lower_ts ? actual_selected : actual_deleted;
+        ASSERT_OK(mirror->ApplyUpdates(*opts.projection, lower_ts, upper_ts,
+                                       start_row_idx, j, &expected_scb, filter));
+        ASSERT_OK(iter->ApplyUpdates(j, &actual_scb, filter));
         ASSERT_EQ(expected_scb, actual_scb)
             << "Expected column block: " << expected_scb.ToString()
             << "\nActual column block: " << actual_scb.ToString();
@@ -857,7 +949,7 @@ void RunDeltaFuzzTest(const DeltaStore& store,
         std::vector<typename MirroredDeltas<T>::DeltaList> expected_muts(batch_size);
         std::vector<Mutation*> actual_muts(batch_size);
         ASSERT_OK(iter->CollectMutations(&actual_muts, &arena));
-        mirror->CollectMutations(ts, start_row_idx, &expected_muts);
+        mirror->CollectMutations(upper_ts, start_row_idx, &expected_muts);
         for (int i = 0; i < expected_muts.size(); i++) {
           const auto& expected = expected_muts[i];
           auto* actual = actual_muts[i];