You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/06/23 02:56:38 UTC

incubator-kudu git commit: KUDU-1477. Pending COMMIT message for failed write operation can prevent tablet startup

Repository: incubator-kudu
Updated Branches:
  refs/heads/master dceb0015d -> 6894438a4


KUDU-1477. Pending COMMIT message for failed write operation can prevent tablet startup

This fixes a bug seen in a recent YCSB stress test that I ran
in which I was accidentally writing tens of thousands of duplicate
keys per second. After a tablet server restarted, it failed to come
up due to a pending commit which referred to no mutated stores
(e.g. because all of the operations were duplicate key inserts).

This patch tweaks the logic for this safety check: a commit with no
mutated stores trivially has "no active stores". However, that's not
the same as having "only inactive stores" -- the subtlety is in the
difference in behavior when a commit has no stores at all.

The patch adds a new targeted test in tablet_bootstrap-test as well as
a more end-to-end test in ts_recovery-itest. Both reproduced the bug
reliably before this patch.

Change-Id: I8ecf8d780de1aa89fae4e0510d8291eb1f1cee11
Reviewed-on: http://gerrit.cloudera.org:8080/3321
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dr...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/6894438a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/6894438a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/6894438a

Branch: refs/heads/master
Commit: 6894438a406a635dc8a8f3bd77862294163cc7fb
Parents: dceb001
Author: Todd Lipcon <to...@apache.org>
Authored: Tue Jun 7 00:41:31 2016 +0200
Committer: Todd Lipcon <to...@apache.org>
Committed: Thu Jun 23 02:54:52 2016 +0000

----------------------------------------------------------------------
 src/kudu/consensus/log-test-base.h              | 19 ++++++++
 .../integration-tests/raft_consensus-itest.cc   |  2 +-
 src/kudu/integration-tests/test_workload.cc     | 11 +++--
 src/kudu/integration-tests/test_workload.h      | 23 +++++++--
 src/kudu/integration-tests/ts_recovery-itest.cc | 50 +++++++++++++++++---
 src/kudu/tablet/tablet_bootstrap-test.cc        | 24 ++++++++++
 src/kudu/tablet/tablet_bootstrap.cc             | 42 ++++++++++++----
 7 files changed, 148 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6894438a/src/kudu/consensus/log-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log-test-base.h b/src/kudu/consensus/log-test-base.h
index 05223ba..8c0b0b2 100644
--- a/src/kudu/consensus/log-test-base.h
+++ b/src/kudu/consensus/log-test-base.h
@@ -267,6 +267,25 @@ class LogTestBase : public KuduTest {
     AppendCommit(std::move(commit), sync);
   }
 
+  // Append a COMMIT message for 'original_opid', but with results
+  // indicating that the associated writes failed due to
+  // "NotFound" errors.
+  void AppendCommitWithNotFoundOpResults(const OpId& original_opid) {
+    gscoped_ptr<CommitMsg> commit(new CommitMsg);
+    commit->set_op_type(WRITE_OP);
+
+    commit->mutable_commited_op_id()->CopyFrom(original_opid);
+
+    TxResultPB* result = commit->mutable_result();
+
+    OperationResultPB* insert = result->add_ops();
+    StatusToPB(Status::NotFound("fake failed write"), insert->mutable_failed_status());
+    OperationResultPB* mutate = result->add_ops();
+    StatusToPB(Status::NotFound("fake failed write"), mutate->mutable_failed_status());
+
+    AppendCommit(std::move(commit));
+  }
+
   void AppendCommit(gscoped_ptr<CommitMsg> commit, bool sync = APPEND_SYNC) {
     if (sync) {
       Synchronizer s;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6894438a/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 36e2b6e..e112b82 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -2461,7 +2461,7 @@ TEST_F(RaftConsensusITest, TestHammerOneRow) {
 
   TestWorkload workload(cluster_.get());
   workload.set_table_name(kTableId);
-  workload.set_pathological_one_row_enabled(true);
+  workload.set_write_pattern(TestWorkload::UPDATE_ONE_ROW);
   workload.set_num_write_threads(20);
   workload.Setup();
   workload.Start();

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6894438a/src/kudu/integration-tests/test_workload.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/test_workload.cc b/src/kudu/integration-tests/test_workload.cc
index 44c2039..541647a 100644
--- a/src/kudu/integration-tests/test_workload.cc
+++ b/src/kudu/integration-tests/test_workload.cc
@@ -58,7 +58,6 @@ TestWorkload::TestWorkload(ExternalMiniCluster* cluster)
     write_timeout_millis_(20000),
     timeout_allowed_(false),
     not_found_allowed_(false),
-    pathological_one_row_enabled_(false),
     num_replicas_(3),
     num_tablets_(1),
     table_name_(kDefaultTableName),
@@ -106,7 +105,7 @@ void TestWorkload::WriteThread() {
 
   while (should_run_.Load()) {
     for (int i = 0; i < write_batch_size_; i++) {
-      if (pathological_one_row_enabled_) {
+      if (write_pattern_ == UPDATE_ONE_ROW) {
         gscoped_ptr<KuduUpdate> update(table->NewUpdate());
         KuduPartialRow* row = update->mutable_row();
         CHECK_OK(row->SetInt32(0, 0));
@@ -115,7 +114,11 @@ void TestWorkload::WriteThread() {
       } else {
         gscoped_ptr<KuduInsert> insert(table->NewInsert());
         KuduPartialRow* row = insert->mutable_row();
-        CHECK_OK(row->SetInt32(0, r.Next()));
+        int32_t key = r.Next();
+        if (write_pattern_ == INSERT_WITH_MANY_DUP_KEYS) {
+          key %= kNumRowsForDuplicateKeyWorkload;
+        }
+        CHECK_OK(row->SetInt32(0, key));
         CHECK_OK(row->SetInt32(1, r.Next()));
         string test_payload("hello world");
         if (payload_bytes_ != 11) {
@@ -205,7 +208,7 @@ void TestWorkload::Setup() {
   }
 
 
-  if (pathological_one_row_enabled_) {
+  if (write_pattern_ == UPDATE_ONE_ROW) {
     shared_ptr<KuduSession> session = client_->NewSession();
     session->SetTimeoutMillis(20000);
     CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6894438a/src/kudu/integration-tests/test_workload.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/test_workload.h b/src/kudu/integration-tests/test_workload.h
index 307ae92..0c271b9 100644
--- a/src/kudu/integration-tests/test_workload.h
+++ b/src/kudu/integration-tests/test_workload.h
@@ -97,8 +97,25 @@ class TestWorkload {
     return table_name_;
   }
 
-  void set_pathological_one_row_enabled(bool enabled) {
-    pathological_one_row_enabled_ = enabled;
+  static const int kNumRowsForDuplicateKeyWorkload = 20;
+
+  enum WritePattern {
+    // The default: insert random row keys. This may cause an occasional
+    // duplicate, but with 32-bit keys, they won't be frequent.
+    INSERT_RANDOM_ROWS,
+
+    // All threads generate updates against a single row.
+    UPDATE_ONE_ROW,
+
+    // Insert rows in random order, but restricted to only
+    // kNumRowsForDuplicateKeyWorkload unique keys. This ensures that,
+    // after a very short initial warm-up period, all inserts fail with
+    // duplicate keys.
+    INSERT_WITH_MANY_DUP_KEYS
+  };
+
+  void set_write_pattern(WritePattern pattern) {
+    write_pattern_ = pattern;
   }
 
   // Sets up the internal client and creates the table which will be used for
@@ -139,7 +156,7 @@ class TestWorkload {
   int write_timeout_millis_;
   bool timeout_allowed_;
   bool not_found_allowed_;
-  bool pathological_one_row_enabled_;
+  WritePattern write_pattern_ = INSERT_RANDOM_ROWS;
 
   int num_replicas_;
   int num_tablets_;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6894438a/src/kudu/integration-tests/ts_recovery-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/ts_recovery-itest.cc b/src/kudu/integration-tests/ts_recovery-itest.cc
index d9b2b15..c0afd9a 100644
--- a/src/kudu/integration-tests/ts_recovery-itest.cc
+++ b/src/kudu/integration-tests/ts_recovery-itest.cc
@@ -86,9 +86,7 @@ TEST_F(TsRecoveryITest, TestRestartWithOrphanedReplicates) {
   work.Start();
 
   // Wait for the process to crash due to the injected fault.
-  while (cluster_->tablet_server(0)->IsProcessAlive()) {
-    SleepFor(MonoDelta::FromMilliseconds(10));
-  }
+  ASSERT_OK(cluster_->tablet_server(0)->WaitForCrash(MonoDelta::FromSeconds(30)));
 
   // Stop the writers.
   work.StopAndJoin();
@@ -110,6 +108,47 @@ TEST_F(TsRecoveryITest, TestRestartWithOrphanedReplicates) {
                                        MonoDelta::FromSeconds(20)));
 }
 
+// Regression test for KUDU-1477: a pending commit message would cause
+// bootstrap to fail if that message only included errors and no
+// successful operations.
+TEST_F(TsRecoveryITest, TestRestartWithPendingCommitFromFailedOp) {
+  NO_FATALS(StartCluster());
+  cluster_->SetFlag(cluster_->tablet_server(0),
+                    "fault_crash_before_append_commit", "0.01");
+
+  // Set up the workload to write many duplicate rows, and with only
+  // one operation per batch. This means that by the time we crash
+  // it's likely that most of the recently appended commit messages
+  // are for failed insertions (dup key). We also use many threads
+  // to increase the probability that commits will be written
+  // out-of-order and trigger the bug.
+  TestWorkload work(cluster_.get());
+  work.set_num_replicas(1);
+  work.set_num_write_threads(20);
+  work.set_write_timeout_millis(100);
+  work.set_timeout_allowed(true);
+  work.set_write_batch_size(1);
+  work.set_write_pattern(TestWorkload::INSERT_WITH_MANY_DUP_KEYS);
+  work.Setup();
+  work.Start();
+
+  // Wait for the process to crash due to the injected fault.
+  ASSERT_OK(cluster_->tablet_server(0)->WaitForCrash(MonoDelta::FromSeconds(10)));
+
+  // Stop the writers.
+  work.StopAndJoin();
+
+  // Restart the server, and it should recover.
+  cluster_->tablet_server(0)->Shutdown();
+  ASSERT_OK(cluster_->tablet_server(0)->Restart());
+
+  ClusterVerifier v(cluster_.get());
+  NO_FATALS(v.CheckRowCountWithRetries(work.table_name(),
+                                       ClusterVerifier::AT_LEAST,
+                                       work.rows_inserted(),
+                                       MonoDelta::FromSeconds(20)));
+}
+
 // Test that we replay from the recovery directory, if it exists.
 TEST_F(TsRecoveryITest, TestCrashDuringLogReplay) {
   NO_FATALS(StartCluster({ "--fault_crash_during_log_replay=0.05" }));
@@ -137,10 +176,7 @@ TEST_F(TsRecoveryITest, TestCrashDuringLogReplay) {
   ignore_result(cluster_->tablet_server(0)->Restart());
 
   // Wait for the process to crash during log replay.
-  for (int i = 0; i < 3000 && cluster_->tablet_server(0)->IsProcessAlive(); i++) {
-    SleepFor(MonoDelta::FromMilliseconds(10));
-  }
-  ASSERT_FALSE(cluster_->tablet_server(0)->IsProcessAlive()) << "TS didn't crash!";
+  ASSERT_OK(cluster_->tablet_server(0)->WaitForCrash(MonoDelta::FromSeconds(30)));
 
   // Now remove the crash flag, so the next replay will complete, and restart
   // the server once more.

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6894438a/src/kudu/tablet/tablet_bootstrap-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap-test.cc b/src/kudu/tablet/tablet_bootstrap-test.cc
index f42deff..9713073 100644
--- a/src/kudu/tablet/tablet_bootstrap-test.cc
+++ b/src/kudu/tablet/tablet_bootstrap-test.cc
@@ -252,6 +252,30 @@ TEST_F(BootstrapTest, TestOrphanCommit) {
   }
 }
 
+// Regression test for KUDU-1477: we should successfully start up
+// even if a pending commit contains only failed operations.
+TEST_F(BootstrapTest, TestPendingFailedCommit) {
+  ASSERT_OK(BuildLog());
+
+  OpId opid_1 = MakeOpId(1, current_index_++);
+  OpId opid_2 = MakeOpId(1, current_index_++);
+
+  // Step 2) Write the corresponding COMMIT in the second segment,
+  // with a status indicating that the writes had 'NotFound' results.
+  AppendReplicateBatch(opid_1);
+  AppendReplicateBatch(opid_2);
+  AppendCommitWithNotFoundOpResults(opid_2);
+
+  {
+    shared_ptr<Tablet> tablet;
+    ConsensusBootstrapInfo boot_info;
+
+    // Step 3) Apply the operations in the log to the tablet and flush
+    // the tablet to disk.
+    ASSERT_OK(BootstrapTestTablet(-1, -1, &tablet, &boot_info));
+  }
+}
+
 // Tests this scenario:
 // Orphan COMMIT with id <= current mrs id, followed by a REPLICATE
 // message with mrs_id > current mrs_id, and a COMMIT message for that

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6894438a/src/kudu/tablet/tablet_bootstrap.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index 6c26f65..3b96f11 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -257,9 +257,26 @@ class TabletBootstrap {
   Status FilterOperation(const OperationResultPB& op_result,
                          bool* already_flushed);
 
-  // Returns true if any of the memory stores referenced in 'commit' are still
-  // active, in which case the operation needs to be replayed.
-  bool AreAnyStoresActive(const CommitMsg& commit);
+  enum ActiveStores {
+    // The OperationResultPBs in the commit message do not reference any stores.
+    // This can happen in the case that the operations did not result in any mutations
+    // (e.g. because they were updates for not-found row keys).
+    NO_MUTATED_STORES,
+
+    // At least one operation resulted in a mutation to a store, but none of the
+    // mutated stores are still active. Therefore the operation does not need to
+    // be replayed.
+    NO_STORES_ACTIVE,
+
+    // At least one operation resulted in a mutation to a store, and at least
+    // one of those mutated stores is still active. This implies that the operation
+    // needs to be replayed.
+    SOME_STORES_ACTIVE
+  };
+
+  // For the given commit message, analyze which memory stores were mutated
+  // by the operation, returning one of the enum values above.
+  ActiveStores AnalyzeActiveStores(const CommitMsg& commit);
 
   void DumpReplayStateToLog(const ReplayState& state);
 
@@ -870,19 +887,27 @@ Status TabletBootstrap::HandleCommitMessage(ReplayState* state, LogEntryPB* comm
   return Status::OK();
 }
 
-bool TabletBootstrap::AreAnyStoresActive(const CommitMsg& commit) {
+TabletBootstrap::ActiveStores TabletBootstrap::AnalyzeActiveStores(const CommitMsg& commit) {
+  bool has_mutated_stores = false;
+  bool has_active_stores = false;
+
   for (const OperationResultPB& op_result : commit.result().ops()) {
     for (const MemStoreTargetPB& mutated_store : op_result.mutated_stores()) {
+      has_mutated_stores = true;
       if (flushed_stores_.IsMemStoreActive(mutated_store)) {
-        return true;
+        has_active_stores = true;
       }
     }
   }
-  return false;
+
+  if (!has_mutated_stores) {
+    return NO_MUTATED_STORES;
+  }
+  return has_active_stores ? SOME_STORES_ACTIVE : NO_STORES_ACTIVE;
 }
 
 Status TabletBootstrap::CheckOrphanedCommitDoesntNeedReplay(const CommitMsg& commit) {
-  if (AreAnyStoresActive(commit)) {
+  if (AnalyzeActiveStores(commit) == SOME_STORES_ACTIVE) {
     TabletSuperBlockPB super;
     WARN_NOT_OK(meta_->ToSuperBlock(&super), LogPrefix() + "Couldn't build TabletSuperBlockPB");
     return Status::Corruption(Substitute("CommitMsg was orphaned but it referred to "
@@ -1085,8 +1110,9 @@ Status TabletBootstrap::PlaySegments(ConsensusBootstrapInfo* consensus_info) {
         DumpReplayStateToLog(state);
         return Status::Corruption("Had orphaned commits at the end of replay.");
       }
+
       if (entry.second->commit().op_type() == WRITE_OP &&
-          !AreAnyStoresActive(entry.second->commit())) {
+          AnalyzeActiveStores(entry.second->commit()) == NO_STORES_ACTIVE) {
         DumpReplayStateToLog(state);
         TabletSuperBlockPB super;
         WARN_NOT_OK(meta_->ToSuperBlock(&super), "Couldn't build TabletSuperBlockPB.");