You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/03/02 22:15:07 UTC

[04/11] incubator-kudu git commit: KUDU-1341 (part 2). Fix bootstrap to properly handle delete/insert/update sequences

KUDU-1341 (part 2). Fix bootstrap to properly handle delete/insert/update sequences

This adds a new test which reproduces the following bootstrap error:
- a row is written and flushed to a DRS
- the row is updated and deleted, but the DMS is not flushed
- the row is inserted again into MRS and flushed again
- the TS restarts

When bootstrap begins, we have the following tablet state:

- DRS 1: contains the row (original inserted version)
- DRS 2: contains the row (second inserted version)

and the following log:

  REPLICATE 1.1: INSERT orig version
  COMMIT 1.1: dms id = 1
    - bootstrap correctly decides that this already flushed

  REPLICATE 1.2: UPDATE
  COMMIT 1.2: drs_id = 1, dms_id = 1
    - bootstrap correctly decides that the update is not flushed
    - it applies the update to the tablet
      - the tablet MutateRow code checks the interval tree and finds
        that this row could be in either DRS 1 or DRS 2
      - it tries applying the update to them in whatever order the
        RowSetTree returns the DiskRowSets
      - if it happens to try DRS 2 first, then we incorrectly apply
        the update to the new version of the row instead of the old

We haven't seen this bug in most of our testing because the RowSetTree
returns rows in a deterministic order. We only test the
insert-update-delete-insert sequence in a few test cases, and it appears
that those test cases either:
1) do not restart the tablet server, and thus don't see bootstrap issues
2) happen to result in RowSetTrees that give back the DRS in the lucky
   order that doesn't trigger the bug.

This patch changes the tablet code so that, in debug mode, the results
returned by RowSetTree are shuffled before being used. This makes the
bug more likely to reproduce. It adds a specific test case which reproduces
the bug reliably in debug builds with the shuffling enabled.

Additionally, this patch extends fuzz-itest to add a new fuzz test action:
restarting the tablet server. Additionally, the improved test now supports
generating writes to multiple rows, since some errors don't show up if you have
single-row rowsets.

With the improved fuzz test, I was able to generate sequences that reliably
reproduce the crash described in KUDU-1341 as well as several other bugs:
incorrect results on reads, other CHECK failures in compactions, etc.
The fuzz tester should give us good confidence in the bug fix.

The fix in this patch is to pass the operation result from the COMMIT message
in the WAL back into the tablet while replaying. The tablet code then uses
this to specifically pick the correct rowset to reapply the update.

Cherry-picked from 1ff209e85b4c2cc4beda7560d328b7ed05e008d2

Change-Id: I6017ef67ae236021f7e6bd19d21b89310b8e6894
Reviewed-on: http://gerrit.cloudera.org:8080/2300
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <da...@cloudera.com>
Reviewed-on: http://gerrit.cloudera.org:8080/2391
Reviewed-by: Todd Lipcon <to...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/76231cab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/76231cab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/76231cab

Branch: refs/heads/branch-0.7.x
Commit: 76231cab4e2713d2272e261ada936a8622b10dd4
Parents: 3193ef9
Author: Todd Lipcon <to...@apache.org>
Authored: Wed Feb 24 07:37:42 2016 -0800
Committer: Todd Lipcon <to...@apache.org>
Committed: Wed Mar 2 21:04:19 2016 +0000

----------------------------------------------------------------------
 src/kudu/integration-tests/fuzz-itest.cc | 266 +++++++++++++++++---------
 src/kudu/tablet/mock-rowsets.h           |   4 +-
 src/kudu/tablet/row_op.cc                |   4 +-
 src/kudu/tablet/row_op.h                 |  13 ++
 src/kudu/tablet/rowset_tree.cc           |  10 +
 src/kudu/tablet/rowset_tree.h            |   9 +
 src/kudu/tablet/tablet.cc                |  47 ++++-
 src/kudu/tablet/tablet.h                 |   6 +
 src/kudu/tablet/tablet_bootstrap.cc      |   1 +
 src/kudu/tserver/tablet_server-test.cc   |  35 ++++
 10 files changed, 296 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/76231cab/src/kudu/integration-tests/fuzz-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/fuzz-itest.cc b/src/kudu/integration-tests/fuzz-itest.cc
index e41648c..941b07a 100644
--- a/src/kudu/integration-tests/fuzz-itest.cc
+++ b/src/kudu/integration-tests/fuzz-itest.cc
@@ -39,7 +39,9 @@
 #include "kudu/util/stopwatch.h"
 #include "kudu/util/test_util.h"
 
+DEFINE_int32(keyspace_size, 2,  "number of distinct primary keys to test with");
 DECLARE_bool(enable_maintenance_manager);
+DECLARE_bool(use_hybrid_clock);
 
 using std::string;
 using std::vector;
@@ -47,7 +49,7 @@ using std::unique_ptr;
 
 // The type of operation in a sequence of operations generated by
 // the fuzz test.
-enum TestOp {
+enum TestOpType {
   TEST_INSERT,
   TEST_UPDATE,
   TEST_DELETE,
@@ -57,9 +59,10 @@ enum TestOp {
   TEST_MINOR_COMPACT_DELTAS,
   TEST_MAJOR_COMPACT_DELTAS,
   TEST_COMPACT_TABLET,
+  TEST_RESTART_TS,
   TEST_NUM_OP_TYPES // max value for enum
 };
-MAKE_ENUM_LIMITS(TestOp, TEST_INSERT, TEST_NUM_OP_TYPES);
+MAKE_ENUM_LIMITS(TestOpType, TEST_INSERT, TEST_NUM_OP_TYPES);
 
 const char* kTableName = "table";
 
@@ -84,7 +87,7 @@ using client::sp::shared_ptr;
 
 namespace tablet {
 
-const char* TestOp_names[] = {
+const char* TestOpType_names[] = {
   "TEST_INSERT",
   "TEST_UPDATE",
   "TEST_DELETE",
@@ -93,7 +96,21 @@ const char* TestOp_names[] = {
   "TEST_FLUSH_DELTAS",
   "TEST_MINOR_COMPACT_DELTAS",
   "TEST_MAJOR_COMPACT_DELTAS",
-  "TEST_COMPACT_TABLET"
+  "TEST_COMPACT_TABLET",
+  "TEST_RESTART_TS"
+};
+
+// An operation in a fuzz-test sequence.
+struct TestOp {
+  // The op to run.
+  TestOpType type;
+
+  // For INSERT/UPDATE/DELETE, the key of the row to be modified. Otherwise, unused.
+  int row_key;
+
+  string ToString() const {
+    return strings::Substitute("{$0, $1}", TestOpType_names[type], row_key);
+  }
 };
 
 // Test which does only random operations against a tablet, including update and random
@@ -106,6 +123,7 @@ class FuzzTest : public KuduTest {
  public:
   FuzzTest() {
     FLAGS_enable_maintenance_manager = false;
+    FLAGS_use_hybrid_clock = false;
 
     KuduSchemaBuilder b;
     b.AddColumn("key")->Type(KuduColumnSchema::INT32)->NotNull()->PrimaryKey();
@@ -244,52 +262,53 @@ class FuzzTest : public KuduTest {
 // Generate a random valid sequence of operations for use as a
 // fuzz test.
 void GenerateTestCase(vector<TestOp>* ops, int len) {
-  bool exists = false;
+  vector<bool> exists(FLAGS_keyspace_size);
   bool ops_pending = false;
   bool data_in_mrs = false;
   bool worth_compacting = false;
   bool data_in_dms = false;
   ops->clear();
   while (ops->size() < len) {
-    TestOp r = tight_enum_cast<TestOp>(rand() % enum_limits<TestOp>::max_enumerator);
+    TestOpType r = tight_enum_cast<TestOpType>(rand() % enum_limits<TestOpType>::max_enumerator);
+    int row_key = rand() % FLAGS_keyspace_size;
     switch (r) {
       case TEST_INSERT:
-        if (exists) continue;
-        ops->push_back(TEST_INSERT);
-        exists = true;
+        if (exists[row_key]) continue;
+        ops->push_back({TEST_INSERT, row_key});
+        exists[row_key] = true;
         ops_pending = true;
         data_in_mrs = true;
         break;
       case TEST_UPDATE:
-        if (!exists) continue;
-        ops->push_back(TEST_UPDATE);
+        if (!exists[row_key]) continue;
+        ops->push_back({TEST_UPDATE, row_key});
         ops_pending = true;
         if (!data_in_mrs) {
           data_in_dms = true;
         }
         break;
       case TEST_DELETE:
-        if (!exists) continue;
-        ops->push_back(TEST_DELETE);
+        if (!exists[row_key]) continue;
+        ops->push_back({TEST_DELETE, row_key});
         ops_pending = true;
-        exists = false;
+        exists[row_key] = false;
         if (!data_in_mrs) {
           data_in_dms = true;
         }
         break;
       case TEST_FLUSH_OPS:
         if (ops_pending) {
-          ops->push_back(TEST_FLUSH_OPS);
+          ops->push_back({TEST_FLUSH_OPS, 0});
           ops_pending = false;
         }
         break;
       case TEST_FLUSH_TABLET:
         if (data_in_mrs) {
           if (ops_pending) {
-            ops->push_back(TEST_FLUSH_OPS);
+            ops->push_back({TEST_FLUSH_OPS, 0});
             ops_pending = false;
           }
-          ops->push_back(TEST_FLUSH_TABLET);
+          ops->push_back({TEST_FLUSH_TABLET, 0});
           data_in_mrs = false;
           worth_compacting = true;
         }
@@ -297,28 +316,31 @@ void GenerateTestCase(vector<TestOp>* ops, int len) {
       case TEST_COMPACT_TABLET:
         if (worth_compacting) {
           if (ops_pending) {
-            ops->push_back(TEST_FLUSH_OPS);
+            ops->push_back({TEST_FLUSH_OPS, 0});
             ops_pending = false;
           }
-          ops->push_back(TEST_COMPACT_TABLET);
+          ops->push_back({TEST_COMPACT_TABLET, 0});
           worth_compacting = false;
         }
         break;
       case TEST_FLUSH_DELTAS:
         if (data_in_dms) {
           if (ops_pending) {
-            ops->push_back(TEST_FLUSH_OPS);
+            ops->push_back({TEST_FLUSH_OPS, 0});
             ops_pending = false;
           }
-                ops->push_back(TEST_FLUSH_DELTAS);
+          ops->push_back({TEST_FLUSH_DELTAS, 0});
           data_in_dms = false;
         }
         break;
       case TEST_MAJOR_COMPACT_DELTAS:
-        ops->push_back(TEST_MAJOR_COMPACT_DELTAS);
+        ops->push_back({TEST_MAJOR_COMPACT_DELTAS, 0});
         break;
       case TEST_MINOR_COMPACT_DELTAS:
-        ops->push_back(TEST_MINOR_COMPACT_DELTAS);
+        ops->push_back({TEST_MINOR_COMPACT_DELTAS, 0});
+        break;
+      case TEST_RESTART_TS:
+        ops->push_back({TEST_RESTART_TS, 0});
         break;
       default:
         LOG(FATAL);
@@ -329,7 +351,7 @@ void GenerateTestCase(vector<TestOp>* ops, int len) {
 string DumpTestCase(const vector<TestOp>& ops) {
   vector<string> strs;
   for (TestOp test_op : ops) {
-    strs.push_back(TestOp_names[test_op]);
+    strs.push_back(test_op.ToString());
   }
   return JoinStrings(strs, ",\n");
 }
@@ -341,26 +363,26 @@ void FuzzTest::RunFuzzCase(const vector<TestOp>& test_ops,
   // into a test method in order to reproduce a failure.
   LOG(INFO) << "test case:\n" << DumpTestCase(test_ops);
 
-  string cur_val = "";
-  string pending_val = "";
+  vector<string> cur_val(FLAGS_keyspace_size);
+  vector<string> pending_val(FLAGS_keyspace_size);
 
   int i = 0;
   for (const TestOp& test_op : test_ops) {
-    string val_in_table = GetRow(1);
-    EXPECT_EQ("(" + cur_val + ")", val_in_table);
+    string val_in_table = GetRow(test_op.row_key);
+    EXPECT_EQ("(" + cur_val[test_op.row_key] + ")", val_in_table);
 
-    LOG(INFO) << TestOp_names[test_op];
-    switch (test_op) {
+    LOG(INFO) << test_op.ToString();
+    switch (test_op.type) {
       case TEST_INSERT:
-        pending_val = InsertRow(1, i++);
+        pending_val[test_op.row_key] = InsertRow(test_op.row_key, i++);
         break;
       case TEST_UPDATE:
         for (int j = 0; j < update_multiplier; j++) {
-          pending_val = MutateRow(1, i++);
+          pending_val[test_op.row_key] = MutateRow(test_op.row_key, i++);
         }
         break;
       case TEST_DELETE:
-        pending_val = DeleteRow(1);
+        pending_val[test_op.row_key] = DeleteRow(test_op.row_key);
         break;
       case TEST_FLUSH_OPS:
         FlushSessionOrDie(session_);
@@ -381,8 +403,11 @@ void FuzzTest::RunFuzzCase(const vector<TestOp>& test_ops,
       case TEST_COMPACT_TABLET:
         ASSERT_OK(tablet()->Compact(Tablet::FORCE_COMPACT_ALL));
         break;
+      case TEST_RESTART_TS:
+        NO_FATALS(RestartTabletServer());
+        break;
       default:
-        LOG(FATAL) << test_op;
+        LOG(FATAL) << test_op.type;
     }
   }
 }
@@ -399,7 +424,7 @@ TEST_F(FuzzTest, TestFuzz) {
 }
 
 // Generates a random test case, but the UPDATEs are all repeated 1000 times.
-// This results in very large batches which are likely to span multiple delta blocks
+// This results in very row_keye batches which are likely to span multiple delta blocks
 // when flushed.
 TEST_F(FuzzTest, TestFuzzHugeBatches) {
   SeedRandom();
@@ -412,15 +437,15 @@ TEST_F(FuzzTest, TestFuzzHugeBatches) {
 TEST_F(FuzzTest, TestFuzz1) {
   vector<TestOp> test_ops = {
     // Get an inserted row in a DRS.
-    TEST_INSERT,
-    TEST_FLUSH_OPS,
-    TEST_FLUSH_TABLET,
+    {TEST_INSERT, 0},
+    {TEST_FLUSH_OPS, 0},
+    {TEST_FLUSH_TABLET, 0},
 
     // DELETE in DMS, INSERT in MRS and flush again.
-    TEST_DELETE,
-    TEST_INSERT,
-    TEST_FLUSH_OPS,
-    TEST_FLUSH_TABLET,
+    {TEST_DELETE, 0},
+    {TEST_INSERT, 0},
+    {TEST_FLUSH_OPS, 0},
+    {TEST_FLUSH_TABLET, 0},
 
     // State:
     // RowSet RowSet(0):
@@ -428,7 +453,7 @@ TEST_F(FuzzTest, TestFuzz1) {
     // RowSet RowSet(1):
     //   (int32 key=1, int32 val=NULL) Undos: [@2(DELETE)] Redos: []
 
-    TEST_COMPACT_TABLET,
+    {TEST_COMPACT_TABLET, 0},
   };
   RunFuzzCase(test_ops);
 }
@@ -436,31 +461,31 @@ TEST_F(FuzzTest, TestFuzz1) {
 // A particular test case which previously failed TestFuzz.
 TEST_F(FuzzTest, TestFuzz2) {
   vector<TestOp> test_ops = {
-    TEST_INSERT,
-    TEST_DELETE,
-    TEST_FLUSH_OPS,
-    TEST_FLUSH_TABLET,
+    {TEST_INSERT, 0},
+    {TEST_DELETE, 0},
+    {TEST_FLUSH_OPS, 0},
+    {TEST_FLUSH_TABLET, 0},
     // (int32 key=1, int32 val=NULL)
     // Undo Mutations: [@1(DELETE)]
     // Redo Mutations: [@1(DELETE)]
 
-    TEST_INSERT,
-    TEST_DELETE,
-    TEST_INSERT,
-    TEST_FLUSH_OPS,
-    TEST_FLUSH_TABLET,
+    {TEST_INSERT, 0},
+    {TEST_DELETE, 0},
+    {TEST_INSERT, 0},
+    {TEST_FLUSH_OPS, 0},
+    {TEST_FLUSH_TABLET, 0},
     // (int32 key=1, int32 val=NULL)
     // Undo Mutations: [@2(DELETE)]
     // Redo Mutations: []
 
-    TEST_COMPACT_TABLET,
+    {TEST_COMPACT_TABLET, 0},
     // Output Row: (int32 key=1, int32 val=NULL)
     // Undo Mutations: [@1(DELETE)]
     // Redo Mutations: [@1(DELETE)]
 
-    TEST_DELETE,
-    TEST_FLUSH_OPS,
-    TEST_COMPACT_TABLET,
+    {TEST_DELETE, 0},
+    {TEST_FLUSH_OPS, 0},
+    {TEST_COMPACT_TABLET, 0},
   };
   RunFuzzCase(test_ops);
 }
@@ -468,20 +493,20 @@ TEST_F(FuzzTest, TestFuzz2) {
 // A particular test case which previously failed TestFuzz.
 TEST_F(FuzzTest, TestFuzz3) {
   vector<TestOp> test_ops = {
-    TEST_INSERT,
-    TEST_FLUSH_OPS,
-    TEST_FLUSH_TABLET,
+    {TEST_INSERT, 0},
+    {TEST_FLUSH_OPS, 0},
+    {TEST_FLUSH_TABLET, 0},
     // Output Row: (int32 key=1, int32 val=NULL)
     // Undo Mutations: [@1(DELETE)]
     // Redo Mutations: []
 
-    TEST_DELETE,
+    {TEST_DELETE, 0},
     // Adds a @2 DELETE to DMS for above row.
 
-    TEST_INSERT,
-    TEST_DELETE,
-    TEST_FLUSH_OPS,
-    TEST_FLUSH_TABLET,
+    {TEST_INSERT, 0},
+    {TEST_DELETE, 0},
+    {TEST_FLUSH_OPS, 0},
+    {TEST_FLUSH_TABLET, 0},
     // (int32 key=1, int32 val=NULL)
     // Undo Mutations: [@2(DELETE)]
     // Redo Mutations: [@2(DELETE)]
@@ -494,7 +519,7 @@ TEST_F(FuzzTest, TestFuzz3) {
     //  Undo Mutations: [@1(DELETE)]
     //  Redo Mutations: [@2(DELETE)]
 
-    TEST_COMPACT_TABLET,
+    {TEST_COMPACT_TABLET, 0},
   };
   RunFuzzCase(test_ops);
 }
@@ -502,33 +527,96 @@ TEST_F(FuzzTest, TestFuzz3) {
 // A particular test case which previously failed TestFuzz.
 TEST_F(FuzzTest, TestFuzz4) {
   vector<TestOp> test_ops = {
-    TEST_INSERT,
-    TEST_FLUSH_OPS,
-    TEST_COMPACT_TABLET,
-    TEST_DELETE,
-    TEST_FLUSH_OPS,
-    TEST_COMPACT_TABLET,
-    TEST_INSERT,
-    TEST_UPDATE,
-    TEST_FLUSH_OPS,
-    TEST_FLUSH_TABLET,
-    TEST_DELETE,
-    TEST_INSERT,
-    TEST_FLUSH_OPS,
-    TEST_FLUSH_TABLET,
-    TEST_UPDATE,
-    TEST_FLUSH_OPS,
-    TEST_FLUSH_TABLET,
-    TEST_UPDATE,
-    TEST_DELETE,
-    TEST_INSERT,
-    TEST_DELETE,
-    TEST_FLUSH_OPS,
-    TEST_FLUSH_TABLET,
-    TEST_COMPACT_TABLET,
+    {TEST_INSERT, 0},
+    {TEST_FLUSH_OPS, 0},
+    {TEST_COMPACT_TABLET, 0},
+    {TEST_DELETE, 0},
+    {TEST_FLUSH_OPS, 0},
+    {TEST_COMPACT_TABLET, 0},
+    {TEST_INSERT, 0},
+    {TEST_UPDATE, 0},
+    {TEST_FLUSH_OPS, 0},
+    {TEST_FLUSH_TABLET, 0},
+    {TEST_DELETE, 0},
+    {TEST_INSERT, 0},
+    {TEST_FLUSH_OPS, 0},
+    {TEST_FLUSH_TABLET, 0},
+    {TEST_UPDATE, 0},
+    {TEST_FLUSH_OPS, 0},
+    {TEST_FLUSH_TABLET, 0},
+    {TEST_UPDATE, 0},
+    {TEST_DELETE, 0},
+    {TEST_INSERT, 0},
+    {TEST_DELETE, 0},
+    {TEST_FLUSH_OPS, 0},
+    {TEST_FLUSH_TABLET, 0},
+    {TEST_COMPACT_TABLET, 0},
   };
   RunFuzzCase(test_ops);
 }
 
+// Previously caused incorrect data being read after restart.
+// Failure:
+//  Value of: val_in_table
+//  Actual: "()"
+//  Expected: "(" + cur_val + ")"
+TEST_F(FuzzTest, TestFuzzWithRestarts1) {
+  RunFuzzCase({
+      {TEST_INSERT, 1},
+      {TEST_FLUSH_OPS, 0},
+      {TEST_FLUSH_TABLET, 0},
+      {TEST_UPDATE, 1},
+      {TEST_RESTART_TS, 0},
+      {TEST_FLUSH_OPS, 0},
+      {TEST_FLUSH_DELTAS, 0},
+      {TEST_INSERT, 0},
+      {TEST_DELETE, 1},
+      {TEST_INSERT, 1},
+      {TEST_FLUSH_OPS, 0},
+      {TEST_FLUSH_TABLET, 0},
+      {TEST_RESTART_TS, 0},
+      {TEST_MINOR_COMPACT_DELTAS, 0},
+      {TEST_COMPACT_TABLET, 0},
+      {TEST_UPDATE, 1},
+      {TEST_FLUSH_OPS, 0}
+    });
+}
+
+// Previously caused KUDU-1341:
+// deltafile.cc:134] Check failed: last_key_.CompareTo<UNDO>(key) <= 0 must
+// insert undo deltas in sorted order (ascending key, then descending ts):
+// got key (row 1@tx5965182714017464320) after (row 1@tx5965182713875046400)
+TEST_F(FuzzTest, TestFuzzWithRestarts2) {
+  RunFuzzCase({
+      {TEST_INSERT, 0},
+      {TEST_FLUSH_OPS, 0},
+      {TEST_FLUSH_TABLET, 0},
+      {TEST_DELETE, 0},
+      {TEST_FLUSH_OPS, 0},
+      {TEST_FLUSH_DELTAS, 0},
+      {TEST_RESTART_TS, 0},
+      {TEST_INSERT, 1},
+      {TEST_INSERT, 0},
+      {TEST_FLUSH_OPS, 0},
+      {TEST_FLUSH_TABLET, 0},
+      {TEST_DELETE, 0},
+      {TEST_INSERT, 0},
+      {TEST_UPDATE, 1},
+      {TEST_FLUSH_OPS, 0},
+      {TEST_FLUSH_TABLET, 0},
+      {TEST_FLUSH_DELTAS, 0},
+      {TEST_RESTART_TS, 0},
+      {TEST_UPDATE, 1},
+      {TEST_DELETE, 1},
+      {TEST_FLUSH_OPS, 0},
+      {TEST_RESTART_TS, 0},
+      {TEST_INSERT, 1},
+      {TEST_FLUSH_OPS, 0},
+      {TEST_FLUSH_TABLET, 0},
+      {TEST_RESTART_TS, 0},
+      {TEST_COMPACT_TABLET, 0}
+    });
+}
+
 } // namespace tablet
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/76231cab/src/kudu/tablet/mock-rowsets.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/mock-rowsets.h b/src/kudu/tablet/mock-rowsets.h
index ca9b84b..ed050b6 100644
--- a/src/kudu/tablet/mock-rowsets.h
+++ b/src/kudu/tablet/mock-rowsets.h
@@ -82,9 +82,7 @@ class MockRowSet : public RowSet {
     return NULL;
   }
   virtual std::shared_ptr<RowSetMetadata> metadata() OVERRIDE {
-    LOG(FATAL) << "Unimplemented";
-    return std::shared_ptr<RowSetMetadata>(
-      reinterpret_cast<RowSetMetadata *>(NULL));
+    return NULL;
   }
 
   virtual size_t DeltaMemStoreSize() const OVERRIDE {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/76231cab/src/kudu/tablet/row_op.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/row_op.cc b/src/kudu/tablet/row_op.cc
index 28dfeb9..989780e 100644
--- a/src/kudu/tablet/row_op.cc
+++ b/src/kudu/tablet/row_op.cc
@@ -23,7 +23,9 @@ namespace kudu {
 namespace tablet {
 
 RowOp::RowOp(DecodedRowOperation decoded_op)
-    : decoded_op(std::move(decoded_op)) {}
+    : decoded_op(std::move(decoded_op)),
+      orig_result_from_log_(nullptr) {
+}
 
 RowOp::~RowOp() {
 }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/76231cab/src/kudu/tablet/row_op.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/row_op.h b/src/kudu/tablet/row_op.h
index 8ba2071..6220a87 100644
--- a/src/kudu/tablet/row_op.h
+++ b/src/kudu/tablet/row_op.h
@@ -44,6 +44,15 @@ struct RowOp {
   void SetMutateSucceeded(gscoped_ptr<OperationResultPB> result);
   void SetAlreadyFlushed();
 
+  // In the case that this operation is being replayed from the WAL
+  // during tablet bootstrap, we may need to look at the original result
+  // stored in the COMMIT message to know the correct RowSet to apply it to.
+  //
+  // This pointer must stay live as long as this RowOp.
+  void set_original_result_from_log(const OperationResultPB* orig_result) {
+    orig_result_from_log_ = orig_result;
+  }
+
   bool has_row_lock() const {
     return row_lock.acquired();
   }
@@ -64,6 +73,10 @@ struct RowOp {
 
   // The result of the operation, after Apply.
   gscoped_ptr<OperationResultPB> result;
+
+  // If this operation is being replayed from the log, set to the original
+  // result. Otherwise nullptr.
+  const OperationResultPB* orig_result_from_log_;
 };
 
 

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/76231cab/src/kudu/tablet/rowset_tree.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/rowset_tree.cc b/src/kudu/tablet/rowset_tree.cc
index 69ca35a..b4f114c 100644
--- a/src/kudu/tablet/rowset_tree.cc
+++ b/src/kudu/tablet/rowset_tree.cc
@@ -25,6 +25,7 @@
 
 #include "kudu/gutil/stl_util.h"
 #include "kudu/tablet/rowset.h"
+#include "kudu/tablet/rowset_metadata.h"
 #include "kudu/util/interval_tree.h"
 #include "kudu/util/interval_tree-inl.h"
 #include "kudu/util/slice.h"
@@ -129,6 +130,15 @@ Status RowSetTree::Reset(const RowSetVector &rowsets) {
   tree_.reset(new IntervalTree<RowSetIntervalTraits>(entries_));
   key_endpoints_.swap(endpoints);
   all_rowsets_.assign(rowsets.begin(), rowsets.end());
+
+  // Build the mapping from DRS ID to DRS.
+  drs_by_id_.clear();
+  for (auto& rs : all_rowsets_) {
+    if (rs->metadata()) {
+      InsertOrDie(&drs_by_id_, rs->metadata()->id(), rs.get());
+    }
+  }
+
   initted_ = true;
 
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/76231cab/src/kudu/tablet/rowset_tree.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/rowset_tree.h b/src/kudu/tablet/rowset_tree.h
index dbb518a..b3b4f2a 100644
--- a/src/kudu/tablet/rowset_tree.h
+++ b/src/kudu/tablet/rowset_tree.h
@@ -17,10 +17,12 @@
 #ifndef KUDU_TABLET_ROWSET_MANAGER_H
 #define KUDU_TABLET_ROWSET_MANAGER_H
 
+#include <unordered_map>
 #include <vector>
 #include <utility>
 
 #include "kudu/gutil/gscoped_ptr.h"
+#include "kudu/gutil/map-util.h"
 #include "kudu/util/status.h"
 #include "kudu/tablet/rowset.h"
 
@@ -77,6 +79,10 @@ class RowSetTree {
 
   const RowSetVector &all_rowsets() const { return all_rowsets_; }
 
+  RowSet* drs_by_id(int64_t drs_id) const {
+    return FindPtrOrNull(drs_by_id_, drs_id);
+  }
+
   // Iterates over RowSetTree::RSEndpoint, guaranteed to be ordered and for
   // any rowset to appear exactly twice, once at its start slice and once at
   // its stop slice, equivalent to its GetBounds() values.
@@ -100,6 +106,9 @@ class RowSetTree {
   // All of the rowsets which were put in this RowSetTree.
   RowSetVector all_rowsets_;
 
+  // The DiskRowSets in this RowSetTree, keyed by their id.
+  std::unordered_map<int64_t, RowSet*> drs_by_id_;
+
   // Rowsets for which the bounds are unknown -- e.g because they
   // are mutable (MemRowSets).
   //

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/76231cab/src/kudu/tablet/tablet.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.cc b/src/kudu/tablet/tablet.cc
index 88f8417..030da8d 100644
--- a/src/kudu/tablet/tablet.cc
+++ b/src/kudu/tablet/tablet.cc
@@ -410,6 +410,46 @@ Status Tablet::InsertUnlocked(WriteTransactionState *tx_state,
   return s;
 }
 
+vector<RowSet*> Tablet::FindRowSetsToCheck(RowOp* mutate,
+                                           const TabletComponents* comps) {
+  vector<RowSet*> to_check;
+  if (PREDICT_TRUE(!mutate->orig_result_from_log_)) {
+    // TODO: could iterate the rowsets in a smart order
+    // based on recent statistics - eg if a rowset is getting
+    // updated frequently, pick that one first.
+    comps->rowsets->FindRowSetsWithKeyInRange(mutate->key_probe->encoded_key_slice(),
+                                              &to_check);
+#ifndef NDEBUG
+    // The order in which the rowset tree returns its results doesn't have semantic
+    // relevance. We've had bugs in the past (eg KUDU-1341) which were obscured by
+    // relying on the order of rowsets here. So, in debug builds, we shuffle the
+    // order to encourage finding such bugs more easily.
+    std::random_shuffle(to_check.begin(), to_check.end());
+#endif
+    return to_check;
+  }
+
+  // If we are replaying an operation during bootstrap, then we already have a
+  // COMMIT message which tells us specifically which memory store to apply it to.
+  for (const auto& store : mutate->orig_result_from_log_->mutated_stores()) {
+    if (store.has_mrs_id()) {
+      to_check.push_back(comps->memrowset.get());
+    } else {
+      DCHECK(store.has_rs_id());
+      RowSet* drs = comps->rowsets->drs_by_id(store.rs_id());
+      if (PREDICT_TRUE(drs)) {
+        to_check.push_back(drs);
+      }
+
+      // If for some reason we didn't find any stores that the COMMIT message indicated,
+      // then 'to_check' will be empty at this point. That will result in a NotFound()
+      // status below, which the bootstrap code catches and propagates as a tablet
+      // corruption.
+    }
+  }
+  return to_check;
+}
+
 Status Tablet::MutateRowUnlocked(WriteTransactionState *tx_state,
                                  RowOp* mutate) {
   DCHECK(tx_state != nullptr) << "you must have a WriteTransactionState";
@@ -457,12 +497,7 @@ Status Tablet::MutateRowUnlocked(WriteTransactionState *tx_state,
 
   // Next, check the disk rowsets.
 
-  // TODO: could iterate the rowsets in a smart order
-  // based on recent statistics - eg if a rowset is getting
-  // updated frequently, pick that one first.
-  vector<RowSet *> to_check;
-  comps->rowsets->FindRowSetsWithKeyInRange(mutate->key_probe->encoded_key_slice(),
-                                            &to_check);
+  vector<RowSet *> to_check = FindRowSetsToCheck(mutate, comps);
   for (RowSet *rs : to_check) {
     s = rs->MutateRow(ts,
                       *mutate->key_probe,

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/76231cab/src/kudu/tablet/tablet.h
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet.h b/src/kudu/tablet/tablet.h
index 3a05f64..8edf396 100644
--- a/src/kudu/tablet/tablet.h
+++ b/src/kudu/tablet/tablet.h
@@ -390,6 +390,12 @@ class Tablet {
   Status MutateRowUnlocked(WriteTransactionState *tx_state,
                            RowOp* mutate);
 
+  // Return the list of RowSets that need to be consulted when processing the
+  // given mutation.
+  static std::vector<RowSet*> FindRowSetsToCheck(RowOp* mutate,
+                                                 const TabletComponents* comps);
+
+
   // Capture a set of iterators which, together, reflect all of the data in the tablet.
   //
   // These iterators are not true snapshot iterators, but they are safe against

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/76231cab/src/kudu/tablet/tablet_bootstrap.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index 38d119a..ec8ec09 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -1277,6 +1277,7 @@ Status TabletBootstrap::FilterAndApplyOperations(WriteTransactionState* tx_state
   int32_t op_idx = 0;
   for (RowOp* op : tx_state->row_ops()) {
     const OperationResultPB& orig_op_result = orig_result.ops(op_idx++);
+    op->set_original_result_from_log(&orig_op_result);
 
     // check if the operation failed in the original transaction
     if (PREDICT_FALSE(orig_op_result.has_failed_status())) {

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/76231cab/src/kudu/tserver/tablet_server-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_server-test.cc b/src/kudu/tserver/tablet_server-test.cc
index 5d54643..fe57f9f 100644
--- a/src/kudu/tserver/tablet_server-test.cc
+++ b/src/kudu/tserver/tablet_server-test.cc
@@ -874,6 +874,41 @@ TEST_F(TabletServerTest, TestKUDU_176_RecoveryAfterMajorDeltaCompaction) {
   ANFF(VerifyRows(schema_, { KeyValue(1, 2) }));
 }
 
+// Regression test for KUDU-1341, a case in which, during bootstrap,
+// we have a DELETE for a row which is still live in multiple on-disk
+// rowsets.
+TEST_F(TabletServerTest, TestKUDU_1341) {
+  const int kTid = 0;
+
+  for (int i = 0; i < 3; i++) {
+    // Insert a row to DMS and flush it.
+    ANFF(InsertTestRowsRemote(kTid, 1, 1));
+    ASSERT_OK(tablet_peer_->tablet()->Flush());
+
+    // Update and delete row (in DMS)
+    ANFF(UpdateTestRowRemote(kTid, 1, i));
+    ANFF(DeleteTestRowsRemote(1, 1));
+  }
+
+  // Insert row again, update it in MRS before flush, and
+  // flush.
+  ANFF(InsertTestRowsRemote(kTid, 1, 1));
+  ANFF(UpdateTestRowRemote(kTid, 1, 12345));
+  ASSERT_OK(tablet_peer_->tablet()->Flush());
+
+  ANFF(VerifyRows(schema_, { KeyValue(1, 12345) }));
+
+  // Test restart.
+  ASSERT_OK(ShutdownAndRebuildTablet());
+  ANFF(VerifyRows(schema_, { KeyValue(1, 12345) }));
+  ASSERT_OK(tablet_peer_->tablet()->Flush());
+  ANFF(VerifyRows(schema_, { KeyValue(1, 12345) }));
+
+  // Test compaction after restart.
+  ASSERT_OK(tablet_peer_->tablet()->Compact(Tablet::FORCE_COMPACT_ALL));
+  ANFF(VerifyRows(schema_, { KeyValue(1, 12345) }));
+}
+
 // Regression test for KUDU-177. Ensures that after a major delta compaction,
 // rows that were in the old DRS's DMS are properly replayed.
 TEST_F(TabletServerTest, TestKUDU_177_RecoveryOfDMSEditsAfterMajorDeltaCompaction) {