You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2016/02/23 00:45:17 UTC
[3/3] incubator-kudu git commit: KUDU-1322. Failover client writes on
TABLET_NOT_FOUND
KUDU-1322. Failover client writes on TABLET_NOT_FOUND
Follow-up to KUDU-1054.
If a tablet is deleted from a tablet server, a client that is writing to
the tablet should re-fetch the metadata from the master and connect to
the new leader. If the table has been deleted, the client should get a
Status::NotFound error.
Also grepped for and removed TODOs mentioning KUDU-1054 and KUDU-1322.
Change-Id: I037608977c890cb5099fc74c1be741c93d99ae7a
Reviewed-on: http://gerrit.cloudera.org:8080/978
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans
Reviewed-by: Todd Lipcon <to...@apache.org>
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/2878c586
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/2878c586
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/2878c586
Branch: refs/heads/master
Commit: 2878c586259ecf00d7f2623c27b27e640cc799bd
Parents: b3efaf0
Author: Mike Percy <mp...@apache.org>
Authored: Tue Jan 26 15:39:41 2016 -0800
Committer: Jean-Daniel Cryans <jd...@gerrit.cloudera.org>
Committed: Mon Feb 22 23:44:39 2016 +0000
----------------------------------------------------------------------
src/kudu/client/batcher.cc | 115 +++++++++++--------
src/kudu/client/meta_cache.cc | 13 ++-
.../integration-tests/client_failover-itest.cc | 72 +++++++++---
.../integration-tests/remote_bootstrap-itest.cc | 6 -
4 files changed, 138 insertions(+), 68 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/2878c586/src/kudu/client/batcher.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index f5ad0bb..2b3e583 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -307,54 +307,58 @@ WriteRpc::~WriteRpc() {
void WriteRpc::SendRpc() {
// Choose a destination TS according to the following algorithm:
- // 1. Select the leader, provided:
- // a. One exists, and
+ // 1. If the tablet metadata is stale, refresh it (goto step 5).
+ // 2. Select the leader, provided:
+ // a. The current leader is known,
// b. It hasn't failed, and
// c. It isn't currently marked as a follower.
- // 2. If there's no good leader select another replica, provided:
+ // 3. If there's no good leader select another replica, provided:
// a. It hasn't failed, and
// b. It hasn't rejected our write due to being a follower.
- // 3. Preemptively mark the replica we selected in step 2 as "leader" in the
+ // 4. Preemptively mark the replica we selected in step 3 as "leader" in the
// meta cache, so that our selection remains sticky until the next Master
// metadata refresh.
- // 4. If we're out of appropriate replicas, force a lookup to the master
+ // 5. If we're out of appropriate replicas, force a lookup to the master
// to fetch new consensus configuration information.
- // 5. When the lookup finishes, forget which replicas were followers and
- // retry the write (i.e. goto 1).
- // 6. If we issue the write and it fails because the destination was a
- // follower, remember that fact and retry the write (i.e. goto 1).
- // 7. Repeat steps 1-6 until the write succeeds, fails for other reasons,
+ // 6. When the lookup finishes, forget which replicas were followers and
+ // retry the write (i.e. goto 2).
+ // 7. If we issue the write and it fails because the destination was a
+ // follower, remember that fact and retry the write (i.e. goto 2).
+ // 8. Repeat steps 1-7 until the write succeeds, fails for other reasons,
// or the write's deadline expires.
- current_ts_ = tablet_->LeaderTServer();
- if (current_ts_ && ContainsKey(followers_, current_ts_)) {
- VLOG(2) << "Tablet " << tablet_->tablet_id() << ": We have a follower for a leader: "
- << current_ts_->ToString();
-
- // Mark the node as a follower in the cache so that on the next go-round,
- // LeaderTServer() will not return it as a leader unless a full metadata
- // refresh has occurred. This also avoids LookupTabletByKey() going into
- // "fast path" mode and not actually performing a metadata refresh from the
- // Master when it needs to.
- tablet_->MarkTServerAsFollower(current_ts_);
- current_ts_ = NULL;
- }
- if (!current_ts_) {
- // Try to "guess" the next leader.
- vector<RemoteTabletServer*> replicas;
- tablet_->GetRemoteTabletServers(&replicas);
- for (RemoteTabletServer* ts : replicas) {
- if (!ContainsKey(followers_, ts)) {
- current_ts_ = ts;
- break;
- }
+ current_ts_ = nullptr;
+ if (!tablet_->stale()) {
+ current_ts_ = tablet_->LeaderTServer();
+ if (current_ts_ && ContainsKey(followers_, current_ts_)) {
+ VLOG(2) << "Tablet " << tablet_->tablet_id() << ": We have a follower for a leader: "
+ << current_ts_->ToString();
+
+ // Mark the node as a follower in the cache so that on the next go-round,
+ // LeaderTServer() will not return it as a leader unless a full metadata
+ // refresh has occurred. This also avoids LookupTabletByKey() going into
+ // "fast path" mode and not actually performing a metadata refresh from the
+ // Master when it needs to.
+ tablet_->MarkTServerAsFollower(current_ts_);
+ current_ts_ = NULL;
}
- if (current_ts_) {
- // Mark this next replica "preemptively" as the leader in the meta cache,
- // so we go to it first on the next write if writing was successful.
- VLOG(1) << "Tablet " << tablet_->tablet_id() << ": Previous leader failed. "
- << "Preemptively marking tserver " << current_ts_->ToString()
- << " as leader in the meta cache.";
- tablet_->MarkTServerAsLeader(current_ts_);
+ if (!current_ts_) {
+ // Try to "guess" the next leader.
+ vector<RemoteTabletServer*> replicas;
+ tablet_->GetRemoteTabletServers(&replicas);
+ for (RemoteTabletServer* ts : replicas) {
+ if (!ContainsKey(followers_, ts)) {
+ current_ts_ = ts;
+ break;
+ }
+ }
+ if (current_ts_) {
+ // Mark this next replica "preemptively" as the leader in the meta cache,
+ // so we go to it first on the next write if writing was successful.
+ VLOG(1) << "Tablet " << tablet_->tablet_id() << ": Previous leader failed. "
+ << "Preemptively marking tserver " << current_ts_->ToString()
+ << " as leader in the meta cache.";
+ tablet_->MarkTServerAsLeader(current_ts_);
+ }
}
}
@@ -391,6 +395,14 @@ string WriteRpc::ToString() const {
}
void WriteRpc::LookupTabletCb(const Status& status) {
+ // If the table was deleted, the master will return TABLE_DELETED and the
+ // meta cache will return Status::NotFound.
+ if (status.IsNotFound()) {
+ batcher_->ProcessWriteResponse(*this, status);
+ delete this;
+ return;
+ }
+
// We should retry the RPC regardless of the outcome of the lookup, as
// leader election doesn't depend on the existence of a master at all.
//
@@ -447,13 +459,24 @@ void WriteRpc::SendRpcCb(const Status& status) {
new_status = StatusFromPB(resp_.error().status());
}
- // Oops, we failed over to a replica that wasn't a LEADER. Unlikely as
- // we're using consensus configuration information from the master, but still possible
- // (e.g. leader restarted and became a FOLLOWER). Try again.
- //
- // TODO: IllegalState is obviously way too broad an error category for
- // this case.
- if (new_status.IsIllegalState() || new_status.IsAborted()) {
+ if (resp_.has_error() && resp_.error().code() == tserver::TabletServerErrorPB::TABLET_NOT_FOUND) {
+ // If we get TABLET_NOT_FOUND, the replica we thought was leader has been
+ // deleted. We mark our tablet cache as stale, forcing a master lookup on
+ // the next attempt.
+ tablet_->MarkStale();
+ // TODO: Don't backoff the first time we hit this error (see KUDU-1314).
+ mutable_retrier()->DelayedRetry(this, StatusFromPB(resp_.error().status()));
+ return;
+ } else if (new_status.IsIllegalState() || new_status.IsAborted()) {
+ // Alternatively, when we get a status code of IllegalState or Aborted, we
+ // assume this means that the replica we attempted to write to is not the
+ // current leader (maybe it got partitioned or slow and another node took
+ // over). We attempt to fail over to another replica in the config.
+ //
+ // TODO: This error handling block should really be rewritten to handle
+ // specific error codes exclusively instead of Status codes (this may
+ // require some server-side changes). For example, IllegalState is
+ // obviously way too broad an error category for this case.
followers_.insert(current_ts_);
mutable_retrier()->DelayedRetry(this, new_status);
return;
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/2878c586/src/kudu/client/meta_cache.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/meta_cache.cc b/src/kudu/client/meta_cache.cc
index 61ae145..80ad940 100644
--- a/src/kudu/client/meta_cache.cc
+++ b/src/kudu/client/meta_cache.cc
@@ -477,14 +477,19 @@ void LookupRpc::NewLeaderMasterDeterminedCb(const Status& status) {
void LookupRpc::SendRpcCb(const Status& status) {
gscoped_ptr<LookupRpc> delete_me(this); // delete on scope exit
- // Prefer early failures over controller failures.
+ if (!status.ok()) {
+ // Non-RPC failure. We only support TimedOut for LookupRpc.
+ CHECK(status.IsTimedOut()) << status.ToString();
+ }
+
Status new_status = status;
+ // Check for generic RPC errors.
if (new_status.ok() && mutable_retrier()->HandleResponse(this, &new_status)) {
ignore_result(delete_me.release());
return;
}
- // Prefer controller failures over response failures.
+ // Check for specific application response errors.
if (new_status.ok() && resp_.has_error()) {
if (resp_.error().code() == master::MasterErrorPB::NOT_THE_LEADER ||
resp_.error().code() == master::MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED) {
@@ -498,6 +503,7 @@ void LookupRpc::SendRpcCb(const Status& status) {
new_status = StatusFromPB(resp_.error().status());
}
+ // Check for more generic errors (TimedOut can come from multiple places).
if (new_status.IsTimedOut()) {
if (MonoTime::Now(MonoTime::FINE).ComesBefore(retrier().deadline())) {
if (meta_cache_->client_->IsMultiMaster()) {
@@ -523,7 +529,8 @@ void LookupRpc::SendRpcCb(const Status& status) {
}
}
- // Prefer response failures over no tablets found.
+ // Finally, ensure that there were tablet replicas found. If not, consider
+ // that an error.
if (new_status.ok() && resp_.tablet_locations_size() == 0) {
new_status = Status::NotFound("No such tablet found");
}
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/2878c586/src/kudu/integration-tests/client_failover-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/client_failover-itest.cc b/src/kudu/integration-tests/client_failover-itest.cc
index 535d394..07b547a 100644
--- a/src/kudu/integration-tests/client_failover-itest.cc
+++ b/src/kudu/integration-tests/client_failover-itest.cc
@@ -27,7 +27,10 @@
#include "kudu/integration-tests/test_workload.h"
using kudu::client::CountTableRows;
+using kudu::client::KuduInsert;
+using kudu::client::KuduSession;
using kudu::client::KuduTable;
+using kudu::client::KuduUpdate;
using kudu::client::sp::shared_ptr;
using kudu::itest::TServerDetails;
using kudu::tablet::TABLET_DATA_TOMBSTONED;
@@ -38,14 +41,22 @@ using std::unordered_map;
namespace kudu {
+enum ClientTestBehavior {
+ kWrite,
+ kRead,
+ kReadWrite
+};
+
// Integration test for client failover behavior.
-class ClientFailoverITest : public ExternalMiniClusterITestBase {
+class ClientFailoverParamITest : public ExternalMiniClusterITestBase,
+ public ::testing::WithParamInterface<ClientTestBehavior> {
};
// Test that we can delete the leader replica while scanning it and still get
// results back.
-TEST_F(ClientFailoverITest, TestDeleteLeaderWhileScanning) {
- const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+TEST_P(ClientFailoverParamITest, TestDeleteLeaderWhileScanning) {
+ ClientTestBehavior test_type = GetParam();
+ const MonoDelta kTimeout = MonoDelta::FromSeconds(20);
vector<string> ts_flags = { "--enable_leader_failure_detection=false",
"--enable_remote_bootstrap=false" };
@@ -84,6 +95,24 @@ TEST_F(ClientFailoverITest, TestDeleteLeaderWhileScanning) {
int leader_index = *replica_indexes.begin();
TServerDetails* leader = ts_map_[cluster_->tablet_server(leader_index)->uuid()];
ASSERT_OK(itest::StartElection(leader, tablet_id, kTimeout));
+ ASSERT_OK(WaitForServersToAgree(kTimeout, active_ts_map, tablet_id,
+ workload.batches_completed() + 1));
+
+ shared_ptr<KuduTable> table;
+ ASSERT_OK(client_->OpenTable(TestWorkload::kDefaultTableName, &table));
+ shared_ptr<KuduSession> session = client_->NewSession();
+ ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+ session->SetTimeoutMillis(kTimeout.ToMilliseconds());
+
+ // The row we will update later when testing writes.
+ // Note that this counts as an OpId.
+ KuduInsert* insert = table->NewInsert();
+ ASSERT_OK(insert->mutable_row()->SetInt32(0, 0));
+ ASSERT_OK(insert->mutable_row()->SetInt32(1, 1));
+ ASSERT_OK(insert->mutable_row()->SetString(2, "a"));
+ ASSERT_OK(session->Apply(insert));
+ ASSERT_OK(session->Flush());
+ ASSERT_EQ(1, CountTableRows(table.get()));
// Write data to a tablet.
workload.Start();
@@ -95,14 +124,15 @@ TEST_F(ClientFailoverITest, TestDeleteLeaderWhileScanning) {
// We don't want the leader that takes over after we kill the first leader to
// be unsure whether the writes have been committed, so wait until all
// replicas have all of the writes.
+ //
+ // We should have # opids equal to number of batches written by the workload,
+ // plus the initial "manual" write we did, plus the no-op written when the
+ // first leader was elected.
ASSERT_OK(WaitForServersToAgree(kTimeout, active_ts_map, tablet_id,
- workload.batches_completed() + 1));
+ workload.batches_completed() + 2));
// Open the scanner and count the rows.
- shared_ptr<KuduTable> table;
- ASSERT_OK(client_->OpenTable(TestWorkload::kDefaultTableName, &table));
- ASSERT_EQ(workload.rows_inserted(), CountTableRows(table.get()));
- LOG(INFO) << "Number of rows: " << workload.rows_inserted();
+ ASSERT_EQ(workload.rows_inserted() + 1, CountTableRows(table.get()));
// Delete the leader replica. This will cause the next scan to the same
// leader to get a TABLET_NOT_FOUND error.
@@ -119,14 +149,14 @@ TEST_F(ClientFailoverITest, TestDeleteLeaderWhileScanning) {
// We need to elect a new leader to remove the old node.
ASSERT_OK(itest::StartElection(leader, tablet_id, kTimeout));
- ASSERT_OK(WaitUntilCommittedOpIdIndexIs(workload.batches_completed() + 2, leader, tablet_id,
+ ASSERT_OK(WaitUntilCommittedOpIdIndexIs(workload.batches_completed() + 3, leader, tablet_id,
kTimeout));
// Do a config change to remove the old replica and add a new one.
// Cause the new replica to become leader, then do the scan again.
ASSERT_OK(RemoveServer(leader, tablet_id, old_leader, boost::none, kTimeout));
// Wait until the config is committed, otherwise AddServer() will fail.
- ASSERT_OK(WaitUntilCommittedConfigOpIdIndexIs(workload.batches_completed() + 3, leader, tablet_id,
+ ASSERT_OK(WaitUntilCommittedConfigOpIdIndexIs(workload.batches_completed() + 4, leader, tablet_id,
kTimeout));
TServerDetails* to_add = ts_map_[cluster_->tablet_server(missing_replica_index)->uuid()];
@@ -142,14 +172,30 @@ TEST_F(ClientFailoverITest, TestDeleteLeaderWhileScanning) {
// Wait for remote bootstrap to complete. Then elect the new node.
ASSERT_OK(WaitForServersToAgree(kTimeout, active_ts_map, tablet_id,
- workload.batches_completed() + 4));
+ workload.batches_completed() + 5));
leader_index = missing_replica_index;
leader = ts_map_[cluster_->tablet_server(leader_index)->uuid()];
ASSERT_OK(itest::StartElection(leader, tablet_id, kTimeout));
- ASSERT_OK(WaitUntilCommittedOpIdIndexIs(workload.batches_completed() + 5, leader, tablet_id,
+ ASSERT_OK(WaitUntilCommittedOpIdIndexIs(workload.batches_completed() + 6, leader, tablet_id,
kTimeout));
- ASSERT_EQ(workload.rows_inserted(), CountTableRows(table.get()));
+ if (test_type == kWrite || test_type == kReadWrite) {
+ KuduUpdate* update = table->NewUpdate();
+ ASSERT_OK(update->mutable_row()->SetInt32(0, 0));
+ ASSERT_OK(update->mutable_row()->SetInt32(1, 2));
+ ASSERT_OK(update->mutable_row()->SetString(2, "b"));
+ ASSERT_OK(session->Apply(update));
+ ASSERT_OK(session->Flush());
+ }
+
+ if (test_type == kRead || test_type == kReadWrite) {
+ ASSERT_EQ(workload.rows_inserted() + 1, CountTableRows(table.get()));
+ }
}
+ClientTestBehavior test_type[] = { kWrite, kRead, kReadWrite };
+
+INSTANTIATE_TEST_CASE_P(ClientBehavior, ClientFailoverParamITest,
+ ::testing::ValuesIn(test_type));
+
} // namespace kudu
http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/2878c586/src/kudu/integration-tests/remote_bootstrap-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/remote_bootstrap-itest.cc b/src/kudu/integration-tests/remote_bootstrap-itest.cc
index 305d2c8..c058e70 100644
--- a/src/kudu/integration-tests/remote_bootstrap-itest.cc
+++ b/src/kudu/integration-tests/remote_bootstrap-itest.cc
@@ -633,9 +633,6 @@ TEST_F(RemoteBootstrapITest, TestDisableRemoteBootstrap_NoTightLoopWhenTabletDel
NO_FATALS(StartCluster(ts_flags, master_flags));
TestWorkload workload(cluster_.get());
- // TODO(KUDU-1054): the client should handle retrying on different replicas
- // if the tablet isn't found, rather than giving us this error.
- workload.set_not_found_allowed(true);
workload.set_write_batch_size(1);
workload.Setup();
@@ -695,9 +692,6 @@ TEST_F(RemoteBootstrapITest, TestSlowBootstrapDoesntFail) {
NO_FATALS(StartCluster(ts_flags, master_flags));
TestWorkload workload(cluster_.get());
- // TODO(KUDU-1322): the client should handle retrying on different replicas
- // if the tablet isn't found, rather than giving us this error.
- workload.set_not_found_allowed(true);
workload.set_write_batch_size(1);
workload.Setup();