You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by jd...@apache.org on 2016/02/23 00:45:17 UTC

[3/3] incubator-kudu git commit: KUDU-1322. Failover client writes on TABLET_NOT_FOUND

KUDU-1322. Failover client writes on TABLET_NOT_FOUND

Follow-up to KUDU-1054.

If a tablet is deleted from a tablet server, a client that is writing to
the tablet should re-fetch the metadata from the master and connect to
the new leader. If the table has been deleted, the client should get a
Status::NotFound error.

Also grepped for and removed TODOs mentioning KUDU-1054 and KUDU-1322.

Change-Id: I037608977c890cb5099fc74c1be741c93d99ae7a
Reviewed-on: http://gerrit.cloudera.org:8080/978
Tested-by: Kudu Jenkins
Reviewed-by: Jean-Daniel Cryans
Reviewed-by: Todd Lipcon <to...@apache.org>
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kudu/commit/2878c586
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kudu/tree/2878c586
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kudu/diff/2878c586

Branch: refs/heads/master
Commit: 2878c586259ecf00d7f2623c27b27e640cc799bd
Parents: b3efaf0
Author: Mike Percy <mp...@apache.org>
Authored: Tue Jan 26 15:39:41 2016 -0800
Committer: Jean-Daniel Cryans <jd...@gerrit.cloudera.org>
Committed: Mon Feb 22 23:44:39 2016 +0000

----------------------------------------------------------------------
 src/kudu/client/batcher.cc                      | 115 +++++++++++--------
 src/kudu/client/meta_cache.cc                   |  13 ++-
 .../integration-tests/client_failover-itest.cc  |  72 +++++++++---
 .../integration-tests/remote_bootstrap-itest.cc |   6 -
 4 files changed, 138 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/2878c586/src/kudu/client/batcher.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/batcher.cc b/src/kudu/client/batcher.cc
index f5ad0bb..2b3e583 100644
--- a/src/kudu/client/batcher.cc
+++ b/src/kudu/client/batcher.cc
@@ -307,54 +307,58 @@ WriteRpc::~WriteRpc() {
 
 void WriteRpc::SendRpc() {
   // Choose a destination TS according to the following algorithm:
-  // 1. Select the leader, provided:
-  //    a. One exists, and
+  // 1. If the tablet metadata is stale, refresh it (goto step 5).
+  // 2. Select the leader, provided:
+  //    a. The current leader is known,
   //    b. It hasn't failed, and
   //    c. It isn't currently marked as a follower.
-  // 2. If there's no good leader select another replica, provided:
+  // 3. If there's no good leader select another replica, provided:
   //    a. It hasn't failed, and
   //    b. It hasn't rejected our write due to being a follower.
-  // 3. Preemptively mark the replica we selected in step 2 as "leader" in the
+  // 4. Preemptively mark the replica we selected in step 3 as "leader" in the
   //    meta cache, so that our selection remains sticky until the next Master
   //    metadata refresh.
-  // 4. If we're out of appropriate replicas, force a lookup to the master
+  // 5. If we're out of appropriate replicas, force a lookup to the master
   //    to fetch new consensus configuration information.
-  // 5. When the lookup finishes, forget which replicas were followers and
-  //    retry the write (i.e. goto 1).
-  // 6. If we issue the write and it fails because the destination was a
-  //    follower, remember that fact and retry the write (i.e. goto 1).
-  // 7. Repeat steps 1-6 until the write succeeds, fails for other reasons,
+  // 6. When the lookup finishes, forget which replicas were followers and
+  //    retry the write (i.e. goto 2).
+  // 7. If we issue the write and it fails because the destination was a
+  //    follower, remember that fact and retry the write (i.e. goto 2).
+  // 8. Repeat steps 1-7 until the write succeeds, fails for other reasons,
   //    or the write's deadline expires.
-  current_ts_ = tablet_->LeaderTServer();
-  if (current_ts_ && ContainsKey(followers_, current_ts_)) {
-    VLOG(2) << "Tablet " << tablet_->tablet_id() << ": We have a follower for a leader: "
-            << current_ts_->ToString();
-
-    // Mark the node as a follower in the cache so that on the next go-round,
-    // LeaderTServer() will not return it as a leader unless a full metadata
-    // refresh has occurred. This also avoids LookupTabletByKey() going into
-    // "fast path" mode and not actually performing a metadata refresh from the
-    // Master when it needs to.
-    tablet_->MarkTServerAsFollower(current_ts_);
-    current_ts_ = NULL;
-  }
-  if (!current_ts_) {
-    // Try to "guess" the next leader.
-    vector<RemoteTabletServer*> replicas;
-    tablet_->GetRemoteTabletServers(&replicas);
-    for (RemoteTabletServer* ts : replicas) {
-      if (!ContainsKey(followers_, ts)) {
-        current_ts_ = ts;
-        break;
-      }
+  current_ts_ = nullptr;
+  if (!tablet_->stale()) {
+    current_ts_ = tablet_->LeaderTServer();
+    if (current_ts_ && ContainsKey(followers_, current_ts_)) {
+      VLOG(2) << "Tablet " << tablet_->tablet_id() << ": We have a follower for a leader: "
+              << current_ts_->ToString();
+
+      // Mark the node as a follower in the cache so that on the next go-round,
+      // LeaderTServer() will not return it as a leader unless a full metadata
+      // refresh has occurred. This also avoids LookupTabletByKey() going into
+      // "fast path" mode and not actually performing a metadata refresh from the
+      // Master when it needs to.
+      tablet_->MarkTServerAsFollower(current_ts_);
+      current_ts_ = NULL;
     }
-    if (current_ts_) {
-      // Mark this next replica "preemptively" as the leader in the meta cache,
-      // so we go to it first on the next write if writing was successful.
-      VLOG(1) << "Tablet " << tablet_->tablet_id() << ": Previous leader failed. "
-              << "Preemptively marking tserver " << current_ts_->ToString()
-              << " as leader in the meta cache.";
-      tablet_->MarkTServerAsLeader(current_ts_);
+    if (!current_ts_) {
+      // Try to "guess" the next leader.
+      vector<RemoteTabletServer*> replicas;
+      tablet_->GetRemoteTabletServers(&replicas);
+      for (RemoteTabletServer* ts : replicas) {
+        if (!ContainsKey(followers_, ts)) {
+          current_ts_ = ts;
+          break;
+        }
+      }
+      if (current_ts_) {
+        // Mark this next replica "preemptively" as the leader in the meta cache,
+        // so we go to it first on the next write if writing was successful.
+        VLOG(1) << "Tablet " << tablet_->tablet_id() << ": Previous leader failed. "
+                << "Preemptively marking tserver " << current_ts_->ToString()
+                << " as leader in the meta cache.";
+        tablet_->MarkTServerAsLeader(current_ts_);
+      }
     }
   }
 
@@ -391,6 +395,14 @@ string WriteRpc::ToString() const {
 }
 
 void WriteRpc::LookupTabletCb(const Status& status) {
+  // If the table was deleted, the master will return TABLE_DELETED and the
+  // meta cache will return Status::NotFound.
+  if (status.IsNotFound()) {
+    batcher_->ProcessWriteResponse(*this, status);
+    delete this;
+    return;
+  }
+
   // We should retry the RPC regardless of the outcome of the lookup, as
   // leader election doesn't depend on the existence of a master at all.
   //
@@ -447,13 +459,24 @@ void WriteRpc::SendRpcCb(const Status& status) {
     new_status = StatusFromPB(resp_.error().status());
   }
 
-  // Oops, we failed over to a replica that wasn't a LEADER. Unlikely as
-  // we're using consensus configuration information from the master, but still possible
-  // (e.g. leader restarted and became a FOLLOWER). Try again.
-  //
-  // TODO: IllegalState is obviously way too broad an error category for
-  // this case.
-  if (new_status.IsIllegalState() || new_status.IsAborted()) {
+  if (resp_.has_error() && resp_.error().code() == tserver::TabletServerErrorPB::TABLET_NOT_FOUND) {
+    // If we get TABLET_NOT_FOUND, the replica we thought was leader has been
+    // deleted. We mark our tablet cache as stale, forcing a master lookup on
+    // the next attempt.
+    tablet_->MarkStale();
+    // TODO: Don't backoff the first time we hit this error (see KUDU-1314).
+    mutable_retrier()->DelayedRetry(this, StatusFromPB(resp_.error().status()));
+    return;
+  } else if (new_status.IsIllegalState() || new_status.IsAborted()) {
+    // Alternatively, when we get a status code of IllegalState or Aborted, we
+    // assume this means that the replica we attempted to write to is not the
+    // current leader (maybe it got partitioned or slow and another node took
+    // over). We attempt to fail over to another replica in the config.
+    //
+    // TODO: This error handling block should really be rewritten to handle
+    // specific error codes exclusively instead of Status codes (this may
+    // require some server-side changes). For example, IllegalState is
+    // obviously way too broad an error category for this case.
     followers_.insert(current_ts_);
     mutable_retrier()->DelayedRetry(this, new_status);
     return;

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/2878c586/src/kudu/client/meta_cache.cc
----------------------------------------------------------------------
diff --git a/src/kudu/client/meta_cache.cc b/src/kudu/client/meta_cache.cc
index 61ae145..80ad940 100644
--- a/src/kudu/client/meta_cache.cc
+++ b/src/kudu/client/meta_cache.cc
@@ -477,14 +477,19 @@ void LookupRpc::NewLeaderMasterDeterminedCb(const Status& status) {
 void LookupRpc::SendRpcCb(const Status& status) {
   gscoped_ptr<LookupRpc> delete_me(this); // delete on scope exit
 
-  // Prefer early failures over controller failures.
+  if (!status.ok()) {
+    // Non-RPC failure. We only support TimedOut for LookupRpc.
+    CHECK(status.IsTimedOut()) << status.ToString();
+  }
+
   Status new_status = status;
+  // Check for generic RPC errors.
   if (new_status.ok() && mutable_retrier()->HandleResponse(this, &new_status)) {
     ignore_result(delete_me.release());
     return;
   }
 
-  // Prefer controller failures over response failures.
+  // Check for specific application response errors.
   if (new_status.ok() && resp_.has_error()) {
     if (resp_.error().code() == master::MasterErrorPB::NOT_THE_LEADER ||
         resp_.error().code() == master::MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED) {
@@ -498,6 +503,7 @@ void LookupRpc::SendRpcCb(const Status& status) {
     new_status = StatusFromPB(resp_.error().status());
   }
 
+  // Check for more generic errors (TimedOut can come from multiple places).
   if (new_status.IsTimedOut()) {
     if (MonoTime::Now(MonoTime::FINE).ComesBefore(retrier().deadline())) {
       if (meta_cache_->client_->IsMultiMaster()) {
@@ -523,7 +529,8 @@ void LookupRpc::SendRpcCb(const Status& status) {
     }
   }
 
-  // Prefer response failures over no tablets found.
+  // Finally, ensure that there were tablet replicas found. If not, consider
+  // that an error.
   if (new_status.ok() && resp_.tablet_locations_size() == 0) {
     new_status = Status::NotFound("No such tablet found");
   }

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/2878c586/src/kudu/integration-tests/client_failover-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/client_failover-itest.cc b/src/kudu/integration-tests/client_failover-itest.cc
index 535d394..07b547a 100644
--- a/src/kudu/integration-tests/client_failover-itest.cc
+++ b/src/kudu/integration-tests/client_failover-itest.cc
@@ -27,7 +27,10 @@
 #include "kudu/integration-tests/test_workload.h"
 
 using kudu::client::CountTableRows;
+using kudu::client::KuduInsert;
+using kudu::client::KuduSession;
 using kudu::client::KuduTable;
+using kudu::client::KuduUpdate;
 using kudu::client::sp::shared_ptr;
 using kudu::itest::TServerDetails;
 using kudu::tablet::TABLET_DATA_TOMBSTONED;
@@ -38,14 +41,22 @@ using std::unordered_map;
 
 namespace kudu {
 
+enum ClientTestBehavior {
+  kWrite,
+  kRead,
+  kReadWrite
+};
+
 // Integration test for client failover behavior.
-class ClientFailoverITest : public ExternalMiniClusterITestBase {
+class ClientFailoverParamITest : public ExternalMiniClusterITestBase,
+                                 public ::testing::WithParamInterface<ClientTestBehavior> {
 };
 
 // Test that we can delete the leader replica while scanning it and still get
 // results back.
-TEST_F(ClientFailoverITest, TestDeleteLeaderWhileScanning) {
-  const MonoDelta kTimeout = MonoDelta::FromSeconds(30);
+TEST_P(ClientFailoverParamITest, TestDeleteLeaderWhileScanning) {
+  ClientTestBehavior test_type = GetParam();
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(20);
 
   vector<string> ts_flags = { "--enable_leader_failure_detection=false",
                               "--enable_remote_bootstrap=false" };
@@ -84,6 +95,24 @@ TEST_F(ClientFailoverITest, TestDeleteLeaderWhileScanning) {
   int leader_index = *replica_indexes.begin();
   TServerDetails* leader = ts_map_[cluster_->tablet_server(leader_index)->uuid()];
   ASSERT_OK(itest::StartElection(leader, tablet_id, kTimeout));
+  ASSERT_OK(WaitForServersToAgree(kTimeout, active_ts_map, tablet_id,
+                                  workload.batches_completed() + 1));
+
+  shared_ptr<KuduTable> table;
+  ASSERT_OK(client_->OpenTable(TestWorkload::kDefaultTableName, &table));
+  shared_ptr<KuduSession> session = client_->NewSession();
+  ASSERT_OK(session->SetFlushMode(KuduSession::MANUAL_FLUSH));
+  session->SetTimeoutMillis(kTimeout.ToMilliseconds());
+
+  // The row we will update later when testing writes.
+  // Note that this counts as an OpId.
+  KuduInsert* insert = table->NewInsert();
+  ASSERT_OK(insert->mutable_row()->SetInt32(0, 0));
+  ASSERT_OK(insert->mutable_row()->SetInt32(1, 1));
+  ASSERT_OK(insert->mutable_row()->SetString(2, "a"));
+  ASSERT_OK(session->Apply(insert));
+  ASSERT_OK(session->Flush());
+  ASSERT_EQ(1, CountTableRows(table.get()));
 
   // Write data to a tablet.
   workload.Start();
@@ -95,14 +124,15 @@ TEST_F(ClientFailoverITest, TestDeleteLeaderWhileScanning) {
   // We don't want the leader that takes over after we kill the first leader to
   // be unsure whether the writes have been committed, so wait until all
   // replicas have all of the writes.
+  //
+  // We should have # opids equal to number of batches written by the workload,
+  // plus the initial "manual" write we did, plus the no-op written when the
+  // first leader was elected.
   ASSERT_OK(WaitForServersToAgree(kTimeout, active_ts_map, tablet_id,
-                                  workload.batches_completed() + 1));
+                                  workload.batches_completed() + 2));
 
   // Open the scanner and count the rows.
-  shared_ptr<KuduTable> table;
-  ASSERT_OK(client_->OpenTable(TestWorkload::kDefaultTableName, &table));
-  ASSERT_EQ(workload.rows_inserted(), CountTableRows(table.get()));
-  LOG(INFO) << "Number of rows: " << workload.rows_inserted();
+  ASSERT_EQ(workload.rows_inserted() + 1, CountTableRows(table.get()));
 
   // Delete the leader replica. This will cause the next scan to the same
   // leader to get a TABLET_NOT_FOUND error.
@@ -119,14 +149,14 @@ TEST_F(ClientFailoverITest, TestDeleteLeaderWhileScanning) {
 
   // We need to elect a new leader to remove the old node.
   ASSERT_OK(itest::StartElection(leader, tablet_id, kTimeout));
-  ASSERT_OK(WaitUntilCommittedOpIdIndexIs(workload.batches_completed() + 2, leader, tablet_id,
+  ASSERT_OK(WaitUntilCommittedOpIdIndexIs(workload.batches_completed() + 3, leader, tablet_id,
                                           kTimeout));
 
   // Do a config change to remove the old replica and add a new one.
   // Cause the new replica to become leader, then do the scan again.
   ASSERT_OK(RemoveServer(leader, tablet_id, old_leader, boost::none, kTimeout));
   // Wait until the config is committed, otherwise AddServer() will fail.
-  ASSERT_OK(WaitUntilCommittedConfigOpIdIndexIs(workload.batches_completed() + 3, leader, tablet_id,
+  ASSERT_OK(WaitUntilCommittedConfigOpIdIndexIs(workload.batches_completed() + 4, leader, tablet_id,
                                                 kTimeout));
 
   TServerDetails* to_add = ts_map_[cluster_->tablet_server(missing_replica_index)->uuid()];
@@ -142,14 +172,30 @@ TEST_F(ClientFailoverITest, TestDeleteLeaderWhileScanning) {
 
   // Wait for remote bootstrap to complete. Then elect the new node.
   ASSERT_OK(WaitForServersToAgree(kTimeout, active_ts_map, tablet_id,
-                                  workload.batches_completed() + 4));
+                                  workload.batches_completed() + 5));
   leader_index = missing_replica_index;
   leader = ts_map_[cluster_->tablet_server(leader_index)->uuid()];
   ASSERT_OK(itest::StartElection(leader, tablet_id, kTimeout));
-  ASSERT_OK(WaitUntilCommittedOpIdIndexIs(workload.batches_completed() + 5, leader, tablet_id,
+  ASSERT_OK(WaitUntilCommittedOpIdIndexIs(workload.batches_completed() + 6, leader, tablet_id,
                                           kTimeout));
 
-  ASSERT_EQ(workload.rows_inserted(), CountTableRows(table.get()));
+  if (test_type == kWrite || test_type == kReadWrite) {
+    KuduUpdate* update = table->NewUpdate();
+    ASSERT_OK(update->mutable_row()->SetInt32(0, 0));
+    ASSERT_OK(update->mutable_row()->SetInt32(1, 2));
+    ASSERT_OK(update->mutable_row()->SetString(2, "b"));
+    ASSERT_OK(session->Apply(update));
+    ASSERT_OK(session->Flush());
+  }
+
+  if (test_type == kRead || test_type == kReadWrite) {
+    ASSERT_EQ(workload.rows_inserted() + 1, CountTableRows(table.get()));
+  }
 }
 
+ClientTestBehavior test_type[] = { kWrite, kRead, kReadWrite };
+
+INSTANTIATE_TEST_CASE_P(ClientBehavior, ClientFailoverParamITest,
+                        ::testing::ValuesIn(test_type));
+
 } // namespace kudu

http://git-wip-us.apache.org/repos/asf/incubator-kudu/blob/2878c586/src/kudu/integration-tests/remote_bootstrap-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/remote_bootstrap-itest.cc b/src/kudu/integration-tests/remote_bootstrap-itest.cc
index 305d2c8..c058e70 100644
--- a/src/kudu/integration-tests/remote_bootstrap-itest.cc
+++ b/src/kudu/integration-tests/remote_bootstrap-itest.cc
@@ -633,9 +633,6 @@ TEST_F(RemoteBootstrapITest, TestDisableRemoteBootstrap_NoTightLoopWhenTabletDel
   NO_FATALS(StartCluster(ts_flags, master_flags));
 
   TestWorkload workload(cluster_.get());
-  // TODO(KUDU-1054): the client should handle retrying on different replicas
-  // if the tablet isn't found, rather than giving us this error.
-  workload.set_not_found_allowed(true);
   workload.set_write_batch_size(1);
   workload.Setup();
 
@@ -695,9 +692,6 @@ TEST_F(RemoteBootstrapITest, TestSlowBootstrapDoesntFail) {
   NO_FATALS(StartCluster(ts_flags, master_flags));
 
   TestWorkload workload(cluster_.get());
-  // TODO(KUDU-1322): the client should handle retrying on different replicas
-  // if the tablet isn't found, rather than giving us this error.
-  workload.set_not_found_allowed(true);
   workload.set_write_batch_size(1);
   workload.Setup();