You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2018/08/07 00:25:36 UTC

[2/2] kudu git commit: memrowset: support iteration with is_deleted virtual column

memrowset: support iteration with is_deleted virtual column

This patch rounds out the MemRowSet changes for incremental backups.
Taken together, it is now possible to iterate on a specific time range and
to learn which rows were deleted during that time range.

Data type based virtual columns show a wart here: the virtual column definition
itself does not specify whether it is nullable or has a read default value. To
simplify, we enforce that it must have a read default value.

Change-Id: Ic6b053f5a3696eb9d7c26b8e3d96752f4f87bcd8
Reviewed-on: http://gerrit.cloudera.org:8080/10990
Reviewed-by: Grant Henke <gr...@apache.org>
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/cec6aa37
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/cec6aa37
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/cec6aa37

Branch: refs/heads/master
Commit: cec6aa370ada43fc0d3893df4e4342515470de25
Parents: 3e61d75
Author: Adar Dembo <ad...@cloudera.com>
Authored: Tue Jul 17 17:15:21 2018 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Tue Aug 7 00:25:21 2018 +0000

----------------------------------------------------------------------
 src/kudu/tablet/memrowset-test.cc | 162 +++++++++++++++++++++++----------
 src/kudu/tablet/memrowset.cc      |  22 ++++-
 src/kudu/tablet/memrowset.h       |  11 ++-
 3 files changed, 146 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/cec6aa37/src/kudu/tablet/memrowset-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/memrowset-test.cc b/src/kudu/tablet/memrowset-test.cc
index 627cbad..ba5134e 100644
--- a/src/kudu/tablet/memrowset-test.cc
+++ b/src/kudu/tablet/memrowset-test.cc
@@ -43,6 +43,7 @@
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/ref_counted.h"
 #include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/strcat.h"
 #include "kudu/tablet/mvcc.h"
 #include "kudu/tablet/rowset.h"
 #include "kudu/tablet/tablet-test-util.h"
@@ -68,6 +69,7 @@ using consensus::OpId;
 using log::LogAnchorRegistry;
 using std::shared_ptr;
 using std::string;
+using std::unique_ptr;
 using std::vector;
 
 class TestMemRowSet : public KuduTest {
@@ -81,10 +83,15 @@ class TestMemRowSet : public KuduTest {
   }
 
   static Schema CreateSchema() {
-    SchemaBuilder builder;
-    CHECK_OK(builder.AddKeyColumn("key", STRING));
-    CHECK_OK(builder.AddColumn("val", UINT32));
-    return builder.Build();
+    unique_ptr<SchemaBuilder> sb(CreateSchemaBuilder());
+    return sb->Build();
+  }
+
+  static unique_ptr<SchemaBuilder> CreateSchemaBuilder() {
+    unique_ptr<SchemaBuilder> builder(new SchemaBuilder());
+    CHECK_OK(builder->AddKeyColumn("key", STRING));
+    CHECK_OK(builder->AddColumn("val", UINT32));
+    return builder;
   }
 
  protected:
@@ -204,6 +211,40 @@ class TestMemRowSet : public KuduTest {
     return fetched;
   }
 
+  Status GenerateTestData(MemRowSet* mrs) {
+    // row 0 - insert
+    // row 1 - insert, update
+    // row 2 - insert, delete
+    // row 3 - insert, update, delete
+    // row 4 - insert, update, delete, reinsert
+    // row 5 - insert, update, update, delete, reinsert
+    // row 6 - insert, delete, reinsert, delete
+    RETURN_NOT_OK(InsertRow(mrs, "row 0", 0));
+    RETURN_NOT_OK(InsertRow(mrs, "row 1", 0));
+    OperationResultPB result;
+    RETURN_NOT_OK(UpdateRow(mrs, "row 1", 1, &result));
+    RETURN_NOT_OK(InsertRow(mrs, "row 2", 0));
+    RETURN_NOT_OK(DeleteRow(mrs, "row 2", &result));
+    RETURN_NOT_OK(InsertRow(mrs, "row 3", 0));
+    RETURN_NOT_OK(UpdateRow(mrs, "row 3", 1, &result));
+    RETURN_NOT_OK(DeleteRow(mrs, "row 3", &result));
+    RETURN_NOT_OK(InsertRow(mrs, "row 4", 0));
+    RETURN_NOT_OK(UpdateRow(mrs, "row 4", 1, &result));
+    RETURN_NOT_OK(DeleteRow(mrs, "row 4", &result));
+    RETURN_NOT_OK(InsertRow(mrs, "row 4", 2));
+    RETURN_NOT_OK(InsertRow(mrs, "row 5", 0));
+    RETURN_NOT_OK(UpdateRow(mrs, "row 5", 1, &result));
+    RETURN_NOT_OK(UpdateRow(mrs, "row 5", 2, &result));
+    RETURN_NOT_OK(DeleteRow(mrs, "row 5", &result));
+    RETURN_NOT_OK(InsertRow(mrs, "row 5", 3));
+    RETURN_NOT_OK(InsertRow(mrs, "row 6", 0));
+    RETURN_NOT_OK(DeleteRow(mrs, "row 6", &result));
+    RETURN_NOT_OK(InsertRow(mrs, "row 6", 1));
+    RETURN_NOT_OK(DeleteRow(mrs, "row 6", &result));
+
+    return Status::OK();
+  }
+
   OpId op_id_;
   scoped_refptr<LogAnchorRegistry> log_anchor_registry_;
 
@@ -578,13 +619,16 @@ TEST_F(TestMemRowSet, TestUpdateMVCC) {
 }
 
 class ParameterizedTestMemRowSet : public TestMemRowSet,
-                                   public ::testing::WithParamInterface<bool> {
+                                   public ::testing::WithParamInterface<std::tuple<bool, bool>> {
 };
 
 
-// Parameterized on opts.include_deleted_rows true or false.
-INSTANTIATE_TEST_CASE_P(IncludeDeletedRowsOrNot, ParameterizedTestMemRowSet,
-                        ::testing::Bool());
+// Tests the Cartesian product of two boolean parameters:
+// 1. Whether to include deleted rows in the scan.
+// 2. Whether to include the "is deleted" virtual column in the scan's projection.
+INSTANTIATE_TEST_CASE_P(RowIteratorOptionsPermutations, ParameterizedTestMemRowSet,
+                        ::testing::Combine(::testing::Bool(),
+                                           ::testing::Bool()));
 
 TEST_P(ParameterizedTestMemRowSet, TestScanSnapToExclude) {
   shared_ptr<MemRowSet> mrs;
@@ -609,19 +653,44 @@ TEST_P(ParameterizedTestMemRowSet, TestScanSnapToExclude) {
   ASSERT_OK(InsertRow(mrs.get(), "row", 2));
   snaps.emplace_back(mvcc_);
 
+  bool include_deleted_rows = std::get<0>(GetParam());
+  bool add_vc_is_deleted = std::get<1>(GetParam());
+
   auto DumpAndCheck = [&](const MvccSnapshot& exclude,
                           const MvccSnapshot& include,
-                          boost::optional<int> row_value) {
+                          boost::optional<int> row_value,
+                          bool is_deleted = false) {
+    // Set up the iterator options.
+    unique_ptr<SchemaBuilder> sb = CreateSchemaBuilder();
+    if (add_vc_is_deleted) {
+      const bool kFalse = false;
+      ASSERT_OK(sb->AddColumn("deleted", IS_DELETED,
+                              /*is_nullable=*/false,
+                              &kFalse, /*write_default=*/nullptr));
+    }
+    Schema projection = sb->Build();
     RowIteratorOptions opts;
-    opts.projection = &schema_;
+    opts.projection = &projection;
     opts.snap_to_include = include;
     opts.snap_to_exclude = exclude;
-    opts.include_deleted_rows = GetParam();
+    opts.include_deleted_rows = include_deleted_rows;
+
+    // Iterate.
     vector<string> rows;
     ASSERT_OK(DumpRowSet(*mrs, opts, &rows));
+
+    // Test the results.
     if (row_value.is_initialized()) {
       ASSERT_EQ(1, rows.size());
-      ASSERT_EQ(StringPrintf(R"((string key="row", uint32 val=%d))", row_value.get()), rows[0]);
+      string expected;
+      StrAppend(&expected, "(string key=\"row\", uint32 val=");
+      StrAppend(&expected, row_value.get());
+      if (add_vc_is_deleted) {
+        StrAppend(&expected, ", is_deleted deleted=");
+        StrAppend(&expected, is_deleted ? "true" : "false");
+      }
+      StrAppend(&expected, ")");
+      ASSERT_EQ(expected, rows[0]);
     } else {
       ASSERT_TRUE(rows.empty());
     }
@@ -634,23 +703,23 @@ TEST_P(ParameterizedTestMemRowSet, TestScanSnapToExclude) {
 
   // If we include deleted rows, the row's value will be 1 due to the UPDATE
   // that preceeded it.
-  boost::optional<int> deleted_v = GetParam() ? boost::optional<int>(1) : boost::none;
+  boost::optional<int> deleted_v = include_deleted_rows ? boost::optional<int>(1) : boost::none;
 
   {
     NO_FATALS(DumpAndCheck(snaps[0], snaps[1], 0)); // INSERT
     NO_FATALS(DumpAndCheck(snaps[1], snaps[2], 1)); // UPDATE
-    NO_FATALS(DumpAndCheck(snaps[2], snaps[3], deleted_v)); // DELETE
+    NO_FATALS(DumpAndCheck(snaps[2], snaps[3], deleted_v, true)); // DELETE
     NO_FATALS(DumpAndCheck(snaps[3], snaps[4], 2)); // REINSERT
   }
 
   {
     NO_FATALS(DumpAndCheck(snaps[0], snaps[2], 1)); // INSERT, UPDATE
-    NO_FATALS(DumpAndCheck(snaps[1], snaps[3], deleted_v)); // UPDATE, DELETE
+    NO_FATALS(DumpAndCheck(snaps[1], snaps[3], deleted_v, true)); // UPDATE, DELETE
     NO_FATALS(DumpAndCheck(snaps[2], snaps[4], 2)); // DELETE, REINSERT
   }
 
   {
-    NO_FATALS(DumpAndCheck(snaps[0], snaps[3], deleted_v)); // INSERT, UPDATE, DELETE
+    NO_FATALS(DumpAndCheck(snaps[0], snaps[3], deleted_v, true)); // INSERT, UPDATE, DELETE
     NO_FATALS(DumpAndCheck(snaps[1], snaps[4], 2)); // UPDATE, DELETE, REINSERT
   }
 
@@ -661,36 +730,7 @@ TEST_F(TestMemRowSet, TestScanIncludeDeletedRows) {
   shared_ptr<MemRowSet> mrs;
   ASSERT_OK(MemRowSet::Create(0, schema_, log_anchor_registry_.get(),
                               MemTracker::GetRootTracker(), &mrs));
-
-  // row 0 - insert
-  // row 1 - insert, update
-  // row 2 - insert, delete
-  // row 3 - insert, update, delete
-  // row 4 - insert, update, delete, reinsert
-  // row 5 - insert, update, update, delete, reinsert
-  // row 6 - insert, delete, reinsert, delete
-  ASSERT_OK(InsertRow(mrs.get(), "row 0", 0));
-  ASSERT_OK(InsertRow(mrs.get(), "row 1", 0));
-  OperationResultPB result;
-  ASSERT_OK(UpdateRow(mrs.get(), "row 1", 1, &result));
-  ASSERT_OK(InsertRow(mrs.get(), "row 2", 0));
-  ASSERT_OK(DeleteRow(mrs.get(), "row 2", &result));
-  ASSERT_OK(InsertRow(mrs.get(), "row 3", 0));
-  ASSERT_OK(UpdateRow(mrs.get(), "row 3", 1, &result));
-  ASSERT_OK(DeleteRow(mrs.get(), "row 3", &result));
-  ASSERT_OK(InsertRow(mrs.get(), "row 4", 0));
-  ASSERT_OK(UpdateRow(mrs.get(), "row 4", 1, &result));
-  ASSERT_OK(DeleteRow(mrs.get(), "row 4", &result));
-  ASSERT_OK(InsertRow(mrs.get(), "row 4", 2));
-  ASSERT_OK(InsertRow(mrs.get(), "row 5", 0));
-  ASSERT_OK(UpdateRow(mrs.get(), "row 5", 1, &result));
-  ASSERT_OK(UpdateRow(mrs.get(), "row 5", 2, &result));
-  ASSERT_OK(DeleteRow(mrs.get(), "row 5", &result));
-  ASSERT_OK(InsertRow(mrs.get(), "row 5", 3));
-  ASSERT_OK(InsertRow(mrs.get(), "row 6", 0));
-  ASSERT_OK(DeleteRow(mrs.get(), "row 6", &result));
-  ASSERT_OK(InsertRow(mrs.get(), "row 6", 1));
-  ASSERT_OK(DeleteRow(mrs.get(), "row 6", &result));
+  ASSERT_OK(GenerateTestData(mrs.get()));
 
   RowIteratorOptions opts;
   opts.projection = &schema_;
@@ -701,5 +741,35 @@ TEST_F(TestMemRowSet, TestScanIncludeDeletedRows) {
   ASSERT_EQ(7, ScanAndCount(mrs.get(), opts));
 }
 
+TEST_F(TestMemRowSet, TestScanVirtualColumnIsDeleted) {
+  shared_ptr<MemRowSet> mrs;
+  ASSERT_OK(MemRowSet::Create(0, schema_, log_anchor_registry_.get(),
+                              MemTracker::GetRootTracker(), &mrs));
+  ASSERT_OK(GenerateTestData(mrs.get()));
+
+  SchemaBuilder sb;
+  ASSERT_OK(sb.AddKeyColumn("key", STRING));
+  ASSERT_OK(sb.AddColumn("val", UINT32));
+  const bool kFalse = false;
+  ASSERT_OK(sb.AddColumn("deleted", IS_DELETED,
+                         /*is_nullable=*/false,
+                         &kFalse, /*write_default=*/nullptr));
+  Schema projection = sb.Build();
+
+  RowIteratorOptions opts;
+  opts.projection = &projection;
+  opts.snap_to_include = MvccSnapshot::CreateSnapshotIncludingAllTransactions();
+  opts.include_deleted_rows = true;
+  vector<string> rows;
+  ASSERT_OK(DumpRowSet(*mrs, opts, &rows));
+  ASSERT_EQ(7, rows.size());
+  for (const auto& row_idx_present : { 0, 1, 4, 5 }) {
+    ASSERT_STR_CONTAINS(rows[row_idx_present], "=false");
+  }
+  for (const auto& row_idx_deleted : { 2, 3, 6 }) {
+    ASSERT_STR_CONTAINS(rows[row_idx_deleted], "=true");
+  }
+}
+
 } // namespace tablet
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/cec6aa37/src/kudu/tablet/memrowset.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/memrowset.cc b/src/kudu/tablet/memrowset.cc
index ec435c5..0b68782 100644
--- a/src/kudu/tablet/memrowset.cc
+++ b/src/kudu/tablet/memrowset.cc
@@ -29,11 +29,13 @@
 #include "kudu/codegen/compilation_manager.h"
 #include "kudu/codegen/row_projector.h"
 #include "kudu/common/columnblock.h"
+#include "kudu/common/common.pb.h"
 #include "kudu/common/encoded_key.h"
 #include "kudu/common/row.h"
 #include "kudu/common/row_changelist.h"
 #include "kudu/common/rowblock.h"
 #include "kudu/common/scan_spec.h"
+#include "kudu/common/types.h"
 #include "kudu/consensus/log_anchor_registry.h"
 #include "kudu/consensus/opid.pb.h"
 #include "kudu/gutil/dynamic_annotations.h"
@@ -397,6 +399,21 @@ MemRowSet::Iterator::Iterator(const std::shared_ptr<const MemRowSet>& mrs,
   // seek. Could make this lazy instead, or change the semantics so that
   // a seek is required (probably the latter)
   iter_->SeekToStart();
+
+  // Find the first IS_DELETED virtual column, if one exists.
+  projection_vc_is_deleted_idx_ = Schema::kColumnNotFound;
+  for (int i = 0; i < opts_.projection->num_columns(); i++) {
+    const auto& col = opts_.projection->column(i);
+    if (col.type_info()->type() == IS_DELETED) {
+      // Enforce some properties on the virtual column that simplify our
+      // implementation.
+      DCHECK(!col.is_nullable());
+      DCHECK(col.has_read_default());
+
+      projection_vc_is_deleted_idx_ = i;
+      break;
+    }
+  }
 }
 
 MemRowSet::Iterator::~Iterator() {}
@@ -517,13 +534,13 @@ Status MemRowSet::Iterator::FetchRows(RowBlock* dst, size_t* fetched) {
     bool insert_excluded = opts_.snap_to_exclude &&
                            opts_.snap_to_exclude->IsCommitted(row.insertion_timestamp());
     bool unset_in_sel_vector;
+    ApplyStatus apply_status;
     if (insert_excluded || opts_.snap_to_include.IsCommitted(row.insertion_timestamp())) {
       RETURN_NOT_OK(projector_->ProjectRowForRead(row, &dst_row, dst->arena()));
 
       // Roll-forward MVCC for committed updates.
       Mutation* redo_head = reinterpret_cast<Mutation*>(
           base::subtle::Acquire_Load(reinterpret_cast<AtomicWord*>(&row.header_->redo_head)));
-      ApplyStatus apply_status;
       RETURN_NOT_OK(ApplyMutationsToProjectedRow(
           redo_head, &dst_row, dst->arena(), &apply_status));
       unset_in_sel_vector = (apply_status == APPLIED_AND_DELETED && !opts_.include_deleted_rows) ||
@@ -544,6 +561,9 @@ Status MemRowSet::Iterator::FetchRows(RowBlock* dst, size_t* fetched) {
                                      "MVCCMVCCMVCCMVCCMVCCMVCC");
       }
       #endif
+    } else if (projection_vc_is_deleted_idx_ != Schema::kColumnNotFound) {
+      UnalignedStore(dst_row.mutable_cell_ptr(projection_vc_is_deleted_idx_),
+                     apply_status == APPLIED_AND_DELETED);
     }
 
     ++*fetched;

http://git-wip-us.apache.org/repos/asf/kudu/blob/cec6aa37/src/kudu/tablet/memrowset.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/memrowset.h b/src/kudu/tablet/memrowset.h
index 9b10187..429f734 100644
--- a/src/kudu/tablet/memrowset.h
+++ b/src/kudu/tablet/memrowset.h
@@ -571,15 +571,22 @@ class MemRowSet::Iterator : public RowwiseIterator {
   const std::shared_ptr<const MemRowSet> memrowset_;
   gscoped_ptr<MemRowSet::MSBTIter> iter_;
 
-  RowIteratorOptions opts_;
+  const RowIteratorOptions opts_;
 
   // Mapping from projected column index back to memrowset column index.
   // Relies on the MRSRowProjector interface to abstract from the two
   // different implementations of the RowProjector, which may change
   // at runtime (using vs. not using code generation).
-  gscoped_ptr<MRSRowProjector> projector_;
+  const gscoped_ptr<MRSRowProjector> projector_;
   DeltaProjector delta_projector_;
 
+  // The index of the first IS_DELETED virtual column in the projection schema,
+  // or kColumnNotFound if one doesn't exist.
+  //
+  // The virtual column must not be nullable and must have a read default value.
+  // The process will crash if these constraints are not met.
+  int projection_vc_is_deleted_idx_;
+
   // Temporary buffer used for RowChangeList projection.
   faststring delta_buf_;