You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/10/22 20:35:29 UTC
[kudu] branch master updated: consensus_peers: make
RpcPeerProxy::StartElection async and other cleanup
This is an automated email from the ASF dual-hosted git repository.
adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git
The following commit(s) were added to refs/heads/master by this push:
new de74056 consensus_peers: make RpcPeerProxy::StartElection async and other cleanup
de74056 is described below
commit de74056dd4d5d01e86a56958623e0362d97d0ab7
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Fri Oct 18 15:49:37 2019 -0700
consensus_peers: make RpcPeerProxy::StartElection async and other cleanup
It was the only such proxy call that was synchronous. Apart from violating
POLA, the synchronicity meant that the graceful leadership transfer code
path occupied a Raft threadpool slot for longer in order to receive the RPC
response. It's not a hot path so this is by no means a perf improvement, but
it just seemed unnecessary given that it was only used to log a warning in
the rare event that the remote couldn't start an election.
Changing this meant adding some extra lifecycle code to Peer::StartElection,
but I think it's still a net improvement in clarity.
Along the way I also did some other C++11 cleanup.
Change-Id: I6a6df610f1c07adae5a85534d8c6dec324801042
Reviewed-on: http://gerrit.cloudera.org:8080/14512
Tested-by: Kudu Jenkins
Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
src/kudu/consensus/consensus-test-util.h | 146 +++++++++++++++----------------
src/kudu/consensus/consensus_peers.cc | 60 ++++++++-----
src/kudu/consensus/consensus_peers.h | 35 ++++----
src/kudu/consensus/leader_election.cc | 2 +-
src/kudu/consensus/peer_manager.cc | 3 +-
5 files changed, 134 insertions(+), 112 deletions(-)
diff --git a/src/kudu/consensus/consensus-test-util.h b/src/kudu/consensus/consensus-test-util.h
index cc799c1..ff5aa87 100644
--- a/src/kudu/consensus/consensus-test-util.h
+++ b/src/kudu/consensus/consensus-test-util.h
@@ -150,7 +150,7 @@ class TestPeerProxy : public PeerProxy {
protected:
// Register the RPC callback in order to call later.
// We currently only support one request of each method being in flight at a time.
- virtual void RegisterCallback(Method method, const rpc::ResponseCallback& callback) {
+ void RegisterCallback(Method method, const rpc::ResponseCallback& callback) {
std::lock_guard<simple_spinlock> lock(lock_);
InsertOrDie(&callbacks_, method, callback);
}
@@ -170,7 +170,7 @@ class TestPeerProxy : public PeerProxy {
ignore_result(pool_->SubmitFunc(callback));
}
- virtual void RegisterCallbackAndRespond(Method method, const rpc::ResponseCallback& callback) {
+ void RegisterCallbackAndRespond(Method method, const rpc::ResponseCallback& callback) {
RegisterCallback(method, callback);
Respond(method);
}
@@ -200,7 +200,7 @@ class DelayablePeerProxy : public TestPeerProxy {
latch_.Reset(1); // Reset for the next time.
}
- virtual void RespondUnlessDelayed(Method method) {
+ void RespondUnlessDelayed(Method method) {
{
std::lock_guard<simple_spinlock> l(lock_);
if (delay_response_) {
@@ -212,31 +212,32 @@ class DelayablePeerProxy : public TestPeerProxy {
TestPeerProxy::Respond(method);
}
- virtual void Respond(Method method) OVERRIDE {
+ void Respond(Method method) override {
latch_.Wait(); // Wait until strictly after peer would have responded.
return TestPeerProxy::Respond(method);
}
- virtual void UpdateAsync(const ConsensusRequestPB* request,
- ConsensusResponsePB* response,
- rpc::RpcController* controller,
- const rpc::ResponseCallback& callback) OVERRIDE {
+ void UpdateAsync(const ConsensusRequestPB& request,
+ ConsensusResponsePB* response,
+ rpc::RpcController* controller,
+ const rpc::ResponseCallback& callback) override {
RegisterCallback(kUpdate, callback);
return proxy_->UpdateAsync(request, response, controller,
boost::bind(&DelayablePeerProxy::RespondUnlessDelayed,
this, kUpdate));
}
- virtual Status StartElection(const RunLeaderElectionRequestPB* /*request*/,
- RunLeaderElectionResponsePB* /*response*/,
- rpc::RpcController* /*controller*/) override {
- return Status::OK();
+ void StartElectionAsync(const RunLeaderElectionRequestPB& /*request*/,
+ RunLeaderElectionResponsePB* /*response*/,
+ rpc::RpcController* /*controller*/,
+ const rpc::ResponseCallback& callback) override {
+ callback();
}
- virtual void RequestConsensusVoteAsync(const VoteRequestPB* request,
- VoteResponsePB* response,
- rpc::RpcController* controller,
- const rpc::ResponseCallback& callback) OVERRIDE {
+ void RequestConsensusVoteAsync(const VoteRequestPB& request,
+ VoteResponsePB* response,
+ rpc::RpcController* controller,
+ const rpc::ResponseCallback& callback) override {
RegisterCallback(kRequestVote, callback);
return proxy_->RequestConsensusVoteAsync(request, response, controller,
boost::bind(&DelayablePeerProxy::RespondUnlessDelayed,
@@ -262,25 +263,21 @@ class MockedPeerProxy : public TestPeerProxy {
update_count_(0) {
}
- virtual void set_update_response(const ConsensusResponsePB& update_response) {
+ void set_update_response(const ConsensusResponsePB& update_response) {
CHECK(update_response.IsInitialized()) << pb_util::SecureShortDebugString(update_response);
- {
- std::lock_guard<simple_spinlock> l(lock_);
- update_response_ = update_response;
- }
+ std::lock_guard<simple_spinlock> l(lock_);
+ update_response_ = update_response;
}
- virtual void set_vote_response(const VoteResponsePB& vote_response) {
- {
- std::lock_guard<simple_spinlock> l(lock_);
- vote_response_ = vote_response;
- }
+ void set_vote_response(const VoteResponsePB& vote_response) {
+ std::lock_guard<simple_spinlock> l(lock_);
+ vote_response_ = vote_response;
}
- virtual void UpdateAsync(const ConsensusRequestPB* request,
- ConsensusResponsePB* response,
- rpc::RpcController* controller,
- const rpc::ResponseCallback& callback) OVERRIDE {
+ void UpdateAsync(const ConsensusRequestPB& /*request*/,
+ ConsensusResponsePB* response,
+ rpc::RpcController* /*controller*/,
+ const rpc::ResponseCallback& callback) override {
{
std::lock_guard<simple_spinlock> l(lock_);
update_count_++;
@@ -289,18 +286,19 @@ class MockedPeerProxy : public TestPeerProxy {
return RegisterCallbackAndRespond(kUpdate, callback);
}
- virtual void RequestConsensusVoteAsync(const VoteRequestPB* request,
- VoteResponsePB* response,
- rpc::RpcController* controller,
- const rpc::ResponseCallback& callback) OVERRIDE {
+ void RequestConsensusVoteAsync(const VoteRequestPB& /*request*/,
+ VoteResponsePB* response,
+ rpc::RpcController* /*controller*/,
+ const rpc::ResponseCallback& callback) override {
*response = vote_response_;
return RegisterCallbackAndRespond(kRequestVote, callback);
}
- Status StartElection(const RunLeaderElectionRequestPB* /*request*/,
- RunLeaderElectionResponsePB* /*response*/,
- rpc::RpcController* /*controller*/) override {
- return Status::OK();
+ void StartElectionAsync(const RunLeaderElectionRequestPB& /*request*/,
+ RunLeaderElectionResponsePB* /*response*/,
+ rpc::RpcController* /*controller*/,
+ const rpc::ResponseCallback& callback) override {
+ callback();
}
// Return the number of times that UpdateAsync() has been called.
@@ -326,24 +324,24 @@ class NoOpTestPeerProxy : public TestPeerProxy {
last_received_.CopyFrom(MinimumOpId());
}
- virtual void UpdateAsync(const ConsensusRequestPB* request,
- ConsensusResponsePB* response,
- rpc::RpcController* controller,
- const rpc::ResponseCallback& callback) OVERRIDE {
+ void UpdateAsync(const ConsensusRequestPB& request,
+ ConsensusResponsePB* response,
+ rpc::RpcController* /*controller*/,
+ const rpc::ResponseCallback& callback) override {
response->Clear();
{
std::lock_guard<simple_spinlock> lock(lock_);
- if (OpIdLessThan(last_received_, request->preceding_id())) {
+ if (OpIdLessThan(last_received_, request.preceding_id())) {
ConsensusErrorPB* error = response->mutable_status()->mutable_error();
error->set_code(ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH);
StatusToPB(Status::IllegalState(""), error->mutable_status());
- } else if (request->ops_size() > 0) {
- last_received_.CopyFrom(request->ops(request->ops_size() - 1).id());
+ } else if (request.ops_size() > 0) {
+ last_received_.CopyFrom(request.ops(request.ops_size() - 1).id());
}
response->set_responder_uuid(peer_pb_.permanent_uuid());
- response->set_responder_term(request->caller_term());
+ response->set_responder_term(request.caller_term());
response->mutable_status()->mutable_last_received()->CopyFrom(last_received_);
response->mutable_status()->mutable_last_received_current_leader()->CopyFrom(last_received_);
// We set the last committed index to be the same index as the last received. While
@@ -354,20 +352,21 @@ class NoOpTestPeerProxy : public TestPeerProxy {
return RegisterCallbackAndRespond(kUpdate, callback);
}
- virtual Status StartElection(const RunLeaderElectionRequestPB* /*request*/,
- RunLeaderElectionResponsePB* /*response*/,
- rpc::RpcController* /*controller*/) override {
- return Status::OK();
+ void StartElectionAsync(const RunLeaderElectionRequestPB& /*request*/,
+ RunLeaderElectionResponsePB* /*response*/,
+ rpc::RpcController* /*controller*/,
+ const rpc::ResponseCallback& callback) override {
+ callback();
}
- virtual void RequestConsensusVoteAsync(const VoteRequestPB* request,
- VoteResponsePB* response,
- rpc::RpcController* /*controller*/,
- const rpc::ResponseCallback& callback) OVERRIDE {
+ void RequestConsensusVoteAsync(const VoteRequestPB& request,
+ VoteResponsePB* response,
+ rpc::RpcController* /*controller*/,
+ const rpc::ResponseCallback& callback) override {
{
std::lock_guard<simple_spinlock> lock(lock_);
response->set_responder_uuid(peer_pb_.permanent_uuid());
- response->set_responder_term(request->candidate_term());
+ response->set_responder_term(request.candidate_term());
response->set_vote_granted(true);
}
return RegisterCallbackAndRespond(kRequestVote, callback);
@@ -473,25 +472,26 @@ class LocalTestPeerProxy : public TestPeerProxy {
peers_(peers),
miss_comm_(false) {}
- virtual void UpdateAsync(const ConsensusRequestPB* request,
- ConsensusResponsePB* response,
- rpc::RpcController* controller,
- const rpc::ResponseCallback& callback) OVERRIDE {
+ void UpdateAsync(const ConsensusRequestPB& request,
+ ConsensusResponsePB* response,
+ rpc::RpcController* /*controller*/,
+ const rpc::ResponseCallback& callback) override {
RegisterCallback(kUpdate, callback);
CHECK_OK(pool_->SubmitFunc(boost::bind(&LocalTestPeerProxy::SendUpdateRequest,
this, request, response)));
}
- Status StartElection(const RunLeaderElectionRequestPB* /*request*/,
- RunLeaderElectionResponsePB* /*response*/,
- rpc::RpcController* /*controller*/) override {
- return Status::OK();
+ void StartElectionAsync(const RunLeaderElectionRequestPB& /*request*/,
+ RunLeaderElectionResponsePB* /*response*/,
+ rpc::RpcController* /*controller*/,
+ const rpc::ResponseCallback& callback) override {
+ callback();
}
- virtual void RequestConsensusVoteAsync(const VoteRequestPB* request,
- VoteResponsePB* response,
- rpc::RpcController* /*controller*/,
- const rpc::ResponseCallback& callback) OVERRIDE {
+ void RequestConsensusVoteAsync(const VoteRequestPB& request,
+ VoteResponsePB* response,
+ rpc::RpcController* /*controller*/,
+ const rpc::ResponseCallback& callback) override {
RegisterCallback(kRequestVote, callback);
CHECK_OK(pool_->SubmitFunc(boost::bind(&LocalTestPeerProxy::SendVoteRequest,
this, request, response)));
@@ -505,7 +505,7 @@ class LocalTestPeerProxy : public TestPeerProxy {
}
template<class Request, class Response>
- void RespondOrMissResponse(Request* request,
+ void RespondOrMissResponse(const Request& request,
const Response& response_temp,
Response* final_response,
Method method) {
@@ -517,7 +517,7 @@ class LocalTestPeerProxy : public TestPeerProxy {
miss_comm_ = false;
}
if (PREDICT_FALSE(miss_comm_copy)) {
- VLOG(2) << this << ": injecting fault on " << pb_util::SecureShortDebugString(*request);
+ VLOG(2) << this << ": injecting fault on " << pb_util::SecureShortDebugString(request);
SetResponseError(Status::IOError("Artificial error caused by communication "
"failure injection."), final_response);
} else {
@@ -526,12 +526,12 @@ class LocalTestPeerProxy : public TestPeerProxy {
Respond(method);
}
- void SendUpdateRequest(const ConsensusRequestPB* request,
+ void SendUpdateRequest(const ConsensusRequestPB& request,
ConsensusResponsePB* response) {
// Copy the request and the response for the other peer so that ownership
// remains as close to the dist. impl. as possible.
ConsensusRequestPB other_peer_req;
- other_peer_req.CopyFrom(*request);
+ other_peer_req.CopyFrom(request);
// Give the other peer a clean response object to write to.
ConsensusResponsePB other_peer_resp;
@@ -558,13 +558,13 @@ class LocalTestPeerProxy : public TestPeerProxy {
- void SendVoteRequest(const VoteRequestPB* request,
+ void SendVoteRequest(const VoteRequestPB& request,
VoteResponsePB* response) {
// Copy the request and the response for the other peer so that ownership
// remains as close to the dist. impl. as possible.
VoteRequestPB other_peer_req;
- other_peer_req.CopyFrom(*request);
+ other_peer_req.CopyFrom(request);
VoteResponsePB other_peer_resp;
other_peer_resp.CopyFrom(*response);
diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc
index e6fd67a..5cd12f0 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -93,6 +93,7 @@ using kudu::rpc::RpcController;
using kudu::tserver::TabletServerErrorPB;
using std::shared_ptr;
using std::string;
+using std::unique_ptr;
using std::vector;
using std::weak_ptr;
using strings::Substitute;
@@ -243,7 +244,7 @@ void Peer::SendNextRequest(bool even_if_queue_empty) {
// Capture a shared_ptr reference into the RPC callback so that we're guaranteed
// that this object outlives the RPC.
shared_ptr<Peer> s_this = shared_from_this();
- proxy_->StartTabletCopyAsync(&tc_request_, &tc_response_, &controller_,
+ proxy_->StartTabletCopyAsync(tc_request_, &tc_response_, &controller_,
[s_this]() {
s_this->ProcessTabletCopyResponse();
});
@@ -282,24 +283,38 @@ void Peer::SendNextRequest(bool even_if_queue_empty) {
// Capture a shared_ptr reference into the RPC callback so that we're guaranteed
// that this object outlives the RPC.
shared_ptr<Peer> s_this = shared_from_this();
- proxy_->UpdateAsync(&request_, &response_, &controller_,
+ proxy_->UpdateAsync(request_, &response_, &controller_,
[s_this]() {
s_this->ProcessResponse();
});
}
-Status Peer::StartElection() {
+void Peer::StartElection() {
+ // The async proxy contract is such that the response and RPC controller must
+ // stay in scope until the callback is invoked. Unlike other Peer methods, we
+ // can't guarantee that there's only one outstanding StartElection call at a
+ // time, so we can't store the response and controller as a class member.
+ // Instead, we have to pass them into the callback and free them there.
RunLeaderElectionRequestPB req;
- RunLeaderElectionResponsePB resp;
- RpcController controller;
- req.set_dest_uuid(peer_pb().permanent_uuid());
+ unique_ptr<RunLeaderElectionResponsePB> resp_uniq(new RunLeaderElectionResponsePB());
+ unique_ptr<RpcController> controller_uniq(new RpcController());
+ string peer_uuid = peer_pb().permanent_uuid();
+ req.set_dest_uuid(peer_uuid);
req.set_tablet_id(tablet_id_);
- RETURN_NOT_OK(proxy_->StartElection(&req, &resp, &controller));
- RETURN_NOT_OK(controller.status());
- if (resp.has_error()) {
- return StatusFromPB(response_.error().status());
- }
- return Status::OK();
+
+ // TODO(adar): lack of C++14 move capture makes for ugly code.
+ RunLeaderElectionResponsePB* resp = resp_uniq.release();
+ RpcController* controller = controller_uniq.release();
+ proxy_->StartElectionAsync(req, resp, controller, [resp, controller, peer_uuid]() {
+ unique_ptr<RunLeaderElectionResponsePB> r(resp);
+ unique_ptr<RpcController> c(controller);
+ string error_msg = Substitute("unable to start election on peer $0", peer_uuid);
+ if (!c->status().ok()) {
+ WARN_NOT_OK(c->status(), error_msg);
+ } else if (r->has_error()) {
+ WARN_NOT_OK(StatusFromPB(r->error().status()), error_msg);
+ }
+ });
}
void Peer::ProcessResponse() {
@@ -501,34 +516,35 @@ RpcPeerProxy::RpcPeerProxy(gscoped_ptr<HostPort> hostport,
consensus_proxy_(std::move(DCHECK_NOTNULL(consensus_proxy))) {
}
-void RpcPeerProxy::UpdateAsync(const ConsensusRequestPB* request,
+void RpcPeerProxy::UpdateAsync(const ConsensusRequestPB& request,
ConsensusResponsePB* response,
rpc::RpcController* controller,
const rpc::ResponseCallback& callback) {
controller->set_timeout(MonoDelta::FromMilliseconds(FLAGS_consensus_rpc_timeout_ms));
- consensus_proxy_->UpdateConsensusAsync(*request, response, controller, callback);
+ consensus_proxy_->UpdateConsensusAsync(request, response, controller, callback);
}
-Status RpcPeerProxy::StartElection(const RunLeaderElectionRequestPB* request,
- RunLeaderElectionResponsePB* response,
- rpc::RpcController* controller) {
+void RpcPeerProxy::StartElectionAsync(const RunLeaderElectionRequestPB& request,
+ RunLeaderElectionResponsePB* response,
+ rpc::RpcController* controller,
+ const rpc::ResponseCallback& callback) {
controller->set_timeout(MonoDelta::FromMilliseconds(FLAGS_consensus_rpc_timeout_ms));
- return consensus_proxy_->RunLeaderElection(*request, response, controller);
+ consensus_proxy_->RunLeaderElectionAsync(request, response, controller, callback);
}
-void RpcPeerProxy::RequestConsensusVoteAsync(const VoteRequestPB* request,
+void RpcPeerProxy::RequestConsensusVoteAsync(const VoteRequestPB& request,
VoteResponsePB* response,
rpc::RpcController* controller,
const rpc::ResponseCallback& callback) {
- consensus_proxy_->RequestConsensusVoteAsync(*request, response, controller, callback);
+ consensus_proxy_->RequestConsensusVoteAsync(request, response, controller, callback);
}
-void RpcPeerProxy::StartTabletCopyAsync(const StartTabletCopyRequestPB* request,
+void RpcPeerProxy::StartTabletCopyAsync(const StartTabletCopyRequestPB& request,
StartTabletCopyResponsePB* response,
rpc::RpcController* controller,
const rpc::ResponseCallback& callback) {
controller->set_timeout(MonoDelta::FromMilliseconds(FLAGS_consensus_rpc_timeout_ms));
- consensus_proxy_->StartTabletCopyAsync(*request, response, controller, callback);
+ consensus_proxy_->StartTabletCopyAsync(request, response, controller, callback);
}
string RpcPeerProxy::PeerName() const {
diff --git a/src/kudu/consensus/consensus_peers.h b/src/kudu/consensus/consensus_peers.h
index 7379512..acfdd8a 100644
--- a/src/kudu/consensus/consensus_peers.h
+++ b/src/kudu/consensus/consensus_peers.h
@@ -79,12 +79,14 @@ class Peer : public std::enable_shared_from_this<Peer> {
// status-only requests.
Status SignalRequest(bool even_if_queue_empty = false);
- // Synchronously starts a leader election on this peer.
+ // Starts a leader election on this peer.
+ //
// This method is ad hoc, using this instance's PeerProxy to send the
// StartElection request.
+ //
// The StartElection RPC does not count as the single outstanding request
// that this class tracks.
- Status StartElection();
+ void StartElection();
const RaftPeerPB& peer_pb() const { return peer_pb_; }
@@ -204,23 +206,25 @@ class PeerProxy {
virtual ~PeerProxy() {}
// Sends a request, asynchronously, to a remote peer.
- virtual void UpdateAsync(const ConsensusRequestPB* request,
+ virtual void UpdateAsync(const ConsensusRequestPB& request,
ConsensusResponsePB* response,
rpc::RpcController* controller,
const rpc::ResponseCallback& callback) = 0;
- // Sends a RequestConsensusVote to a remote peer.
- virtual void RequestConsensusVoteAsync(const VoteRequestPB* request,
+ // Asks a peer to vote for a candidate.
+ virtual void RequestConsensusVoteAsync(const VoteRequestPB& request,
VoteResponsePB* response,
rpc::RpcController* controller,
const rpc::ResponseCallback& callback) = 0;
- virtual Status StartElection(const RunLeaderElectionRequestPB* request,
- RunLeaderElectionResponsePB* response,
- rpc::RpcController* controller) = 0;
+ // Instructs a peer to kick off an election to elect itself leader.
+ virtual void StartElectionAsync(const RunLeaderElectionRequestPB& request,
+ RunLeaderElectionResponsePB* response,
+ rpc::RpcController* controller,
+ const rpc::ResponseCallback& callback) = 0;
// Instructs a peer to begin a tablet copy session.
- virtual void StartTabletCopyAsync(const StartTabletCopyRequestPB* /*request*/,
+ virtual void StartTabletCopyAsync(const StartTabletCopyRequestPB& /*request*/,
StartTabletCopyResponsePB* /*response*/,
rpc::RpcController* /*controller*/,
const rpc::ResponseCallback& /*callback*/) {
@@ -250,21 +254,22 @@ class RpcPeerProxy : public PeerProxy {
RpcPeerProxy(gscoped_ptr<HostPort> hostport,
gscoped_ptr<ConsensusServiceProxy> consensus_proxy);
- void UpdateAsync(const ConsensusRequestPB* request,
+ void UpdateAsync(const ConsensusRequestPB& request,
ConsensusResponsePB* response,
rpc::RpcController* controller,
const rpc::ResponseCallback& callback) override;
- void RequestConsensusVoteAsync(const VoteRequestPB* request,
+ void RequestConsensusVoteAsync(const VoteRequestPB& request,
VoteResponsePB* response,
rpc::RpcController* controller,
const rpc::ResponseCallback& callback) override;
- Status StartElection(const RunLeaderElectionRequestPB* request,
- RunLeaderElectionResponsePB* response,
- rpc::RpcController* controller) override;
+ void StartElectionAsync(const RunLeaderElectionRequestPB& request,
+ RunLeaderElectionResponsePB* response,
+ rpc::RpcController* controller,
+ const rpc::ResponseCallback& callback) override;
- void StartTabletCopyAsync(const StartTabletCopyRequestPB* request,
+ void StartTabletCopyAsync(const StartTabletCopyRequestPB& request,
StartTabletCopyResponsePB* response,
rpc::RpcController* controller,
const rpc::ResponseCallback& callback) override;
diff --git a/src/kudu/consensus/leader_election.cc b/src/kudu/consensus/leader_election.cc
index 830c11a..46e85eb 100644
--- a/src/kudu/consensus/leader_election.cc
+++ b/src/kudu/consensus/leader_election.cc
@@ -285,7 +285,7 @@ void LeaderElection::Run() {
state->request.set_dest_uuid(voter_uuid);
state->proxy->RequestConsensusVoteAsync(
- &state->request,
+ state->request,
&state->response,
&state->rpc,
// We use gutil Bind() for the refcounting and boost::bind to adapt the
diff --git a/src/kudu/consensus/peer_manager.cc b/src/kudu/consensus/peer_manager.cc
index 2ec49f0..370f130 100644
--- a/src/kudu/consensus/peer_manager.cc
+++ b/src/kudu/consensus/peer_manager.cc
@@ -116,7 +116,8 @@ Status PeerManager::StartElection(const std::string& uuid) {
if (!peer) {
return Status::NotFound("unknown peer");
}
- return peer->StartElection();
+ peer->StartElection();
+ return Status::OK();
}
void PeerManager::Close() {