You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2018/08/07 00:25:36 UTC
[2/2] kudu git commit: memrowset: support iteration with is_deleted
virtual column
memrowset: support iteration with is_deleted virtual column
This patch rounds out the MemRowSet changes for incremental backups.
Taken together, it is now possible to iterate on a specific time range and
to learn which rows were deleted during that time range.
Data type based virtual columns show a wart here: the virtual column definition
itself does not specify whether it is nullable or has a read default value. To
simplify, we enforce that it must have a read default value.
Change-Id: Ic6b053f5a3696eb9d7c26b8e3d96752f4f87bcd8
Reviewed-on: http://gerrit.cloudera.org:8080/10990
Reviewed-by: Grant Henke <gr...@apache.org>
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/cec6aa37
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/cec6aa37
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/cec6aa37
Branch: refs/heads/master
Commit: cec6aa370ada43fc0d3893df4e4342515470de25
Parents: 3e61d75
Author: Adar Dembo <ad...@cloudera.com>
Authored: Tue Jul 17 17:15:21 2018 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Tue Aug 7 00:25:21 2018 +0000
----------------------------------------------------------------------
src/kudu/tablet/memrowset-test.cc | 162 +++++++++++++++++++++++----------
src/kudu/tablet/memrowset.cc | 22 ++++-
src/kudu/tablet/memrowset.h | 11 ++-
3 files changed, 146 insertions(+), 49 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/cec6aa37/src/kudu/tablet/memrowset-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/memrowset-test.cc b/src/kudu/tablet/memrowset-test.cc
index 627cbad..ba5134e 100644
--- a/src/kudu/tablet/memrowset-test.cc
+++ b/src/kudu/tablet/memrowset-test.cc
@@ -43,6 +43,7 @@
#include "kudu/gutil/gscoped_ptr.h"
#include "kudu/gutil/ref_counted.h"
#include "kudu/gutil/stringprintf.h"
+#include "kudu/gutil/strings/strcat.h"
#include "kudu/tablet/mvcc.h"
#include "kudu/tablet/rowset.h"
#include "kudu/tablet/tablet-test-util.h"
@@ -68,6 +69,7 @@ using consensus::OpId;
using log::LogAnchorRegistry;
using std::shared_ptr;
using std::string;
+using std::unique_ptr;
using std::vector;
class TestMemRowSet : public KuduTest {
@@ -81,10 +83,15 @@ class TestMemRowSet : public KuduTest {
}
static Schema CreateSchema() {
- SchemaBuilder builder;
- CHECK_OK(builder.AddKeyColumn("key", STRING));
- CHECK_OK(builder.AddColumn("val", UINT32));
- return builder.Build();
+ unique_ptr<SchemaBuilder> sb(CreateSchemaBuilder());
+ return sb->Build();
+ }
+
+ static unique_ptr<SchemaBuilder> CreateSchemaBuilder() {
+ unique_ptr<SchemaBuilder> builder(new SchemaBuilder());
+ CHECK_OK(builder->AddKeyColumn("key", STRING));
+ CHECK_OK(builder->AddColumn("val", UINT32));
+ return builder;
}
protected:
@@ -204,6 +211,40 @@ class TestMemRowSet : public KuduTest {
return fetched;
}
+ Status GenerateTestData(MemRowSet* mrs) {
+ // row 0 - insert
+ // row 1 - insert, update
+ // row 2 - insert, delete
+ // row 3 - insert, update, delete
+ // row 4 - insert, update, delete, reinsert
+ // row 5 - insert, update, update, delete, reinsert
+ // row 6 - insert, delete, reinsert, delete
+ RETURN_NOT_OK(InsertRow(mrs, "row 0", 0));
+ RETURN_NOT_OK(InsertRow(mrs, "row 1", 0));
+ OperationResultPB result;
+ RETURN_NOT_OK(UpdateRow(mrs, "row 1", 1, &result));
+ RETURN_NOT_OK(InsertRow(mrs, "row 2", 0));
+ RETURN_NOT_OK(DeleteRow(mrs, "row 2", &result));
+ RETURN_NOT_OK(InsertRow(mrs, "row 3", 0));
+ RETURN_NOT_OK(UpdateRow(mrs, "row 3", 1, &result));
+ RETURN_NOT_OK(DeleteRow(mrs, "row 3", &result));
+ RETURN_NOT_OK(InsertRow(mrs, "row 4", 0));
+ RETURN_NOT_OK(UpdateRow(mrs, "row 4", 1, &result));
+ RETURN_NOT_OK(DeleteRow(mrs, "row 4", &result));
+ RETURN_NOT_OK(InsertRow(mrs, "row 4", 2));
+ RETURN_NOT_OK(InsertRow(mrs, "row 5", 0));
+ RETURN_NOT_OK(UpdateRow(mrs, "row 5", 1, &result));
+ RETURN_NOT_OK(UpdateRow(mrs, "row 5", 2, &result));
+ RETURN_NOT_OK(DeleteRow(mrs, "row 5", &result));
+ RETURN_NOT_OK(InsertRow(mrs, "row 5", 3));
+ RETURN_NOT_OK(InsertRow(mrs, "row 6", 0));
+ RETURN_NOT_OK(DeleteRow(mrs, "row 6", &result));
+ RETURN_NOT_OK(InsertRow(mrs, "row 6", 1));
+ RETURN_NOT_OK(DeleteRow(mrs, "row 6", &result));
+
+ return Status::OK();
+ }
+
OpId op_id_;
scoped_refptr<LogAnchorRegistry> log_anchor_registry_;
@@ -578,13 +619,16 @@ TEST_F(TestMemRowSet, TestUpdateMVCC) {
}
class ParameterizedTestMemRowSet : public TestMemRowSet,
- public ::testing::WithParamInterface<bool> {
+ public ::testing::WithParamInterface<std::tuple<bool, bool>> {
};
-// Parameterized on opts.include_deleted_rows true or false.
-INSTANTIATE_TEST_CASE_P(IncludeDeletedRowsOrNot, ParameterizedTestMemRowSet,
- ::testing::Bool());
+// Tests the Cartesian product of two boolean parameters:
+// 1. Whether to include deleted rows in the scan.
+// 2. Whether to include the "is deleted" virtual column in the scan's projection.
+INSTANTIATE_TEST_CASE_P(RowIteratorOptionsPermutations, ParameterizedTestMemRowSet,
+ ::testing::Combine(::testing::Bool(),
+ ::testing::Bool()));
TEST_P(ParameterizedTestMemRowSet, TestScanSnapToExclude) {
shared_ptr<MemRowSet> mrs;
@@ -609,19 +653,44 @@ TEST_P(ParameterizedTestMemRowSet, TestScanSnapToExclude) {
ASSERT_OK(InsertRow(mrs.get(), "row", 2));
snaps.emplace_back(mvcc_);
+ bool include_deleted_rows = std::get<0>(GetParam());
+ bool add_vc_is_deleted = std::get<1>(GetParam());
+
auto DumpAndCheck = [&](const MvccSnapshot& exclude,
const MvccSnapshot& include,
- boost::optional<int> row_value) {
+ boost::optional<int> row_value,
+ bool is_deleted = false) {
+ // Set up the iterator options.
+ unique_ptr<SchemaBuilder> sb = CreateSchemaBuilder();
+ if (add_vc_is_deleted) {
+ const bool kFalse = false;
+ ASSERT_OK(sb->AddColumn("deleted", IS_DELETED,
+ /*is_nullable=*/false,
+ &kFalse, /*write_default=*/nullptr));
+ }
+ Schema projection = sb->Build();
RowIteratorOptions opts;
- opts.projection = &schema_;
+ opts.projection = &projection;
opts.snap_to_include = include;
opts.snap_to_exclude = exclude;
- opts.include_deleted_rows = GetParam();
+ opts.include_deleted_rows = include_deleted_rows;
+
+ // Iterate.
vector<string> rows;
ASSERT_OK(DumpRowSet(*mrs, opts, &rows));
+
+ // Test the results.
if (row_value.is_initialized()) {
ASSERT_EQ(1, rows.size());
- ASSERT_EQ(StringPrintf(R"((string key="row", uint32 val=%d))", row_value.get()), rows[0]);
+ string expected;
+ StrAppend(&expected, "(string key=\"row\", uint32 val=");
+ StrAppend(&expected, row_value.get());
+ if (add_vc_is_deleted) {
+ StrAppend(&expected, ", is_deleted deleted=");
+ StrAppend(&expected, is_deleted ? "true" : "false");
+ }
+ StrAppend(&expected, ")");
+ ASSERT_EQ(expected, rows[0]);
} else {
ASSERT_TRUE(rows.empty());
}
@@ -634,23 +703,23 @@ TEST_P(ParameterizedTestMemRowSet, TestScanSnapToExclude) {
// If we include deleted rows, the row's value will be 1 due to the UPDATE
// that preceeded it.
- boost::optional<int> deleted_v = GetParam() ? boost::optional<int>(1) : boost::none;
+ boost::optional<int> deleted_v = include_deleted_rows ? boost::optional<int>(1) : boost::none;
{
NO_FATALS(DumpAndCheck(snaps[0], snaps[1], 0)); // INSERT
NO_FATALS(DumpAndCheck(snaps[1], snaps[2], 1)); // UPDATE
- NO_FATALS(DumpAndCheck(snaps[2], snaps[3], deleted_v)); // DELETE
+ NO_FATALS(DumpAndCheck(snaps[2], snaps[3], deleted_v, true)); // DELETE
NO_FATALS(DumpAndCheck(snaps[3], snaps[4], 2)); // REINSERT
}
{
NO_FATALS(DumpAndCheck(snaps[0], snaps[2], 1)); // INSERT, UPDATE
- NO_FATALS(DumpAndCheck(snaps[1], snaps[3], deleted_v)); // UPDATE, DELETE
+ NO_FATALS(DumpAndCheck(snaps[1], snaps[3], deleted_v, true)); // UPDATE, DELETE
NO_FATALS(DumpAndCheck(snaps[2], snaps[4], 2)); // DELETE, REINSERT
}
{
- NO_FATALS(DumpAndCheck(snaps[0], snaps[3], deleted_v)); // INSERT, UPDATE, DELETE
+ NO_FATALS(DumpAndCheck(snaps[0], snaps[3], deleted_v, true)); // INSERT, UPDATE, DELETE
NO_FATALS(DumpAndCheck(snaps[1], snaps[4], 2)); // UPDATE, DELETE, REINSERT
}
@@ -661,36 +730,7 @@ TEST_F(TestMemRowSet, TestScanIncludeDeletedRows) {
shared_ptr<MemRowSet> mrs;
ASSERT_OK(MemRowSet::Create(0, schema_, log_anchor_registry_.get(),
MemTracker::GetRootTracker(), &mrs));
-
- // row 0 - insert
- // row 1 - insert, update
- // row 2 - insert, delete
- // row 3 - insert, update, delete
- // row 4 - insert, update, delete, reinsert
- // row 5 - insert, update, update, delete, reinsert
- // row 6 - insert, delete, reinsert, delete
- ASSERT_OK(InsertRow(mrs.get(), "row 0", 0));
- ASSERT_OK(InsertRow(mrs.get(), "row 1", 0));
- OperationResultPB result;
- ASSERT_OK(UpdateRow(mrs.get(), "row 1", 1, &result));
- ASSERT_OK(InsertRow(mrs.get(), "row 2", 0));
- ASSERT_OK(DeleteRow(mrs.get(), "row 2", &result));
- ASSERT_OK(InsertRow(mrs.get(), "row 3", 0));
- ASSERT_OK(UpdateRow(mrs.get(), "row 3", 1, &result));
- ASSERT_OK(DeleteRow(mrs.get(), "row 3", &result));
- ASSERT_OK(InsertRow(mrs.get(), "row 4", 0));
- ASSERT_OK(UpdateRow(mrs.get(), "row 4", 1, &result));
- ASSERT_OK(DeleteRow(mrs.get(), "row 4", &result));
- ASSERT_OK(InsertRow(mrs.get(), "row 4", 2));
- ASSERT_OK(InsertRow(mrs.get(), "row 5", 0));
- ASSERT_OK(UpdateRow(mrs.get(), "row 5", 1, &result));
- ASSERT_OK(UpdateRow(mrs.get(), "row 5", 2, &result));
- ASSERT_OK(DeleteRow(mrs.get(), "row 5", &result));
- ASSERT_OK(InsertRow(mrs.get(), "row 5", 3));
- ASSERT_OK(InsertRow(mrs.get(), "row 6", 0));
- ASSERT_OK(DeleteRow(mrs.get(), "row 6", &result));
- ASSERT_OK(InsertRow(mrs.get(), "row 6", 1));
- ASSERT_OK(DeleteRow(mrs.get(), "row 6", &result));
+ ASSERT_OK(GenerateTestData(mrs.get()));
RowIteratorOptions opts;
opts.projection = &schema_;
@@ -701,5 +741,35 @@ TEST_F(TestMemRowSet, TestScanIncludeDeletedRows) {
ASSERT_EQ(7, ScanAndCount(mrs.get(), opts));
}
+TEST_F(TestMemRowSet, TestScanVirtualColumnIsDeleted) {
+ shared_ptr<MemRowSet> mrs;
+ ASSERT_OK(MemRowSet::Create(0, schema_, log_anchor_registry_.get(),
+ MemTracker::GetRootTracker(), &mrs));
+ ASSERT_OK(GenerateTestData(mrs.get()));
+
+ SchemaBuilder sb;
+ ASSERT_OK(sb.AddKeyColumn("key", STRING));
+ ASSERT_OK(sb.AddColumn("val", UINT32));
+ const bool kFalse = false;
+ ASSERT_OK(sb.AddColumn("deleted", IS_DELETED,
+ /*is_nullable=*/false,
+ &kFalse, /*write_default=*/nullptr));
+ Schema projection = sb.Build();
+
+ RowIteratorOptions opts;
+ opts.projection = &projection;
+ opts.snap_to_include = MvccSnapshot::CreateSnapshotIncludingAllTransactions();
+ opts.include_deleted_rows = true;
+ vector<string> rows;
+ ASSERT_OK(DumpRowSet(*mrs, opts, &rows));
+ ASSERT_EQ(7, rows.size());
+ for (const auto& row_idx_present : { 0, 1, 4, 5 }) {
+ ASSERT_STR_CONTAINS(rows[row_idx_present], "=false");
+ }
+ for (const auto& row_idx_deleted : { 2, 3, 6 }) {
+ ASSERT_STR_CONTAINS(rows[row_idx_deleted], "=true");
+ }
+}
+
} // namespace tablet
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/kudu/blob/cec6aa37/src/kudu/tablet/memrowset.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/memrowset.cc b/src/kudu/tablet/memrowset.cc
index ec435c5..0b68782 100644
--- a/src/kudu/tablet/memrowset.cc
+++ b/src/kudu/tablet/memrowset.cc
@@ -29,11 +29,13 @@
#include "kudu/codegen/compilation_manager.h"
#include "kudu/codegen/row_projector.h"
#include "kudu/common/columnblock.h"
+#include "kudu/common/common.pb.h"
#include "kudu/common/encoded_key.h"
#include "kudu/common/row.h"
#include "kudu/common/row_changelist.h"
#include "kudu/common/rowblock.h"
#include "kudu/common/scan_spec.h"
+#include "kudu/common/types.h"
#include "kudu/consensus/log_anchor_registry.h"
#include "kudu/consensus/opid.pb.h"
#include "kudu/gutil/dynamic_annotations.h"
@@ -397,6 +399,21 @@ MemRowSet::Iterator::Iterator(const std::shared_ptr<const MemRowSet>& mrs,
// seek. Could make this lazy instead, or change the semantics so that
// a seek is required (probably the latter)
iter_->SeekToStart();
+
+ // Find the first IS_DELETED virtual column, if one exists.
+ projection_vc_is_deleted_idx_ = Schema::kColumnNotFound;
+ for (int i = 0; i < opts_.projection->num_columns(); i++) {
+ const auto& col = opts_.projection->column(i);
+ if (col.type_info()->type() == IS_DELETED) {
+ // Enforce some properties on the virtual column that simplify our
+ // implementation.
+ DCHECK(!col.is_nullable());
+ DCHECK(col.has_read_default());
+
+ projection_vc_is_deleted_idx_ = i;
+ break;
+ }
+ }
}
MemRowSet::Iterator::~Iterator() {}
@@ -517,13 +534,13 @@ Status MemRowSet::Iterator::FetchRows(RowBlock* dst, size_t* fetched) {
bool insert_excluded = opts_.snap_to_exclude &&
opts_.snap_to_exclude->IsCommitted(row.insertion_timestamp());
bool unset_in_sel_vector;
+ ApplyStatus apply_status;
if (insert_excluded || opts_.snap_to_include.IsCommitted(row.insertion_timestamp())) {
RETURN_NOT_OK(projector_->ProjectRowForRead(row, &dst_row, dst->arena()));
// Roll-forward MVCC for committed updates.
Mutation* redo_head = reinterpret_cast<Mutation*>(
base::subtle::Acquire_Load(reinterpret_cast<AtomicWord*>(&row.header_->redo_head)));
- ApplyStatus apply_status;
RETURN_NOT_OK(ApplyMutationsToProjectedRow(
redo_head, &dst_row, dst->arena(), &apply_status));
unset_in_sel_vector = (apply_status == APPLIED_AND_DELETED && !opts_.include_deleted_rows) ||
@@ -544,6 +561,9 @@ Status MemRowSet::Iterator::FetchRows(RowBlock* dst, size_t* fetched) {
"MVCCMVCCMVCCMVCCMVCCMVCC");
}
#endif
+ } else if (projection_vc_is_deleted_idx_ != Schema::kColumnNotFound) {
+ UnalignedStore(dst_row.mutable_cell_ptr(projection_vc_is_deleted_idx_),
+ apply_status == APPLIED_AND_DELETED);
}
++*fetched;
http://git-wip-us.apache.org/repos/asf/kudu/blob/cec6aa37/src/kudu/tablet/memrowset.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/memrowset.h b/src/kudu/tablet/memrowset.h
index 9b10187..429f734 100644
--- a/src/kudu/tablet/memrowset.h
+++ b/src/kudu/tablet/memrowset.h
@@ -571,15 +571,22 @@ class MemRowSet::Iterator : public RowwiseIterator {
const std::shared_ptr<const MemRowSet> memrowset_;
gscoped_ptr<MemRowSet::MSBTIter> iter_;
- RowIteratorOptions opts_;
+ const RowIteratorOptions opts_;
// Mapping from projected column index back to memrowset column index.
// Relies on the MRSRowProjector interface to abstract from the two
// different implementations of the RowProjector, which may change
// at runtime (using vs. not using code generation).
- gscoped_ptr<MRSRowProjector> projector_;
+ const gscoped_ptr<MRSRowProjector> projector_;
DeltaProjector delta_projector_;
+ // The index of the first IS_DELETED virtual column in the projection schema,
+ // or kColumnNotFound if one doesn't exist.
+ //
+ // The virtual column must not be nullable and must have a read default value.
+ // The process will crash if these constraints are not met.
+ int projection_vc_is_deleted_idx_;
+
// Temporary buffer used for RowChangeList projection.
faststring delta_buf_;