You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/05/05 23:23:10 UTC

kudu git commit: KUDU-1991. master: Retry background tasks even if TS UUID not registered

Repository: kudu
Updated Branches:
  refs/heads/master 4df9f5969 -> d0ffa4ac6


KUDU-1991. master: Retry background tasks even if TS UUID not registered

Before this patch, if a master had just started up and got a message
from a tablet server that would require some action in response (for
example, deleting an evicted replica or replacing a replica in an
under-replicated tablet) then if an affected tablet server had not
registered with the master the task would fail and would not be retried.
This patch causes the task to be retried even if the initial tablet
server UUID lookup fails.

This change is tested by
AdminCliTest.TestUnsafeChangeConfigOnSingleFollower, which prior to this
patch was flaky (7/200 tests failed on dist-test before the patch).
After the changes to that test, it requires the above master fix to pass
but is no longer flaky (200/200 passed on dist-test).

Change-Id: I3a3de7fe87266f11392fd3bb0c74f19ad803de9d
Reviewed-on: http://gerrit.cloudera.org:8080/6534
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Tested-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/d0ffa4ac
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/d0ffa4ac
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/d0ffa4ac

Branch: refs/heads/master
Commit: d0ffa4ac6e2b383832ebe1ca377081aa93b01d77
Parents: 4df9f59
Author: Mike Percy <mp...@apache.org>
Authored: Thu Mar 30 23:32:09 2017 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Fri May 5 22:11:52 2017 +0000

----------------------------------------------------------------------
 .../integration-tests/cluster_itest_util.cc     | 47 ++++++++++++--
 src/kudu/integration-tests/cluster_itest_util.h |  7 ++
 src/kudu/master/catalog_manager.cc              | 42 ++++++------
 src/kudu/tools/kudu-admin-test.cc               | 67 +++++++++-----------
 4 files changed, 102 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/d0ffa4ac/src/kudu/integration-tests/cluster_itest_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_itest_util.cc b/src/kudu/integration-tests/cluster_itest_util.cc
index 31c6bbe..9bc945e 100644
--- a/src/kudu/integration-tests/cluster_itest_util.cc
+++ b/src/kudu/integration-tests/cluster_itest_util.cc
@@ -63,6 +63,7 @@ using consensus::GetLastOpIdResponsePB;
 using consensus::LeaderStepDownRequestPB;
 using consensus::LeaderStepDownResponsePB;
 using consensus::OpId;
+using consensus::OpIdType;
 using consensus::RaftPeerPB;
 using consensus::RunLeaderElectionResponsePB;
 using consensus::RunLeaderElectionRequestPB;
@@ -110,7 +111,7 @@ client::KuduSchema SimpleIntKeyKuduSchema() {
 
 Status GetLastOpIdForEachReplica(const string& tablet_id,
                                  const vector<TServerDetails*>& replicas,
-                                 consensus::OpIdType opid_type,
+                                 OpIdType opid_type,
                                  const MonoDelta& timeout,
                                  vector<OpId>* op_ids) {
   GetLastOpIdRequestPB opid_req;
@@ -138,7 +139,7 @@ Status GetLastOpIdForEachReplica(const string& tablet_id,
 
 Status GetLastOpIdForReplica(const std::string& tablet_id,
                              TServerDetails* replica,
-                             consensus::OpIdType opid_type,
+                             OpIdType opid_type,
                              const MonoDelta& timeout,
                              consensus::OpId* op_id) {
   vector<OpId> op_ids;
@@ -148,6 +149,42 @@ Status GetLastOpIdForReplica(const std::string& tablet_id,
   return Status::OK();
 }
 
+Status WaitForOpFromCurrentTerm(TServerDetails* replica,
+                                const string& tablet_id,
+                                OpIdType opid_type,
+                                const MonoDelta& timeout,
+                                OpId* opid) {
+  const MonoTime kStart = MonoTime::Now();
+  const MonoTime kDeadline = kStart + timeout;
+
+  Status s;
+  while (MonoTime::Now() < kDeadline) {
+    ConsensusStatePB cstate;
+    s = GetConsensusState(replica, tablet_id, CONSENSUS_CONFIG_ACTIVE, kDeadline - MonoTime::Now(),
+                          &cstate);
+    if (s.ok()) {
+      OpId tmp_opid;
+      s = GetLastOpIdForReplica(tablet_id, replica, opid_type, kDeadline - MonoTime::Now(),
+                                &tmp_opid);
+      if (s.ok()) {
+        if (tmp_opid.term() == cstate.current_term()) {
+          if (opid) {
+            opid->Swap(&tmp_opid);
+          }
+          return Status::OK();
+        }
+        s = Status::IllegalState(Substitute("Terms don't match. Current term: $0. Latest OpId: $1",
+                                 cstate.current_term(), OpIdToString(tmp_opid)));
+      }
+    }
+    SleepFor(MonoDelta::FromMilliseconds(10));
+  }
+
+  return Status::TimedOut(Substitute("Timed out after $0 waiting for op from current term: $1",
+                                     (MonoTime::Now() - kStart).ToString(),
+                                     s.ToString()));
+}
+
 Status WaitForServersToAgree(const MonoDelta& timeout,
                              const TabletServerMap& tablet_servers,
                              const string& tablet_id,
@@ -306,10 +343,10 @@ Status WaitUntilCommittedConfigNumVotersIs(int config_size,
     if (MonoTime::Now() > start + timeout) {
       break;
     }
-    SleepFor(MonoDelta::FromMilliseconds(1 << backoff_exp));
+    SleepFor(MonoDelta::FromMilliseconds(1LLU << backoff_exp));
     backoff_exp = min(backoff_exp + 1, kMaxBackoffExp);
   }
-  return Status::TimedOut(Substitute("Number of voters does not equal $0 after waiting for $1."
+  return Status::TimedOut(Substitute("Number of voters does not equal $0 after waiting for $1. "
                                      "Last consensus state: $2. Last status: $3",
                                      config_size, timeout.ToString(),
                                      SecureShortDebugString(cstate), s.ToString()));
@@ -487,7 +524,7 @@ Status WaitUntilLeader(const TServerDetails* replica,
     if (MonoTime::Now() > deadline) {
       break;
     }
-    SleepFor(MonoDelta::FromMilliseconds(1 << backoff_exp));
+    SleepFor(MonoDelta::FromMilliseconds(1LLU << backoff_exp));
     backoff_exp = min(backoff_exp + 1, kMaxBackoffExp);
   }
   return Status::TimedOut(Substitute("Replica $0 is not leader after waiting for $1: $2",

http://git-wip-us.apache.org/repos/asf/kudu/blob/d0ffa4ac/src/kudu/integration-tests/cluster_itest_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/cluster_itest_util.h b/src/kudu/integration-tests/cluster_itest_util.h
index 883ea8d..6474ae2 100644
--- a/src/kudu/integration-tests/cluster_itest_util.h
+++ b/src/kudu/integration-tests/cluster_itest_util.h
@@ -117,6 +117,13 @@ Status GetLastOpIdForReplica(const std::string& tablet_id,
                              const MonoDelta& timeout,
                              consensus::OpId* op_id);
 
+// Wait until the latest op on the target replica is from the current term.
+Status WaitForOpFromCurrentTerm(TServerDetails* replica,
+                                const std::string& tablet_id,
+                                consensus::OpIdType opid_type,
+                                const MonoDelta& timeout,
+                                consensus::OpId* opid = nullptr);
+
 // Wait until all of the servers have converged on the same log index.
 // The converged index must be at least equal to 'minimum_index'.
 //

http://git-wip-us.apache.org/repos/asf/kudu/blob/d0ffa4ac/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index 3c61f84..25aff82 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -2712,25 +2712,26 @@ Status RetryingTSRpcTask::Run() {
     return Status::RuntimeError("Async RPCs configured to fail");
   }
 
-  Status s = ResetTSProxy(); // This can fail if it's a replica we don't know about yet.
-  if (!s.ok()) {
-    MarkFailed();
-    UnregisterAsyncTask(); // May delete this.
-    return s.CloneAndPrepend("Failed to reset TS proxy");
-  }
-
   // Calculate and set the timeout deadline.
-  MonoTime timeout = MonoTime::Now() +
-      MonoDelta::FromMilliseconds(FLAGS_master_ts_rpc_timeout_ms);
+  MonoTime timeout = MonoTime::Now() + MonoDelta::FromMilliseconds(FLAGS_master_ts_rpc_timeout_ms);
   const MonoTime& deadline = MonoTime::Earliest(timeout, deadline_);
+  rpc_.Reset();
   rpc_.set_deadline(deadline);
 
-  if (!SendRequest(++attempt_)) {
-    if (!RescheduleWithBackoffDelay()) {
-      UnregisterAsyncTask();  // May call 'delete this'.
+  Status s = ResetTSProxy();
+  if (s.ok()) {
+    if (SendRequest(++attempt_)) {
+      return Status::OK();
     }
+  } else {
+    s = s.CloneAndPrepend("Failed to reset TS proxy");
   }
-  return Status::OK();
+
+  if (!RescheduleWithBackoffDelay()) {
+    MarkFailed();
+    UnregisterAsyncTask();  // May call 'delete this'.
+  }
+  return s;
 }
 
 void RetryingTSRpcTask::RpcCallback() {
@@ -3134,13 +3135,7 @@ class AsyncAddServerTask : public RetryingTSRpcTask {
 
   virtual string type_name() const OVERRIDE { return "AddServer ChangeConfig"; }
 
-  virtual string description() const OVERRIDE {
-    return Substitute("AddServer ChangeConfig RPC for tablet $0 on TS $1 "
-                      "with cas_config_opid_index $2",
-                      tablet_->tablet_id(),
-                      target_ts_desc_->ToString(),
-                      cstate_.config().opid_index());
-  }
+  virtual string description() const OVERRIDE;
 
  protected:
   virtual bool SendRequest(int attempt) OVERRIDE;
@@ -3156,6 +3151,13 @@ class AsyncAddServerTask : public RetryingTSRpcTask {
   consensus::ChangeConfigResponsePB resp_;
 };
 
+string AsyncAddServerTask::description() const {
+  return Substitute("AddServer ChangeConfig RPC for tablet $0 "
+                    "with cas_config_opid_index $1",
+                    tablet_->tablet_id(),
+                    cstate_.config().opid_index());
+}
+
 bool AsyncAddServerTask::SendRequest(int attempt) {
   LOG(INFO) << "Sending request for AddServer on tablet " << tablet_->tablet_id()
             << " (attempt " << attempt << ")";

http://git-wip-us.apache.org/repos/asf/kudu/blob/d0ffa4ac/src/kudu/tools/kudu-admin-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-admin-test.cc b/src/kudu/tools/kudu-admin-test.cc
index ba27ed2..497857b 100644
--- a/src/kudu/tools/kudu-admin-test.cc
+++ b/src/kudu/tools/kudu-admin-test.cc
@@ -42,9 +42,8 @@ using client::sp::shared_ptr;
 using consensus::CONSENSUS_CONFIG_COMMITTED;
 using consensus::ConsensusStatePB;
 using consensus::OpId;
-using consensus::RECEIVED_OPID;
+using consensus::COMMITTED_OPID;
 using itest::GetConsensusState;
-using itest::GetLastOpIdForReplica;
 using itest::TabletServerMap;
 using itest::TServerDetails;
 using itest::WAIT_FOR_LEADER;
@@ -198,7 +197,7 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigOnSingleFollower) {
 
   // Determine the list of tablet servers currently in the config.
   TabletServerMap active_tablet_servers;
-  auto iter = tablet_replicas_.equal_range(tablet_id_);
+  auto iter = tablet_replicas_.equal_range(tablet_id);
   for (auto it = iter.first; it != iter.second; ++it) {
     InsertOrDie(&active_tablet_servers, it->second->uuid(), it->second);
   }
@@ -208,7 +207,7 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigOnSingleFollower) {
   master::TabletLocationsPB tablet_locations;
   bool has_leader;
   ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
-                                            3, tablet_id_, kTimeout,
+                                            3, tablet_id, kTimeout,
                                             WAIT_FOR_LEADER,
                                             &has_leader, &tablet_locations));
   LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
@@ -216,50 +215,46 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigOnSingleFollower) {
 
   // Wait for initial NO_OP to be committed by the leader.
   TServerDetails* leader_ts;
-  ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
-  ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
   vector<TServerDetails*> followers;
-  ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
+  ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id, kTimeout, &leader_ts));
+  ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id, kTimeout, &followers));
+  OpId opid;
+  ASSERT_OK(WaitForOpFromCurrentTerm(leader_ts, tablet_id, COMMITTED_OPID, kTimeout, &opid));
 
-  // Replace the config on follower1 after shutting down follower2 and leader.
+  // Shut down master so it doesn't interfere while we shut down the leader and
+  // one of the other followers.
+  cluster_->master()->Shutdown();
   cluster_->tablet_server_by_uuid(leader_ts->uuid())->Shutdown();
   cluster_->tablet_server_by_uuid(followers[1]->uuid())->Shutdown();
-  // Restart master to cleanup cache of dead servers from its list of candidate
-  // servers to trigger placement of new replicas on healthy servers.
-  cluster_->master()->Shutdown();
-  ASSERT_OK(cluster_->master()->Restart());
+
 
   LOG(INFO) << "Forcing unsafe config change on remaining follower " << followers[0]->uuid();
-  const string& follower1_addr = Substitute("$0:$1",
-                                            followers[0]->registration.rpc_addresses(0).host(),
-                                            followers[0]->registration.rpc_addresses(0).port());
-  vector<string> peer_uuid_list;
-  peer_uuid_list.push_back(followers[0]->uuid());
-  ASSERT_OK(RunUnsafeChangeConfig(tablet_id_, follower1_addr, peer_uuid_list));
+  const string& follower0_addr =
+      cluster_->tablet_server_by_uuid(followers[0]->uuid())->bound_rpc_addr().ToString();
+  ASSERT_OK(RunUnsafeChangeConfig(tablet_id, follower0_addr, { followers[0]->uuid() }));
+  ASSERT_OK(WaitUntilLeader(followers[0], tablet_id, kTimeout));
+  ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(1, followers[0], tablet_id, kTimeout));
 
-  // Check that new config is populated to a new follower.
-  vector<TServerDetails*> all_tservers;
-  TServerDetails *new_follower = nullptr;
-  AppendValuesFromMap(tablet_servers_, &all_tservers);
-  for (const auto& ts :all_tservers) {
-    if (!ContainsKey(active_tablet_servers, ts->uuid())) {
-      new_follower = ts;
-      break;
-    }
-  }
-  ASSERT_TRUE(new_follower != nullptr);
+  LOG(INFO) << "Restarting master...";
 
-  // Master may try to add the servers which are down until tserver_unresponsive_timeout_ms,
-  // so it is safer to wait until consensus metadata has 3 voters on follower1.
-  ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, new_follower, tablet_id_, kTimeout));
+  // Restart master so it can re-replicate the tablet to remaining tablet servers.
+  ASSERT_OK(cluster_->master()->Restart());
 
-  // Wait for the master to be notified of the config change.
-  LOG(INFO) << "Waiting for Master to see new config...";
+  // Wait for master to re-replicate.
+  ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, followers[0], tablet_id, kTimeout));
   ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
-                                            3, tablet_id_, kTimeout,
+                                            3, tablet_id, kTimeout,
                                             WAIT_FOR_LEADER,
                                             &has_leader, &tablet_locations));
-  LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
+  ASSERT_OK(WaitForOpFromCurrentTerm(followers[0], tablet_id, COMMITTED_OPID, kTimeout, &opid));
+
+  active_tablet_servers.clear();
+  unordered_set<string> replica_uuids;
+  for (const auto& loc : tablet_locations.replicas()) {
+    const string& uuid = loc.ts_info().permanent_uuid();
+    InsertOrDie(&active_tablet_servers, uuid, tablet_servers_[uuid]);
+  }
+  ASSERT_OK(WaitForServersToAgree(kTimeout, active_tablet_servers, tablet_id, opid.index()));
 
   // Verify that two new servers are part of new config and old
   // servers are gone.