You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/06/08 00:23:28 UTC

[kudu] 04/05: KUDU-2809 (4/6): skip unobservable rows when iterating a DRS

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit ff656763a3d24623964c6a7cc7074899b83441ea
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Wed Jun 5 17:38:38 2019 -0700

    KUDU-2809 (4/6): skip unobservable rows when iterating a DRS
    
    The counterpart to the MRS patch, this one makes an equivalent change to the
    DRS diff scanning logic. It's far more complicated than the MRS version, for
    several reasons:
    1. A row's history is fragmented across multiple delta stores.
    2. UNDOs' view of time is inverted, complicating attempts at tracing a row's
       liveness across stores.
    
    To implement this, we need to reconsider how the DeltaApplier initializes
    its selection vector. The overall structure remains the same:
    1. The DeltaApplier sets up data structure to capture selectivity info.
    2. It invokes a method on each store to add that store's selectivity info
       to the structure.
    3. Finally, converts the structure into the selection vector.
    
    Before this patch, it was sufficient for the data structure to be the
    selection vector itself. But now the selection vector is insufficient.
    Instead we use a new data structure which tracks, for each row, the row's
    oldest and newest selected deltas. Armed with this information, we can omit
    unobservable rows at conversion time: if a row was dead before the oldest
    delta and after the newest delta, its lifetime exists entirely within the
    diff scan and it should be omitted from the results.
    
    Much of the complexity here lies with the need to totally order all deltas,
    regardless of whether they're UNDOs or REDOs, even if they share the same
    timestamps (fuzz-itest is a super effective way of testing this). The
    ordering starts with timestamps, but then orders deltas in some potentially
    non-obvious ways based on Kudu invariants:
    1. Timestamps (obviously)
    2. UNDOs, then REDOs. Within a rowset all UNDOs come before REDOs.
    3. The order in which the deltas were observed in the store. This is
       equivalent to the order in which they are applied.
    
    Change-Id: I2b616c4e8b99dbc7063b940bcb35352f87ab0226
    Reviewed-on: http://gerrit.cloudera.org:8080/13536
    Reviewed-by: Mike Percy <mp...@apache.org>
    Tested-by: Kudu Jenkins
---
 src/kudu/common/row_changelist.cc        |  16 +++
 src/kudu/common/row_changelist.h         |  12 +-
 src/kudu/common/rowblock.h               |   5 +
 src/kudu/tablet/compaction.cc            |   4 +-
 src/kudu/tablet/delta_applier.cc         |   6 +-
 src/kudu/tablet/delta_iterator_merger.cc |   4 +-
 src/kudu/tablet/delta_iterator_merger.h  |   2 +-
 src/kudu/tablet/delta_store.cc           | 185 ++++++++++++++++++++++++++-----
 src/kudu/tablet/delta_store.h            | 138 +++++++++++++++++++++--
 src/kudu/tablet/deltafile.cc             |   4 +-
 src/kudu/tablet/deltafile.h              |   2 +-
 src/kudu/tablet/deltamemstore.cc         |   4 +-
 src/kudu/tablet/deltamemstore.h          |   2 +-
 src/kudu/tablet/tablet-test-util.h       |  60 ++++++++--
 src/kudu/tablet/tablet-test.cc           |   3 +
 15 files changed, 387 insertions(+), 60 deletions(-)

diff --git a/src/kudu/common/row_changelist.cc b/src/kudu/common/row_changelist.cc
index 5681439..c20fce1 100644
--- a/src/kudu/common/row_changelist.cc
+++ b/src/kudu/common/row_changelist.cc
@@ -106,6 +106,21 @@ string RowChangeList::ToString(const Schema &schema) const {
   return ret;
 }
 
+const char* RowChangeList::ChangeType_Name(RowChangeList::ChangeType t) {
+  switch (t) {
+    case kUninitialized:
+      return "UNINITIALIZED";
+    case kUpdate:
+      return "UPDATE";
+    case kDelete:
+      return "DELETE";
+    case kReinsert:
+      return "REINSERT";
+    default:
+      return "UNKNOWN";
+  }
+}
+
 void RowChangeListEncoder::AddColumnUpdate(const ColumnSchema& col_schema,
                                            int col_id,
                                            const void* cell_ptr) {
@@ -151,6 +166,7 @@ void RowChangeListEncoder::EncodeColumnMutationRaw(int col_id, bool is_null, Sli
 }
 
 Status RowChangeListDecoder::Init() {
+  DCHECK_EQ(type_, RowChangeList::kUninitialized) << "Already called Init()";
   if (PREDICT_FALSE(remaining_.empty())) {
     return Status::Corruption("empty changelist - expected type");
   }
diff --git a/src/kudu/common/row_changelist.h b/src/kudu/common/row_changelist.h
index e415c27..e648d3d 100644
--- a/src/kudu/common/row_changelist.h
+++ b/src/kudu/common/row_changelist.h
@@ -21,6 +21,7 @@
 #define KUDU_COMMON_ROW_CHANGELIST_H
 
 #include <cstddef>
+#include <ostream>
 #include <string>
 #include <vector>
 
@@ -136,6 +137,8 @@ class RowChangeList {
     ChangeType_max = 3
   };
 
+  static const char* ChangeType_Name(ChangeType t);
+
   Slice encoded_data_;
 };
 
@@ -290,24 +293,32 @@ class RowChangeListDecoder {
 #endif
   }
 
+  bool IsInitialized() const {
+    return type_ != RowChangeList::kUninitialized;
+  }
+
   bool HasNext() const {
     DCHECK(!is_delete());
     return !remaining_.empty();
   }
 
   bool is_update() const {
+    DCHECK_NE(type_, RowChangeList::kUninitialized) << "Must call Init()";
     return type_ == RowChangeList::kUpdate;
   }
 
   bool is_delete() const {
+    DCHECK_NE(type_, RowChangeList::kUninitialized) << "Must call Init()";
     return type_ == RowChangeList::kDelete;
   }
 
   bool is_reinsert() const {
+    DCHECK_NE(type_, RowChangeList::kUninitialized) << "Must call Init()";
     return type_ == RowChangeList::kReinsert;
   }
 
   const RowChangeList::ChangeType get_type() const {
+    DCHECK_NE(type_, RowChangeList::kUninitialized) << "Must call Init()";
     return type_;
   }
 
@@ -429,7 +440,6 @@ class RowChangeListDecoder {
   RowChangeList::ChangeType type_;
 };
 
-
 } // namespace kudu
 
 // Defined for tight_enum_test_cast<> -- has to be defined outside of any namespace.
diff --git a/src/kudu/common/rowblock.h b/src/kudu/common/rowblock.h
index 8d3c4ec..678b355 100644
--- a/src/kudu/common/rowblock.h
+++ b/src/kudu/common/rowblock.h
@@ -20,6 +20,7 @@
 
 #include <cstdint>
 #include <cstring>
+#include <string>
 #include <vector>
 
 #include <glog/logging.h>
@@ -144,6 +145,10 @@ class SelectionVector {
                bitmap_.get(), src_row_off, num_rows);
   }
 
+  std::string ToString() const {
+    return BitmapToString(&bitmap_[0], n_rows_);
+  }
+
  private:
 
   // Pads any non-byte-aligned bits at the end of the SelectionVector with zeroes.
diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index b760776..ecebbf8 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -135,7 +135,8 @@ class MemRowSetCompactionInput : public CompactionInput {
       // Handle the rare case where a row was inserted and deleted in the same operation.
       // This row can never be observed and should not be compacted/flushed. This saves
       // us some trouble later on on compactions.
-      // See: MergeCompactionInput::CompareAndMergeDuplicatedRows().
+      //
+      // See CompareDuplicatedRows().
       if (PREDICT_FALSE(input_row.redo_head != nullptr &&
           input_row.redo_head->timestamp() == insertion_timestamp)) {
         // Get the latest mutation.
@@ -1222,7 +1223,6 @@ Status ReupdateMissedDeltas(const IOContext* io_context,
   // updates. So, this can be made much faster.
   vector<CompactionInputRow> rows;
   const Schema* schema = &input->schema();
-  const Schema key_schema(input->schema().CreateKeyProjection());
 
   rowid_t output_row_offset = 0;
   while (input->HasMoreBlocks()) {
diff --git a/src/kudu/tablet/delta_applier.cc b/src/kudu/tablet/delta_applier.cc
index 7c767a6..43c489f 100644
--- a/src/kudu/tablet/delta_applier.cc
+++ b/src/kudu/tablet/delta_applier.cc
@@ -116,8 +116,10 @@ Status DeltaApplier::InitializeSelectionVector(SelectionVector *sel_vec) {
   //
   // See delta_relevancy.h for more details.
   if (opts_.snap_to_exclude) {
-    sel_vec->SetAllFalse();
-    RETURN_NOT_OK(delta_iter_->SelectUpdates(sel_vec));
+    SelectedDeltas deltas(sel_vec->nrows());
+    RETURN_NOT_OK(delta_iter_->SelectDeltas(&deltas));
+    VLOG(4) << "Final deltas:\n" << deltas.ToString();
+    deltas.ToSelectionVector(sel_vec);
   } else {
     RETURN_NOT_OK(base_iter_->InitializeSelectionVector(sel_vec));
   }
diff --git a/src/kudu/tablet/delta_iterator_merger.cc b/src/kudu/tablet/delta_iterator_merger.cc
index 8bdf648..24ef27b 100644
--- a/src/kudu/tablet/delta_iterator_merger.cc
+++ b/src/kudu/tablet/delta_iterator_merger.cc
@@ -79,9 +79,9 @@ Status DeltaIteratorMerger::ApplyDeletes(SelectionVector* sel_vec) {
   return Status::OK();
 }
 
-Status DeltaIteratorMerger::SelectUpdates(SelectionVector* sel_vec) {
+Status DeltaIteratorMerger::SelectDeltas(SelectedDeltas* deltas) {
   for (const unique_ptr<DeltaIterator>& iter : iters_) {
-    RETURN_NOT_OK(iter->SelectUpdates(sel_vec));
+    RETURN_NOT_OK(iter->SelectDeltas(deltas));
   }
   return Status::OK();
 }
diff --git a/src/kudu/tablet/delta_iterator_merger.h b/src/kudu/tablet/delta_iterator_merger.h
index c5a36e8..bedf552 100644
--- a/src/kudu/tablet/delta_iterator_merger.h
+++ b/src/kudu/tablet/delta_iterator_merger.h
@@ -67,7 +67,7 @@ class DeltaIteratorMerger : public DeltaIterator {
 
   Status ApplyDeletes(SelectionVector* sel_vec) override;
 
-  Status SelectUpdates(SelectionVector* sel_vec) override;
+  Status SelectDeltas(SelectedDeltas* deltas) override;
 
   Status CollectMutations(std::vector<Mutation*>* dst, Arena* arena) override;
 
diff --git a/src/kudu/tablet/delta_store.cc b/src/kudu/tablet/delta_store.cc
index 3fd7e52..5240b57 100644
--- a/src/kudu/tablet/delta_store.cc
+++ b/src/kudu/tablet/delta_store.cc
@@ -34,6 +34,7 @@
 #include "kudu/common/types.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/delta_relevancy.h"
@@ -63,13 +64,121 @@ string DeltaKeyAndUpdate::Stringify(DeltaType type, const Schema& schema, bool p
                                                  key.timestamp().ToString()))));
 }
 
+SelectedDeltas::SelectedDeltas(size_t nrows) {
+  Reset(nrows);
+}
+
+void SelectedDeltas::MergeFrom(const SelectedDeltas& other) {
+  DCHECK_EQ(rows_.size(), other.rows_.size());
+
+  for (rowid_t idx = 0; idx < rows_.size(); idx++) {
+    const auto& src = other.rows_[idx];
+    if (!src) {
+      continue;
+    }
+    if (src->same_delta) {
+      ProcessDelta(idx, src->oldest);
+    } else {
+      ProcessDelta(idx, src->oldest);
+      ProcessDelta(idx, src->newest);
+    }
+  }
+}
+
+void SelectedDeltas::ToSelectionVector(SelectionVector* sel_vec) const {
+  DCHECK_EQ(rows_.size(), sel_vec->nrows());
+
+  for (rowid_t idx = 0; idx < rows_.size(); idx++) {
+    const auto& row = rows_[idx];
+
+    if (!row) {
+      // There were no relevant deltas for this row.
+      sel_vec->SetRowUnselected(idx);
+      continue;
+    }
+
+    if (row->same_delta) {
+      // There was exactly one relevant delta; the row must be selected.
+      sel_vec->SetRowSelected(idx);
+      continue;
+    }
+
+    // There was more than one relevant delta.
+    //
+    // Before we mark the row as selected, we must first determine whether the
+    // row was dead at the beginning and end of the time range: such rows should
+    // be deselected. We've captured the oldest and newest deltas; the table
+    // below indicates whether, for a given type of delta, the row is live or
+    // dead at that endpoint.
+    //
+    // delta type    | oldest | newest
+    // --------------+--------+-------
+    // REDO DELETE   | L      | D
+    // REDO REINSERT | D      | L
+    // UNDO DELETE   | D      | L
+    // UNDO REINSERT | L      | D
+    const auto& oldest = row->oldest;
+    const auto& newest = row->newest;
+    if (((oldest.dtype == REDO && oldest.ctype == RowChangeList::kReinsert) ||
+         (oldest.dtype == UNDO && oldest.ctype == RowChangeList::kDelete)) &&
+        ((newest.dtype == REDO && newest.ctype == RowChangeList::kDelete) ||
+         (newest.dtype == UNDO && newest.ctype == RowChangeList::kReinsert))) {
+      sel_vec->SetRowUnselected(idx);
+    } else {
+      sel_vec->SetRowSelected(idx);
+    }
+  }
+}
+
+void SelectedDeltas::ProcessDelta(rowid_t row_idx, Delta new_delta) {
+  DCHECK_LT(row_idx, rows_.size());
+  auto& existing = rows_[row_idx];
+
+  if (!existing) {
+    existing = DeltaPair();
+    existing->same_delta = true;
+    existing->oldest = new_delta;
+    existing->newest = new_delta;
+    return;
+  }
+
+  existing->oldest = std::min(existing->oldest, new_delta, DeltaLessThanFunctor());
+  existing->newest = std::max(existing->newest, new_delta, DeltaLessThanFunctor());
+  existing->same_delta = false;
+}
+
+string SelectedDeltas::ToString() const {
+  rowid_t idx = 0;
+  return JoinMapped(rows_, [&idx](const boost::optional<DeltaPair>& dp) {
+      if (!dp) {
+        return Substitute("$0: UNSELECTED", idx++);
+      }
+      return Substitute("$0: @tx$1 $2 dis=$3 ($4) @tx$5 $6 dis=$7 ($8)$9", idx++,
+                        dp->oldest.ts.ToString(),
+                        DeltaType_Name(dp->oldest.dtype),
+                        dp->oldest.disambiguator,
+                        RowChangeList::ChangeType_Name(dp->oldest.ctype),
+                        dp->newest.ts.ToString(),
+                        DeltaType_Name(dp->newest.dtype),
+                        dp->newest.disambiguator,
+                        RowChangeList::ChangeType_Name(dp->newest.ctype),
+                        dp->same_delta ? " (same delta)" : "");
+    }, "\n");
+}
+
+void SelectedDeltas::Reset(size_t nrows) {
+  rows_.clear();
+  rows_.resize(nrows);
+}
+
 template<class Traits>
 DeltaPreparer<Traits>::DeltaPreparer(RowIteratorOptions opts)
     : opts_(std::move(opts)),
       cur_prepared_idx_(0),
       prev_prepared_idx_(0),
       prepared_flags_(DeltaIterator::PREPARE_NONE),
-      deletion_state_(UNKNOWN) {
+      deletion_state_(UNKNOWN),
+      deltas_selected_(0) {
 }
 
 template<class Traits>
@@ -85,12 +194,7 @@ void DeltaPreparer<Traits>::Start(size_t nrows, int prepare_flags) {
 
   if (prepare_flags & DeltaIterator::PREPARE_FOR_SELECT) {
     DCHECK(opts_.snap_to_exclude);
-
-    // Ensure we have a selection vector at least 'nrows' long.
-    if (!selected_ || selected_->nrows() < nrows) {
-      selected_.reset(new SelectionVector(nrows));
-    }
-    selected_->SetAllFalse();
+    selected_.Reset(nrows);
   }
   prepared_flags_ = prepare_flags;
   if (updates_by_col_.empty()) {
@@ -138,13 +242,32 @@ Status DeltaPreparer<Traits>::AddDelta(const DeltaKey& key, Slice val, bool* fin
   // to be finished, but that short-circuit can only be used if we're not also
   // handling a preparation with a different criteria.
 
+  RowChangeListDecoder decoder((RowChangeList(val)));
+
   if (prepared_flags_ & DeltaIterator::PREPARE_FOR_SELECT) {
     bool finished_row_for_select;
     if (IsDeltaRelevantForSelect<Traits::kType>(*opts_.snap_to_exclude,
                                                 opts_.snap_to_include,
                                                 key.timestamp(),
                                                 &finished_row_for_select)) {
-      selected_->SetRowSelected(key.row_idx() - cur_prepared_idx_);
+      RETURN_NOT_OK(InitDecoderIfNecessary(&decoder));
+
+      // The logical ordering of UNDOs is the opposite of their counting order.
+      int64_t disambiguator = Traits::kType == REDO ?
+                              deltas_selected_ : -deltas_selected_;
+
+      // We use the address of the DeltaPreparer itself as a "delta store" ID.
+      // That's safe because it is globally unique and remains so for the
+      // duration of the scan, which outlives this delta.
+      SelectedDeltas::Delta new_delta = { key.timestamp(),
+                                          Traits::kType,
+                                          disambiguator,
+                                          reinterpret_cast<int64_t>(this),
+                                          decoder.get_type() };
+
+      selected_.ProcessDelta(key.row_idx() - cur_prepared_idx_, new_delta);
+      deltas_selected_++;
+      VLOG(4) << "Selected deltas after AddDelta:\n" << selected_.ToString();
     }
 
     if (finished_row_for_select &&
@@ -164,16 +287,7 @@ Status DeltaPreparer<Traits>::AddDelta(const DeltaKey& key, Slice val, bool* fin
 
   if (prepared_flags_ & DeltaIterator::PREPARE_FOR_APPLY &&
       relevant_for_apply_or_collect) {
-    RowChangeListDecoder decoder((RowChangeList(val)));
-    if (Traits::kInitializeDecodersWithSafetyChecks) {
-      RETURN_NOT_OK(decoder.Init());
-    } else {
-      decoder.InitNoSafetyChecks();
-    }
-    if (!Traits::kAllowReinserts && decoder.is_reinsert()) {
-      LOG(DFATAL) << "Attempted to reinsert but not supported" << GetStackTrace();
-      return Status::InvalidArgument("Reinserts are not supported");
-    }
+    RETURN_NOT_OK(InitDecoderIfNecessary(&decoder));
     UpdateDeletionState(decoder.get_type());
     if (!decoder.is_delete()) {
       while (decoder.HasNext()) {
@@ -297,17 +411,13 @@ Status DeltaPreparer<Traits>::ApplyDeletes(SelectionVector* sel_vec) {
 }
 
 template<class Traits>
-Status DeltaPreparer<Traits>::SelectUpdates(SelectionVector* sel_vec) {
+Status DeltaPreparer<Traits>::SelectDeltas(SelectedDeltas* deltas) {
   DCHECK(prepared_flags_ & DeltaIterator::PREPARE_FOR_SELECT);
-  DCHECK_LE(cur_prepared_idx_ - prev_prepared_idx_, sel_vec->nrows());
-
-  // SelectUpdates() is additive: it should never exclude rows, only include them.
-  for (rowid_t idx = 0; idx < sel_vec->nrows(); idx++) {
-    if (selected_->IsRowSelected(idx)) {
-      sel_vec->SetRowSelected(idx);
-    }
-  }
-
+  DCHECK_LE(cur_prepared_idx_ - prev_prepared_idx_, deltas->rows_.size());
+  VLOG(4) << "Selected deltas before SelectDeltas:\n" << selected_.ToString();
+  VLOG(4) << "Pre-merge deltas:\n" << deltas->ToString();
+  deltas->MergeFrom(selected_);
+  VLOG(4) << "Post-merge deltas:\n" << deltas->ToString();
   return Status::OK();
 }
 
@@ -372,7 +482,7 @@ bool DeltaPreparer<Traits>::MayHaveDeltas() const {
   if (!reinserted_.empty()) {
     return true;
   }
-  for (auto& col : updates_by_col_) {
+  for (const auto& col : updates_by_col_) {
     if (!col.empty()) {
       return true;
     }
@@ -381,6 +491,23 @@ bool DeltaPreparer<Traits>::MayHaveDeltas() const {
 }
 
 template<class Traits>
+Status DeltaPreparer<Traits>::InitDecoderIfNecessary(RowChangeListDecoder* decoder) {
+  if (decoder->IsInitialized()) {
+    return Status::OK();
+  }
+  if (Traits::kInitializeDecodersWithSafetyChecks) {
+    RETURN_NOT_OK(decoder->Init());
+  } else {
+    decoder->InitNoSafetyChecks();
+  }
+  if (!Traits::kAllowReinserts && decoder->is_reinsert()) {
+    LOG(DFATAL) << "Attempted to reinsert but not supported" << GetStackTrace();
+    return Status::InvalidArgument("Reinserts are not supported");
+  }
+  return Status::OK();
+}
+
+template<class Traits>
 void DeltaPreparer<Traits>::MaybeProcessPreviousRowChange(boost::optional<rowid_t> cur_row_idx) {
   if (prepared_flags_ & DeltaIterator::PREPARE_FOR_APPLY &&
       last_added_idx_ &&
diff --git a/src/kudu/tablet/delta_store.h b/src/kudu/tablet/delta_store.h
index 9333581..77961f3 100644
--- a/src/kudu/tablet/delta_store.h
+++ b/src/kudu/tablet/delta_store.h
@@ -21,14 +21,18 @@
 #include <cstdint>
 #include <deque>
 #include <memory>
+#include <ostream>
 #include <string>
 #include <vector>
 
 #include <boost/optional/optional.hpp>
+#include <glog/logging.h>
 
 #include "kudu/common/row_changelist.h"
 #include "kudu/common/rowid.h"
+#include "kudu/common/timestamp.h"
 #include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
 #include "kudu/tablet/delta_key.h"
 #include "kudu/tablet/rowset.h"
 #include "kudu/util/slice.h"
@@ -54,6 +58,118 @@ class DeltaIterator;
 class DeltaStats;
 class Mutation;
 
+// Tracks deltas that have been selected by PreparedDeltas::SelectDeltas.
+//
+// May track deltas belonging to a single delta store, or to multiple stores
+// whose SelectedDeltas have been merged together.
+class SelectedDeltas {
+ public:
+  SelectedDeltas() = default;
+
+  // Equivalent to calling:
+  //
+  //   SelectedDeltas sd;
+  //   sd.Reset(nrows);
+  explicit SelectedDeltas(size_t nrows);
+
+  // Converts the selected deltas into a simpler SelectionVector.
+  void ToSelectionVector(SelectionVector* sel_vec) const;
+
+  // Returns a textual representation suitable for debugging.
+  std::string ToString() const;
+
+ private:
+  template<class Traits>
+  friend class DeltaPreparer;
+
+  // Mutation that has met the 'select' criteria in a delta store.
+  struct Delta {
+    // Key fields.
+
+    // The delta's timestamp.
+    Timestamp ts;
+
+    // Whether this delta was an UNDO or a REDO.
+    DeltaType dtype;
+
+    // It's possible for multiple UNDOs or REDOs in the same delta store to
+    // share a common timestamp. To ensure a total ordering, this additional key
+    // field reflects the logical ordering of such deltas.
+    //
+    // For example, consider the sequence of REDOs:
+    // D1: @tx10 UPDATE key=1
+    // D2: @tx10 DELETE key=1
+    //
+    // D1 and D2 are identical as far as 'ts' and 'dtype' are concerned, so D1's
+    // disambiguator must be less than that of D2.
+    int64_t disambiguator;
+
+    // Non-key fields.
+
+    // Identifier of the delta store that provided this delta. It must:
+    // 1. Be unique for the owning rowset, but needn't be more unique than that.
+    // 2. Remain unique for the lifetime of this delta scan.
+    int64_t delta_store_id;
+
+    // Whether this delta was an UPDATE, DELETE, or REINSERT.
+    RowChangeList::ChangeType ctype;
+  };
+
+  // Tracks the oldest and newest deltas for a given row.
+  //
+  // When there's only one delta, 'oldest' and 'newest' are equal and
+  // 'same_delta' is true. Otherwise, oldest is guaranteed to be less than
+  // newest as per the rules defined in DeltaLessThanFunctor.
+  //
+  // Most of the time "oldest" and "newest" is determined purely by timestamp,
+  // but some deltas can share timestamps, in which case additional rules are
+  // used to maintain a total ordering.
+  struct DeltaPair {
+    Delta oldest;
+    Delta newest;
+    bool same_delta;
+  };
+
+  // Comparator that establishes a total ordering amongst Deltas for the same row.
+  struct DeltaLessThanFunctor {
+    bool operator() (const Delta& a, const Delta& b) const {
+      // Most of the time, deltas are ordered using timestamp.
+      if (PREDICT_TRUE(a.ts != b.ts)) {
+        return a.ts < b.ts;
+      }
+
+      // If the timestamps match, we can order by observing that UNDO < REDO, an
+      // invariant that is preserved inside of a rowset.
+      if (a.dtype != b.dtype) {
+        return a.dtype == UNDO;
+      }
+
+      // The timestamps and delta types match. It should only be possible to get
+      // here if we're comparing deltas from within the same store, in which
+      // case the disambiguators must not match.
+      CHECK_EQ(a.delta_store_id, b.delta_store_id);
+      if (a.disambiguator != b.disambiguator) {
+        return a.disambiguator < b.disambiguator;
+      }
+      LOG(FATAL) << "Could not differentiate between two deltas";
+    }
+  };
+
+  // Merges two SelectedDeltas together on a row-by-row basis.
+  void MergeFrom(const SelectedDeltas& other);
+
+  // Considers a new delta, possibly adding it to 'rows_'.
+  void ProcessDelta(rowid_t row_idx, Delta new_delta);
+
+  // Clears out 'rows_' and makes it suitable for handling 'nrows'.
+  void Reset(size_t nrows);
+
+  // All tracked deltas, indexed by row ordinal.
+  //
+  // If an element is boost::none, there are no deltas for that row.
+  std::vector<boost::optional<DeltaPair>> rows_;
+};
+
 // Interface for the pieces of the system that track deltas/updates.
 // This is implemented by DeltaMemStore and by DeltaFileReader.
 class DeltaStore {
@@ -155,13 +271,11 @@ class PreparedDeltas {
   // Deltas must have been prepared with the flag PREPARE_FOR_APPLY.
   virtual Status ApplyDeletes(SelectionVector* sel_vec) = 0;
 
-  // Updates the given selection vector to reflect the snapshotted updates.
-  //
-  // Rows which have been updated or deleted in the associated MVCC snapshot are
-  // set to 1 in the selection vector so that they show up in the output.
+  // Modifies the given SelectedDeltas to include rows with relevant deltas from
+  // the current prepared batch.
   //
   // Deltas must have been prepared with the flag PREPARE_FOR_SELECT.
-  virtual Status SelectUpdates(SelectionVector* sel_vec) = 0;
+  virtual Status SelectDeltas(SelectedDeltas* deltas) = 0;
 
   // Collects the mutations associated with each row in the current prepared batch.
   //
@@ -282,7 +396,7 @@ struct DeltaFilePreparerTraits {
 // is responsible for loading encoded deltas from a backing store, passing them
 // to the DeltaPreparer to be transformed, and later, calling the DeltaPreparer
 // to serve the deltas.
-template <class Traits>
+template<class Traits>
 class DeltaPreparer : public PreparedDeltas {
  public:
   explicit DeltaPreparer(RowIteratorOptions opts);
@@ -320,7 +434,7 @@ class DeltaPreparer : public PreparedDeltas {
 
   Status ApplyDeletes(SelectionVector* sel_vec) override;
 
-  Status SelectUpdates(SelectionVector* sel_vec) override;
+  Status SelectDeltas(SelectedDeltas* deltas) override;
 
   Status CollectMutations(std::vector<Mutation*>* dst, Arena* arena) override;
 
@@ -335,6 +449,10 @@ class DeltaPreparer : public PreparedDeltas {
   const RowIteratorOptions& opts() const { return opts_; }
 
  private:
+  // If 'decoder' is not yet initialized, initializes it in accordance with the
+  // preparer's traits.
+  static Status InitDecoderIfNecessary(RowChangeListDecoder* decoder);
+
   // Checks whether we are done processing a row's deltas. If so, attempts to
   // convert the row's latest deletion state into a saved deletion or
   // reinsertion. By deferring this work to when a row is finished, we avoid
@@ -401,7 +519,11 @@ class DeltaPreparer : public PreparedDeltas {
 
   // State when prepared_for_ & PREPARED_FOR_SELECT
   // ------------------------------------------------------------
-  std::unique_ptr<SelectionVector> selected_;
+  SelectedDeltas selected_;
+
+  // The number of deltas selected so far by this DeltaPreparer. Used to build
+  // disambiguators (see SelectedDeltas::Delta). Never reset.
+  int64_t deltas_selected_;
 
   DISALLOW_COPY_AND_ASSIGN(DeltaPreparer);
 };
diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc
index e6b7a0a..26a3495 100644
--- a/src/kudu/tablet/deltafile.cc
+++ b/src/kudu/tablet/deltafile.cc
@@ -681,8 +681,8 @@ Status DeltaFileIterator<Type>::ApplyDeletes(SelectionVector* sel_vec) {
 }
 
 template<DeltaType Type>
-Status DeltaFileIterator<Type>::SelectUpdates(SelectionVector* sel_vec) {
-  return preparer_.SelectUpdates(sel_vec);
+Status DeltaFileIterator<Type>::SelectDeltas(SelectedDeltas* deltas) {
+  return preparer_.SelectDeltas(deltas);
 }
 
 template<DeltaType Type>
diff --git a/src/kudu/tablet/deltafile.h b/src/kudu/tablet/deltafile.h
index a93d5a3..56632ff 100644
--- a/src/kudu/tablet/deltafile.h
+++ b/src/kudu/tablet/deltafile.h
@@ -236,7 +236,7 @@ class DeltaFileIterator : public DeltaIterator {
 
   Status ApplyDeletes(SelectionVector* sel_vec) override;
 
-  Status SelectUpdates(SelectionVector* sel_vec) override;
+  Status SelectDeltas(SelectedDeltas* deltas) override;
 
   Status CollectMutations(std::vector<Mutation*>*dst, Arena* arena) override;
 
diff --git a/src/kudu/tablet/deltamemstore.cc b/src/kudu/tablet/deltamemstore.cc
index d2faea3..e356655 100644
--- a/src/kudu/tablet/deltamemstore.cc
+++ b/src/kudu/tablet/deltamemstore.cc
@@ -289,8 +289,8 @@ Status DMSIterator::ApplyDeletes(SelectionVector* sel_vec) {
   return preparer_.ApplyDeletes(sel_vec);
 }
 
-Status DMSIterator::SelectUpdates(SelectionVector* sel_vec) {
-  return preparer_.SelectUpdates(sel_vec);
+Status DMSIterator::SelectDeltas(SelectedDeltas* deltas) {
+  return preparer_.SelectDeltas(deltas);
 }
 
 Status DMSIterator::CollectMutations(vector<Mutation*>*dst, Arena* arena) {
diff --git a/src/kudu/tablet/deltamemstore.h b/src/kudu/tablet/deltamemstore.h
index 01b942f..687f45e 100644
--- a/src/kudu/tablet/deltamemstore.h
+++ b/src/kudu/tablet/deltamemstore.h
@@ -212,7 +212,7 @@ class DMSIterator : public DeltaIterator {
 
   Status ApplyDeletes(SelectionVector* sel_vec) override;
 
-  Status SelectUpdates(SelectionVector* sel_vec) override;
+  Status SelectDeltas(SelectedDeltas* deltas) override;
 
   Status CollectMutations(std::vector<Mutation*>* dst, Arena* arena) override;
 
diff --git a/src/kudu/tablet/tablet-test-util.h b/src/kudu/tablet/tablet-test-util.h
index 07ddb66..1b67fa9 100644
--- a/src/kudu/tablet/tablet-test-util.h
+++ b/src/kudu/tablet/tablet-test-util.h
@@ -354,8 +354,8 @@ class MirroredDeltas {
   using ComparatorType = typename std::conditional<T::kTag == REDO,
                                                    std::less<Timestamp>,
                                                    std::greater<Timestamp>>::type;
-  using MirroredDeltaMap = std::map<rowid_t,
-                                    std::map<Timestamp, faststring, ComparatorType>>;
+  using MirroredDeltaTimestampMap = std::map<Timestamp, faststring, ComparatorType>;
+  using MirroredDeltaMap = std::map<rowid_t, MirroredDeltaTimestampMap>;
 
   explicit MirroredDeltas(const Schema* schema)
       : schema_(schema),
@@ -499,17 +499,53 @@ class MirroredDeltas {
   //
   // Deltas not relevant to 'lower_ts' or 'upper_ts' are skipped. The set of
   // rows considered is determined by 'start_row_idx' and the number of rows in 'sel_vec'.
-  void SelectUpdates(Timestamp lower_ts, Timestamp upper_ts,
+  void SelectDeltas(Timestamp lower_ts, Timestamp upper_ts,
                      rowid_t start_row_idx, SelectionVector* sel_vec) {
     for (int i = 0; i < sel_vec->nrows(); i++) {
+      boost::optional<const typename MirroredDeltaTimestampMap::mapped_type&> first;
+      boost::optional<const typename MirroredDeltaTimestampMap::mapped_type&> last;
       for (const auto& e : all_deltas_[start_row_idx + i]) {
         if (!IsDeltaRelevantForSelect(lower_ts, upper_ts, e.first)) {
           // Must keep iterating; short-circuit out of the select criteria is
           // complex and not worth using in test code.
           continue;
         }
+        if (!first.is_initialized()) {
+          first = e.second;
+        }
+        last = e.second;
+      }
+
+      // No relevant deltas.
+      if (!first) {
+        continue;
+      }
+
+      // One relevant delta.
+      if (first == last) {
+        sel_vec->SetRowSelected(i);
+        continue;
+      }
+
+      // At least two relevant deltas.
+      bool first_liveness;
+      {
+        RowChangeList changes(*first);
+        RowChangeListDecoder decoder(changes);
+        decoder.InitNoSafetyChecks();
+        first_liveness = !decoder.is_reinsert();
+      }
+      bool last_liveness;
+      {
+        RowChangeList changes(*last);
+        RowChangeListDecoder decoder(changes);
+        decoder.InitNoSafetyChecks();
+        last_liveness = !decoder.is_delete();
+      }
+      if (!first_liveness && !last_liveness) {
+        sel_vec->SetRowUnselected(i);
+      } else {
         sel_vec->SetRowSelected(i);
-        break;
       }
     }
   }
@@ -936,7 +972,7 @@ void RunDeltaFuzzTest(const DeltaStore& store,
       }
       ASSERT_OK(iter->PrepareBatch(batch_size, prepare_flags));
 
-      // Test SelectUpdates: the selection vector begins all false and a row is
+      // Test SelectDeltas: the selection vector begins all false and a row is
       // set if there is at least one relevant update for it.
       //
       // Note: we retain 'actual_selected' for use as a possible filter in the
@@ -946,9 +982,13 @@ void RunDeltaFuzzTest(const DeltaStore& store,
         SelectionVector expected_selected(batch_size);
         expected_selected.SetAllFalse();
         actual_selected.SetAllFalse();
-        mirror->SelectUpdates(*lower_ts, upper_ts, start_row_idx, &expected_selected);
-        ASSERT_OK(iter->SelectUpdates(&actual_selected));
-        ASSERT_EQ(expected_selected, actual_selected);
+        mirror->SelectDeltas(*lower_ts, upper_ts, start_row_idx, &expected_selected);
+        SelectedDeltas deltas(batch_size);
+        ASSERT_OK(iter->SelectDeltas(&deltas));
+        deltas.ToSelectionVector(&actual_selected);
+        ASSERT_EQ(expected_selected, actual_selected)
+            << "Expected selvec: " << expected_selected.ToString()
+            << "\nActual selvec: " << actual_selected.ToString();
       }
 
       // Test ApplyDeletes: the selection vector is all true and a row is unset
@@ -963,7 +1003,9 @@ void RunDeltaFuzzTest(const DeltaStore& store,
         actual_deleted.SetAllTrue();
         ASSERT_OK(mirror->ApplyDeletes(upper_ts, start_row_idx, &expected_deleted));
         ASSERT_OK(iter->ApplyDeletes(&actual_deleted));
-        ASSERT_EQ(expected_deleted, actual_deleted);
+        ASSERT_EQ(expected_deleted, actual_deleted)
+            << "Expected selvec: " << expected_deleted.ToString()
+            << "\nActual selvec: " << actual_deleted.ToString();
       }
 
       // Test ApplyUpdates: all relevant updates are applied to the column block.
diff --git a/src/kudu/tablet/tablet-test.cc b/src/kudu/tablet/tablet-test.cc
index d714fad..27a0724 100644
--- a/src/kudu/tablet/tablet-test.cc
+++ b/src/kudu/tablet/tablet-test.cc
@@ -1510,6 +1510,9 @@ TYPED_TEST(TestTablet, TestDiffScanUnobservableOperations) {
   };
 
   NO_FATALS(diff_scan_no_rows());
+
+  ASSERT_OK(this->tablet()->Flush());
+  NO_FATALS(diff_scan_no_rows());
 }
 
 } // namespace tablet