You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/06/08 00:23:24 UTC

[kudu] branch master updated (bf2b3e8 -> 9dfe8f8)

This is an automated email from the ASF dual-hosted git repository.

adar pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git.


    from bf2b3e8  schema: cache first is_deleted virtual column index
     new 5aa6931  KUDU-2809 (1/6): temporarily disable TestKuduScanner.testDiffScan
     new 3afbb4a  KUDU-2809 (2/6): skip unobservable rows when iterating an MRS
     new 552e7f6  KUDU-2809 (3/6): C++ private API for diff scans
     new ff65676  KUDU-2809 (4/6): skip unobservable rows when iterating a DRS
     new 9dfe8f8  KUDU-2809 (5/6): add diff scan support to fuzz-itest

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../org/apache/kudu/client/TestKuduScanner.java    |   2 +
 src/kudu/client/client.cc                          |  14 ++
 src/kudu/client/client.h                           |  25 ++
 src/kudu/client/scan_batch.cc                      |   8 +
 src/kudu/client/scan_batch.h                       |   8 +
 src/kudu/client/scan_configuration.cc              |  62 ++++-
 src/kudu/client/scan_configuration.h               |  29 ++-
 src/kudu/client/scan_token-internal.cc             |   8 +-
 src/kudu/client/scanner-internal.cc                |   3 +
 src/kudu/common/row_changelist.cc                  |  16 ++
 src/kudu/common/row_changelist.h                   |  12 +-
 src/kudu/common/rowblock.h                         |   5 +
 src/kudu/integration-tests/fuzz-itest.cc           | 276 +++++++++++++++++++--
 src/kudu/tablet/compaction.cc                      |   4 +-
 src/kudu/tablet/delta_applier.cc                   |   6 +-
 src/kudu/tablet/delta_iterator_merger.cc           |   4 +-
 src/kudu/tablet/delta_iterator_merger.h            |   2 +-
 src/kudu/tablet/delta_store.cc                     | 185 +++++++++++---
 src/kudu/tablet/delta_store.h                      | 138 ++++++++++-
 src/kudu/tablet/deltafile.cc                       |   4 +-
 src/kudu/tablet/deltafile.h                        |   2 +-
 src/kudu/tablet/deltamemstore.cc                   |   4 +-
 src/kudu/tablet/deltamemstore.h                    |   2 +-
 src/kudu/tablet/key_value_test_schema.h            |   4 +
 src/kudu/tablet/memrowset-test.cc                  |   2 +-
 src/kudu/tablet/memrowset.cc                       |  49 +++-
 src/kudu/tablet/memrowset.h                        |   9 +-
 src/kudu/tablet/tablet-test-util.h                 |  60 ++++-
 src/kudu/tablet/tablet-test.cc                     |  86 +++++++
 29 files changed, 929 insertions(+), 100 deletions(-)


[kudu] 01/05: KUDU-2809 (1/6): temporarily disable TestKuduScanner.testDiffScan

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 5aa69318ff57b7f3455d7e4e3317778170e927ac
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Thu Jun 6 00:36:27 2019 -0700

    KUDU-2809 (1/6): temporarily disable TestKuduScanner.testDiffScan
    
    This test needs to be updated to reflect KUDU-2809, but since those changes
    are split into separate MRS and DRS patches, and since there's no telling
    where the data inserted by this test will ultimately land, we must disable
    it until both patches are applied.
    
    Change-Id: I981aaa47a532dd0c1271008e27de052e3c5ad1a6
    Reviewed-on: http://gerrit.cloudera.org:8080/13533
    Reviewed-by: Mike Percy <mp...@apache.org>
    Tested-by: Adar Dembo <ad...@cloudera.com>
---
 .../src/test/java/org/apache/kudu/client/TestKuduScanner.java           | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java
index 911f98e..9860b45 100644
--- a/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java
+++ b/java/kudu-client/src/test/java/org/apache/kudu/client/TestKuduScanner.java
@@ -27,6 +27,7 @@ import org.apache.kudu.test.RandomUtils;
 import org.apache.kudu.util.DataGenerator;
 import org.apache.kudu.util.Pair;
 import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.slf4j.Logger;
@@ -233,6 +234,7 @@ public class TestKuduScanner {
 
   @Test(timeout = 100000)
   @KuduTestHarness.TabletServerConfig(flags = { "--flush_threshold_secs=" + DIFF_FLUSH_SEC })
+  @Ignore
   public void testDiffScan() throws Exception {
     Schema schema = new Schema(Arrays.asList(
         new ColumnSchema.ColumnSchemaBuilder("key", Type.INT32).key(true).build(),


[kudu] 04/05: KUDU-2809 (4/6): skip unobservable rows when iterating a DRS

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit ff656763a3d24623964c6a7cc7074899b83441ea
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Wed Jun 5 17:38:38 2019 -0700

    KUDU-2809 (4/6): skip unobservable rows when iterating a DRS
    
    The counterpart to the MRS patch, this one makes an equivalent change to the
    DRS diff scanning logic. It's far more complicated than the MRS version, for
    several reasons:
    1. A row's history is fragmented across multiple delta stores.
    2. UNDOs' view of time is inverted, complicating attempts at tracing a row's
       liveness across stores.
    
    To implement this, we need to reconsider how the DeltaApplier initializes
    its selection vector. The overall structure remains the same:
    1. The DeltaApplier sets up data structure to capture selectivity info.
    2. It invokes a method on each store to add that store's selectivity info
       to the structure.
    3. Finally, converts the structure into the selection vector.
    
    Before this patch, it was sufficient for the data structure to be the
    selection vector itself. But now the selection vector is insufficient.
    Instead we use a new data structure which tracks, for each row, the row's
    oldest and newest selected deltas. Armed with this information, we can omit
    unobservable rows at conversion time: if a row was dead before the oldest
    delta and after the newest delta, its lifetime exists entirely within the
    diff scan and it should be omitted from the results.
    
    Much of the complexity here lies with the need to totally order all deltas,
    regardless of whether they're UNDOs or REDOs, even if they share the same
    timestamps (fuzz-itest is a super effective way of testing this). The
    ordering starts with timestamps, but then orders deltas in some potentially
    non-obvious ways based on Kudu invariants:
    1. Timestamps (obviously)
    2. UNDOs, then REDOs. Within a rowset all UNDOs come before REDOs.
    3. The order in which the deltas were observed in the store. This is
       equivalent to the order in which they are applied.
    
    Change-Id: I2b616c4e8b99dbc7063b940bcb35352f87ab0226
    Reviewed-on: http://gerrit.cloudera.org:8080/13536
    Reviewed-by: Mike Percy <mp...@apache.org>
    Tested-by: Kudu Jenkins
---
 src/kudu/common/row_changelist.cc        |  16 +++
 src/kudu/common/row_changelist.h         |  12 +-
 src/kudu/common/rowblock.h               |   5 +
 src/kudu/tablet/compaction.cc            |   4 +-
 src/kudu/tablet/delta_applier.cc         |   6 +-
 src/kudu/tablet/delta_iterator_merger.cc |   4 +-
 src/kudu/tablet/delta_iterator_merger.h  |   2 +-
 src/kudu/tablet/delta_store.cc           | 185 ++++++++++++++++++++++++++-----
 src/kudu/tablet/delta_store.h            | 138 +++++++++++++++++++++--
 src/kudu/tablet/deltafile.cc             |   4 +-
 src/kudu/tablet/deltafile.h              |   2 +-
 src/kudu/tablet/deltamemstore.cc         |   4 +-
 src/kudu/tablet/deltamemstore.h          |   2 +-
 src/kudu/tablet/tablet-test-util.h       |  60 ++++++++--
 src/kudu/tablet/tablet-test.cc           |   3 +
 15 files changed, 387 insertions(+), 60 deletions(-)

diff --git a/src/kudu/common/row_changelist.cc b/src/kudu/common/row_changelist.cc
index 5681439..c20fce1 100644
--- a/src/kudu/common/row_changelist.cc
+++ b/src/kudu/common/row_changelist.cc
@@ -106,6 +106,21 @@ string RowChangeList::ToString(const Schema &schema) const {
   return ret;
 }
 
+const char* RowChangeList::ChangeType_Name(RowChangeList::ChangeType t) {
+  switch (t) {
+    case kUninitialized:
+      return "UNINITIALIZED";
+    case kUpdate:
+      return "UPDATE";
+    case kDelete:
+      return "DELETE";
+    case kReinsert:
+      return "REINSERT";
+    default:
+      return "UNKNOWN";
+  }
+}
+
 void RowChangeListEncoder::AddColumnUpdate(const ColumnSchema& col_schema,
                                            int col_id,
                                            const void* cell_ptr) {
@@ -151,6 +166,7 @@ void RowChangeListEncoder::EncodeColumnMutationRaw(int col_id, bool is_null, Sli
 }
 
 Status RowChangeListDecoder::Init() {
+  DCHECK_EQ(type_, RowChangeList::kUninitialized) << "Already called Init()";
   if (PREDICT_FALSE(remaining_.empty())) {
     return Status::Corruption("empty changelist - expected type");
   }
diff --git a/src/kudu/common/row_changelist.h b/src/kudu/common/row_changelist.h
index e415c27..e648d3d 100644
--- a/src/kudu/common/row_changelist.h
+++ b/src/kudu/common/row_changelist.h
@@ -21,6 +21,7 @@
 #define KUDU_COMMON_ROW_CHANGELIST_H
 
 #include <cstddef>
+#include <ostream>
 #include <string>
 #include <vector>
 
@@ -136,6 +137,8 @@ class RowChangeList {
     ChangeType_max = 3
   };
 
+  static const char* ChangeType_Name(ChangeType t);
+
   Slice encoded_data_;
 };
 
@@ -290,24 +293,32 @@ class RowChangeListDecoder {
 #endif
   }
 
+  bool IsInitialized() const {
+    return type_ != RowChangeList::kUninitialized;
+  }
+
   bool HasNext() const {
     DCHECK(!is_delete());
     return !remaining_.empty();
   }
 
   bool is_update() const {
+    DCHECK_NE(type_, RowChangeList::kUninitialized) << "Must call Init()";
     return type_ == RowChangeList::kUpdate;
   }
 
   bool is_delete() const {
+    DCHECK_NE(type_, RowChangeList::kUninitialized) << "Must call Init()";
     return type_ == RowChangeList::kDelete;
   }
 
   bool is_reinsert() const {
+    DCHECK_NE(type_, RowChangeList::kUninitialized) << "Must call Init()";
     return type_ == RowChangeList::kReinsert;
   }
 
   const RowChangeList::ChangeType get_type() const {
+    DCHECK_NE(type_, RowChangeList::kUninitialized) << "Must call Init()";
     return type_;
   }
 
@@ -429,7 +440,6 @@ class RowChangeListDecoder {
   RowChangeList::ChangeType type_;
 };
 
-
 } // namespace kudu
 
 // Defined for tight_enum_test_cast<> -- has to be defined outside of any namespace.
diff --git a/src/kudu/common/rowblock.h b/src/kudu/common/rowblock.h
index 8d3c4ec..678b355 100644
--- a/src/kudu/common/rowblock.h
+++ b/src/kudu/common/rowblock.h
@@ -20,6 +20,7 @@
 
 #include <cstdint>
 #include <cstring>
+#include <string>
 #include <vector>
 
 #include <glog/logging.h>
@@ -144,6 +145,10 @@ class SelectionVector {
                bitmap_.get(), src_row_off, num_rows);
   }
 
+  std::string ToString() const {
+    return BitmapToString(&bitmap_[0], n_rows_);
+  }
+
  private:
 
   // Pads any non-byte-aligned bits at the end of the SelectionVector with zeroes.
diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index b760776..ecebbf8 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -135,7 +135,8 @@ class MemRowSetCompactionInput : public CompactionInput {
       // Handle the rare case where a row was inserted and deleted in the same operation.
       // This row can never be observed and should not be compacted/flushed. This saves
       // us some trouble later on on compactions.
-      // See: MergeCompactionInput::CompareAndMergeDuplicatedRows().
+      //
+      // See CompareDuplicatedRows().
       if (PREDICT_FALSE(input_row.redo_head != nullptr &&
           input_row.redo_head->timestamp() == insertion_timestamp)) {
         // Get the latest mutation.
@@ -1222,7 +1223,6 @@ Status ReupdateMissedDeltas(const IOContext* io_context,
   // updates. So, this can be made much faster.
   vector<CompactionInputRow> rows;
   const Schema* schema = &input->schema();
-  const Schema key_schema(input->schema().CreateKeyProjection());
 
   rowid_t output_row_offset = 0;
   while (input->HasMoreBlocks()) {
diff --git a/src/kudu/tablet/delta_applier.cc b/src/kudu/tablet/delta_applier.cc
index 7c767a6..43c489f 100644
--- a/src/kudu/tablet/delta_applier.cc
+++ b/src/kudu/tablet/delta_applier.cc
@@ -116,8 +116,10 @@ Status DeltaApplier::InitializeSelectionVector(SelectionVector *sel_vec) {
   //
   // See delta_relevancy.h for more details.
   if (opts_.snap_to_exclude) {
-    sel_vec->SetAllFalse();
-    RETURN_NOT_OK(delta_iter_->SelectUpdates(sel_vec));
+    SelectedDeltas deltas(sel_vec->nrows());
+    RETURN_NOT_OK(delta_iter_->SelectDeltas(&deltas));
+    VLOG(4) << "Final deltas:\n" << deltas.ToString();
+    deltas.ToSelectionVector(sel_vec);
   } else {
     RETURN_NOT_OK(base_iter_->InitializeSelectionVector(sel_vec));
   }
diff --git a/src/kudu/tablet/delta_iterator_merger.cc b/src/kudu/tablet/delta_iterator_merger.cc
index 8bdf648..24ef27b 100644
--- a/src/kudu/tablet/delta_iterator_merger.cc
+++ b/src/kudu/tablet/delta_iterator_merger.cc
@@ -79,9 +79,9 @@ Status DeltaIteratorMerger::ApplyDeletes(SelectionVector* sel_vec) {
   return Status::OK();
 }
 
-Status DeltaIteratorMerger::SelectUpdates(SelectionVector* sel_vec) {
+Status DeltaIteratorMerger::SelectDeltas(SelectedDeltas* deltas) {
   for (const unique_ptr<DeltaIterator>& iter : iters_) {
-    RETURN_NOT_OK(iter->SelectUpdates(sel_vec));
+    RETURN_NOT_OK(iter->SelectDeltas(deltas));
   }
   return Status::OK();
 }
diff --git a/src/kudu/tablet/delta_iterator_merger.h b/src/kudu/tablet/delta_iterator_merger.h
index c5a36e8..bedf552 100644
--- a/src/kudu/tablet/delta_iterator_merger.h
+++ b/src/kudu/tablet/delta_iterator_merger.h
@@ -67,7 +67,7 @@ class DeltaIteratorMerger : public DeltaIterator {
 
   Status ApplyDeletes(SelectionVector* sel_vec) override;
 
-  Status SelectUpdates(SelectionVector* sel_vec) override;
+  Status SelectDeltas(SelectedDeltas* deltas) override;
 
   Status CollectMutations(std::vector<Mutation*>* dst, Arena* arena) override;
 
diff --git a/src/kudu/tablet/delta_store.cc b/src/kudu/tablet/delta_store.cc
index 3fd7e52..5240b57 100644
--- a/src/kudu/tablet/delta_store.cc
+++ b/src/kudu/tablet/delta_store.cc
@@ -34,6 +34,7 @@
 #include "kudu/common/types.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/join.h"
 #include "kudu/gutil/strings/strcat.h"
 #include "kudu/gutil/strings/substitute.h"
 #include "kudu/tablet/delta_relevancy.h"
@@ -63,13 +64,121 @@ string DeltaKeyAndUpdate::Stringify(DeltaType type, const Schema& schema, bool p
                                                  key.timestamp().ToString()))));
 }
 
+SelectedDeltas::SelectedDeltas(size_t nrows) {
+  Reset(nrows);
+}
+
+void SelectedDeltas::MergeFrom(const SelectedDeltas& other) {
+  DCHECK_EQ(rows_.size(), other.rows_.size());
+
+  for (rowid_t idx = 0; idx < rows_.size(); idx++) {
+    const auto& src = other.rows_[idx];
+    if (!src) {
+      continue;
+    }
+    if (src->same_delta) {
+      ProcessDelta(idx, src->oldest);
+    } else {
+      ProcessDelta(idx, src->oldest);
+      ProcessDelta(idx, src->newest);
+    }
+  }
+}
+
+void SelectedDeltas::ToSelectionVector(SelectionVector* sel_vec) const {
+  DCHECK_EQ(rows_.size(), sel_vec->nrows());
+
+  for (rowid_t idx = 0; idx < rows_.size(); idx++) {
+    const auto& row = rows_[idx];
+
+    if (!row) {
+      // There were no relevant deltas for this row.
+      sel_vec->SetRowUnselected(idx);
+      continue;
+    }
+
+    if (row->same_delta) {
+      // There was exactly one relevant delta; the row must be selected.
+      sel_vec->SetRowSelected(idx);
+      continue;
+    }
+
+    // There was more than one relevant delta.
+    //
+    // Before we mark the row as selected, we must first determine whether the
+    // row was dead at the beginning and end of the time range: such rows should
+    // be deselected. We've captured the oldest and newest deltas; the table
+    // below indicates whether, for a given type of delta, the row is live or
+    // dead at that endpoint.
+    //
+    // delta type    | oldest | newest
+    // --------------+--------+-------
+    // REDO DELETE   | L      | D
+    // REDO REINSERT | D      | L
+    // UNDO DELETE   | D      | L
+    // UNDO REINSERT | L      | D
+    const auto& oldest = row->oldest;
+    const auto& newest = row->newest;
+    if (((oldest.dtype == REDO && oldest.ctype == RowChangeList::kReinsert) ||
+         (oldest.dtype == UNDO && oldest.ctype == RowChangeList::kDelete)) &&
+        ((newest.dtype == REDO && newest.ctype == RowChangeList::kDelete) ||
+         (newest.dtype == UNDO && newest.ctype == RowChangeList::kReinsert))) {
+      sel_vec->SetRowUnselected(idx);
+    } else {
+      sel_vec->SetRowSelected(idx);
+    }
+  }
+}
+
+void SelectedDeltas::ProcessDelta(rowid_t row_idx, Delta new_delta) {
+  DCHECK_LT(row_idx, rows_.size());
+  auto& existing = rows_[row_idx];
+
+  if (!existing) {
+    existing = DeltaPair();
+    existing->same_delta = true;
+    existing->oldest = new_delta;
+    existing->newest = new_delta;
+    return;
+  }
+
+  existing->oldest = std::min(existing->oldest, new_delta, DeltaLessThanFunctor());
+  existing->newest = std::max(existing->newest, new_delta, DeltaLessThanFunctor());
+  existing->same_delta = false;
+}
+
+string SelectedDeltas::ToString() const {
+  rowid_t idx = 0;
+  return JoinMapped(rows_, [&idx](const boost::optional<DeltaPair>& dp) {
+      if (!dp) {
+        return Substitute("$0: UNSELECTED", idx++);
+      }
+      return Substitute("$0: @tx$1 $2 dis=$3 ($4) @tx$5 $6 dis=$7 ($8)$9", idx++,
+                        dp->oldest.ts.ToString(),
+                        DeltaType_Name(dp->oldest.dtype),
+                        dp->oldest.disambiguator,
+                        RowChangeList::ChangeType_Name(dp->oldest.ctype),
+                        dp->newest.ts.ToString(),
+                        DeltaType_Name(dp->newest.dtype),
+                        dp->newest.disambiguator,
+                        RowChangeList::ChangeType_Name(dp->newest.ctype),
+                        dp->same_delta ? " (same delta)" : "");
+    }, "\n");
+}
+
+void SelectedDeltas::Reset(size_t nrows) {
+  rows_.clear();
+  rows_.resize(nrows);
+}
+
 template<class Traits>
 DeltaPreparer<Traits>::DeltaPreparer(RowIteratorOptions opts)
     : opts_(std::move(opts)),
       cur_prepared_idx_(0),
       prev_prepared_idx_(0),
       prepared_flags_(DeltaIterator::PREPARE_NONE),
-      deletion_state_(UNKNOWN) {
+      deletion_state_(UNKNOWN),
+      deltas_selected_(0) {
 }
 
 template<class Traits>
@@ -85,12 +194,7 @@ void DeltaPreparer<Traits>::Start(size_t nrows, int prepare_flags) {
 
   if (prepare_flags & DeltaIterator::PREPARE_FOR_SELECT) {
     DCHECK(opts_.snap_to_exclude);
-
-    // Ensure we have a selection vector at least 'nrows' long.
-    if (!selected_ || selected_->nrows() < nrows) {
-      selected_.reset(new SelectionVector(nrows));
-    }
-    selected_->SetAllFalse();
+    selected_.Reset(nrows);
   }
   prepared_flags_ = prepare_flags;
   if (updates_by_col_.empty()) {
@@ -138,13 +242,32 @@ Status DeltaPreparer<Traits>::AddDelta(const DeltaKey& key, Slice val, bool* fin
   // to be finished, but that short-circuit can only be used if we're not also
   // handling a preparation with a different criteria.
 
+  RowChangeListDecoder decoder((RowChangeList(val)));
+
   if (prepared_flags_ & DeltaIterator::PREPARE_FOR_SELECT) {
     bool finished_row_for_select;
     if (IsDeltaRelevantForSelect<Traits::kType>(*opts_.snap_to_exclude,
                                                 opts_.snap_to_include,
                                                 key.timestamp(),
                                                 &finished_row_for_select)) {
-      selected_->SetRowSelected(key.row_idx() - cur_prepared_idx_);
+      RETURN_NOT_OK(InitDecoderIfNecessary(&decoder));
+
+      // The logical ordering of UNDOs is the opposite of their counting order.
+      int64_t disambiguator = Traits::kType == REDO ?
+                              deltas_selected_ : -deltas_selected_;
+
+      // We use the address of the DeltaPreparer itself as a "delta store" ID.
+      // That's safe because it is globally unique and remains so for the
+      // duration of the scan, which outlives this delta.
+      SelectedDeltas::Delta new_delta = { key.timestamp(),
+                                          Traits::kType,
+                                          disambiguator,
+                                          reinterpret_cast<int64_t>(this),
+                                          decoder.get_type() };
+
+      selected_.ProcessDelta(key.row_idx() - cur_prepared_idx_, new_delta);
+      deltas_selected_++;
+      VLOG(4) << "Selected deltas after AddDelta:\n" << selected_.ToString();
     }
 
     if (finished_row_for_select &&
@@ -164,16 +287,7 @@ Status DeltaPreparer<Traits>::AddDelta(const DeltaKey& key, Slice val, bool* fin
 
   if (prepared_flags_ & DeltaIterator::PREPARE_FOR_APPLY &&
       relevant_for_apply_or_collect) {
-    RowChangeListDecoder decoder((RowChangeList(val)));
-    if (Traits::kInitializeDecodersWithSafetyChecks) {
-      RETURN_NOT_OK(decoder.Init());
-    } else {
-      decoder.InitNoSafetyChecks();
-    }
-    if (!Traits::kAllowReinserts && decoder.is_reinsert()) {
-      LOG(DFATAL) << "Attempted to reinsert but not supported" << GetStackTrace();
-      return Status::InvalidArgument("Reinserts are not supported");
-    }
+    RETURN_NOT_OK(InitDecoderIfNecessary(&decoder));
     UpdateDeletionState(decoder.get_type());
     if (!decoder.is_delete()) {
       while (decoder.HasNext()) {
@@ -297,17 +411,13 @@ Status DeltaPreparer<Traits>::ApplyDeletes(SelectionVector* sel_vec) {
 }
 
 template<class Traits>
-Status DeltaPreparer<Traits>::SelectUpdates(SelectionVector* sel_vec) {
+Status DeltaPreparer<Traits>::SelectDeltas(SelectedDeltas* deltas) {
   DCHECK(prepared_flags_ & DeltaIterator::PREPARE_FOR_SELECT);
-  DCHECK_LE(cur_prepared_idx_ - prev_prepared_idx_, sel_vec->nrows());
-
-  // SelectUpdates() is additive: it should never exclude rows, only include them.
-  for (rowid_t idx = 0; idx < sel_vec->nrows(); idx++) {
-    if (selected_->IsRowSelected(idx)) {
-      sel_vec->SetRowSelected(idx);
-    }
-  }
-
+  DCHECK_LE(cur_prepared_idx_ - prev_prepared_idx_, deltas->rows_.size());
+  VLOG(4) << "Selected deltas before SelectDeltas:\n" << selected_.ToString();
+  VLOG(4) << "Pre-merge deltas:\n" << deltas->ToString();
+  deltas->MergeFrom(selected_);
+  VLOG(4) << "Post-merge deltas:\n" << deltas->ToString();
   return Status::OK();
 }
 
@@ -372,7 +482,7 @@ bool DeltaPreparer<Traits>::MayHaveDeltas() const {
   if (!reinserted_.empty()) {
     return true;
   }
-  for (auto& col : updates_by_col_) {
+  for (const auto& col : updates_by_col_) {
     if (!col.empty()) {
       return true;
     }
@@ -381,6 +491,23 @@ bool DeltaPreparer<Traits>::MayHaveDeltas() const {
 }
 
 template<class Traits>
+Status DeltaPreparer<Traits>::InitDecoderIfNecessary(RowChangeListDecoder* decoder) {
+  if (decoder->IsInitialized()) {
+    return Status::OK();
+  }
+  if (Traits::kInitializeDecodersWithSafetyChecks) {
+    RETURN_NOT_OK(decoder->Init());
+  } else {
+    decoder->InitNoSafetyChecks();
+  }
+  if (!Traits::kAllowReinserts && decoder->is_reinsert()) {
+    LOG(DFATAL) << "Attempted to reinsert but not supported" << GetStackTrace();
+    return Status::InvalidArgument("Reinserts are not supported");
+  }
+  return Status::OK();
+}
+
+template<class Traits>
 void DeltaPreparer<Traits>::MaybeProcessPreviousRowChange(boost::optional<rowid_t> cur_row_idx) {
   if (prepared_flags_ & DeltaIterator::PREPARE_FOR_APPLY &&
       last_added_idx_ &&
diff --git a/src/kudu/tablet/delta_store.h b/src/kudu/tablet/delta_store.h
index 9333581..77961f3 100644
--- a/src/kudu/tablet/delta_store.h
+++ b/src/kudu/tablet/delta_store.h
@@ -21,14 +21,18 @@
 #include <cstdint>
 #include <deque>
 #include <memory>
+#include <ostream>
 #include <string>
 #include <vector>
 
 #include <boost/optional/optional.hpp>
+#include <glog/logging.h>
 
 #include "kudu/common/row_changelist.h"
 #include "kudu/common/rowid.h"
+#include "kudu/common/timestamp.h"
 #include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
 #include "kudu/tablet/delta_key.h"
 #include "kudu/tablet/rowset.h"
 #include "kudu/util/slice.h"
@@ -54,6 +58,118 @@ class DeltaIterator;
 class DeltaStats;
 class Mutation;
 
+// Tracks deltas that have been selected by PreparedDeltas::SelectDeltas.
+//
+// May track deltas belonging to a single delta store, or to multiple stores
+// whose SelectedDeltas have been merged together.
+class SelectedDeltas {
+ public:
+  SelectedDeltas() = default;
+
+  // Equivalent to calling:
+  //
+  //   SelectedDeltas sd;
+  //   sd.Reset(nrows);
+  explicit SelectedDeltas(size_t nrows);
+
+  // Converts the selected deltas into a simpler SelectionVector.
+  void ToSelectionVector(SelectionVector* sel_vec) const;
+
+  // Returns a textual representation suitable for debugging.
+  std::string ToString() const;
+
+ private:
+  template<class Traits>
+  friend class DeltaPreparer;
+
+  // Mutation that has met the 'select' criteria in a delta store.
+  struct Delta {
+    // Key fields.
+
+    // The delta's timestamp.
+    Timestamp ts;
+
+    // Whether this delta was an UNDO or a REDO.
+    DeltaType dtype;
+
+    // It's possible for multiple UNDOs or REDOs in the same delta store to
+    // share a common timestamp. To ensure a total ordering, this additional key
+    // field reflects the logical ordering of such deltas.
+    //
+    // For example, consider the sequence of REDOs:
+    // D1: @tx10 UPDATE key=1
+    // D2: @tx10 DELETE key=1
+    //
+    // D1 and D2 are identical as far as 'ts' and 'dtype' are concerned, so D1's
+    // disambiguator must be less than that of D2.
+    int64_t disambiguator;
+
+    // Non-key fields.
+
+    // Identifier of the delta store that provided this delta. It must:
+    // 1. Be unique for the owning rowset, but needn't be more unique than that.
+    // 2. Remain unique for the lifetime of this delta scan.
+    int64_t delta_store_id;
+
+    // Whether this delta was an UPDATE, DELETE, or REINSERT.
+    RowChangeList::ChangeType ctype;
+  };
+
+  // Tracks the oldest and newest deltas for a given row.
+  //
+  // When there's only one delta, 'oldest' and 'newest' are equal and
+  // 'same_delta' is true. Otherwise, oldest is guaranteed to be less than
+  // newest as per the rules defined in DeltaLessThanFunctor.
+  //
+  // Most of the time "oldest" and "newest" is determined purely by timestamp,
+  // but some deltas can share timestamps, in which case additional rules are
+  // used to maintain a total ordering.
+  struct DeltaPair {
+    Delta oldest;
+    Delta newest;
+    bool same_delta;
+  };
+
+  // Comparator that establishes a total ordering amongst Deltas for the same row.
+  struct DeltaLessThanFunctor {
+    bool operator() (const Delta& a, const Delta& b) const {
+      // Most of the time, deltas are ordered using timestamp.
+      if (PREDICT_TRUE(a.ts != b.ts)) {
+        return a.ts < b.ts;
+      }
+
+      // If the timestamps match, we can order by observing that UNDO < REDO, an
+      // invariant that is preserved inside of a rowset.
+      if (a.dtype != b.dtype) {
+        return a.dtype == UNDO;
+      }
+
+      // The timestamps and delta types match. It should only be possible to get
+      // here if we're comparing deltas from within the same store, in which
+      // case the disambiguators must not match.
+      CHECK_EQ(a.delta_store_id, b.delta_store_id);
+      if (a.disambiguator != b.disambiguator) {
+        return a.disambiguator < b.disambiguator;
+      }
+      LOG(FATAL) << "Could not differentiate between two deltas";
+    }
+  };
+
+  // Merges two SelectedDeltas together on a row-by-row basis.
+  void MergeFrom(const SelectedDeltas& other);
+
+  // Considers a new delta, possibly adding it to 'rows_'.
+  void ProcessDelta(rowid_t row_idx, Delta new_delta);
+
+  // Clears out 'rows_' and makes it suitable for handling 'nrows'.
+  void Reset(size_t nrows);
+
+  // All tracked deltas, indexed by row ordinal.
+  //
+  // If an element is boost::none, there are no deltas for that row.
+  std::vector<boost::optional<DeltaPair>> rows_;
+};
+
 // Interface for the pieces of the system that track deltas/updates.
 // This is implemented by DeltaMemStore and by DeltaFileReader.
 class DeltaStore {
@@ -155,13 +271,11 @@ class PreparedDeltas {
   // Deltas must have been prepared with the flag PREPARE_FOR_APPLY.
   virtual Status ApplyDeletes(SelectionVector* sel_vec) = 0;
 
-  // Updates the given selection vector to reflect the snapshotted updates.
-  //
-  // Rows which have been updated or deleted in the associated MVCC snapshot are
-  // set to 1 in the selection vector so that they show up in the output.
+  // Modifies the given SelectedDeltas to include rows with relevant deltas from
+  // the current prepared batch.
   //
   // Deltas must have been prepared with the flag PREPARE_FOR_SELECT.
-  virtual Status SelectUpdates(SelectionVector* sel_vec) = 0;
+  virtual Status SelectDeltas(SelectedDeltas* deltas) = 0;
 
   // Collects the mutations associated with each row in the current prepared batch.
   //
@@ -282,7 +396,7 @@ struct DeltaFilePreparerTraits {
 // is responsible for loading encoded deltas from a backing store, passing them
 // to the DeltaPreparer to be transformed, and later, calling the DeltaPreparer
 // to serve the deltas.
-template <class Traits>
+template<class Traits>
 class DeltaPreparer : public PreparedDeltas {
  public:
   explicit DeltaPreparer(RowIteratorOptions opts);
@@ -320,7 +434,7 @@ class DeltaPreparer : public PreparedDeltas {
 
   Status ApplyDeletes(SelectionVector* sel_vec) override;
 
-  Status SelectUpdates(SelectionVector* sel_vec) override;
+  Status SelectDeltas(SelectedDeltas* deltas) override;
 
   Status CollectMutations(std::vector<Mutation*>* dst, Arena* arena) override;
 
@@ -335,6 +449,10 @@ class DeltaPreparer : public PreparedDeltas {
   const RowIteratorOptions& opts() const { return opts_; }
 
  private:
+  // If 'decoder' is not yet initialized, initializes it in accordance with the
+  // preparer's traits.
+  static Status InitDecoderIfNecessary(RowChangeListDecoder* decoder);
+
   // Checks whether we are done processing a row's deltas. If so, attempts to
   // convert the row's latest deletion state into a saved deletion or
   // reinsertion. By deferring this work to when a row is finished, we avoid
@@ -401,7 +519,11 @@ class DeltaPreparer : public PreparedDeltas {
 
   // State when prepared_for_ & PREPARED_FOR_SELECT
   // ------------------------------------------------------------
-  std::unique_ptr<SelectionVector> selected_;
+  SelectedDeltas selected_;
+
+  // The number of deltas selected so far by this DeltaPreparer. Used to build
+  // disambiguators (see SelectedDeltas::Delta). Never reset.
+  int64_t deltas_selected_;
 
   DISALLOW_COPY_AND_ASSIGN(DeltaPreparer);
 };
diff --git a/src/kudu/tablet/deltafile.cc b/src/kudu/tablet/deltafile.cc
index e6b7a0a..26a3495 100644
--- a/src/kudu/tablet/deltafile.cc
+++ b/src/kudu/tablet/deltafile.cc
@@ -681,8 +681,8 @@ Status DeltaFileIterator<Type>::ApplyDeletes(SelectionVector* sel_vec) {
 }
 
 template<DeltaType Type>
-Status DeltaFileIterator<Type>::SelectUpdates(SelectionVector* sel_vec) {
-  return preparer_.SelectUpdates(sel_vec);
+Status DeltaFileIterator<Type>::SelectDeltas(SelectedDeltas* deltas) {
+  return preparer_.SelectDeltas(deltas);
 }
 
 template<DeltaType Type>
diff --git a/src/kudu/tablet/deltafile.h b/src/kudu/tablet/deltafile.h
index a93d5a3..56632ff 100644
--- a/src/kudu/tablet/deltafile.h
+++ b/src/kudu/tablet/deltafile.h
@@ -236,7 +236,7 @@ class DeltaFileIterator : public DeltaIterator {
 
   Status ApplyDeletes(SelectionVector* sel_vec) override;
 
-  Status SelectUpdates(SelectionVector* sel_vec) override;
+  Status SelectDeltas(SelectedDeltas* deltas) override;
 
   Status CollectMutations(std::vector<Mutation*>*dst, Arena* arena) override;
 
diff --git a/src/kudu/tablet/deltamemstore.cc b/src/kudu/tablet/deltamemstore.cc
index d2faea3..e356655 100644
--- a/src/kudu/tablet/deltamemstore.cc
+++ b/src/kudu/tablet/deltamemstore.cc
@@ -289,8 +289,8 @@ Status DMSIterator::ApplyDeletes(SelectionVector* sel_vec) {
   return preparer_.ApplyDeletes(sel_vec);
 }
 
-Status DMSIterator::SelectUpdates(SelectionVector* sel_vec) {
-  return preparer_.SelectUpdates(sel_vec);
+Status DMSIterator::SelectDeltas(SelectedDeltas* deltas) {
+  return preparer_.SelectDeltas(deltas);
 }
 
 Status DMSIterator::CollectMutations(vector<Mutation*>*dst, Arena* arena) {
diff --git a/src/kudu/tablet/deltamemstore.h b/src/kudu/tablet/deltamemstore.h
index 01b942f..687f45e 100644
--- a/src/kudu/tablet/deltamemstore.h
+++ b/src/kudu/tablet/deltamemstore.h
@@ -212,7 +212,7 @@ class DMSIterator : public DeltaIterator {
 
   Status ApplyDeletes(SelectionVector* sel_vec) override;
 
-  Status SelectUpdates(SelectionVector* sel_vec) override;
+  Status SelectDeltas(SelectedDeltas* deltas) override;
 
   Status CollectMutations(std::vector<Mutation*>* dst, Arena* arena) override;
 
diff --git a/src/kudu/tablet/tablet-test-util.h b/src/kudu/tablet/tablet-test-util.h
index 07ddb66..1b67fa9 100644
--- a/src/kudu/tablet/tablet-test-util.h
+++ b/src/kudu/tablet/tablet-test-util.h
@@ -354,8 +354,8 @@ class MirroredDeltas {
   using ComparatorType = typename std::conditional<T::kTag == REDO,
                                                    std::less<Timestamp>,
                                                    std::greater<Timestamp>>::type;
-  using MirroredDeltaMap = std::map<rowid_t,
-                                    std::map<Timestamp, faststring, ComparatorType>>;
+  using MirroredDeltaTimestampMap = std::map<Timestamp, faststring, ComparatorType>;
+  using MirroredDeltaMap = std::map<rowid_t, MirroredDeltaTimestampMap>;
 
   explicit MirroredDeltas(const Schema* schema)
       : schema_(schema),
@@ -499,17 +499,53 @@ class MirroredDeltas {
   //
   // Deltas not relevant to 'lower_ts' or 'upper_ts' are skipped. The set of
   // rows considered is determined by 'start_row_idx' and the number of rows in 'sel_vec'.
-  void SelectUpdates(Timestamp lower_ts, Timestamp upper_ts,
+  void SelectDeltas(Timestamp lower_ts, Timestamp upper_ts,
                      rowid_t start_row_idx, SelectionVector* sel_vec) {
     for (int i = 0; i < sel_vec->nrows(); i++) {
+      boost::optional<const typename MirroredDeltaTimestampMap::mapped_type&> first;
+      boost::optional<const typename MirroredDeltaTimestampMap::mapped_type&> last;
       for (const auto& e : all_deltas_[start_row_idx + i]) {
         if (!IsDeltaRelevantForSelect(lower_ts, upper_ts, e.first)) {
           // Must keep iterating; short-circuit out of the select criteria is
           // complex and not worth using in test code.
           continue;
         }
+        if (!first.is_initialized()) {
+          first = e.second;
+        }
+        last = e.second;
+      }
+
+      // No relevant deltas.
+      if (!first) {
+        continue;
+      }
+
+      // One relevant delta.
+      if (first == last) {
+        sel_vec->SetRowSelected(i);
+        continue;
+      }
+
+      // At least two relevant deltas.
+      bool first_liveness;
+      {
+        RowChangeList changes(*first);
+        RowChangeListDecoder decoder(changes);
+        decoder.InitNoSafetyChecks();
+        first_liveness = !decoder.is_reinsert();
+      }
+      bool last_liveness;
+      {
+        RowChangeList changes(*last);
+        RowChangeListDecoder decoder(changes);
+        decoder.InitNoSafetyChecks();
+        last_liveness = !decoder.is_delete();
+      }
+      if (!first_liveness && !last_liveness) {
+        sel_vec->SetRowUnselected(i);
+      } else {
         sel_vec->SetRowSelected(i);
-        break;
       }
     }
   }
@@ -936,7 +972,7 @@ void RunDeltaFuzzTest(const DeltaStore& store,
       }
       ASSERT_OK(iter->PrepareBatch(batch_size, prepare_flags));
 
-      // Test SelectUpdates: the selection vector begins all false and a row is
+      // Test SelectDeltas: the selection vector begins all false and a row is
       // set if there is at least one relevant update for it.
       //
       // Note: we retain 'actual_selected' for use as a possible filter in the
@@ -946,9 +982,13 @@ void RunDeltaFuzzTest(const DeltaStore& store,
         SelectionVector expected_selected(batch_size);
         expected_selected.SetAllFalse();
         actual_selected.SetAllFalse();
-        mirror->SelectUpdates(*lower_ts, upper_ts, start_row_idx, &expected_selected);
-        ASSERT_OK(iter->SelectUpdates(&actual_selected));
-        ASSERT_EQ(expected_selected, actual_selected);
+        mirror->SelectDeltas(*lower_ts, upper_ts, start_row_idx, &expected_selected);
+        SelectedDeltas deltas(batch_size);
+        ASSERT_OK(iter->SelectDeltas(&deltas));
+        deltas.ToSelectionVector(&actual_selected);
+        ASSERT_EQ(expected_selected, actual_selected)
+            << "Expected selvec: " << expected_selected.ToString()
+            << "\nActual selvec: " << actual_selected.ToString();
       }
 
       // Test ApplyDeletes: the selection vector is all true and a row is unset
@@ -963,7 +1003,9 @@ void RunDeltaFuzzTest(const DeltaStore& store,
         actual_deleted.SetAllTrue();
         ASSERT_OK(mirror->ApplyDeletes(upper_ts, start_row_idx, &expected_deleted));
         ASSERT_OK(iter->ApplyDeletes(&actual_deleted));
-        ASSERT_EQ(expected_deleted, actual_deleted);
+        ASSERT_EQ(expected_deleted, actual_deleted)
+            << "Expected selvec: " << expected_deleted.ToString()
+            << "\nActual selvec: " << actual_deleted.ToString();
       }
 
       // Test ApplyUpdates: all relevant updates are applied to the column block.
diff --git a/src/kudu/tablet/tablet-test.cc b/src/kudu/tablet/tablet-test.cc
index d714fad..27a0724 100644
--- a/src/kudu/tablet/tablet-test.cc
+++ b/src/kudu/tablet/tablet-test.cc
@@ -1510,6 +1510,9 @@ TYPED_TEST(TestTablet, TestDiffScanUnobservableOperations) {
   };
 
   NO_FATALS(diff_scan_no_rows());
+
+  ASSERT_OK(this->tablet()->Flush());
+  NO_FATALS(diff_scan_no_rows());
 }
 
 } // namespace tablet


[kudu] 05/05: KUDU-2809 (5/6): add diff scan support to fuzz-itest

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 9dfe8f8e671b436996867083a4e9d82df5e1997f
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Wed Jun 5 17:51:13 2019 -0700

    KUDU-2809 (5/6): add diff scan support to fuzz-itest
    
    This patch adds diff scans to the set of operations supported by fuzz-itest.
    The existing saved_values_ map is quite difficult to reuse for verifying
    diff scan results, so instead I added a saved_redos_ map that tracks every
    insertion/mutation as a discrete "redo".
    
    For coverage, there are two new tests for KUDU-2809. I also ran fuzz-itest
    in slow mode a few thousands times on this patch series without failure.
    
    Change-Id: I3f7dae20ef1b903dba80e90d5f491e4322815fbb
    Reviewed-on: http://gerrit.cloudera.org:8080/13537
    Reviewed-by: Mike Percy <mp...@apache.org>
    Tested-by: Kudu Jenkins
---
 src/kudu/integration-tests/fuzz-itest.cc | 276 +++++++++++++++++++++++++++++--
 src/kudu/tablet/key_value_test_schema.h  |   4 +
 2 files changed, 262 insertions(+), 18 deletions(-)

diff --git a/src/kudu/integration-tests/fuzz-itest.cc b/src/kudu/integration-tests/fuzz-itest.cc
index dbc0ee4..2ee1a24 100644
--- a/src/kudu/integration-tests/fuzz-itest.cc
+++ b/src/kudu/integration-tests/fuzz-itest.cc
@@ -15,6 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <algorithm>
 #include <cstdint>
 #include <cstdlib>
 #include <functional>
@@ -31,6 +32,7 @@
 #include <gflags/gflags.h>
 #include <gflags/gflags_declare.h>
 #include <glog/logging.h>
+#include <glog/stl_logging.h>
 #include <gtest/gtest.h>
 
 #include "kudu/client/client-test-util.h"
@@ -115,6 +117,7 @@ enum TestOpType {
   TEST_COMPACT_TABLET,
   TEST_RESTART_TS,
   TEST_SCAN_AT_TIMESTAMP,
+  TEST_DIFF_SCAN,
   TEST_NUM_OP_TYPES // max value for enum
 };
 
@@ -133,24 +136,31 @@ const char* TestOpType_names[] = {
   "TEST_MAJOR_COMPACT_DELTAS",
   "TEST_COMPACT_TABLET",
   "TEST_RESTART_TS",
-  "TEST_SCAN_AT_TIMESTAMP"
+  "TEST_SCAN_AT_TIMESTAMP",
+  "TEST_DIFF_SCAN"
 };
 
 // An operation in a fuzz-test sequence.
 struct TestOp {
   // NOLINTNEXTLINE(google-explicit-constructor)
-  TestOp(TestOpType t, int v = 0) // NOLINT(runtime/explicit)
+  TestOp(TestOpType t, int v1 = 0, int v2 = 0) // NOLINT(runtime/explicit)
       : type(t),
-        val(v) {}
+        val(v1),
+        val2(v2) {}
 
   // The op to run.
   TestOpType type;
 
   // For INSERT/UPSERT/UPDATE/DELETE, the key of the row to be modified.
   // For SCAN_AT_TIMESTAMP the timestamp of the scan.
+  // For DIFF_SCAN the start timestamp of the scan.
   // Otherwise, unused.
   int val;
 
+  // For DIFF_SCAN, the end timestamp of the scan.
+  // Otherwise, unused.
+  int val2;
+
   string ToString() const {
     switch (type) {
       case TEST_FLUSH_OPS:
@@ -169,6 +179,8 @@ struct TestOp {
       case TEST_DELETE:
       case TEST_SCAN_AT_TIMESTAMP:
         return strings::Substitute("{$0, $1}", TestOpType_names[type], val);
+      case TEST_DIFF_SCAN:
+        return strings::Substitute("{$0, $1, $2}", TestOpType_names[type], val, val2);
       default:
         LOG(FATAL) << "Invalid op type: " << type;
     }
@@ -176,6 +188,31 @@ struct TestOp {
   }
 };
 
+enum RedoType {
+  INSERT,
+  UPDATE,
+  DELETE,
+};
+
+struct Redo {
+  Redo(RedoType t, int32_t k, optional<int32_t> v = boost::none)
+      : rtype(t),
+        key(k),
+        val(std::move(v)) {}
+
+  string ToString() const {
+    if (rtype == DELETE) {
+      return strings::Substitute("{DELETE key=$0}", key);
+    }
+    return strings::Substitute("{$0 key=$1 val=$2}",
+                               rtype == INSERT ? "INSERT" : "UPDATE", key,
+                               val ? std::to_string(*val) : "NULL");
+  }
+  RedoType rtype;
+  int32_t key;
+  optional<int32_t> val;
+};
+
 const vector<TestOpType> kAllOps {TEST_INSERT,
                                   TEST_INSERT_PK_ONLY,
                                   TEST_UPSERT,
@@ -189,7 +226,8 @@ const vector<TestOpType> kAllOps {TEST_INSERT,
                                   TEST_MAJOR_COMPACT_DELTAS,
                                   TEST_COMPACT_TABLET,
                                   TEST_RESTART_TS,
-                                  TEST_SCAN_AT_TIMESTAMP};
+                                  TEST_SCAN_AT_TIMESTAMP,
+                                  TEST_DIFF_SCAN};
 
 const vector<TestOpType> kPkOnlyOps {TEST_INSERT_PK_ONLY,
                                      TEST_UPSERT_PK_ONLY,
@@ -201,7 +239,8 @@ const vector<TestOpType> kPkOnlyOps {TEST_INSERT_PK_ONLY,
                                      TEST_MAJOR_COMPACT_DELTAS,
                                      TEST_COMPACT_TABLET,
                                      TEST_RESTART_TS,
-                                     TEST_SCAN_AT_TIMESTAMP};
+                                     TEST_SCAN_AT_TIMESTAMP,
+                                     TEST_DIFF_SCAN};
 
 // Test which does only random operations against a tablet, including update and random
 // get (ie scans with equal lower and upper bounds).
@@ -423,6 +462,33 @@ class FuzzTest : public KuduTest {
     }
   }
 
+  // Fully consume all rows in 'scanner', writing the results to 'rows'.
+  //
+  // If 'is_deleted' is provided (only in diff scans), will also write out the
+  // values of the IS_DELETED virtual column.
+  Status ScanAllRows(KuduScanner* scanner, vector<ExpectedKeyValueRow>* rows,
+                     vector<bool>* is_deleted) {
+    while (scanner->HasMoreRows()) {
+      KuduScanBatch batch;
+      RETURN_NOT_OK(scanner->NextBatch(&batch));
+      for (KuduScanBatch::RowPtr row : batch) {
+        ExpectedKeyValueRow ret;
+        RETURN_NOT_OK(row.GetInt32(0, &ret.key));
+        if (schema_.num_columns() == 2 && !row.IsNull(1)) {
+          ret.val = 0;
+          RETURN_NOT_OK(row.GetInt32(1, ret.val.get_ptr()));
+        }
+        if (is_deleted) {
+          bool b;
+          RETURN_NOT_OK(row.IsDeleted(&b));
+          is_deleted->push_back(b);
+        }
+        rows->emplace_back(std::move(ret));
+      }
+    }
+    return Status::OK();
+  }
+
   // Scan the tablet at 'timestamp' and compare the result to the saved values.
   void CheckScanAtTimestamp(int timestamp) {
     KuduScanner s(table_.get());
@@ -430,20 +496,9 @@ class FuzzTest : public KuduTest {
     ASSERT_OK(s.SetSnapshotRaw(timestamp));
     ASSERT_OK(s.SetFaultTolerant());
     ASSERT_OK(s.Open());
+
     vector<ExpectedKeyValueRow> found;
-    while (s.HasMoreRows()) {
-      KuduScanBatch batch;
-      ASSERT_OK(s.NextBatch(&batch));
-      for (KuduScanBatch::RowPtr row : batch) {
-        ExpectedKeyValueRow ret;
-        ASSERT_OK(row.GetInt32(0, &ret.key));
-        if (schema_.num_columns() > 1 && !row.IsNull(1)) {
-          ret.val = 0;
-          ASSERT_OK(row.GetInt32(1, ret.val.get_ptr()));
-        }
-        found.push_back(ret);
-      }
-    }
+    ASSERT_OK(ScanAllRows(&s, &found, nullptr));
 
     list<string> errors;
     CheckRowsMatchAtTimestamp(timestamp, std::move(found), &errors);
@@ -457,6 +512,133 @@ class FuzzTest : public KuduTest {
     }
   }
 
+  // Diff scan the tablet from 'start_timestamp' to 'end_timestamp' and compare
+  // the result to the saved values.
+  void CheckDiffScan(int start_timestamp, int end_timestamp) {
+    KuduScanner s(table_.get());
+    ASSERT_OK(s.SetDiffScan(start_timestamp, end_timestamp));
+    ASSERT_OK(s.Open());
+
+    vector<ExpectedKeyValueRow> found;
+    vector<bool> found_is_deleted;
+    ASSERT_OK(ScanAllRows(&s, &found, &found_is_deleted));
+
+    if (VLOG_IS_ON(1)) {
+      for (int i = 0; i < found.size(); i++) {
+        VLOG(1) << Substitute("Diff scan result: $0$1", found[i].ToString(),
+                              found_is_deleted[i] ? " (deleted)" : "");
+      }
+    }
+
+    // Use saved_redos_ to reconstruct the expected results of the diff scan.
+    //
+    // 'selected_rows' tracks which row keys are expected in the scan results
+    // using the select criteria.
+    //
+    // 'expected_rows' tracks expected values of rows and is built up using the
+    // apply criteria. After we've processed all relevant deltas, rows not in
+    // 'selected_rows' will be pruned and the results compared with the diff scan.
+    //
+    // 'is_deleted_start' and 'is_deleted_end' track liveness for each row at
+    // the beginning and end of the time range. If a row is dead in both, it
+    // shouldn't be in the diff scan results.
+    vector<bool> selected_rows(FLAGS_keyspace_size);
+    vector<ExpectedKeyValueRow> expected_rows(FLAGS_keyspace_size);
+    vector<bool> is_deleted_start(FLAGS_keyspace_size, true);
+    vector<bool> is_deleted_end(FLAGS_keyspace_size, true);
+
+    for (const auto& e : saved_redos_) {
+      int ts = e.first;
+      const auto& redos = e.second;
+      if (redos.empty()) {
+        continue;
+      }
+      VLOG(1) << "Processing redos for ts @" << ts;
+
+      if (ts >= end_timestamp) {
+        // The redo is beyond the end of the diff scan as per both the select
+        // and apply criteria. We're iterating in ascending timestamp order so
+        // this also means all future redos are irrelevant.
+        if (VLOG_IS_ON(1)) {
+          for (const auto& redo : redos) {
+            VLOG(1) << "Skipping redo " << redo.ToString();
+          }
+          continue;
+        }
+        break;
+      }
+
+      for (const auto& redo : redos) {
+        VLOG(1) << "Processing redo " << redo.ToString();
+        if (ts >= start_timestamp) {
+          // The redo is relevant as per the select criteria.
+          VLOG(1) << "Selected row " << redo.key;
+
+          if (!selected_rows[redo.key]) {
+            // This is the first relevant redo for this row.
+            is_deleted_start[redo.key] = redo.rtype == INSERT;
+            selected_rows[redo.key] = true;
+          }
+        }
+
+        // The redo is relevant as per the apply criteria.
+        is_deleted_end[redo.key] = redo.rtype == DELETE;
+        if (redo.rtype != DELETE) {
+          // Deleted rows still exist in 'expected_rows'. This is OK;
+          // 'expected_is_deleted' will reflect the deletion.
+          expected_rows[redo.key] = { redo.key, redo.val };
+        }
+        VLOG(1) << "New value for row " << redo.key << ": "
+                << expected_rows[redo.key].ToString();
+        VLOG(1) << "New is_deleted for row " << redo.key << ": "
+                << is_deleted_end[redo.key];
+      }
+    }
+    vector<bool> expected_is_deleted = is_deleted_end;
+
+    // Trim the expected results as per 'selected_rows' and start/end liveness.
+    int row_key = 0;
+    expected_rows.erase(std::remove_if(
+        expected_rows.begin(), expected_rows.end(),
+        [&](const ExpectedKeyValueRow& /*row*/) {
+          bool retval = !selected_rows[row_key] ||
+                        (is_deleted_start[row_key] && is_deleted_end[row_key]);
+          row_key++;
+          return retval;
+        }), expected_rows.end());
+    row_key = 0;
+    expected_is_deleted.erase(std::remove_if(
+        expected_is_deleted.begin(), expected_is_deleted.end(),
+        [&](bool /*is_deleted*/) {
+          bool retval = !selected_rows[row_key] ||
+                        (is_deleted_start[row_key] && is_deleted_end[row_key]);
+          row_key++;
+          return retval;
+        }), expected_is_deleted.end());
+
+    // Test the results. Note that for deleted rows, we can't compare column
+    // values; the server is free to pick whatever historical values it wants.
+    auto fail_diff_scan = [&]() {
+      FAIL() << "Diff scan verification failed\n"
+             << "Expected IS_DELETED: " << expected_is_deleted << "\n"
+             << "Found IS_DELETED: " << found_is_deleted << "\n"
+             << "Expected rows: " << expected_rows  << "\n"
+             << "Found rows: " << found;
+    };
+    if (expected_is_deleted != found_is_deleted) {
+      NO_FATALS(fail_diff_scan());
+    }
+    if (expected_rows.size() != found.size()) {
+      NO_FATALS(fail_diff_scan());
+    }
+    for (int i = 0; i < expected_rows.size(); i++) {
+      if ((expected_is_deleted[i] && expected_rows[i].key != found[i].key) ||
+          (!expected_is_deleted[i] && expected_rows[i] != found[i])) {
+        NO_FATALS(fail_diff_scan());
+      }
+    }
+  }
+
  protected:
   // Validate that the given sequence is valid and would not cause any
   // errors assuming that there are no bugs. For example, checks to make sure there
@@ -478,6 +660,8 @@ class FuzzTest : public KuduTest {
       vector<optional<ExpectedKeyValueRow>>,
       std::greater<int>> saved_values_;
 
+  map<int, vector<Redo>> saved_redos_;
+
   scoped_refptr<TabletReplica> tablet_replica_;
 };
 
@@ -630,6 +814,19 @@ void GenerateTestCase(vector<TestOp>* ops, int len, TestOpSets sets = ALL) {
         ops->emplace_back(TEST_SCAN_AT_TIMESTAMP, timestamp);
         break;
       }
+      case TEST_DIFF_SCAN: {
+        int start_timestamp = 1;
+        int end_timestamp = 1;
+        if (op_timestamps > 0) {
+          start_timestamp = (rand() % op_timestamps) + 1;
+          end_timestamp = (rand() % op_timestamps) + 1;
+          if (start_timestamp > end_timestamp) {
+            std::swap(start_timestamp, end_timestamp);
+          }
+        }
+        ops->emplace_back(TEST_DIFF_SCAN, start_timestamp, end_timestamp);
+        break;
+      }
       default:
         LOG(FATAL) << "Invalid op type: " << r;
     }
@@ -680,6 +877,7 @@ void FuzzTest::RunFuzzCase(const vector<TestOp>& test_ops,
 
   vector<optional<ExpectedKeyValueRow>> cur_val(FLAGS_keyspace_size);
   vector<optional<ExpectedKeyValueRow>> pending_val(FLAGS_keyspace_size);
+  vector<Redo> pending_redos;
 
   int i = 0;
   for (const TestOp& test_op : test_ops) {
@@ -693,24 +891,36 @@ void FuzzTest::RunFuzzCase(const vector<TestOp>& test_ops,
       case TEST_INSERT_PK_ONLY:
       case TEST_UPSERT:
       case TEST_UPSERT_PK_ONLY: {
+        RedoType rtype = pending_val[test_op.val] ? UPDATE : INSERT;
         pending_val[test_op.val] = InsertOrUpsertRow(
             test_op.val, i++, pending_val[test_op.val], test_op.type);
+
+        // A PK-only UPSERT that is converted into an UPDATE will be dropped
+        // server-side. We must do the same.
+        if (test_op.type != TEST_UPSERT_PK_ONLY || rtype != UPDATE) {
+          pending_redos.emplace_back(rtype, test_op.val, pending_val[test_op.val]->val);
+        }
         break;
       }
       case TEST_UPDATE:
         for (int j = 0; j < update_multiplier; j++) {
           pending_val[test_op.val] = MutateRow(test_op.val, i++);
+          pending_redos.emplace_back(UPDATE, test_op.val, pending_val[test_op.val]->val);
         }
         break;
       case TEST_DELETE:
         pending_val[test_op.val] = DeleteRow(test_op.val);
+        pending_redos.emplace_back(DELETE, test_op.val, boost::none);
         break;
       case TEST_FLUSH_OPS: {
         FlushSessionOrDie(session_);
         cur_val = pending_val;
         int current_time = down_cast<kudu::clock::LogicalClock*>(
             tablet()->clock().get())->GetCurrentTime();
+        VLOG(1) << "Current time: " << current_time;
         saved_values_[current_time] = cur_val;
+        saved_redos_[current_time] = pending_redos;
+        pending_redos.clear();
         break;
       }
       case TEST_FLUSH_TABLET:
@@ -734,6 +944,9 @@ void FuzzTest::RunFuzzCase(const vector<TestOp>& test_ops,
       case TEST_SCAN_AT_TIMESTAMP:
         NO_FATALS(CheckScanAtTimestamp(test_op.val));
         break;
+      case TEST_DIFF_SCAN:
+        NO_FATALS(CheckDiffScan(test_op.val, test_op.val2));
+        break;
       default:
         LOG(FATAL) << test_op.type;
     }
@@ -1029,6 +1242,33 @@ TEST_F(FuzzTest, TestUpsert_PKOnlySchema) {
      });
 }
 
+// MRS test for KUDU-2809: a row that has been inserted and deleted within the
+// time range of a diff scan is excluded from the results.
+TEST_F(FuzzTest, TestDiffScanRowLifespanInOneScanMRS) {
+  CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
+  RunFuzzCase({
+      {TEST_INSERT, 0},
+      {TEST_FLUSH_OPS},
+      {TEST_DELETE, 0},
+      {TEST_FLUSH_OPS},
+      {TEST_DIFF_SCAN, 4, 7}
+    });
+}
+
+// DRS test for KUDU-2809: a row that has been inserted and deleted within the
+// time range of a diff scan is excluded from the results.
+TEST_F(FuzzTest, TestDiffScanRowLifespanInOneScanDRS) {
+  CreateTabletAndStartClusterWithSchema(CreateKeyValueTestSchema());
+  RunFuzzCase({
+      {TEST_INSERT, 0},
+      {TEST_FLUSH_OPS},
+      {TEST_FLUSH_TABLET},
+      {TEST_DELETE, 0},
+      {TEST_FLUSH_OPS},
+      {TEST_DIFF_SCAN, 4, 7}
+    });
+}
+
 } // namespace tablet
 } // namespace kudu
 
diff --git a/src/kudu/tablet/key_value_test_schema.h b/src/kudu/tablet/key_value_test_schema.h
index da36990..b8025eb 100644
--- a/src/kudu/tablet/key_value_test_schema.h
+++ b/src/kudu/tablet/key_value_test_schema.h
@@ -42,6 +42,10 @@ struct ExpectedKeyValueRow {
     return key == other.key && val == other.val;
   }
 
+  bool operator!=(const ExpectedKeyValueRow& other) const {
+    return !(*this == other);
+  }
+
   std::string ToString() const {
     std::string ret = strings::Substitute("{$0,", key);
     if (val == boost::none) {


[kudu] 02/05: KUDU-2809 (2/6): skip unobservable rows when iterating an MRS

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 3afbb4af1313a0556c6534c7736746f5a5181342
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Sun May 19 21:31:23 2019 -0700

    KUDU-2809 (2/6): skip unobservable rows when iterating an MRS
    
    An unobservable row is one whose entire lifespan is contained within the
    time range of a diff scan. Such rows should be hidden from the diff scan
    results as they represent "changes" that the client wouldn't expect to see.
    
    This patch adds such a heuristic to the MRS iterator. It's relatively
    straight-forward because the MRS only stores REDOs and always remembers its
    rows' insertion timestamps. Delta stores are more complicated; a patch for
    handling unobservable rows for them will follow.
    
    Change-Id: I801991260749d1f810540cb32ec84c3ea6a02160
    Reviewed-on: http://gerrit.cloudera.org:8080/13534
    Reviewed-by: Mike Percy <mp...@apache.org>
    Tested-by: Kudu Jenkins
---
 src/kudu/tablet/memrowset-test.cc |  2 +-
 src/kudu/tablet/memrowset.cc      | 49 ++++++++++++++++++-----
 src/kudu/tablet/memrowset.h       |  9 ++++-
 src/kudu/tablet/tablet-test.cc    | 83 +++++++++++++++++++++++++++++++++++++++
 4 files changed, 132 insertions(+), 11 deletions(-)

diff --git a/src/kudu/tablet/memrowset-test.cc b/src/kudu/tablet/memrowset-test.cc
index f32002f..0e3b363 100644
--- a/src/kudu/tablet/memrowset-test.cc
+++ b/src/kudu/tablet/memrowset-test.cc
@@ -727,7 +727,7 @@ TEST_P(ParameterizedTestMemRowSet, TestScanSnapToExclude) {
   }
 
   {
-    NO_FATALS(DumpAndCheck(snaps[0], snaps[3], deleted_v, true)); // INSERT, UPDATE, DELETE
+    NO_FATALS(DumpAndCheck(snaps[0], snaps[3], boost::none)); // INSERT, UPDATE, DELETE
     NO_FATALS(DumpAndCheck(snaps[1], snaps[4], 2)); // UPDATE, DELETE, REINSERT
   }
 
diff --git a/src/kudu/tablet/memrowset.cc b/src/kudu/tablet/memrowset.cc
index 9f56851..58cabab 100644
--- a/src/kudu/tablet/memrowset.cc
+++ b/src/kudu/tablet/memrowset.cc
@@ -531,9 +531,10 @@ Status MemRowSet::Iterator::FetchRows(RowBlock* dst, size_t* fetched) {
       Mutation* redo_head = reinterpret_cast<Mutation*>(
           base::subtle::Acquire_Load(reinterpret_cast<AtomicWord*>(&row.header_->redo_head)));
       RETURN_NOT_OK(ApplyMutationsToProjectedRow(
-          redo_head, &dst_row, dst->arena(), &apply_status));
+          redo_head, &dst_row, dst->arena(), insert_excluded, &apply_status));
       unset_in_sel_vector = (apply_status == APPLIED_AND_DELETED && !opts_.include_deleted_rows) ||
-                            (apply_status == NONE_APPLIED && insert_excluded);
+                            (apply_status == NONE_APPLIED && insert_excluded) ||
+                            (apply_status == APPLIED_AND_UNOBSERVABLE);
     } else {
       // The insertion is too new; the entire row should be omitted.
       unset_in_sel_vector = true;
@@ -563,7 +564,7 @@ Status MemRowSet::Iterator::FetchRows(RowBlock* dst, size_t* fetched) {
 
 Status MemRowSet::Iterator::ApplyMutationsToProjectedRow(
     const Mutation* mutation_head, RowBlockRow* dst_row, Arena* dst_arena,
-    ApplyStatus* apply_status) {
+    bool insert_excluded, ApplyStatus* apply_status) {
   ApplyStatus local_apply_status = NONE_APPLIED;
 
   // Fast short-circuit the likely case of a row which was inserted and never
@@ -573,7 +574,23 @@ Status MemRowSet::Iterator::ApplyMutationsToProjectedRow(
     return Status::OK();
   }
 
-  bool is_deleted = false;
+  // In order to find unobservable rows, we need to track row liveness at the
+  // start and end of the time range. If a row was dead at both ends, its
+  // lifespan must have been a subset of the time range and it should be
+  // excluded from the results.
+  //
+  // Finding 'is_deleted_end' is relatively straight-forward: we use each
+  // relevant mutation to drive a liveness state machine, and after we're done
+  // applying, 'is_deleted_end' is just the final value of that state machine.
+  //
+  // Finding 'is_deleted_start' is trickier. If the insertion was inside the
+  // time range, we know the value is true because the row was dead prior to the
+  // insertion and the insertion happened after the start of the time range.
+  // However, if the insertion was excluded from the time range, the value is
+  // going to be whatever the value of the liveness state machine was at the
+  // start of the time range.
+  bool is_deleted_start = !insert_excluded;
+  bool is_deleted_end = false;
 
   for (const Mutation *mut = mutation_head;
        mut != nullptr;
@@ -591,6 +608,12 @@ Status MemRowSet::Iterator::ApplyMutationsToProjectedRow(
     // count towards the overall "application status".
     if (!opts_.snap_to_exclude ||
         !opts_.snap_to_exclude->IsCommitted(mut->timestamp_)) {
+
+      // This is the first mutation within the time range, so we may use it to
+      // initialize 'is_deleted_start'.
+      if (local_apply_status == NONE_APPLIED && insert_excluded) {
+        is_deleted_start = is_deleted_end;
+      }
       local_apply_status = APPLIED_AND_PRESENT;
     }
 
@@ -600,11 +623,11 @@ Status MemRowSet::Iterator::ApplyMutationsToProjectedRow(
     RowChangeListDecoder decoder(mut->changelist());
     RETURN_NOT_OK(decoder.Init());
     if (decoder.is_delete()) {
-      decoder.TwiddleDeleteStatus(&is_deleted);
+      decoder.TwiddleDeleteStatus(&is_deleted_end);
     } else {
       DCHECK(decoder.is_update() || decoder.is_reinsert());
       if (decoder.is_reinsert()) {
-        decoder.TwiddleDeleteStatus(&is_deleted);
+        decoder.TwiddleDeleteStatus(&is_deleted_end);
       }
 
       // TODO(todd): this is slow, since it makes multiple passes through the rowchangelist.
@@ -620,9 +643,17 @@ Status MemRowSet::Iterator::ApplyMutationsToProjectedRow(
     }
   }
 
-  // If the most recent mutation seen for the row was a DELETE, then set the selection
-  // vector bit to 0, so it doesn't show up in the results.
-  if (is_deleted && local_apply_status == APPLIED_AND_PRESENT) {
+  if (opts_.snap_to_exclude && is_deleted_start && is_deleted_end) {
+    // The row's lifespan was a subset of the time range. It can't be observed,
+    // so it should definitely not show up in the results.
+    //
+    // Note: we condition on 'snap_to_exclude' because although insert_excluded
+    // is false on some closed time range scans, it's also false in all open
+    // time range scans, and we don't want this heuristic to fire in the latter case.
+    local_apply_status = APPLIED_AND_UNOBSERVABLE;
+  } else if (is_deleted_end && local_apply_status == APPLIED_AND_PRESENT) {
+    // The row was selected and deleted. It may be omitted from the results,
+    // depending on whether the results should include deleted rows or not.
     local_apply_status = APPLIED_AND_DELETED;
   }
 
diff --git a/src/kudu/tablet/memrowset.h b/src/kudu/tablet/memrowset.h
index 752140d..89a02ac 100644
--- a/src/kudu/tablet/memrowset.h
+++ b/src/kudu/tablet/memrowset.h
@@ -566,7 +566,8 @@ class MemRowSet::Iterator : public RowwiseIterator {
   Status FetchRows(RowBlock* dst, size_t* fetched);
 
   // Walks the mutations in 'mutation_head', applying relevant ones to 'dst_row'
-  // (performing any allocations out of 'dst_arena').
+  // (performing any allocations out of 'dst_arena'). 'insert_excluded' is true
+  // if the row's original insertion took place outside the iterator's time range.
   //
   // On success, 'apply_status' summarizes the application process.
   enum ApplyStatus {
@@ -580,10 +581,16 @@ class MemRowSet::Iterator : public RowwiseIterator {
     // At least one mutation was applied to the row, and the row's final state
     // was deleted (i.e. the last mutation was a DELETE).
     APPLIED_AND_DELETED,
+
+    // Some mutations were applied, but the sequence of applied mutations was
+    // such that clients should never see this row in their output (i.e. the row
+    // was inserted and deleted in the same timestamp).
+    APPLIED_AND_UNOBSERVABLE,
   };
   Status ApplyMutationsToProjectedRow(const Mutation* mutation_head,
                                       RowBlockRow* dst_row,
                                       Arena* dst_arena,
+                                      bool insert_excluded,
                                       ApplyStatus* apply_status);
 
   const std::shared_ptr<const MemRowSet> memrowset_;
diff --git a/src/kudu/tablet/tablet-test.cc b/src/kudu/tablet/tablet-test.cc
index 3f46522..d714fad 100644
--- a/src/kudu/tablet/tablet-test.cc
+++ b/src/kudu/tablet/tablet-test.cc
@@ -40,6 +40,7 @@
 #include "kudu/common/rowblock.h"
 #include "kudu/common/schema.h"
 #include "kudu/common/timestamp.h"
+#include "kudu/common/wire_protocol.pb.h"
 #include "kudu/fs/block_id.h"
 #include "kudu/fs/block_manager.h"
 #include "kudu/gutil/gscoped_ptr.h"
@@ -1429,5 +1430,87 @@ TEST_F(TestTabletStringKey, TestSplitKeyRangeWithMinimumValueRowSet) {
   }
 }
 
+TYPED_TEST(TestTablet, TestDiffScanUnobservableOperations) {
+  LocalTabletWriter writer(this->tablet().get(), &this->client_schema());
+  vector<LocalTabletWriter::Op> ops;
+
+  // Row 0: INSERT -> DELETE.
+
+  KuduPartialRow insert_row0(&this->client_schema());
+  this->setup_.BuildRow(&insert_row0, 0);
+  ops.emplace_back(RowOperationsPB::INSERT, &insert_row0);
+
+  KuduPartialRow delete_row0(&this->client_schema());
+  this->setup_.BuildRowKey(&delete_row0, 0);
+  ops.emplace_back(RowOperationsPB::DELETE, &delete_row0);
+
+  // Row 1: INSERT -> UPDATE -> DELETE.
+
+  KuduPartialRow insert_row1(&this->client_schema());
+  this->setup_.BuildRow(&insert_row1, 1);
+  ops.emplace_back(RowOperationsPB::INSERT, &insert_row1);
+
+  KuduPartialRow update_row1(&this->client_schema());
+  this->setup_.BuildRowKey(&update_row1, 1);
+  int col_idx = this->client_schema().num_key_columns() == 1 ? 2 : 3;
+  ASSERT_OK(update_row1.SetInt32(col_idx, 10));
+  ops.emplace_back(RowOperationsPB::UPDATE, &update_row1);
+
+  KuduPartialRow delete_row1(&this->client_schema());
+  this->setup_.BuildRowKey(&delete_row1, 1);
+  ops.emplace_back(RowOperationsPB::DELETE, &delete_row1);
+
+  // Row 2: INSERT -> DELETE -> REINSERT -> DELETE.
+
+  KuduPartialRow insert_row2(&this->client_schema());
+  this->setup_.BuildRow(&insert_row2, 2);
+  ops.emplace_back(RowOperationsPB::INSERT, &insert_row2);
+
+  KuduPartialRow first_delete_row2(&this->client_schema());
+  this->setup_.BuildRowKey(&first_delete_row2, 2);
+  ops.emplace_back(RowOperationsPB::DELETE, &first_delete_row2);
+
+  KuduPartialRow reinsert_row2(&this->client_schema());
+  this->setup_.BuildRow(&reinsert_row2, 2);
+  ops.emplace_back(RowOperationsPB::INSERT, &reinsert_row2);
+
+  KuduPartialRow second_delete_row2(&this->client_schema());
+  this->setup_.BuildRowKey(&second_delete_row2, 2);
+  ops.emplace_back(RowOperationsPB::DELETE, &second_delete_row2);
+
+  // Write all operations to the tablet as part of the same batch. This means
+  // that they will all be assigned the same timestamp.
+  ASSERT_OK(writer.WriteBatch(ops));
+
+  // Performs a diff scan, expecting an empty result set because all three rows
+  // are deleted at all times.
+  auto diff_scan_no_rows = [&]() {
+    // Create a projection with an IS_DELETED virtual column.
+    vector<ColumnSchema> col_schemas(this->client_schema().columns());
+    bool read_default = false;
+    col_schemas.emplace_back("is_deleted", IS_DELETED, /*is_nullable=*/ false,
+                             &read_default);
+    Schema projection(col_schemas, this->client_schema().num_key_columns());
+
+    // Do the diff scan.
+    RowIteratorOptions opts;
+    opts.projection = &projection;
+    opts.snap_to_exclude = MvccSnapshot(Timestamp(1));
+    opts.snap_to_include = MvccSnapshot(Timestamp(2));
+    opts.include_deleted_rows = true;
+    unique_ptr<RowwiseIterator> iter;
+    ASSERT_OK(this->tablet()->NewRowIterator(std::move(opts), &iter));
+    ASSERT_OK(iter->Init(nullptr));
+    vector<string> lines;
+    ASSERT_OK(IterateToStringList(iter.get(), &lines));
+
+    // Test the results.
+    SCOPED_TRACE(JoinStrings(lines, "\n"));
+    ASSERT_TRUE(lines.empty());
+  };
+
+  NO_FATALS(diff_scan_no_rows());
+}
+
 } // namespace tablet
 } // namespace kudu


[kudu] 03/05: KUDU-2809 (3/6): C++ private API for diff scans

Posted by ad...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git

commit 552e7f66a6b584ff7c9971c14bab16f32214ddd5
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Wed Jun 5 17:39:42 2019 -0700

    KUDU-2809 (3/6): C++ private API for diff scans
    
    This patch adds just enough of a private diff scan API so that fuzz-itest
    can use it. I modeled it after Grant's Java work in commit 5e78e4a5c.
    
    No tests here, but this gets plenty of test coverage in an upcoming
    fuzz-itest patch, and I think that's fine given fuzz-itest is the only
    thing motivating the C++ API in the first place.
    
    Change-Id: I03bd5e51c553b239e36fbdfc4a94131c9599851e
    Reviewed-on: http://gerrit.cloudera.org:8080/13535
    Tested-by: Kudu Jenkins
    Reviewed-by: Mike Percy <mp...@apache.org>
---
 src/kudu/client/client.cc              | 14 ++++++++
 src/kudu/client/client.h               | 25 ++++++++++++++
 src/kudu/client/scan_batch.cc          |  8 +++++
 src/kudu/client/scan_batch.h           |  8 +++++
 src/kudu/client/scan_configuration.cc  | 62 ++++++++++++++++++++++++++++++----
 src/kudu/client/scan_configuration.h   | 29 +++++++++++++---
 src/kudu/client/scan_token-internal.cc |  8 ++++-
 src/kudu/client/scanner-internal.cc    |  3 ++
 8 files changed, 146 insertions(+), 11 deletions(-)

diff --git a/src/kudu/client/client.cc b/src/kudu/client/client.cc
index 041625a..8cde561 100644
--- a/src/kudu/client/client.cc
+++ b/src/kudu/client/client.cc
@@ -1334,6 +1334,13 @@ Status KuduScanner::SetSnapshotRaw(uint64_t snapshot_timestamp) {
   return Status::OK();
 }
 
+Status KuduScanner::SetDiffScan(uint64_t start_timestamp, uint64_t end_timestamp) {
+  if (data_->open_) {
+    return Status::IllegalState("Diff scan must be set before Open()");
+  }
+  return data_->mutable_configuration()->SetDiffScan(start_timestamp, end_timestamp);
+}
+
 Status KuduScanner::SetSelection(KuduClient::ReplicaSelection selection) {
   if (data_->open_) {
     return Status::IllegalState("Replica selection must be set before Open()");
@@ -1448,6 +1455,9 @@ string KuduScanner::ToString() const {
 Status KuduScanner::Open() {
   CHECK(!data_->open_) << "Scanner already open";
 
+  if (data_->configuration().has_start_timestamp()) {
+    RETURN_NOT_OK(data_->mutable_configuration()->AddIsDeletedColumn());
+  }
   data_->mutable_configuration()->OptimizeScanSpec();
   data_->partition_pruner_.Init(*data_->table_->schema().schema_,
                                 data_->table_->partition_schema(),
@@ -1719,6 +1729,10 @@ Status KuduScanTokenBuilder::SetSnapshotMicros(uint64_t snapshot_timestamp_micro
   return Status::OK();
 }
 
+Status KuduScanTokenBuilder::SetDiffScan(uint64_t start_timestamp, uint64_t end_timestamp) {
+  return data_->mutable_configuration()->SetDiffScan(start_timestamp, end_timestamp);
+}
+
 Status KuduScanTokenBuilder::SetSnapshotRaw(uint64_t snapshot_timestamp) {
   data_->mutable_configuration()->SetSnapshotRaw(snapshot_timestamp);
   return Status::OK();
diff --git a/src/kudu/client/client.h b/src/kudu/client/client.h
index a5ad313..fb5c0c0 100644
--- a/src/kudu/client/client.h
+++ b/src/kudu/client/client.h
@@ -2138,6 +2138,27 @@ class KUDU_EXPORT KuduScanner {
   /// @return Operation result status.
   Status SetSnapshotRaw(uint64_t snapshot_timestamp) WARN_UNUSED_RESULT;
 
+  /// @cond PRIVATE_API
+
+  /// Set the start and end timestamp for a diff scan. The timestamps should be
+  /// encoded HT timestamps.
+  ///
+  /// Additionally sets any other scan properties required by diff scans.
+  ///
+  /// Private API.
+  ///
+  /// @param [in] start_timestamp
+  ///   Start timestamp to set in raw encoded form
+  ///   (i.e. as returned by a previous call to a server).
+  /// @param [in] end_timestamp
+  ///   End timestamp to set in raw encoded form
+  ///   (i.e. as returned by a previous call to a server).
+  /// @return Operation result status.
+  Status SetDiffScan(uint64_t start_timestamp, uint64_t end_timestamp)
+      WARN_UNUSED_RESULT KUDU_NO_EXPORT;
+
+  /// @endcond
+
   /// Set the maximum time that Open() and NextBatch() are allowed to take.
   ///
   /// @param [in] millis
@@ -2385,6 +2406,10 @@ class KUDU_EXPORT KuduScanTokenBuilder {
   /// @copydoc KuduScanner::SetSnapshotRaw
   Status SetSnapshotRaw(uint64_t snapshot_timestamp) WARN_UNUSED_RESULT;
 
+  /// @copydoc KuduScanner::SetDiffScan
+  Status SetDiffScan(uint64_t start_timestamp, uint64_t end_timestamp)
+      WARN_UNUSED_RESULT KUDU_NO_EXPORT;
+
   /// @copydoc KuduScanner::SetTimeoutMillis
   Status SetTimeoutMillis(int millis) WARN_UNUSED_RESULT;
 
diff --git a/src/kudu/client/scan_batch.cc b/src/kudu/client/scan_batch.cc
index d79397f..8ecd347 100644
--- a/src/kudu/client/scan_batch.cc
+++ b/src/kudu/client/scan_batch.cc
@@ -122,6 +122,14 @@ bool KuduScanBatch::RowPtr::IsNull(const Slice& col_name) const {
   return IsNull(col_idx);
 }
 
+Status KuduScanBatch::RowPtr::IsDeleted(bool* val) const {
+  int col_idx = schema_->first_is_deleted_virtual_column_idx();
+  if (col_idx == Schema::kColumnNotFound) {
+    return Status::NotFound("IS_DELETED virtual column not found");
+  }
+  return Get<TypeTraits<IS_DELETED> >(col_idx, val);
+}
+
 Status KuduScanBatch::RowPtr::GetBool(const Slice& col_name, bool* val) const {
   return Get<TypeTraits<BOOL> >(col_name, val);
 }
diff --git a/src/kudu/client/scan_batch.h b/src/kudu/client/scan_batch.h
index e975e91..6a127e2 100644
--- a/src/kudu/client/scan_batch.h
+++ b/src/kudu/client/scan_batch.h
@@ -172,6 +172,14 @@ class KUDU_EXPORT KuduScanBatch::RowPtr {
   /// @return @c true iff the specified column of the row has @c NULL value.
   bool IsNull(int col_idx) const;
 
+  /// Get the value of the IS_DELETED virtual column.
+  ///
+  /// @param [out] val
+  ///   Placeholder for the result value.
+  /// @return Operation result status. Return a bad Status if there is no
+  ///   IS_DELETED virtual column in the schema.
+  Status IsDeleted(bool* val) const WARN_UNUSED_RESULT KUDU_NO_EXPORT;
+
   /// @name Getters for integral type columns by column name.
   ///
   /// @param [in] col_name
diff --git a/src/kudu/client/scan_configuration.cc b/src/kudu/client/scan_configuration.cc
index 700e64e..3e74b0f 100644
--- a/src/kudu/client/scan_configuration.cc
+++ b/src/kudu/client/scan_configuration.cc
@@ -17,6 +17,7 @@
 
 #include "kudu/client/scan_configuration.h"
 
+#include <cstddef>
 #include <memory>
 #include <string>
 #include <utility>
@@ -26,6 +27,7 @@
 #include "kudu/client/scan_predicate-internal.h"
 #include "kudu/client/scan_predicate.h"
 #include "kudu/common/column_predicate.h"
+#include "kudu/common/common.pb.h"
 #include "kudu/common/encoded_key.h"
 #include "kudu/common/partial_row.h"
 #include "kudu/common/schema.h"
@@ -41,6 +43,7 @@ namespace client {
 
 const uint64_t ScanConfiguration::kNoTimestamp = KuduClient::kNoTimestamp;
 const int ScanConfiguration::kHtTimestampBitsToShift = 12;
+const char* ScanConfiguration::kDefaultIsDeletedColName = "is_deleted";
 
 ScanConfiguration::ScanConfiguration(KuduTable* table)
     : table_(table),
@@ -51,6 +54,7 @@ ScanConfiguration::ScanConfiguration(KuduTable* table)
       selection_(KuduClient::CLOSEST_REPLICA),
       read_mode_(KuduScanner::READ_LATEST),
       is_fault_tolerant_(false),
+      start_timestamp_(kNoTimestamp),
       snapshot_timestamp_(kNoTimestamp),
       lower_bound_propagation_timestamp_(kNoTimestamp),
       timeout_(MonoDelta::FromMilliseconds(KuduScanner::kScanTimeoutMillis)),
@@ -84,12 +88,7 @@ Status ScanConfiguration::SetProjectedColumnIndexes(const vector<int>& col_index
     }
     cols.push_back(table_schema->column(col_index));
   }
-
-  unique_ptr<Schema> s(new Schema());
-  RETURN_NOT_OK(s->Reset(cols, 0));
-  projection_ = pool_.Add(s.release());
-  client_projection_ = KuduSchema::FromSchema(*projection_);
-  return Status::OK();
+  return CreateProjection(cols);
 }
 
 Status ScanConfiguration::AddConjunctPredicate(KuduPredicate* pred) {
@@ -181,6 +180,21 @@ void ScanConfiguration::SetSnapshotRaw(uint64_t snapshot_timestamp) {
   snapshot_timestamp_ = snapshot_timestamp;
 }
 
+Status ScanConfiguration::SetDiffScan(uint64_t start_timestamp, uint64_t end_timestamp) {
+  if (start_timestamp == kNoTimestamp) {
+    return Status::IllegalState("Start timestamp must be set bigger than 0");
+  }
+  if (start_timestamp > end_timestamp) {
+    return Status::IllegalState("Start timestamp must be less than or equal to "
+                                "end timestamp");
+  }
+  RETURN_NOT_OK(SetFaultTolerant(true));
+  RETURN_NOT_OK(SetReadMode(KuduScanner::READ_AT_SNAPSHOT));
+  start_timestamp_ = start_timestamp;
+  snapshot_timestamp_ = end_timestamp;
+  return Status::OK();
+}
+
 void ScanConfiguration::SetScanLowerBoundTimestampRaw(uint64_t propagation_timestamp) {
   lower_bound_propagation_timestamp_ = propagation_timestamp;
 }
@@ -202,6 +216,34 @@ Status ScanConfiguration::SetLimit(int64_t limit) {
   return Status::OK();
 }
 
+Status ScanConfiguration::AddIsDeletedColumn() {
+  CHECK(has_start_timestamp());
+  CHECK(has_snapshot_timestamp());
+
+  // Convert the current projection back into ColumnSchemas.
+  vector<ColumnSchema> cols;
+  cols.reserve(projection_->num_columns() + 1);
+  for (size_t i = 0; i < projection_->num_columns(); i++) {
+    cols.emplace_back(projection_->column(i));
+  }
+
+  // Generate a unique name for the IS_DELETED virtual column.
+  string col_name = kDefaultIsDeletedColName;
+  while (table_->schema().schema_->find_column(col_name) != Schema::kColumnNotFound) {
+    col_name += "_";
+  }
+
+  // Add the IS_DELETED virtual column to the list of projected columns.
+  bool read_default = false;
+  ColumnSchema is_deleted(col_name,
+                          IS_DELETED,
+                          /*is_nullable=*/false,
+                          &read_default);
+  cols.emplace_back(std::move(is_deleted));
+
+  return CreateProjection(cols);
+}
+
 void ScanConfiguration::OptimizeScanSpec() {
   spec_.OptimizeScan(*table_->schema().schema_,
                      &arena_,
@@ -209,5 +251,13 @@ void ScanConfiguration::OptimizeScanSpec() {
                      /* remove_pushed_predicates */ false);
 }
 
+Status ScanConfiguration::CreateProjection(const vector<ColumnSchema>& cols) {
+  unique_ptr<Schema> s(new Schema());
+  RETURN_NOT_OK(s->Reset(cols, 0));
+  projection_ = pool_.Add(s.release());
+  client_projection_ = KuduSchema::FromSchema(*projection_);
+  return Status::OK();
+}
+
 } // namespace client
 } // namespace kudu
diff --git a/src/kudu/client/scan_configuration.h b/src/kudu/client/scan_configuration.h
index 4569bb4..f45e166 100644
--- a/src/kudu/client/scan_configuration.h
+++ b/src/kudu/client/scan_configuration.h
@@ -36,6 +36,7 @@
 namespace kudu {
 
 class ColumnPredicate;
+class ColumnSchema;
 class KuduPartialRow;
 class Schema;
 
@@ -82,24 +83,27 @@ class ScanConfiguration {
 
   Status SetFaultTolerant(bool fault_tolerant) WARN_UNUSED_RESULT;
 
-  // Sets the timestamp the scan must be executed at, in microseconds
-  // since the Unix epoch. Requires READ_AT_SNAPSHOT scan mode.
   void SetSnapshotMicros(uint64_t snapshot_timestamp_micros);
 
-  // Sets a previously encoded timestamp as a snapshot timestamp.
-  // Requires READ_AT_SNAPSHOT scan mode.
   void SetSnapshotRaw(uint64_t snapshot_timestamp);
 
   // Set the lower bound of scan's propagation timestamp.
   // It is only used in READ_YOUR_WRITES scan mode.
   void SetScanLowerBoundTimestampRaw(uint64_t propagation_timestamp);
 
+  Status SetDiffScan(uint64_t start_timestamp, uint64_t end_timestamp);
+
   void SetTimeoutMillis(int millis);
 
   Status SetRowFormatFlags(uint64_t flags);
 
   Status SetLimit(int64_t limit);
 
+  // Adds an IS_DELETED virtual column to the projection.
+  //
+  // Can only be used with diff scans.
+  Status AddIsDeletedColumn();
+
   void OptimizeScanSpec();
 
   const KuduTable& table() {
@@ -145,6 +149,15 @@ class ScanConfiguration {
     return is_fault_tolerant_;
   }
 
+  bool has_start_timestamp() const {
+    return start_timestamp_ != kNoTimestamp;
+  }
+
+  uint64_t start_timestamp() const {
+    CHECK(has_start_timestamp());
+    return start_timestamp_;
+  }
+
   bool has_snapshot_timestamp() const {
     return snapshot_timestamp_ != kNoTimestamp;
   }
@@ -180,6 +193,10 @@ class ScanConfiguration {
 
   static const uint64_t kNoTimestamp;
   static const int kHtTimestampBitsToShift;
+  static const char* kDefaultIsDeletedColName;
+
+  // Set projection_ and client_projection_ using a schema constructed from 'cols'.
+  Status CreateProjection(const std::vector<ColumnSchema>& cols);
 
   // Non-owned, non-null table.
   KuduTable* table_;
@@ -201,6 +218,10 @@ class ScanConfiguration {
 
   bool is_fault_tolerant_;
 
+  // Start and end timestamps in a diff scan.
+  //
+  // If just a regular snapshot scan, start_timestamp_ is ignored.
+  uint64_t start_timestamp_;
   uint64_t snapshot_timestamp_;
 
   uint64_t lower_bound_propagation_timestamp_;
diff --git a/src/kudu/client/scan_token-internal.cc b/src/kudu/client/scan_token-internal.cc
index 787f767..46bc270 100644
--- a/src/kudu/client/scan_token-internal.cc
+++ b/src/kudu/client/scan_token-internal.cc
@@ -180,7 +180,10 @@ Status KuduScanToken::Data::PBIntoScanner(KuduClient* client,
     RETURN_NOT_OK(scan_builder->SetFaultTolerant());
   }
 
-  if (message.has_snap_timestamp()) {
+  if (message.has_snap_start_timestamp() && message.has_snap_timestamp()) {
+    RETURN_NOT_OK(scan_builder->SetDiffScan(message.snap_start_timestamp(),
+                                            message.snap_timestamp()));
+  } else if (message.has_snap_timestamp()) {
     RETURN_NOT_OK(scan_builder->SetSnapshotRaw(message.snap_timestamp()));
   }
 
@@ -261,6 +264,9 @@ Status KuduScanTokenBuilder::Data::Build(vector<KuduScanToken*>* tokens) {
       break;
     case KuduScanner::READ_AT_SNAPSHOT:
       pb.set_read_mode(kudu::READ_AT_SNAPSHOT);
+      if (configuration_.has_start_timestamp()) {
+        pb.set_snap_start_timestamp(configuration_.start_timestamp());
+      }
       if (configuration_.has_snapshot_timestamp()) {
         pb.set_snap_timestamp(configuration_.snapshot_timestamp());
       }
diff --git a/src/kudu/client/scanner-internal.cc b/src/kudu/client/scanner-internal.cc
index 57fa87b..717e5de 100644
--- a/src/kudu/client/scanner-internal.cc
+++ b/src/kudu/client/scanner-internal.cc
@@ -395,6 +395,9 @@ Status KuduScanner::Data::OpenTablet(const string& partition_key,
       break;
     case KuduScanner::READ_AT_SNAPSHOT:
       scan->set_read_mode(kudu::READ_AT_SNAPSHOT);
+      if (configuration_.has_start_timestamp()) {
+        scan->set_snap_start_timestamp(configuration_.start_timestamp());
+      }
       if (configuration_.has_snapshot_timestamp()) {
         scan->set_snap_timestamp(configuration_.snapshot_timestamp());
       }