You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/04/04 20:12:07 UTC

kudu git commit: master/consensus: Code cleanup

Repository: kudu
Updated Branches:
  refs/heads/master fcb150c1a -> f4272eb5c


master/consensus: Code cleanup

No functional changes in this patch, just cleanup for follow-up patches.
I split this out so the following fixes would be easier to review.

* Catalog Manager: rearrange some code to make it easier to read and
  change in a follow-up patch.
* consensus: Add "using" directives to kudu-admin-test. Just cleanup.

Change-Id: If9c4f2702d9f255a403b6b72e54f4795fd582676
Reviewed-on: http://gerrit.cloudera.org:8080/6533
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/f4272eb5
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/f4272eb5
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/f4272eb5

Branch: refs/heads/master
Commit: f4272eb5cebf0d2f5e38b457cfe536cc5b25d537
Parents: fcb150c
Author: Mike Percy <mp...@apache.org>
Authored: Thu Mar 30 18:33:54 2017 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Tue Apr 4 20:11:22 2017 +0000

----------------------------------------------------------------------
 src/kudu/master/catalog_manager.cc | 264 +++++++++++++++++---------------
 src/kudu/tools/kudu-admin-test.cc  | 177 +++++++++++----------
 2 files changed, 233 insertions(+), 208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/f4272eb5/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index a9d136a..39b104a 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -2607,34 +2607,7 @@ class RetryingTSRpcTask : public MonitoredTask {
   }
 
   // Send the subclass RPC request.
-  Status Run() {
-    if (PREDICT_FALSE(FLAGS_catalog_manager_fail_ts_rpcs)) {
-      MarkFailed();
-      UnregisterAsyncTask(); // May delete this.
-      return Status::RuntimeError("Async RPCs configured to fail");
-    }
-
-    Status s = ResetTSProxy();
-    if (!s.ok()) {
-      LOG(WARNING) << "Unable to reset TS proxy: " << s.ToString();
-      MarkFailed();
-      UnregisterAsyncTask(); // May delete this.
-      return s.CloneAndPrepend("Failed to reset TS proxy");
-    }
-
-    // Calculate and set the timeout deadline.
-    MonoTime timeout = MonoTime::Now() +
-        MonoDelta::FromMilliseconds(FLAGS_master_ts_rpc_timeout_ms);
-    const MonoTime& deadline = MonoTime::Earliest(timeout, deadline_);
-    rpc_.set_deadline(deadline);
-
-    if (!SendRequest(++attempt_)) {
-      if (!RescheduleWithBackoffDelay()) {
-        UnregisterAsyncTask();  // May call 'delete this'.
-      }
-    }
-    return Status::OK();
-  }
+  Status Run();
 
   // Abort this task.
   virtual void Abort() OVERRIDE {
@@ -2688,22 +2661,7 @@ class RetryingTSRpcTask : public MonitoredTask {
   // Callback meant to be invoked from asynchronous RPC service proxy calls.
   //
   // Runs on a reactor thread, so should not block or do any IO.
-  void RpcCallback() {
-    if (!rpc_.status().ok()) {
-      LOG(WARNING) << "TS " << target_ts_desc_->ToString() << ": "
-                   << type_name() << " RPC failed for tablet "
-                   << tablet_id() << ": " << rpc_.status().ToString();
-    } else if (state() != kStateAborted) {
-      HandleResponse(attempt_); // Modifies state_.
-    }
-
-    // Schedule a retry if the RPC call was not successful.
-    if (RescheduleWithBackoffDelay()) {
-      return;
-    }
-
-    UnregisterAsyncTask();  // May call 'delete this'.
-  }
+  void RpcCallback();
 
   Master * const master_;
   const gscoped_ptr<TSPicker> replica_picker_;
@@ -2724,95 +2682,150 @@ class RetryingTSRpcTask : public MonitoredTask {
   // Returns false if the task was not rescheduled due to reaching the maximum
   // timeout or because the task is no longer in a running state.
   // Returns true if rescheduling the task was successful.
-  bool RescheduleWithBackoffDelay() {
-    if (state() != kStateRunning) return false;
-    MonoTime now = MonoTime::Now();
-    // We assume it might take 10ms to process the request in the best case,
-    // fail if we have less than that amount of time remaining.
-    int64_t millis_remaining = (deadline_ - now).ToMilliseconds() - 10;
-    // Exponential backoff with jitter.
-    int64_t base_delay_ms;
-    if (attempt_ <= 12) {
-      base_delay_ms = 1 << (attempt_ + 3);  // 1st retry delayed 2^4 ms, 2nd 2^5, etc.
-    } else {
-      base_delay_ms = 60 * 1000; // cap at 1 minute
-    }
-    int64_t jitter_ms = rand() % 50;              // Add up to 50ms of additional random delay.
-    int64_t delay_millis = std::min<int64_t>(base_delay_ms + jitter_ms, millis_remaining);
-
-    if (delay_millis <= 0) {
-      LOG(WARNING) << "Request timed out: " << description();
-      MarkFailed();
-    } else {
-      LOG(INFO) << "Scheduling retry of " << description() << " with a delay"
-                << " of " << delay_millis << "ms (attempt = " << attempt_ << ")...";
-      master_->messenger()->ScheduleOnReactor(
-          boost::bind(&RetryingTSRpcTask::RunDelayedTask, this, _1),
-          MonoDelta::FromMilliseconds(delay_millis));
-      return true;
-    }
-    return false;
-  }
+  bool RescheduleWithBackoffDelay();
 
   // Callback for Reactor delayed task mechanism. Called either when it is time
   // to execute the delayed task (with status == OK) or when the task
   // is cancelled, i.e. when the scheduling timer is shut down (status != OK).
-  void RunDelayedTask(const Status& status) {
-    if (!status.ok()) {
-      LOG(WARNING) << "Async tablet task " << description() << " failed or was cancelled: "
-                   << status.ToString();
-      UnregisterAsyncTask();   // May delete this.
-      return;
-    }
+  void RunDelayedTask(const Status& status);
 
-    string desc = description();  // Save in case we need to log after deletion.
-    Status s = Run();             // May delete this.
-    if (!s.ok()) {
-      LOG(WARNING) << "Async tablet task " << desc << " failed: " << s.ToString();
-    }
+  // Clean up request and release resources. May call 'delete this'.
+  void UnregisterAsyncTask();
+
+  // Find a new replica and construct the RPC proxy.
+  Status ResetTSProxy();
+
+  // Use state() and MarkX() accessors.
+  AtomicWord state_;
+};
+
+Status RetryingTSRpcTask::Run() {
+  if (PREDICT_FALSE(FLAGS_catalog_manager_fail_ts_rpcs)) {
+    MarkFailed();
+    UnregisterAsyncTask(); // May delete this.
+    return Status::RuntimeError("Async RPCs configured to fail");
   }
 
-  // Clean up request and release resources. May call 'delete this'.
-  void UnregisterAsyncTask() {
-    end_ts_ = MonoTime::Now();
-    if (table_ != nullptr) {
-      table_->RemoveTask(this);
-    } else {
-      // This is a floating task (since the table does not exist)
-      // created as response to a tablet report.
-      Release();  // May call "delete this";
-    }
+  Status s = ResetTSProxy(); // This can fail if it's a replica we don't know about yet.
+  if (!s.ok()) {
+    MarkFailed();
+    UnregisterAsyncTask(); // May delete this.
+    return s.CloneAndPrepend("Failed to reset TS proxy");
   }
 
-  Status ResetTSProxy() {
-    // TODO: if there is no replica available, should we still keep the task running?
-    string ts_uuid;
-    RETURN_NOT_OK(replica_picker_->PickReplica(&ts_uuid));
-    shared_ptr<TSDescriptor> ts_desc;
-    if (!master_->ts_manager()->LookupTSByUUID(ts_uuid, &ts_desc)) {
-      return Status::NotFound(Substitute("Could not find TS for UUID $0",
-                                         ts_uuid));
+  // Calculate and set the timeout deadline.
+  MonoTime timeout = MonoTime::Now() +
+      MonoDelta::FromMilliseconds(FLAGS_master_ts_rpc_timeout_ms);
+  const MonoTime& deadline = MonoTime::Earliest(timeout, deadline_);
+  rpc_.set_deadline(deadline);
+
+  if (!SendRequest(++attempt_)) {
+    if (!RescheduleWithBackoffDelay()) {
+      UnregisterAsyncTask();  // May call 'delete this'.
     }
+  }
+  return Status::OK();
+}
 
-    // This assumes that TSDescriptors are never deleted by the master,
-    // so the task need not take ownership of the returned pointer.
-    target_ts_desc_ = ts_desc.get();
+void RetryingTSRpcTask::RpcCallback() {
+  if (!rpc_.status().ok()) {
+    LOG(WARNING) << "TS " << target_ts_desc_->ToString() << ": "
+                  << type_name() << " RPC failed for tablet "
+                  << tablet_id() << ": " << rpc_.status().ToString();
+  } else if (state() != kStateAborted) {
+    HandleResponse(attempt_); // Modifies state_.
+  }
 
-    shared_ptr<tserver::TabletServerAdminServiceProxy> ts_proxy;
-    RETURN_NOT_OK(target_ts_desc_->GetTSAdminProxy(master_->messenger(), &ts_proxy));
-    ts_proxy_.swap(ts_proxy);
+  // Schedule a retry if the RPC call was not successful.
+  if (RescheduleWithBackoffDelay()) {
+    return;
+  }
 
-    shared_ptr<consensus::ConsensusServiceProxy> consensus_proxy;
-    RETURN_NOT_OK(target_ts_desc_->GetConsensusProxy(master_->messenger(), &consensus_proxy));
-    consensus_proxy_.swap(consensus_proxy);
+  UnregisterAsyncTask();  // May call 'delete this'.
+}
 
-    rpc_.Reset();
-    return Status::OK();
+bool RetryingTSRpcTask::RescheduleWithBackoffDelay() {
+  if (state() != kStateRunning) return false;
+  MonoTime now = MonoTime::Now();
+  // We assume it might take 10ms to process the request in the best case,
+  // fail if we have less than that amount of time remaining.
+  int64_t millis_remaining = (deadline_ - now).ToMilliseconds() - 10;
+  // Exponential backoff with jitter.
+  int64_t base_delay_ms;
+  if (attempt_ <= 12) {
+    base_delay_ms = 1 << (attempt_ + 3);  // 1st retry delayed 2^4 ms, 2nd 2^5, etc.
+  } else {
+    base_delay_ms = 60 * 1000; // cap at 1 minute
   }
+  int64_t jitter_ms = rand() % 50;              // Add up to 50ms of additional random delay.
+  int64_t delay_millis = std::min<int64_t>(base_delay_ms + jitter_ms, millis_remaining);
 
-  // Use state() and MarkX() accessors.
-  AtomicWord state_;
-};
+  if (delay_millis <= 0) {
+    LOG(WARNING) << "Request timed out: " << description();
+    MarkFailed();
+  } else {
+    LOG(INFO) << "Scheduling retry of " << description() << " with a delay"
+              << " of " << delay_millis << "ms (attempt = " << attempt_ << ")...";
+    master_->messenger()->ScheduleOnReactor(
+        boost::bind(&RetryingTSRpcTask::RunDelayedTask, this, _1),
+        MonoDelta::FromMilliseconds(delay_millis));
+    return true;
+  }
+  return false;
+}
+
+void RetryingTSRpcTask::RunDelayedTask(const Status& status) {
+  if (!status.ok()) {
+    LOG(WARNING) << "Async tablet task " << description() << " failed or was cancelled: "
+                  << status.ToString();
+    UnregisterAsyncTask();   // May delete this.
+    return;
+  }
+
+  string desc = description();  // Save in case we need to log after deletion.
+  Status s = Run();             // May delete this.
+  if (!s.ok()) {
+    LOG(WARNING) << "Async tablet task " << desc << " failed: " << s.ToString();
+  }
+}
+
+void RetryingTSRpcTask::UnregisterAsyncTask() {
+  end_ts_ = MonoTime::Now();
+  if (table_ != nullptr) {
+    table_->RemoveTask(this);
+  } else {
+    // This is a floating task (since the table does not exist)
+    // created as response to a tablet report.
+    Release();  // May call "delete this";
+  }
+}
+
+Status RetryingTSRpcTask::ResetTSProxy() {
+  // TODO: if there is no replica available, should we still keep the task running?
+  string ts_uuid;
+  // TODO: don't pick replica we can't lookup???
+  RETURN_NOT_OK(replica_picker_->PickReplica(&ts_uuid));
+  shared_ptr<TSDescriptor> ts_desc;
+  if (!master_->ts_manager()->LookupTSByUUID(ts_uuid, &ts_desc)) {
+    return Status::NotFound(Substitute("Could not find TS for UUID $0",
+                                        ts_uuid));
+  }
+
+  // This assumes that TSDescriptors are never deleted by the master,
+  // so the task need not take ownership of the returned pointer.
+  target_ts_desc_ = ts_desc.get();
+
+  shared_ptr<tserver::TabletServerAdminServiceProxy> ts_proxy;
+  RETURN_NOT_OK(target_ts_desc_->GetTSAdminProxy(master_->messenger(), &ts_proxy));
+  ts_proxy_.swap(ts_proxy);
+
+  shared_ptr<consensus::ConsensusServiceProxy> consensus_proxy;
+  RETURN_NOT_OK(target_ts_desc_->GetConsensusProxy(master_->messenger(), &consensus_proxy));
+  consensus_proxy_.swap(consensus_proxy);
+
+  rpc_.Reset();
+  return Status::OK();
+}
 
 // RetryingTSRpcTask subclass which always retries the same tablet server,
 // identified by its UUID.
@@ -3138,6 +3151,9 @@ class AsyncAddServerTask : public RetryingTSRpcTask {
 };
 
 bool AsyncAddServerTask::SendRequest(int attempt) {
+  LOG(INFO) << "Sending request for AddServer on tablet " << tablet_->tablet_id()
+            << " (attempt " << attempt << ")";
+
   // Bail if we're retrying in vain.
   int64_t latest_index;
   {
@@ -3189,7 +3205,7 @@ bool AsyncAddServerTask::SendRequest(int attempt) {
 void AsyncAddServerTask::HandleResponse(int attempt) {
   if (!resp_.has_error()) {
     MarkComplete();
-    LOG_WITH_PREFIX(INFO) << "Change config succeeded";
+    LOG_WITH_PREFIX(INFO) << "Config change to add server succeeded";
     return;
   }
 
@@ -3280,18 +3296,18 @@ void CatalogManager::SendDeleteReplicaRequest(
     // created as response to a tablet report.
     call->AddRef();
   }
-  WARN_NOT_OK(call->Run(), "Failed to send delete tablet request");
+  WARN_NOT_OK(call->Run(),
+              Substitute("Failed to send DeleteReplica request for tablet $0", tablet_id));
 }
 
 void CatalogManager::SendAddServerRequest(const scoped_refptr<TabletInfo>& tablet,
                                           const ConsensusStatePB& cstate) {
   auto task = new AsyncAddServerTask(master_, tablet, cstate);
   tablet->table()->AddTask(task);
-  WARN_NOT_OK(task->Run(), "Failed to send new AddServer request");
-
-  // We can't access 'task' here because it may delete itself inside Run() in the
-  // case that the tablet has no known leader.
-  LOG(INFO) << "Started AddServer task for tablet " << tablet->tablet_id();
+  WARN_NOT_OK(task->Run(),
+              Substitute("Failed to send AddServer request for tablet $0", tablet->tablet_id()));
+  // We can't access 'task' after calling Run() because it may delete itself
+  // inside Run() in the case that the tablet has no known leader.
 }
 
 void CatalogManager::ExtractTabletsToProcess(

http://git-wip-us.apache.org/repos/asf/kudu/blob/f4272eb5/src/kudu/tools/kudu-admin-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-admin-test.cc b/src/kudu/tools/kudu-admin-test.cc
index a414c25..04573d6 100644
--- a/src/kudu/tools/kudu-admin-test.cc
+++ b/src/kudu/tools/kudu-admin-test.cc
@@ -39,10 +39,19 @@ using client::KuduClientBuilder;
 using client::KuduSchema;
 using client::KuduTableCreator;
 using client::sp::shared_ptr;
+using consensus::CONSENSUS_CONFIG_COMMITTED;
 using consensus::ConsensusStatePB;
 using consensus::OpId;
+using consensus::RECEIVED_OPID;
+using itest::GetConsensusState;
+using itest::GetLastOpIdForReplica;
 using itest::TabletServerMap;
 using itest::TServerDetails;
+using itest::WAIT_FOR_LEADER;
+using itest::WaitForReplicasReportedToMaster;
+using itest::WaitUntilCommittedOpIdIndexIs;
+using itest::WaitUntilTabletInState;
+using itest::WaitUntilTabletRunning;
 using std::string;
 using std::vector;
 using strings::Substitute;
@@ -182,7 +191,7 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigOnSingleFollower) {
   FLAGS_num_replicas = 3;
   // tserver_unresponsive_timeout_ms is useful so that master considers
   // the live tservers for tablet re-replication.
-  NO_FATALS(BuildAndStart({}, {}));
+  NO_FATALS(BuildAndStart());
 
   LOG(INFO) << "Finding tablet leader and waiting for things to start...";
   string tablet_id = tablet_replicas_.begin()->first;
@@ -198,17 +207,17 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigOnSingleFollower) {
   LOG(INFO) << "Waiting for Master to see the current replicas...";
   master::TabletLocationsPB tablet_locations;
   bool has_leader;
-  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
-                                                   3, tablet_id_, kTimeout,
-                                                   itest::WAIT_FOR_LEADER,
-                                                   &has_leader, &tablet_locations));
+  ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                            3, tablet_id_, kTimeout,
+                                            WAIT_FOR_LEADER,
+                                            &has_leader, &tablet_locations));
   LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
   ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
 
   // Wait for initial NO_OP to be committed by the leader.
   TServerDetails* leader_ts;
   ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
-  ASSERT_OK(itest::WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
+  ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
   vector<TServerDetails*> followers;
   ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
 
@@ -246,10 +255,10 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigOnSingleFollower) {
 
   // Wait for the master to be notified of the config change.
   LOG(INFO) << "Waiting for Master to see new config...";
-  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
-                                                   3, tablet_id_, kTimeout,
-                                                   itest::WAIT_FOR_LEADER,
-                                                   &has_leader, &tablet_locations));
+  ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                            3, tablet_id_, kTimeout,
+                                            WAIT_FOR_LEADER,
+                                            &has_leader, &tablet_locations));
   LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
 
   // Verify that two new servers are part of new config and old
@@ -264,8 +273,8 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigOnSingleFollower) {
   // we should see the tablet in TOMBSTONED state on these servers.
   ASSERT_OK(cluster_->tablet_server_by_uuid(leader_ts->uuid())->Restart());
   ASSERT_OK(cluster_->tablet_server_by_uuid(followers[1]->uuid())->Restart());
-  ASSERT_OK(itest::WaitUntilTabletInState(leader_ts, tablet_id, tablet::SHUTDOWN, kTimeout));
-  ASSERT_OK(itest::WaitUntilTabletInState(followers[1], tablet_id, tablet::SHUTDOWN, kTimeout));
+  ASSERT_OK(WaitUntilTabletInState(leader_ts, tablet_id, tablet::SHUTDOWN, kTimeout));
+  ASSERT_OK(WaitUntilTabletInState(followers[1], tablet_id, tablet::SHUTDOWN, kTimeout));
 }
 
 // Test unsafe config change when there is one leader survivor in the cluster.
@@ -291,17 +300,17 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigOnSingleLeader) {
   LOG(INFO) << "Waiting for Master to see the current replicas...";
   master::TabletLocationsPB tablet_locations;
   bool has_leader;
-  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
-                                                   3, tablet_id_, kTimeout,
-                                                   itest::WAIT_FOR_LEADER,
-                                                   &has_leader, &tablet_locations));
+  ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                            3, tablet_id_, kTimeout,
+                                            WAIT_FOR_LEADER,
+                                            &has_leader, &tablet_locations));
   LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
   ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
 
   // Wait for initial NO_OP to be committed by the leader.
   TServerDetails* leader_ts;
   ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
-  ASSERT_OK(itest::WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
+  ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
   vector<TServerDetails*> followers;
   ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
 
@@ -340,10 +349,10 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigOnSingleLeader) {
 
   // Wait for the master to be notified of the config change.
   LOG(INFO) << "Waiting for Master to see new config...";
-  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
-                                                   3, tablet_id_, kTimeout,
-                                                   itest::WAIT_FOR_LEADER,
-                                                   &has_leader, &tablet_locations));
+  ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                            3, tablet_id_, kTimeout,
+                                            WAIT_FOR_LEADER,
+                                            &has_leader, &tablet_locations));
   LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
   for (const master::TabletLocationsPB_ReplicaPB& replica :
       tablet_locations.replicas()) {
@@ -375,17 +384,17 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigForConfigWithTwoNodes) {
   LOG(INFO) << "Waiting for Master to see the current replicas...";
   master::TabletLocationsPB tablet_locations;
   bool has_leader;
-  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
-                                                   3, tablet_id_, kTimeout,
-                                                   itest::WAIT_FOR_LEADER,
-                                                   &has_leader, &tablet_locations));
+  ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                            3, tablet_id_, kTimeout,
+                                            WAIT_FOR_LEADER,
+                                            &has_leader, &tablet_locations));
   LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
   ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
 
   // Wait for initial NO_OP to be committed by the leader.
   TServerDetails* leader_ts;
   ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
-  ASSERT_OK(itest::WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
+  ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
   vector<TServerDetails*> followers;
   ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
 
@@ -423,10 +432,10 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigForConfigWithTwoNodes) {
 
   // Wait for the master to be notified of the config change.
   LOG(INFO) << "Waiting for Master to see new config...";
-  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
-                                                   3, tablet_id_, kTimeout,
-                                                   itest::WAIT_FOR_LEADER,
-                                                   &has_leader, &tablet_locations));
+  ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                            3, tablet_id_, kTimeout,
+                                            WAIT_FOR_LEADER,
+                                            &has_leader, &tablet_locations));
   LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
   for (const master::TabletLocationsPB_ReplicaPB& replica :
       tablet_locations.replicas()) {
@@ -466,17 +475,17 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigWithFiveReplicaConfig) {
   LOG(INFO) << "Waiting for Master to see the current replicas...";
   master::TabletLocationsPB tablet_locations;
   bool has_leader;
-  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
-                                                   5, tablet_id_, kTimeout,
-                                                   itest::WAIT_FOR_LEADER,
-                                                   &has_leader, &tablet_locations));
+  ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                            5, tablet_id_, kTimeout,
+                                            WAIT_FOR_LEADER,
+                                            &has_leader, &tablet_locations));
   LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
   ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
 
   // Wait for initial NO_OP to be committed by the leader.
   TServerDetails* leader_ts;
   ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
-  ASSERT_OK(itest::WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
+  ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
   vector<TServerDetails*> followers;
   ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
   ASSERT_EQ(followers.size(), 4);
@@ -515,10 +524,10 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigWithFiveReplicaConfig) {
 
   // Wait for the master to be notified of the config change.
   LOG(INFO) << "Waiting for Master to see new config...";
-  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
-                                                   5, tablet_id_, kTimeout,
-                                                   itest::WAIT_FOR_LEADER,
-                                                   &has_leader, &tablet_locations));
+  ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                            5, tablet_id_, kTimeout,
+                                            WAIT_FOR_LEADER,
+                                            &has_leader, &tablet_locations));
   LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
   for (const master::TabletLocationsPB_ReplicaPB& replica :
       tablet_locations.replicas()) {
@@ -553,17 +562,17 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigLeaderWithPendingConfig) {
   LOG(INFO) << "Waiting for Master to see the current replicas...";
   master::TabletLocationsPB tablet_locations;
   bool has_leader;
-  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
-                                                   3, tablet_id_, kTimeout,
-                                                   itest::WAIT_FOR_LEADER,
-                                                   &has_leader, &tablet_locations));
+  ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                            3, tablet_id_, kTimeout,
+                                            WAIT_FOR_LEADER,
+                                            &has_leader, &tablet_locations));
   LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
   ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
 
   // Wait for initial NO_OP to be committed by the leader.
   TServerDetails* leader_ts;
   ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
-  ASSERT_OK(itest::WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
+  ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
   vector<TServerDetails*> followers;
   ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
   ASSERT_EQ(followers.size(), 2);
@@ -612,10 +621,10 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigLeaderWithPendingConfig) {
 
   // Wait for the master to be notified of the config change.
   LOG(INFO) << "Waiting for Master to see new config...";
-  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
-                                                   3, tablet_id_, kTimeout,
-                                                   itest::WAIT_FOR_LEADER,
-                                                   &has_leader, &tablet_locations));
+  ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                            3, tablet_id_, kTimeout,
+                                            WAIT_FOR_LEADER,
+                                            &has_leader, &tablet_locations));
   LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
   for (const master::TabletLocationsPB_ReplicaPB& replica :
       tablet_locations.replicas()) {
@@ -650,17 +659,17 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigFollowerWithPendingConfig) {
   LOG(INFO) << "Waiting for Master to see the current replicas...";
   master::TabletLocationsPB tablet_locations;
   bool has_leader;
-  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
-                                                   3, tablet_id_, kTimeout,
-                                                   itest::WAIT_FOR_LEADER,
-                                                   &has_leader, &tablet_locations));
+  ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                            3, tablet_id_, kTimeout,
+                                            WAIT_FOR_LEADER,
+                                            &has_leader, &tablet_locations));
   LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
   ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
 
   // Wait for initial NO_OP to be committed by the leader.
   TServerDetails* leader_ts;
   ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
-  ASSERT_OK(itest::WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
+  ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
   vector<TServerDetails*> followers;
   ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
 
@@ -718,10 +727,10 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigFollowerWithPendingConfig) {
 
   // Wait for the master to be notified of the config change.
   LOG(INFO) << "Waiting for Master to see new config...";
-  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
-                                                   3, tablet_id_, kTimeout,
-                                                   itest::WAIT_FOR_LEADER,
-                                                   &has_leader, &tablet_locations));
+  ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                            3, tablet_id_, kTimeout,
+                                            WAIT_FOR_LEADER,
+                                            &has_leader, &tablet_locations));
   LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
   for (const master::TabletLocationsPB_ReplicaPB& replica :
       tablet_locations.replicas()) {
@@ -756,17 +765,17 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigWithPendingConfigsOnWAL) {
   LOG(INFO) << "Waiting for Master to see the current replicas...";
   master::TabletLocationsPB tablet_locations;
   bool has_leader;
-  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
-                                                   3, tablet_id_, kTimeout,
-                                                   itest::WAIT_FOR_LEADER,
-                                                   &has_leader, &tablet_locations));
+  ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                            3, tablet_id_, kTimeout,
+                                            WAIT_FOR_LEADER,
+                                            &has_leader, &tablet_locations));
   LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
   ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
 
   // Wait for initial NO_OP to be committed by the leader.
   TServerDetails* leader_ts;
   ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
-  ASSERT_OK(itest::WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
+  ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
   vector<TServerDetails*> followers;
   ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
 
@@ -824,10 +833,10 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigWithPendingConfigsOnWAL) {
   ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, new_node, tablet_id_, kTimeout));
 
   // Wait for the master to be notified of the config change.
-  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
-                                                   3, tablet_id_, kTimeout,
-                                                   itest::WAIT_FOR_LEADER,
-                                                   &has_leader, &tablet_locations));
+  ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                            3, tablet_id_, kTimeout,
+                                            WAIT_FOR_LEADER,
+                                            &has_leader, &tablet_locations));
   LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
   for (const master::TabletLocationsPB_ReplicaPB& replica :
       tablet_locations.replicas()) {
@@ -869,17 +878,17 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigWithMultiplePendingConfigs) {
   LOG(INFO) << "Waiting for Master to see the current replicas...";
   master::TabletLocationsPB tablet_locations;
   bool has_leader;
-  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
-                                                   5, tablet_id_, kTimeout,
-                                                   itest::WAIT_FOR_LEADER,
-                                                   &has_leader, &tablet_locations));
+  ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                            5, tablet_id_, kTimeout,
+                                            WAIT_FOR_LEADER,
+                                            &has_leader, &tablet_locations));
   LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
   ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
 
   // Wait for initial NO_OP to be committed by the leader.
   TServerDetails* leader_ts;
   ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
-  ASSERT_OK(itest::WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
+  ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
   vector<TServerDetails*> followers;
   ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
   ASSERT_EQ(followers.size(), 4);
@@ -931,10 +940,10 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigWithMultiplePendingConfigs) {
 
   // Wait for the master to be notified of the config change.
   LOG(INFO) << "Waiting for Master to see new config...";
-  ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
-                                                   5, tablet_id_, kTimeout,
-                                                   itest::WAIT_FOR_LEADER,
-                                                   &has_leader, &tablet_locations));
+  ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+                                            5, tablet_id_, kTimeout,
+                                            WAIT_FOR_LEADER,
+                                            &has_leader, &tablet_locations));
   LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
   for (const master::TabletLocationsPB_ReplicaPB& replica :
       tablet_locations.replicas()) {
@@ -951,9 +960,9 @@ Status GetTermFromConsensus(const vector<TServerDetails*>& tservers,
   ConsensusStatePB cstate;
   for (auto& ts : tservers) {
     RETURN_NOT_OK(
-        itest::GetConsensusState(ts, tablet_id,
-                                 consensus::CONSENSUS_CONFIG_COMMITTED,
-                                 MonoDelta::FromSeconds(10), &cstate));
+        GetConsensusState(ts, tablet_id,
+                          CONSENSUS_CONFIG_COMMITTED,
+                          MonoDelta::FromSeconds(10), &cstate));
     if (cstate.has_leader_uuid() && cstate.has_current_term()) {
       *current_term = cstate.current_term();
       return Status::OK();
@@ -972,9 +981,9 @@ TEST_F(AdminCliTest, TestLeaderStepDown) {
   AppendValuesFromMap(tablet_servers_, &tservers);
   ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
   for (auto& ts : tservers) {
-    ASSERT_OK(itest::WaitUntilTabletRunning(ts,
-                                            tablet_id_,
-                                            MonoDelta::FromSeconds(10)));
+    ASSERT_OK(WaitUntilTabletRunning(ts,
+                                     tablet_id_,
+                                     MonoDelta::FromSeconds(10)));
   }
 
   int64 current_term;
@@ -1014,9 +1023,9 @@ TEST_F(AdminCliTest, TestLeaderStepDownWhenNotPresent) {
   AppendValuesFromMap(tablet_servers_, &tservers);
   ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
   for (auto& ts : tservers) {
-    ASSERT_OK(itest::WaitUntilTabletRunning(ts,
-                                            tablet_id_,
-                                            MonoDelta::FromSeconds(10)));
+    ASSERT_OK(WaitUntilTabletRunning(ts,
+                                     tablet_id_,
+                                     MonoDelta::FromSeconds(10)));
   }
 
   int64 current_term;