You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2017/04/04 20:12:07 UTC
kudu git commit: master/consensus: Code cleanup
Repository: kudu
Updated Branches:
refs/heads/master fcb150c1a -> f4272eb5c
master/consensus: Code cleanup
No functional changes in this patch, just cleanup for follow-up patches.
I split this out so the following fixes would be easier to review.
* Catalog Manager: rearrange some code to make it easier to read and
change in a follow-up patch.
* consensus: Add "using" directives to kudu-admin-test. Just cleanup.
Change-Id: If9c4f2702d9f255a403b6b72e54f4795fd582676
Reviewed-on: http://gerrit.cloudera.org:8080/6533
Tested-by: Kudu Jenkins
Reviewed-by: Adar Dembo <ad...@cloudera.com>
Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/f4272eb5
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/f4272eb5
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/f4272eb5
Branch: refs/heads/master
Commit: f4272eb5cebf0d2f5e38b457cfe536cc5b25d537
Parents: fcb150c
Author: Mike Percy <mp...@apache.org>
Authored: Thu Mar 30 18:33:54 2017 -0700
Committer: Mike Percy <mp...@apache.org>
Committed: Tue Apr 4 20:11:22 2017 +0000
----------------------------------------------------------------------
src/kudu/master/catalog_manager.cc | 264 +++++++++++++++++---------------
src/kudu/tools/kudu-admin-test.cc | 177 +++++++++++----------
2 files changed, 233 insertions(+), 208 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kudu/blob/f4272eb5/src/kudu/master/catalog_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/master/catalog_manager.cc b/src/kudu/master/catalog_manager.cc
index a9d136a..39b104a 100644
--- a/src/kudu/master/catalog_manager.cc
+++ b/src/kudu/master/catalog_manager.cc
@@ -2607,34 +2607,7 @@ class RetryingTSRpcTask : public MonitoredTask {
}
// Send the subclass RPC request.
- Status Run() {
- if (PREDICT_FALSE(FLAGS_catalog_manager_fail_ts_rpcs)) {
- MarkFailed();
- UnregisterAsyncTask(); // May delete this.
- return Status::RuntimeError("Async RPCs configured to fail");
- }
-
- Status s = ResetTSProxy();
- if (!s.ok()) {
- LOG(WARNING) << "Unable to reset TS proxy: " << s.ToString();
- MarkFailed();
- UnregisterAsyncTask(); // May delete this.
- return s.CloneAndPrepend("Failed to reset TS proxy");
- }
-
- // Calculate and set the timeout deadline.
- MonoTime timeout = MonoTime::Now() +
- MonoDelta::FromMilliseconds(FLAGS_master_ts_rpc_timeout_ms);
- const MonoTime& deadline = MonoTime::Earliest(timeout, deadline_);
- rpc_.set_deadline(deadline);
-
- if (!SendRequest(++attempt_)) {
- if (!RescheduleWithBackoffDelay()) {
- UnregisterAsyncTask(); // May call 'delete this'.
- }
- }
- return Status::OK();
- }
+ Status Run();
// Abort this task.
virtual void Abort() OVERRIDE {
@@ -2688,22 +2661,7 @@ class RetryingTSRpcTask : public MonitoredTask {
// Callback meant to be invoked from asynchronous RPC service proxy calls.
//
// Runs on a reactor thread, so should not block or do any IO.
- void RpcCallback() {
- if (!rpc_.status().ok()) {
- LOG(WARNING) << "TS " << target_ts_desc_->ToString() << ": "
- << type_name() << " RPC failed for tablet "
- << tablet_id() << ": " << rpc_.status().ToString();
- } else if (state() != kStateAborted) {
- HandleResponse(attempt_); // Modifies state_.
- }
-
- // Schedule a retry if the RPC call was not successful.
- if (RescheduleWithBackoffDelay()) {
- return;
- }
-
- UnregisterAsyncTask(); // May call 'delete this'.
- }
+ void RpcCallback();
Master * const master_;
const gscoped_ptr<TSPicker> replica_picker_;
@@ -2724,95 +2682,150 @@ class RetryingTSRpcTask : public MonitoredTask {
// Returns false if the task was not rescheduled due to reaching the maximum
// timeout or because the task is no longer in a running state.
// Returns true if rescheduling the task was successful.
- bool RescheduleWithBackoffDelay() {
- if (state() != kStateRunning) return false;
- MonoTime now = MonoTime::Now();
- // We assume it might take 10ms to process the request in the best case,
- // fail if we have less than that amount of time remaining.
- int64_t millis_remaining = (deadline_ - now).ToMilliseconds() - 10;
- // Exponential backoff with jitter.
- int64_t base_delay_ms;
- if (attempt_ <= 12) {
- base_delay_ms = 1 << (attempt_ + 3); // 1st retry delayed 2^4 ms, 2nd 2^5, etc.
- } else {
- base_delay_ms = 60 * 1000; // cap at 1 minute
- }
- int64_t jitter_ms = rand() % 50; // Add up to 50ms of additional random delay.
- int64_t delay_millis = std::min<int64_t>(base_delay_ms + jitter_ms, millis_remaining);
-
- if (delay_millis <= 0) {
- LOG(WARNING) << "Request timed out: " << description();
- MarkFailed();
- } else {
- LOG(INFO) << "Scheduling retry of " << description() << " with a delay"
- << " of " << delay_millis << "ms (attempt = " << attempt_ << ")...";
- master_->messenger()->ScheduleOnReactor(
- boost::bind(&RetryingTSRpcTask::RunDelayedTask, this, _1),
- MonoDelta::FromMilliseconds(delay_millis));
- return true;
- }
- return false;
- }
+ bool RescheduleWithBackoffDelay();
// Callback for Reactor delayed task mechanism. Called either when it is time
// to execute the delayed task (with status == OK) or when the task
// is cancelled, i.e. when the scheduling timer is shut down (status != OK).
- void RunDelayedTask(const Status& status) {
- if (!status.ok()) {
- LOG(WARNING) << "Async tablet task " << description() << " failed or was cancelled: "
- << status.ToString();
- UnregisterAsyncTask(); // May delete this.
- return;
- }
+ void RunDelayedTask(const Status& status);
- string desc = description(); // Save in case we need to log after deletion.
- Status s = Run(); // May delete this.
- if (!s.ok()) {
- LOG(WARNING) << "Async tablet task " << desc << " failed: " << s.ToString();
- }
+ // Clean up request and release resources. May call 'delete this'.
+ void UnregisterAsyncTask();
+
+ // Find a new replica and construct the RPC proxy.
+ Status ResetTSProxy();
+
+ // Use state() and MarkX() accessors.
+ AtomicWord state_;
+};
+
+Status RetryingTSRpcTask::Run() {
+ if (PREDICT_FALSE(FLAGS_catalog_manager_fail_ts_rpcs)) {
+ MarkFailed();
+ UnregisterAsyncTask(); // May delete this.
+ return Status::RuntimeError("Async RPCs configured to fail");
}
- // Clean up request and release resources. May call 'delete this'.
- void UnregisterAsyncTask() {
- end_ts_ = MonoTime::Now();
- if (table_ != nullptr) {
- table_->RemoveTask(this);
- } else {
- // This is a floating task (since the table does not exist)
- // created as response to a tablet report.
- Release(); // May call "delete this";
- }
+ Status s = ResetTSProxy(); // This can fail if it's a replica we don't know about yet.
+ if (!s.ok()) {
+ MarkFailed();
+ UnregisterAsyncTask(); // May delete this.
+ return s.CloneAndPrepend("Failed to reset TS proxy");
}
- Status ResetTSProxy() {
- // TODO: if there is no replica available, should we still keep the task running?
- string ts_uuid;
- RETURN_NOT_OK(replica_picker_->PickReplica(&ts_uuid));
- shared_ptr<TSDescriptor> ts_desc;
- if (!master_->ts_manager()->LookupTSByUUID(ts_uuid, &ts_desc)) {
- return Status::NotFound(Substitute("Could not find TS for UUID $0",
- ts_uuid));
+ // Calculate and set the timeout deadline.
+ MonoTime timeout = MonoTime::Now() +
+ MonoDelta::FromMilliseconds(FLAGS_master_ts_rpc_timeout_ms);
+ const MonoTime& deadline = MonoTime::Earliest(timeout, deadline_);
+ rpc_.set_deadline(deadline);
+
+ if (!SendRequest(++attempt_)) {
+ if (!RescheduleWithBackoffDelay()) {
+ UnregisterAsyncTask(); // May call 'delete this'.
}
+ }
+ return Status::OK();
+}
- // This assumes that TSDescriptors are never deleted by the master,
- // so the task need not take ownership of the returned pointer.
- target_ts_desc_ = ts_desc.get();
+void RetryingTSRpcTask::RpcCallback() {
+ if (!rpc_.status().ok()) {
+ LOG(WARNING) << "TS " << target_ts_desc_->ToString() << ": "
+ << type_name() << " RPC failed for tablet "
+ << tablet_id() << ": " << rpc_.status().ToString();
+ } else if (state() != kStateAborted) {
+ HandleResponse(attempt_); // Modifies state_.
+ }
- shared_ptr<tserver::TabletServerAdminServiceProxy> ts_proxy;
- RETURN_NOT_OK(target_ts_desc_->GetTSAdminProxy(master_->messenger(), &ts_proxy));
- ts_proxy_.swap(ts_proxy);
+ // Schedule a retry if the RPC call was not successful.
+ if (RescheduleWithBackoffDelay()) {
+ return;
+ }
- shared_ptr<consensus::ConsensusServiceProxy> consensus_proxy;
- RETURN_NOT_OK(target_ts_desc_->GetConsensusProxy(master_->messenger(), &consensus_proxy));
- consensus_proxy_.swap(consensus_proxy);
+ UnregisterAsyncTask(); // May call 'delete this'.
+}
- rpc_.Reset();
- return Status::OK();
+bool RetryingTSRpcTask::RescheduleWithBackoffDelay() {
+ if (state() != kStateRunning) return false;
+ MonoTime now = MonoTime::Now();
+ // We assume it might take 10ms to process the request in the best case,
+ // fail if we have less than that amount of time remaining.
+ int64_t millis_remaining = (deadline_ - now).ToMilliseconds() - 10;
+ // Exponential backoff with jitter.
+ int64_t base_delay_ms;
+ if (attempt_ <= 12) {
+ base_delay_ms = 1 << (attempt_ + 3); // 1st retry delayed 2^4 ms, 2nd 2^5, etc.
+ } else {
+ base_delay_ms = 60 * 1000; // cap at 1 minute
}
+ int64_t jitter_ms = rand() % 50; // Add up to 50ms of additional random delay.
+ int64_t delay_millis = std::min<int64_t>(base_delay_ms + jitter_ms, millis_remaining);
- // Use state() and MarkX() accessors.
- AtomicWord state_;
-};
+ if (delay_millis <= 0) {
+ LOG(WARNING) << "Request timed out: " << description();
+ MarkFailed();
+ } else {
+ LOG(INFO) << "Scheduling retry of " << description() << " with a delay"
+ << " of " << delay_millis << "ms (attempt = " << attempt_ << ")...";
+ master_->messenger()->ScheduleOnReactor(
+ boost::bind(&RetryingTSRpcTask::RunDelayedTask, this, _1),
+ MonoDelta::FromMilliseconds(delay_millis));
+ return true;
+ }
+ return false;
+}
+
+void RetryingTSRpcTask::RunDelayedTask(const Status& status) {
+ if (!status.ok()) {
+ LOG(WARNING) << "Async tablet task " << description() << " failed or was cancelled: "
+ << status.ToString();
+ UnregisterAsyncTask(); // May delete this.
+ return;
+ }
+
+ string desc = description(); // Save in case we need to log after deletion.
+ Status s = Run(); // May delete this.
+ if (!s.ok()) {
+ LOG(WARNING) << "Async tablet task " << desc << " failed: " << s.ToString();
+ }
+}
+
+void RetryingTSRpcTask::UnregisterAsyncTask() {
+ end_ts_ = MonoTime::Now();
+ if (table_ != nullptr) {
+ table_->RemoveTask(this);
+ } else {
+ // This is a floating task (since the table does not exist)
+ // created as response to a tablet report.
+ Release(); // May call "delete this";
+ }
+}
+
+Status RetryingTSRpcTask::ResetTSProxy() {
+ // TODO: if there is no replica available, should we still keep the task running?
+ string ts_uuid;
+ // TODO: don't pick replica we can't lookup???
+ RETURN_NOT_OK(replica_picker_->PickReplica(&ts_uuid));
+ shared_ptr<TSDescriptor> ts_desc;
+ if (!master_->ts_manager()->LookupTSByUUID(ts_uuid, &ts_desc)) {
+ return Status::NotFound(Substitute("Could not find TS for UUID $0",
+ ts_uuid));
+ }
+
+ // This assumes that TSDescriptors are never deleted by the master,
+ // so the task need not take ownership of the returned pointer.
+ target_ts_desc_ = ts_desc.get();
+
+ shared_ptr<tserver::TabletServerAdminServiceProxy> ts_proxy;
+ RETURN_NOT_OK(target_ts_desc_->GetTSAdminProxy(master_->messenger(), &ts_proxy));
+ ts_proxy_.swap(ts_proxy);
+
+ shared_ptr<consensus::ConsensusServiceProxy> consensus_proxy;
+ RETURN_NOT_OK(target_ts_desc_->GetConsensusProxy(master_->messenger(), &consensus_proxy));
+ consensus_proxy_.swap(consensus_proxy);
+
+ rpc_.Reset();
+ return Status::OK();
+}
// RetryingTSRpcTask subclass which always retries the same tablet server,
// identified by its UUID.
@@ -3138,6 +3151,9 @@ class AsyncAddServerTask : public RetryingTSRpcTask {
};
bool AsyncAddServerTask::SendRequest(int attempt) {
+ LOG(INFO) << "Sending request for AddServer on tablet " << tablet_->tablet_id()
+ << " (attempt " << attempt << ")";
+
// Bail if we're retrying in vain.
int64_t latest_index;
{
@@ -3189,7 +3205,7 @@ bool AsyncAddServerTask::SendRequest(int attempt) {
void AsyncAddServerTask::HandleResponse(int attempt) {
if (!resp_.has_error()) {
MarkComplete();
- LOG_WITH_PREFIX(INFO) << "Change config succeeded";
+ LOG_WITH_PREFIX(INFO) << "Config change to add server succeeded";
return;
}
@@ -3280,18 +3296,18 @@ void CatalogManager::SendDeleteReplicaRequest(
// created as response to a tablet report.
call->AddRef();
}
- WARN_NOT_OK(call->Run(), "Failed to send delete tablet request");
+ WARN_NOT_OK(call->Run(),
+ Substitute("Failed to send DeleteReplica request for tablet $0", tablet_id));
}
void CatalogManager::SendAddServerRequest(const scoped_refptr<TabletInfo>& tablet,
const ConsensusStatePB& cstate) {
auto task = new AsyncAddServerTask(master_, tablet, cstate);
tablet->table()->AddTask(task);
- WARN_NOT_OK(task->Run(), "Failed to send new AddServer request");
-
- // We can't access 'task' here because it may delete itself inside Run() in the
- // case that the tablet has no known leader.
- LOG(INFO) << "Started AddServer task for tablet " << tablet->tablet_id();
+ WARN_NOT_OK(task->Run(),
+ Substitute("Failed to send AddServer request for tablet $0", tablet->tablet_id()));
+ // We can't access 'task' after calling Run() because it may delete itself
+ // inside Run() in the case that the tablet has no known leader.
}
void CatalogManager::ExtractTabletsToProcess(
http://git-wip-us.apache.org/repos/asf/kudu/blob/f4272eb5/src/kudu/tools/kudu-admin-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-admin-test.cc b/src/kudu/tools/kudu-admin-test.cc
index a414c25..04573d6 100644
--- a/src/kudu/tools/kudu-admin-test.cc
+++ b/src/kudu/tools/kudu-admin-test.cc
@@ -39,10 +39,19 @@ using client::KuduClientBuilder;
using client::KuduSchema;
using client::KuduTableCreator;
using client::sp::shared_ptr;
+using consensus::CONSENSUS_CONFIG_COMMITTED;
using consensus::ConsensusStatePB;
using consensus::OpId;
+using consensus::RECEIVED_OPID;
+using itest::GetConsensusState;
+using itest::GetLastOpIdForReplica;
using itest::TabletServerMap;
using itest::TServerDetails;
+using itest::WAIT_FOR_LEADER;
+using itest::WaitForReplicasReportedToMaster;
+using itest::WaitUntilCommittedOpIdIndexIs;
+using itest::WaitUntilTabletInState;
+using itest::WaitUntilTabletRunning;
using std::string;
using std::vector;
using strings::Substitute;
@@ -182,7 +191,7 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigOnSingleFollower) {
FLAGS_num_replicas = 3;
// tserver_unresponsive_timeout_ms is useful so that master considers
// the live tservers for tablet re-replication.
- NO_FATALS(BuildAndStart({}, {}));
+ NO_FATALS(BuildAndStart());
LOG(INFO) << "Finding tablet leader and waiting for things to start...";
string tablet_id = tablet_replicas_.begin()->first;
@@ -198,17 +207,17 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigOnSingleFollower) {
LOG(INFO) << "Waiting for Master to see the current replicas...";
master::TabletLocationsPB tablet_locations;
bool has_leader;
- ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
- 3, tablet_id_, kTimeout,
- itest::WAIT_FOR_LEADER,
- &has_leader, &tablet_locations));
+ ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+ 3, tablet_id_, kTimeout,
+ WAIT_FOR_LEADER,
+ &has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
// Wait for initial NO_OP to be committed by the leader.
TServerDetails* leader_ts;
ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
- ASSERT_OK(itest::WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
+ ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
vector<TServerDetails*> followers;
ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
@@ -246,10 +255,10 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigOnSingleFollower) {
// Wait for the master to be notified of the config change.
LOG(INFO) << "Waiting for Master to see new config...";
- ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
- 3, tablet_id_, kTimeout,
- itest::WAIT_FOR_LEADER,
- &has_leader, &tablet_locations));
+ ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+ 3, tablet_id_, kTimeout,
+ WAIT_FOR_LEADER,
+ &has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
// Verify that two new servers are part of new config and old
@@ -264,8 +273,8 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigOnSingleFollower) {
// we should see the tablet in TOMBSTONED state on these servers.
ASSERT_OK(cluster_->tablet_server_by_uuid(leader_ts->uuid())->Restart());
ASSERT_OK(cluster_->tablet_server_by_uuid(followers[1]->uuid())->Restart());
- ASSERT_OK(itest::WaitUntilTabletInState(leader_ts, tablet_id, tablet::SHUTDOWN, kTimeout));
- ASSERT_OK(itest::WaitUntilTabletInState(followers[1], tablet_id, tablet::SHUTDOWN, kTimeout));
+ ASSERT_OK(WaitUntilTabletInState(leader_ts, tablet_id, tablet::SHUTDOWN, kTimeout));
+ ASSERT_OK(WaitUntilTabletInState(followers[1], tablet_id, tablet::SHUTDOWN, kTimeout));
}
// Test unsafe config change when there is one leader survivor in the cluster.
@@ -291,17 +300,17 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigOnSingleLeader) {
LOG(INFO) << "Waiting for Master to see the current replicas...";
master::TabletLocationsPB tablet_locations;
bool has_leader;
- ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
- 3, tablet_id_, kTimeout,
- itest::WAIT_FOR_LEADER,
- &has_leader, &tablet_locations));
+ ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+ 3, tablet_id_, kTimeout,
+ WAIT_FOR_LEADER,
+ &has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
// Wait for initial NO_OP to be committed by the leader.
TServerDetails* leader_ts;
ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
- ASSERT_OK(itest::WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
+ ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
vector<TServerDetails*> followers;
ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
@@ -340,10 +349,10 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigOnSingleLeader) {
// Wait for the master to be notified of the config change.
LOG(INFO) << "Waiting for Master to see new config...";
- ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
- 3, tablet_id_, kTimeout,
- itest::WAIT_FOR_LEADER,
- &has_leader, &tablet_locations));
+ ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+ 3, tablet_id_, kTimeout,
+ WAIT_FOR_LEADER,
+ &has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
for (const master::TabletLocationsPB_ReplicaPB& replica :
tablet_locations.replicas()) {
@@ -375,17 +384,17 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigForConfigWithTwoNodes) {
LOG(INFO) << "Waiting for Master to see the current replicas...";
master::TabletLocationsPB tablet_locations;
bool has_leader;
- ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
- 3, tablet_id_, kTimeout,
- itest::WAIT_FOR_LEADER,
- &has_leader, &tablet_locations));
+ ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+ 3, tablet_id_, kTimeout,
+ WAIT_FOR_LEADER,
+ &has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
// Wait for initial NO_OP to be committed by the leader.
TServerDetails* leader_ts;
ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
- ASSERT_OK(itest::WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
+ ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
vector<TServerDetails*> followers;
ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
@@ -423,10 +432,10 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigForConfigWithTwoNodes) {
// Wait for the master to be notified of the config change.
LOG(INFO) << "Waiting for Master to see new config...";
- ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
- 3, tablet_id_, kTimeout,
- itest::WAIT_FOR_LEADER,
- &has_leader, &tablet_locations));
+ ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+ 3, tablet_id_, kTimeout,
+ WAIT_FOR_LEADER,
+ &has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
for (const master::TabletLocationsPB_ReplicaPB& replica :
tablet_locations.replicas()) {
@@ -466,17 +475,17 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigWithFiveReplicaConfig) {
LOG(INFO) << "Waiting for Master to see the current replicas...";
master::TabletLocationsPB tablet_locations;
bool has_leader;
- ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
- 5, tablet_id_, kTimeout,
- itest::WAIT_FOR_LEADER,
- &has_leader, &tablet_locations));
+ ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+ 5, tablet_id_, kTimeout,
+ WAIT_FOR_LEADER,
+ &has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
// Wait for initial NO_OP to be committed by the leader.
TServerDetails* leader_ts;
ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
- ASSERT_OK(itest::WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
+ ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
vector<TServerDetails*> followers;
ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
ASSERT_EQ(followers.size(), 4);
@@ -515,10 +524,10 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigWithFiveReplicaConfig) {
// Wait for the master to be notified of the config change.
LOG(INFO) << "Waiting for Master to see new config...";
- ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
- 5, tablet_id_, kTimeout,
- itest::WAIT_FOR_LEADER,
- &has_leader, &tablet_locations));
+ ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+ 5, tablet_id_, kTimeout,
+ WAIT_FOR_LEADER,
+ &has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
for (const master::TabletLocationsPB_ReplicaPB& replica :
tablet_locations.replicas()) {
@@ -553,17 +562,17 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigLeaderWithPendingConfig) {
LOG(INFO) << "Waiting for Master to see the current replicas...";
master::TabletLocationsPB tablet_locations;
bool has_leader;
- ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
- 3, tablet_id_, kTimeout,
- itest::WAIT_FOR_LEADER,
- &has_leader, &tablet_locations));
+ ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+ 3, tablet_id_, kTimeout,
+ WAIT_FOR_LEADER,
+ &has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
// Wait for initial NO_OP to be committed by the leader.
TServerDetails* leader_ts;
ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
- ASSERT_OK(itest::WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
+ ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
vector<TServerDetails*> followers;
ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
ASSERT_EQ(followers.size(), 2);
@@ -612,10 +621,10 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigLeaderWithPendingConfig) {
// Wait for the master to be notified of the config change.
LOG(INFO) << "Waiting for Master to see new config...";
- ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
- 3, tablet_id_, kTimeout,
- itest::WAIT_FOR_LEADER,
- &has_leader, &tablet_locations));
+ ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+ 3, tablet_id_, kTimeout,
+ WAIT_FOR_LEADER,
+ &has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
for (const master::TabletLocationsPB_ReplicaPB& replica :
tablet_locations.replicas()) {
@@ -650,17 +659,17 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigFollowerWithPendingConfig) {
LOG(INFO) << "Waiting for Master to see the current replicas...";
master::TabletLocationsPB tablet_locations;
bool has_leader;
- ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
- 3, tablet_id_, kTimeout,
- itest::WAIT_FOR_LEADER,
- &has_leader, &tablet_locations));
+ ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+ 3, tablet_id_, kTimeout,
+ WAIT_FOR_LEADER,
+ &has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
// Wait for initial NO_OP to be committed by the leader.
TServerDetails* leader_ts;
ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
- ASSERT_OK(itest::WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
+ ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
vector<TServerDetails*> followers;
ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
@@ -718,10 +727,10 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigFollowerWithPendingConfig) {
// Wait for the master to be notified of the config change.
LOG(INFO) << "Waiting for Master to see new config...";
- ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
- 3, tablet_id_, kTimeout,
- itest::WAIT_FOR_LEADER,
- &has_leader, &tablet_locations));
+ ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+ 3, tablet_id_, kTimeout,
+ WAIT_FOR_LEADER,
+ &has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
for (const master::TabletLocationsPB_ReplicaPB& replica :
tablet_locations.replicas()) {
@@ -756,17 +765,17 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigWithPendingConfigsOnWAL) {
LOG(INFO) << "Waiting for Master to see the current replicas...";
master::TabletLocationsPB tablet_locations;
bool has_leader;
- ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
- 3, tablet_id_, kTimeout,
- itest::WAIT_FOR_LEADER,
- &has_leader, &tablet_locations));
+ ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+ 3, tablet_id_, kTimeout,
+ WAIT_FOR_LEADER,
+ &has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
// Wait for initial NO_OP to be committed by the leader.
TServerDetails* leader_ts;
ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
- ASSERT_OK(itest::WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
+ ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
vector<TServerDetails*> followers;
ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
@@ -824,10 +833,10 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigWithPendingConfigsOnWAL) {
ASSERT_OK(WaitUntilCommittedConfigNumVotersIs(3, new_node, tablet_id_, kTimeout));
// Wait for the master to be notified of the config change.
- ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
- 3, tablet_id_, kTimeout,
- itest::WAIT_FOR_LEADER,
- &has_leader, &tablet_locations));
+ ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+ 3, tablet_id_, kTimeout,
+ WAIT_FOR_LEADER,
+ &has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
for (const master::TabletLocationsPB_ReplicaPB& replica :
tablet_locations.replicas()) {
@@ -869,17 +878,17 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigWithMultiplePendingConfigs) {
LOG(INFO) << "Waiting for Master to see the current replicas...";
master::TabletLocationsPB tablet_locations;
bool has_leader;
- ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
- 5, tablet_id_, kTimeout,
- itest::WAIT_FOR_LEADER,
- &has_leader, &tablet_locations));
+ ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+ 5, tablet_id_, kTimeout,
+ WAIT_FOR_LEADER,
+ &has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
ASSERT_TRUE(has_leader) << SecureDebugString(tablet_locations);
// Wait for initial NO_OP to be committed by the leader.
TServerDetails* leader_ts;
ASSERT_OK(FindTabletLeader(active_tablet_servers, tablet_id_, kTimeout, &leader_ts));
- ASSERT_OK(itest::WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
+ ASSERT_OK(WaitUntilCommittedOpIdIndexIs(1, leader_ts, tablet_id_, kTimeout));
vector<TServerDetails*> followers;
ASSERT_OK(FindTabletFollowers(active_tablet_servers, tablet_id_, kTimeout, &followers));
ASSERT_EQ(followers.size(), 4);
@@ -931,10 +940,10 @@ TEST_F(AdminCliTest, TestUnsafeChangeConfigWithMultiplePendingConfigs) {
// Wait for the master to be notified of the config change.
LOG(INFO) << "Waiting for Master to see new config...";
- ASSERT_OK(itest::WaitForReplicasReportedToMaster(cluster_->master_proxy(),
- 5, tablet_id_, kTimeout,
- itest::WAIT_FOR_LEADER,
- &has_leader, &tablet_locations));
+ ASSERT_OK(WaitForReplicasReportedToMaster(cluster_->master_proxy(),
+ 5, tablet_id_, kTimeout,
+ WAIT_FOR_LEADER,
+ &has_leader, &tablet_locations));
LOG(INFO) << "Tablet locations:\n" << SecureDebugString(tablet_locations);
for (const master::TabletLocationsPB_ReplicaPB& replica :
tablet_locations.replicas()) {
@@ -951,9 +960,9 @@ Status GetTermFromConsensus(const vector<TServerDetails*>& tservers,
ConsensusStatePB cstate;
for (auto& ts : tservers) {
RETURN_NOT_OK(
- itest::GetConsensusState(ts, tablet_id,
- consensus::CONSENSUS_CONFIG_COMMITTED,
- MonoDelta::FromSeconds(10), &cstate));
+ GetConsensusState(ts, tablet_id,
+ CONSENSUS_CONFIG_COMMITTED,
+ MonoDelta::FromSeconds(10), &cstate));
if (cstate.has_leader_uuid() && cstate.has_current_term()) {
*current_term = cstate.current_term();
return Status::OK();
@@ -972,9 +981,9 @@ TEST_F(AdminCliTest, TestLeaderStepDown) {
AppendValuesFromMap(tablet_servers_, &tservers);
ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
for (auto& ts : tservers) {
- ASSERT_OK(itest::WaitUntilTabletRunning(ts,
- tablet_id_,
- MonoDelta::FromSeconds(10)));
+ ASSERT_OK(WaitUntilTabletRunning(ts,
+ tablet_id_,
+ MonoDelta::FromSeconds(10)));
}
int64 current_term;
@@ -1014,9 +1023,9 @@ TEST_F(AdminCliTest, TestLeaderStepDownWhenNotPresent) {
AppendValuesFromMap(tablet_servers_, &tservers);
ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
for (auto& ts : tservers) {
- ASSERT_OK(itest::WaitUntilTabletRunning(ts,
- tablet_id_,
- MonoDelta::FromSeconds(10)));
+ ASSERT_OK(WaitUntilTabletRunning(ts,
+ tablet_id_,
+ MonoDelta::FromSeconds(10)));
}
int64 current_term;