You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by aw...@apache.org on 2020/11/03 05:37:17 UTC

[kudu] branch master updated: tablet: support merge compacting a MRS with invisible rows

This is an automated email from the ASF dual-hosted git repository.

awong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new b3de5e3  tablet: support merge compacting a MRS with invisible rows
b3de5e3 is described below

commit b3de5e38c128277a7e9e48bd8c1d0f0a691157a5
Author: Andrew Wong <aw...@cloudera.com>
AuthorDate: Fri Oct 30 18:30:42 2020 -0700

    tablet: support merge compacting a MRS with invisible rows
    
    In testing an upcoming patch, I hit a check failure when merging
    multiple MemRowSets in which some rows are effectively invisible (i.e.
    inserted and deleted in the same write request).
    
    F1030 18:30:04.936852 96696 compaction.cc:413] Check failed: !empty() 0 of 0
    *** Check failure stack trace: ***
    *** Aborted at 1604107804 (unix time) try "date -d @1604107804" if you are using GNU date ***
    PC: @     0x7f7ca8b311d7 __GI_raise
    *** SIGABRT (@0x1117000179b8) received by PID 96696 (TID 0x7f7cb5fcb9c0) from PID 96696; stack trace: ***
        @     0x7f7cb204e370 (unknown)
        @     0x7f7ca8b311d7 __GI_raise
        @     0x7f7ca8b328c8 __GI_abort
        @     0x7f7cab6b97b9 google::logging_fail()
        @     0x7f7cab6baf8d google::LogMessage::Fail()
        @     0x7f7cab6bcee3 google::LogMessage::SendToLog()
        @     0x7f7cab6baae9 google::LogMessage::Flush()
        @     0x7f7cab6bd86f google::LogMessageFatal::~LogMessageFatal()
        @     0x7f7cb4e751b1 kudu::tablet::(anonymous namespace)::MergeCompactionInput::MergeState::Dominates()
        @     0x7f7cb4e76dbf kudu::tablet::(anonymous namespace)::MergeCompactionInput::TryInsertIntoDominanceList()
        @     0x7f7cb4e76bea kudu::tablet::(anonymous namespace)::MergeCompactionInput::ProcessEmptyInputs()
        @     0x7f7cb4e7565f kudu::tablet::(anonymous namespace)::MergeCompactionInput::Init()
        @     0x7f7cb4e7b0e5 kudu::tablet::FlushCompactionInput()
        @           0x48020d kudu::tablet::TestCompaction::DoFlushAndReopen()
        @           0x46bcfc kudu::tablet::TestCompaction_TestMergeMRSWithInvisibleRows_Test::TestBody()
        @     0x7f7cb531fb39 testing::internal::HandleExceptionsInMethodIfSupported<>()
        @     0x7f7cb531104f testing::Test::Run()
        @     0x7f7cb531110d testing::TestInfo::Run()
        @     0x7f7cb5311225 testing::TestCase::Run()
        @     0x7f7cb53114e8 testing::internal::UnitTestImpl::RunAllTests()
        @     0x7f7cb5311789 testing::UnitTest::Run()
        @     0x7f7cb59cacbd RUN_ALL_TESTS()
        @     0x7f7cb59c8c91 main
        @     0x7f7ca8b1db35 __libc_start_main
        @           0x461509 (unknown)
    Aborted
    
    The issue is that when we iterate through the merge compaction states,
    we would previously only cull the input merge state (i.e. the state
    representing a rowset being merged) if it had no blocks to read. Based
    on this, compaction code assumed that no inputs could possible be empty.
    However, it did not take into account the case where an input has
    blocks, but the rowset is effectively empty because rows were inserted
    and deleted in the same op. In such cases,
    MemRowSetCompactionInput::PrepareBlock() leaves the input in an empty
    state if there are no further blocks, hence the crash.
    
    This updates the behavior to check for both blocks _and_ emptiness
    following the block preparation.
    
    NOTE: this code path isn't exercised today, but it will be in an
    upcoming patch that will merge multiple MRSs from multiple transactions.
    
    Change-Id: I9016f2cd3714a4ac7b5fb1abd80b9e6bd86d5ac5
    Reviewed-on: http://gerrit.cloudera.org:8080/16691
    Tested-by: Kudu Jenkins
    Reviewed-by: Alexey Serbin <as...@cloudera.com>
---
 src/kudu/tablet/compaction-test.cc | 77 ++++++++++++++++++++++++++++++++------
 src/kudu/tablet/compaction.cc      | 14 ++++---
 2 files changed, 73 insertions(+), 18 deletions(-)

diff --git a/src/kudu/tablet/compaction-test.cc b/src/kudu/tablet/compaction-test.cc
index e96ecbc..5e5cf44 100644
--- a/src/kudu/tablet/compaction-test.cc
+++ b/src/kudu/tablet/compaction-test.cc
@@ -298,7 +298,6 @@ class TestCompaction : public KuduRowSetTest {
 
     vector<shared_ptr<RowSetMetadata> > metas;
     rsw.GetWrittenRowSetMetadata(&metas);
-    ASSERT_GE(metas.size(), 1);
     for (const shared_ptr<RowSetMetadata>& meta : metas) {
       ASSERT_TRUE(meta->HasBloomDataBlockForTests());
     }
@@ -372,6 +371,31 @@ class TestCompaction : public KuduRowSetTest {
     *result_rs = rowsets[0];
   }
 
+  // Create an invisible MRS -- one whose inserts and deletes were applied at
+  // the same timestamp.
+  shared_ptr<MemRowSet> CreateInvisibleMRS() {
+    shared_ptr<MemRowSet> mrs;
+    CHECK_OK(MemRowSet::Create(0, schema_, log_anchor_registry_.get(),
+                               mem_trackers_.tablet_tracker, &mrs));
+    ScopedOp op(&mvcc_, clock_.Now());
+    op.StartApplying();
+    InsertRowInOp(mrs.get(), op, 0, 0);
+    DeleteRowInOp(mrs.get(), op, 0);
+    op.FinishApplying();
+    return mrs;
+  }
+
+  // Count the number of rows in the given rowsets.
+  static uint64_t CountRows(const vector<shared_ptr<DiskRowSet>>& rowsets) {
+    uint64_t total_num_rows = 0;
+    for (const auto& rs : rowsets) {
+      uint64_t rs_live_rows;
+      CHECK_OK(rs->CountLiveRows(&rs_live_rows));
+      total_num_rows += rs_live_rows;
+    }
+    return total_num_rows;
+  }
+
   // Test compaction where each of the input rowsets has
   // each of the input schemas. The output rowset will
   // have the 'projection' schema.
@@ -1047,23 +1071,52 @@ TEST_F(TestCompaction, TestMergeMRS) {
   InsertRows(mrs_a.get(), 5, 1);
 
   MvccSnapshot snap(mvcc_);
-  vector<shared_ptr<CompactionInput> > merge_inputs;
-  merge_inputs.push_back(
-        shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_a, &schema_, snap)));
-  merge_inputs.push_back(
-        shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_b, &schema_, snap)));
+  vector<shared_ptr<CompactionInput> > merge_inputs {
+    shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_a, &schema_, snap)),
+    shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_b, &schema_, snap))
+  };
   unique_ptr<CompactionInput> input(CompactionInput::Merge(merge_inputs, &schema_));
   vector<shared_ptr<DiskRowSet>> result_rs;
   DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, &result_rs);
-  uint64_t total_num_rows = 0;
-  for (const auto& rs : result_rs) {
-    uint64_t rs_live_rows;
-    ASSERT_OK(rs->CountLiveRows(&rs_live_rows));
-    total_num_rows += rs_live_rows;
-  }
+  uint64_t total_num_rows = CountRows(result_rs);
   ASSERT_EQ(20, total_num_rows);
 }
 
+// Test MergeCompactionInput against MemRowSets, where there are rows that were
+// inserted and deleted in the same op, and can thus never be seen.
+TEST_F(TestCompaction, TestMergeMRSWithInvisibleRows) {
+  shared_ptr<MemRowSet> mrs_a = CreateInvisibleMRS();
+  shared_ptr<MemRowSet> mrs_b;
+  ASSERT_OK(MemRowSet::Create(1, schema_, log_anchor_registry_.get(),
+                              mem_trackers_.tablet_tracker, &mrs_b));
+  InsertRows(mrs_b.get(), 10, 0);
+  MvccSnapshot snap(mvcc_);
+  vector<shared_ptr<CompactionInput> > merge_inputs {
+    shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_a, &schema_, snap)),
+    shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_b, &schema_, snap))
+  };
+  unique_ptr<CompactionInput> input(CompactionInput::Merge(merge_inputs, &schema_));
+  vector<shared_ptr<DiskRowSet>> result_rs;
+  DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, &result_rs);
+  ASSERT_EQ(1, result_rs.size());
+  ASSERT_EQ(10, CountRows(result_rs));
+}
+
+// Like the above test, but with all invisible rowsets.
+TEST_F(TestCompaction, TestMergeMRSWithAllInvisibleRows) {
+  shared_ptr<MemRowSet> mrs_a = CreateInvisibleMRS();
+  shared_ptr<MemRowSet> mrs_b = CreateInvisibleMRS();
+  MvccSnapshot snap(mvcc_);
+  vector<shared_ptr<CompactionInput> > merge_inputs {
+    shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_a, &schema_, snap)),
+    shared_ptr<CompactionInput>(CompactionInput::Create(*mrs_b, &schema_, snap))
+  };
+  unique_ptr<CompactionInput> input(CompactionInput::Merge(merge_inputs, &schema_));
+  vector<shared_ptr<DiskRowSet>> result_rs;
+  DoFlushAndReopen(input.get(), schema_, snap, kSmallRollThreshold, &result_rs);
+  ASSERT_TRUE(result_rs.empty());
+}
+
 #ifdef NDEBUG
 // Benchmark for the compaction merge input for the case where the inputs
 // contain non-overlapping data. In this case the merge can be optimized
diff --git a/src/kudu/tablet/compaction.cc b/src/kudu/tablet/compaction.cc
index f6a8804..a4a8258 100644
--- a/src/kudu/tablet/compaction.cc
+++ b/src/kudu/tablet/compaction.cc
@@ -428,7 +428,7 @@ class MergeCompactionInput : public CompactionInput {
                        const Schema* schema)
     : schema_(schema),
       num_dup_rows_(0) {
-    for (const shared_ptr<CompactionInput> &input : inputs) {
+    for (const shared_ptr<CompactionInput>& input : inputs) {
       unique_ptr<MergeState> state(new MergeState);
       state->input = input;
       states_.push_back(state.release());
@@ -573,8 +573,13 @@ class MergeCompactionInput : public CompactionInput {
 
       // If an input is fully exhausted, no need to consider it
       // in the merge anymore.
-      if (!state->input->HasMoreBlocks()) {
-
+      bool has_blocks = state->input->HasMoreBlocks();
+      if (has_blocks) {
+        state->Reset();
+        RETURN_NOT_OK(state->input->PrepareBlock(&state->pending));
+        has_blocks = !state->empty();
+      }
+      if (!has_blocks) {
         // Any inputs that were dominated by the last block of this input
         // need to be re-added into the merge.
         states_.insert(states_.end(), state->dominated.begin(), state->dominated.end());
@@ -584,9 +589,6 @@ class MergeCompactionInput : public CompactionInput {
         continue;
       }
 
-      state->Reset();
-      RETURN_NOT_OK(state->input->PrepareBlock(&state->pending));
-
       // Now that this input has moved to its next block, it's possible that
       // it no longer dominates the inputs in it 'dominated' list. Re-check
       // all of those dominance relations and remove any that are no longer