You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by to...@apache.org on 2016/06/23 02:56:38 UTC
incubator-kudu git commit: KUDU-1477. Pending COMMIT message for
failed write operation can prevent tablet startup
Repository: incubator-kudu
Updated Branches:
refs/heads/master dceb0015d -> 6894438a4
KUDU-1477. Pending COMMIT message for failed write operation can prevent tablet startup
This fixes a bug seen in a recent YCSB stress test that I ran
in which I was accidentally writing tens of thousands of duplicate
keys per second. After a tablet server restarted, it failed to come
up due to a pending commit which referred to no mutated stores
(e.g. because all of the operations were duplicate key inserts).
This patch tweaks the logic for this safety check: a commit with no
mutated stores trivially has "no active stores". However, that's not
the same as having "only inactive stores" -- the subtlety is in the
difference in behavior when a commit has no stores at all.
The patch adds a new targeted test in tablet_bootstrap-test as well as
a more end-to-end test in ts_recovery-itest. Both reproduced the bug
reliably before this patch.
Change-Id: I8ecf8d780de1aa89fae4e0510d8291eb1f1cee11
Reviewed-on: http://gerrit.cloudera.org:8080/3321
Tested-by: Kudu Jenkins
Reviewed-by: David Ribeiro Alves <dr...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/6894438a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/6894438a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/6894438a
Branch: refs/heads/master
Commit: 6894438a406a635dc8a8f3bd77862294163cc7fb
Parents: dceb001
Author: Todd Lipcon <to...@apache.org>
Authored: Tue Jun 7 00:41:31 2016 +0200
Committer: Todd Lipcon <to...@apache.org>
Committed: Thu Jun 23 02:54:52 2016 +0000
----------------------------------------------------------------------
src/kudu/consensus/log-test-base.h | 19 ++++++++
.../integration-tests/raft_consensus-itest.cc | 2 +-
src/kudu/integration-tests/test_workload.cc | 11 +++--
src/kudu/integration-tests/test_workload.h | 23 +++++++--
src/kudu/integration-tests/ts_recovery-itest.cc | 50 +++++++++++++++++---
src/kudu/tablet/tablet_bootstrap-test.cc | 24 ++++++++++
src/kudu/tablet/tablet_bootstrap.cc | 42 ++++++++++++----
7 files changed, 148 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6894438a/src/kudu/consensus/log-test-base.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/log-test-base.h b/src/kudu/consensus/log-test-base.h
index 05223ba..8c0b0b2 100644
--- a/src/kudu/consensus/log-test-base.h
+++ b/src/kudu/consensus/log-test-base.h
@@ -267,6 +267,25 @@ class LogTestBase : public KuduTest {
AppendCommit(std::move(commit), sync);
}
+ // Append a COMMIT message for 'original_opid', but with results
+ // indicating that the associated writes failed due to
+ // "NotFound" errors.
+ void AppendCommitWithNotFoundOpResults(const OpId& original_opid) {
+ gscoped_ptr<CommitMsg> commit(new CommitMsg);
+ commit->set_op_type(WRITE_OP);
+
+ commit->mutable_commited_op_id()->CopyFrom(original_opid);
+
+ TxResultPB* result = commit->mutable_result();
+
+ OperationResultPB* insert = result->add_ops();
+ StatusToPB(Status::NotFound("fake failed write"), insert->mutable_failed_status());
+ OperationResultPB* mutate = result->add_ops();
+ StatusToPB(Status::NotFound("fake failed write"), mutate->mutable_failed_status());
+
+ AppendCommit(std::move(commit));
+ }
+
void AppendCommit(gscoped_ptr<CommitMsg> commit, bool sync = APPEND_SYNC) {
if (sync) {
Synchronizer s;
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6894438a/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 36e2b6e..e112b82 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -2461,7 +2461,7 @@ TEST_F(RaftConsensusITest, TestHammerOneRow) {
TestWorkload workload(cluster_.get());
workload.set_table_name(kTableId);
- workload.set_pathological_one_row_enabled(true);
+ workload.set_write_pattern(TestWorkload::UPDATE_ONE_ROW);
workload.set_num_write_threads(20);
workload.Setup();
workload.Start();
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6894438a/src/kudu/integration-tests/test_workload.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/test_workload.cc b/src/kudu/integration-tests/test_workload.cc
index 44c2039..541647a 100644
--- a/src/kudu/integration-tests/test_workload.cc
+++ b/src/kudu/integration-tests/test_workload.cc
@@ -58,7 +58,6 @@ TestWorkload::TestWorkload(ExternalMiniCluster* cluster)
write_timeout_millis_(20000),
timeout_allowed_(false),
not_found_allowed_(false),
- pathological_one_row_enabled_(false),
num_replicas_(3),
num_tablets_(1),
table_name_(kDefaultTableName),
@@ -106,7 +105,7 @@ void TestWorkload::WriteThread() {
while (should_run_.Load()) {
for (int i = 0; i < write_batch_size_; i++) {
- if (pathological_one_row_enabled_) {
+ if (write_pattern_ == UPDATE_ONE_ROW) {
gscoped_ptr<KuduUpdate> update(table->NewUpdate());
KuduPartialRow* row = update->mutable_row();
CHECK_OK(row->SetInt32(0, 0));
@@ -115,7 +114,11 @@ void TestWorkload::WriteThread() {
} else {
gscoped_ptr<KuduInsert> insert(table->NewInsert());
KuduPartialRow* row = insert->mutable_row();
- CHECK_OK(row->SetInt32(0, r.Next()));
+ int32_t key = r.Next();
+ if (write_pattern_ == INSERT_WITH_MANY_DUP_KEYS) {
+ key %= kNumRowsForDuplicateKeyWorkload;
+ }
+ CHECK_OK(row->SetInt32(0, key));
CHECK_OK(row->SetInt32(1, r.Next()));
string test_payload("hello world");
if (payload_bytes_ != 11) {
@@ -205,7 +208,7 @@ void TestWorkload::Setup() {
}
- if (pathological_one_row_enabled_) {
+ if (write_pattern_ == UPDATE_ONE_ROW) {
shared_ptr<KuduSession> session = client_->NewSession();
session->SetTimeoutMillis(20000);
CHECK_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6894438a/src/kudu/integration-tests/test_workload.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/test_workload.h b/src/kudu/integration-tests/test_workload.h
index 307ae92..0c271b9 100644
--- a/src/kudu/integration-tests/test_workload.h
+++ b/src/kudu/integration-tests/test_workload.h
@@ -97,8 +97,25 @@ class TestWorkload {
return table_name_;
}
- void set_pathological_one_row_enabled(bool enabled) {
- pathological_one_row_enabled_ = enabled;
+ static const int kNumRowsForDuplicateKeyWorkload = 20;
+
+ enum WritePattern {
+ // The default: insert random row keys. This may cause an occasional
+ // duplicate, but with 32-bit keys, they won't be frequent.
+ INSERT_RANDOM_ROWS,
+
+ // All threads generate updates against a single row.
+ UPDATE_ONE_ROW,
+
+ // Insert rows in random order, but restricted to only
+ // kNumRowsForDuplicateKeyWorkload unique keys. This ensures that,
+ // after a very short initial warm-up period, all inserts fail with
+ // duplicate keys.
+ INSERT_WITH_MANY_DUP_KEYS
+ };
+
+ void set_write_pattern(WritePattern pattern) {
+ write_pattern_ = pattern;
}
// Sets up the internal client and creates the table which will be used for
@@ -139,7 +156,7 @@ class TestWorkload {
int write_timeout_millis_;
bool timeout_allowed_;
bool not_found_allowed_;
- bool pathological_one_row_enabled_;
+ WritePattern write_pattern_ = INSERT_RANDOM_ROWS;
int num_replicas_;
int num_tablets_;
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6894438a/src/kudu/integration-tests/ts_recovery-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/ts_recovery-itest.cc b/src/kudu/integration-tests/ts_recovery-itest.cc
index d9b2b15..c0afd9a 100644
--- a/src/kudu/integration-tests/ts_recovery-itest.cc
+++ b/src/kudu/integration-tests/ts_recovery-itest.cc
@@ -86,9 +86,7 @@ TEST_F(TsRecoveryITest, TestRestartWithOrphanedReplicates) {
work.Start();
// Wait for the process to crash due to the injected fault.
- while (cluster_->tablet_server(0)->IsProcessAlive()) {
- SleepFor(MonoDelta::FromMilliseconds(10));
- }
+ ASSERT_OK(cluster_->tablet_server(0)->WaitForCrash(MonoDelta::FromSeconds(30)));
// Stop the writers.
work.StopAndJoin();
@@ -110,6 +108,47 @@ TEST_F(TsRecoveryITest, TestRestartWithOrphanedReplicates) {
MonoDelta::FromSeconds(20)));
}
+// Regression test for KUDU-1477: a pending commit message would cause
+// bootstrap to fail if that message only included errors and no
+// successful operations.
+TEST_F(TsRecoveryITest, TestRestartWithPendingCommitFromFailedOp) {
+ NO_FATALS(StartCluster());
+ cluster_->SetFlag(cluster_->tablet_server(0),
+ "fault_crash_before_append_commit", "0.01");
+
+ // Set up the workload to write many duplicate rows, and with only
+ // one operation per batch. This means that by the time we crash
+ // it's likely that most of the recently appended commit messages
+ // are for failed insertions (dup key). We also use many threads
+ // to increase the probability that commits will be written
+ // out-of-order and trigger the bug.
+ TestWorkload work(cluster_.get());
+ work.set_num_replicas(1);
+ work.set_num_write_threads(20);
+ work.set_write_timeout_millis(100);
+ work.set_timeout_allowed(true);
+ work.set_write_batch_size(1);
+ work.set_write_pattern(TestWorkload::INSERT_WITH_MANY_DUP_KEYS);
+ work.Setup();
+ work.Start();
+
+ // Wait for the process to crash due to the injected fault.
+ ASSERT_OK(cluster_->tablet_server(0)->WaitForCrash(MonoDelta::FromSeconds(10)));
+
+ // Stop the writers.
+ work.StopAndJoin();
+
+ // Restart the server, and it should recover.
+ cluster_->tablet_server(0)->Shutdown();
+ ASSERT_OK(cluster_->tablet_server(0)->Restart());
+
+ ClusterVerifier v(cluster_.get());
+ NO_FATALS(v.CheckRowCountWithRetries(work.table_name(),
+ ClusterVerifier::AT_LEAST,
+ work.rows_inserted(),
+ MonoDelta::FromSeconds(20)));
+}
+
// Test that we replay from the recovery directory, if it exists.
TEST_F(TsRecoveryITest, TestCrashDuringLogReplay) {
NO_FATALS(StartCluster({ "--fault_crash_during_log_replay=0.05" }));
@@ -137,10 +176,7 @@ TEST_F(TsRecoveryITest, TestCrashDuringLogReplay) {
ignore_result(cluster_->tablet_server(0)->Restart());
// Wait for the process to crash during log replay.
- for (int i = 0; i < 3000 && cluster_->tablet_server(0)->IsProcessAlive(); i++) {
- SleepFor(MonoDelta::FromMilliseconds(10));
- }
- ASSERT_FALSE(cluster_->tablet_server(0)->IsProcessAlive()) << "TS didn't crash!";
+ ASSERT_OK(cluster_->tablet_server(0)->WaitForCrash(MonoDelta::FromSeconds(30)));
// Now remove the crash flag, so the next replay will complete, and restart
// the server once more.
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6894438a/src/kudu/tablet/tablet_bootstrap-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap-test.cc b/src/kudu/tablet/tablet_bootstrap-test.cc
index f42deff..9713073 100644
--- a/src/kudu/tablet/tablet_bootstrap-test.cc
+++ b/src/kudu/tablet/tablet_bootstrap-test.cc
@@ -252,6 +252,30 @@ TEST_F(BootstrapTest, TestOrphanCommit) {
}
}
+// Regression test for KUDU-1477: we should successfully start up
+// even if a pending commit contains only failed operations.
+TEST_F(BootstrapTest, TestPendingFailedCommit) {
+ ASSERT_OK(BuildLog());
+
+ OpId opid_1 = MakeOpId(1, current_index_++);
+ OpId opid_2 = MakeOpId(1, current_index_++);
+
+ // Step 2) Write the corresponding COMMIT in the second segment,
+ // with a status indicating that the writes had 'NotFound' results.
+ AppendReplicateBatch(opid_1);
+ AppendReplicateBatch(opid_2);
+ AppendCommitWithNotFoundOpResults(opid_2);
+
+ {
+ shared_ptr<Tablet> tablet;
+ ConsensusBootstrapInfo boot_info;
+
+ // Step 3) Apply the operations in the log to the tablet and flush
+ // the tablet to disk.
+ ASSERT_OK(BootstrapTestTablet(-1, -1, &tablet, &boot_info));
+ }
+}
+
// Tests this scenario:
// Orphan COMMIT with id <= current mrs id, followed by a REPLICATE
// message with mrs_id > current mrs_id, and a COMMIT message for that
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/6894438a/src/kudu/tablet/tablet_bootstrap.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tablet/tablet_bootstrap.cc b/src/kudu/tablet/tablet_bootstrap.cc
index 6c26f65..3b96f11 100644
--- a/src/kudu/tablet/tablet_bootstrap.cc
+++ b/src/kudu/tablet/tablet_bootstrap.cc
@@ -257,9 +257,26 @@ class TabletBootstrap {
Status FilterOperation(const OperationResultPB& op_result,
bool* already_flushed);
- // Returns true if any of the memory stores referenced in 'commit' are still
- // active, in which case the operation needs to be replayed.
- bool AreAnyStoresActive(const CommitMsg& commit);
+ enum ActiveStores {
+ // The OperationResultPBs in the commit message do not reference any stores.
+ // This can happen in the case that the operations did not result in any mutations
+ // (e.g. because they were updates for not-found row keys).
+ NO_MUTATED_STORES,
+
+ // At least one operation resulted in a mutation to a store, but none of the
+ // mutated stores are still active. Therefore the operation does not need to
+ // be replayed.
+ NO_STORES_ACTIVE,
+
+ // At least one operation resulted in a mutation to a store, and at least
+ // one of those mutated stores is still active. This implies that the operation
+ // needs to be replayed.
+ SOME_STORES_ACTIVE
+ };
+
+ // For the given commit message, analyze which memory stores were mutated
+ // by the operation, returning one of the enum values above.
+ ActiveStores AnalyzeActiveStores(const CommitMsg& commit);
void DumpReplayStateToLog(const ReplayState& state);
@@ -870,19 +887,27 @@ Status TabletBootstrap::HandleCommitMessage(ReplayState* state, LogEntryPB* comm
return Status::OK();
}
-bool TabletBootstrap::AreAnyStoresActive(const CommitMsg& commit) {
+TabletBootstrap::ActiveStores TabletBootstrap::AnalyzeActiveStores(const CommitMsg& commit) {
+ bool has_mutated_stores = false;
+ bool has_active_stores = false;
+
for (const OperationResultPB& op_result : commit.result().ops()) {
for (const MemStoreTargetPB& mutated_store : op_result.mutated_stores()) {
+ has_mutated_stores = true;
if (flushed_stores_.IsMemStoreActive(mutated_store)) {
- return true;
+ has_active_stores = true;
}
}
}
- return false;
+
+ if (!has_mutated_stores) {
+ return NO_MUTATED_STORES;
+ }
+ return has_active_stores ? SOME_STORES_ACTIVE : NO_STORES_ACTIVE;
}
Status TabletBootstrap::CheckOrphanedCommitDoesntNeedReplay(const CommitMsg& commit) {
- if (AreAnyStoresActive(commit)) {
+ if (AnalyzeActiveStores(commit) == SOME_STORES_ACTIVE) {
TabletSuperBlockPB super;
WARN_NOT_OK(meta_->ToSuperBlock(&super), LogPrefix() + "Couldn't build TabletSuperBlockPB");
return Status::Corruption(Substitute("CommitMsg was orphaned but it referred to "
@@ -1085,8 +1110,9 @@ Status TabletBootstrap::PlaySegments(ConsensusBootstrapInfo* consensus_info) {
DumpReplayStateToLog(state);
return Status::Corruption("Had orphaned commits at the end of replay.");
}
+
if (entry.second->commit().op_type() == WRITE_OP &&
- !AreAnyStoresActive(entry.second->commit())) {
+ AnalyzeActiveStores(entry.second->commit()) == NO_STORES_ACTIVE) {
DumpReplayStateToLog(state);
TabletSuperBlockPB super;
WARN_NOT_OK(meta_->ToSuperBlock(&super), "Couldn't build TabletSuperBlockPB.");