You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by ad...@apache.org on 2019/10/22 20:35:29 UTC

[kudu] branch master updated: consensus_peers: make RpcPeerProxy::StartElection async and other cleanup

This is an automated email from the ASF dual-hosted git repository.

adar pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new de74056  consensus_peers: make RpcPeerProxy::StartElection async and other cleanup
de74056 is described below

commit de74056dd4d5d01e86a56958623e0362d97d0ab7
Author: Adar Dembo <ad...@cloudera.com>
AuthorDate: Fri Oct 18 15:49:37 2019 -0700

    consensus_peers: make RpcPeerProxy::StartElection async and other cleanup
    
    It was the only such proxy call that was synchronous. Apart from violating
    POLA, the synchronicity meant that the graceful leadership transfer code
    path occupied a Raft threadpool slot for longer in order to receive the RPC
    response. It's not a hot path so this is by no means a perf improvement, but
    it just seemed unnecessary given that it was only used to log a warning in
    the rare event that the remote couldn't start an election.
    
    Changing this meant adding some extra lifecycle code to Peer::StartElection,
    but I think it's still a net improvement in clarity.
    
    Along the way I also did some other C++11 cleanup.
    
    Change-Id: I6a6df610f1c07adae5a85534d8c6dec324801042
    Reviewed-on: http://gerrit.cloudera.org:8080/14512
    Tested-by: Kudu Jenkins
    Reviewed-by: Andrew Wong <aw...@cloudera.com>
---
 src/kudu/consensus/consensus-test-util.h | 146 +++++++++++++++----------------
 src/kudu/consensus/consensus_peers.cc    |  60 ++++++++-----
 src/kudu/consensus/consensus_peers.h     |  35 ++++----
 src/kudu/consensus/leader_election.cc    |   2 +-
 src/kudu/consensus/peer_manager.cc       |   3 +-
 5 files changed, 134 insertions(+), 112 deletions(-)

diff --git a/src/kudu/consensus/consensus-test-util.h b/src/kudu/consensus/consensus-test-util.h
index cc799c1..ff5aa87 100644
--- a/src/kudu/consensus/consensus-test-util.h
+++ b/src/kudu/consensus/consensus-test-util.h
@@ -150,7 +150,7 @@ class TestPeerProxy : public PeerProxy {
  protected:
   // Register the RPC callback in order to call later.
   // We currently only support one request of each method being in flight at a time.
-  virtual void RegisterCallback(Method method, const rpc::ResponseCallback& callback) {
+  void RegisterCallback(Method method, const rpc::ResponseCallback& callback) {
     std::lock_guard<simple_spinlock> lock(lock_);
     InsertOrDie(&callbacks_, method, callback);
   }
@@ -170,7 +170,7 @@ class TestPeerProxy : public PeerProxy {
     ignore_result(pool_->SubmitFunc(callback));
   }
 
-  virtual void RegisterCallbackAndRespond(Method method, const rpc::ResponseCallback& callback) {
+  void RegisterCallbackAndRespond(Method method, const rpc::ResponseCallback& callback) {
     RegisterCallback(method, callback);
     Respond(method);
   }
@@ -200,7 +200,7 @@ class DelayablePeerProxy : public TestPeerProxy {
     latch_.Reset(1); // Reset for the next time.
   }
 
-  virtual void RespondUnlessDelayed(Method method) {
+  void RespondUnlessDelayed(Method method) {
     {
       std::lock_guard<simple_spinlock> l(lock_);
       if (delay_response_) {
@@ -212,31 +212,32 @@ class DelayablePeerProxy : public TestPeerProxy {
     TestPeerProxy::Respond(method);
   }
 
-  virtual void Respond(Method method) OVERRIDE {
+  void Respond(Method method) override {
     latch_.Wait();   // Wait until strictly after peer would have responded.
     return TestPeerProxy::Respond(method);
   }
 
-  virtual void UpdateAsync(const ConsensusRequestPB* request,
-                           ConsensusResponsePB* response,
-                           rpc::RpcController* controller,
-                           const rpc::ResponseCallback& callback) OVERRIDE {
+  void UpdateAsync(const ConsensusRequestPB& request,
+                   ConsensusResponsePB* response,
+                   rpc::RpcController* controller,
+                   const rpc::ResponseCallback& callback) override {
     RegisterCallback(kUpdate, callback);
     return proxy_->UpdateAsync(request, response, controller,
                                boost::bind(&DelayablePeerProxy::RespondUnlessDelayed,
                                            this, kUpdate));
   }
 
-  virtual Status StartElection(const RunLeaderElectionRequestPB* /*request*/,
-                               RunLeaderElectionResponsePB* /*response*/,
-                               rpc::RpcController* /*controller*/) override {
-    return Status::OK();
+  void StartElectionAsync(const RunLeaderElectionRequestPB& /*request*/,
+                          RunLeaderElectionResponsePB* /*response*/,
+                          rpc::RpcController* /*controller*/,
+                          const rpc::ResponseCallback& callback) override {
+    callback();
   }
 
-  virtual void RequestConsensusVoteAsync(const VoteRequestPB* request,
-                                         VoteResponsePB* response,
-                                         rpc::RpcController* controller,
-                                         const rpc::ResponseCallback& callback) OVERRIDE {
+  void RequestConsensusVoteAsync(const VoteRequestPB& request,
+                                 VoteResponsePB* response,
+                                 rpc::RpcController* controller,
+                                 const rpc::ResponseCallback& callback) override {
     RegisterCallback(kRequestVote, callback);
     return proxy_->RequestConsensusVoteAsync(request, response, controller,
                                              boost::bind(&DelayablePeerProxy::RespondUnlessDelayed,
@@ -262,25 +263,21 @@ class MockedPeerProxy : public TestPeerProxy {
     update_count_(0) {
   }
 
-  virtual void set_update_response(const ConsensusResponsePB& update_response) {
+  void set_update_response(const ConsensusResponsePB& update_response) {
     CHECK(update_response.IsInitialized()) << pb_util::SecureShortDebugString(update_response);
-    {
-      std::lock_guard<simple_spinlock> l(lock_);
-      update_response_ = update_response;
-    }
+    std::lock_guard<simple_spinlock> l(lock_);
+    update_response_ = update_response;
   }
 
-  virtual void set_vote_response(const VoteResponsePB& vote_response) {
-    {
-      std::lock_guard<simple_spinlock> l(lock_);
-      vote_response_ = vote_response;
-    }
+  void set_vote_response(const VoteResponsePB& vote_response) {
+    std::lock_guard<simple_spinlock> l(lock_);
+    vote_response_ = vote_response;
   }
 
-  virtual void UpdateAsync(const ConsensusRequestPB* request,
-                           ConsensusResponsePB* response,
-                           rpc::RpcController* controller,
-                           const rpc::ResponseCallback& callback) OVERRIDE {
+  void UpdateAsync(const ConsensusRequestPB& /*request*/,
+                   ConsensusResponsePB* response,
+                   rpc::RpcController* /*controller*/,
+                   const rpc::ResponseCallback& callback) override {
     {
       std::lock_guard<simple_spinlock> l(lock_);
       update_count_++;
@@ -289,18 +286,19 @@ class MockedPeerProxy : public TestPeerProxy {
     return RegisterCallbackAndRespond(kUpdate, callback);
   }
 
-  virtual void RequestConsensusVoteAsync(const VoteRequestPB* request,
-                                         VoteResponsePB* response,
-                                         rpc::RpcController* controller,
-                                         const rpc::ResponseCallback& callback) OVERRIDE {
+  void RequestConsensusVoteAsync(const VoteRequestPB& /*request*/,
+                                 VoteResponsePB* response,
+                                 rpc::RpcController* /*controller*/,
+                                 const rpc::ResponseCallback& callback) override {
     *response = vote_response_;
     return RegisterCallbackAndRespond(kRequestVote, callback);
   }
 
-  Status StartElection(const RunLeaderElectionRequestPB* /*request*/,
-                     RunLeaderElectionResponsePB* /*response*/,
-                     rpc::RpcController* /*controller*/) override {
-    return Status::OK();
+  void StartElectionAsync(const RunLeaderElectionRequestPB& /*request*/,
+                          RunLeaderElectionResponsePB* /*response*/,
+                          rpc::RpcController* /*controller*/,
+                          const rpc::ResponseCallback& callback) override {
+    callback();
   }
 
   // Return the number of times that UpdateAsync() has been called.
@@ -326,24 +324,24 @@ class NoOpTestPeerProxy : public TestPeerProxy {
     last_received_.CopyFrom(MinimumOpId());
   }
 
-  virtual void UpdateAsync(const ConsensusRequestPB* request,
-                           ConsensusResponsePB* response,
-                           rpc::RpcController* controller,
-                           const rpc::ResponseCallback& callback) OVERRIDE {
+  void UpdateAsync(const ConsensusRequestPB& request,
+                   ConsensusResponsePB* response,
+                   rpc::RpcController* /*controller*/,
+                   const rpc::ResponseCallback& callback) override {
 
     response->Clear();
     {
       std::lock_guard<simple_spinlock> lock(lock_);
-      if (OpIdLessThan(last_received_, request->preceding_id())) {
+      if (OpIdLessThan(last_received_, request.preceding_id())) {
         ConsensusErrorPB* error = response->mutable_status()->mutable_error();
         error->set_code(ConsensusErrorPB::PRECEDING_ENTRY_DIDNT_MATCH);
         StatusToPB(Status::IllegalState(""), error->mutable_status());
-      } else if (request->ops_size() > 0) {
-        last_received_.CopyFrom(request->ops(request->ops_size() - 1).id());
+      } else if (request.ops_size() > 0) {
+        last_received_.CopyFrom(request.ops(request.ops_size() - 1).id());
       }
 
       response->set_responder_uuid(peer_pb_.permanent_uuid());
-      response->set_responder_term(request->caller_term());
+      response->set_responder_term(request.caller_term());
       response->mutable_status()->mutable_last_received()->CopyFrom(last_received_);
       response->mutable_status()->mutable_last_received_current_leader()->CopyFrom(last_received_);
       // We set the last committed index to be the same index as the last received. While
@@ -354,20 +352,21 @@ class NoOpTestPeerProxy : public TestPeerProxy {
     return RegisterCallbackAndRespond(kUpdate, callback);
   }
 
-  virtual Status StartElection(const RunLeaderElectionRequestPB* /*request*/,
-                             RunLeaderElectionResponsePB* /*response*/,
-                             rpc::RpcController* /*controller*/) override {
-    return Status::OK();
+  void StartElectionAsync(const RunLeaderElectionRequestPB& /*request*/,
+                          RunLeaderElectionResponsePB* /*response*/,
+                          rpc::RpcController* /*controller*/,
+                          const rpc::ResponseCallback& callback) override {
+    callback();
   }
 
-  virtual void RequestConsensusVoteAsync(const VoteRequestPB* request,
-                                         VoteResponsePB* response,
-                                         rpc::RpcController* /*controller*/,
-                                         const rpc::ResponseCallback& callback) OVERRIDE {
+  void RequestConsensusVoteAsync(const VoteRequestPB& request,
+                                 VoteResponsePB* response,
+                                 rpc::RpcController* /*controller*/,
+                                 const rpc::ResponseCallback& callback) override {
     {
       std::lock_guard<simple_spinlock> lock(lock_);
       response->set_responder_uuid(peer_pb_.permanent_uuid());
-      response->set_responder_term(request->candidate_term());
+      response->set_responder_term(request.candidate_term());
       response->set_vote_granted(true);
     }
     return RegisterCallbackAndRespond(kRequestVote, callback);
@@ -473,25 +472,26 @@ class LocalTestPeerProxy : public TestPeerProxy {
         peers_(peers),
         miss_comm_(false) {}
 
-  virtual void UpdateAsync(const ConsensusRequestPB* request,
-                           ConsensusResponsePB* response,
-                           rpc::RpcController* controller,
-                           const rpc::ResponseCallback& callback) OVERRIDE {
+  void UpdateAsync(const ConsensusRequestPB& request,
+                   ConsensusResponsePB* response,
+                   rpc::RpcController* /*controller*/,
+                   const rpc::ResponseCallback& callback) override {
     RegisterCallback(kUpdate, callback);
     CHECK_OK(pool_->SubmitFunc(boost::bind(&LocalTestPeerProxy::SendUpdateRequest,
                                            this, request, response)));
   }
 
-  Status StartElection(const RunLeaderElectionRequestPB* /*request*/,
-                       RunLeaderElectionResponsePB* /*response*/,
-                       rpc::RpcController* /*controller*/) override {
-    return Status::OK();
+  void StartElectionAsync(const RunLeaderElectionRequestPB& /*request*/,
+                          RunLeaderElectionResponsePB* /*response*/,
+                          rpc::RpcController* /*controller*/,
+                          const rpc::ResponseCallback& callback) override {
+    callback();
   }
 
-  virtual void RequestConsensusVoteAsync(const VoteRequestPB* request,
-                                         VoteResponsePB* response,
-                                         rpc::RpcController* /*controller*/,
-                                         const rpc::ResponseCallback& callback) OVERRIDE {
+  void RequestConsensusVoteAsync(const VoteRequestPB& request,
+                                 VoteResponsePB* response,
+                                 rpc::RpcController* /*controller*/,
+                                 const rpc::ResponseCallback& callback) override {
     RegisterCallback(kRequestVote, callback);
     CHECK_OK(pool_->SubmitFunc(boost::bind(&LocalTestPeerProxy::SendVoteRequest,
                                            this, request, response)));
@@ -505,7 +505,7 @@ class LocalTestPeerProxy : public TestPeerProxy {
   }
 
   template<class Request, class Response>
-  void RespondOrMissResponse(Request* request,
+  void RespondOrMissResponse(const Request& request,
                              const Response& response_temp,
                              Response* final_response,
                              Method method) {
@@ -517,7 +517,7 @@ class LocalTestPeerProxy : public TestPeerProxy {
       miss_comm_ = false;
     }
     if (PREDICT_FALSE(miss_comm_copy)) {
-      VLOG(2) << this << ": injecting fault on " << pb_util::SecureShortDebugString(*request);
+      VLOG(2) << this << ": injecting fault on " << pb_util::SecureShortDebugString(request);
       SetResponseError(Status::IOError("Artificial error caused by communication "
           "failure injection."), final_response);
     } else {
@@ -526,12 +526,12 @@ class LocalTestPeerProxy : public TestPeerProxy {
     Respond(method);
   }
 
-  void SendUpdateRequest(const ConsensusRequestPB* request,
+  void SendUpdateRequest(const ConsensusRequestPB& request,
                          ConsensusResponsePB* response) {
     // Copy the request and the response for the other peer so that ownership
     // remains as close to the dist. impl. as possible.
     ConsensusRequestPB other_peer_req;
-    other_peer_req.CopyFrom(*request);
+    other_peer_req.CopyFrom(request);
 
     // Give the other peer a clean response object to write to.
     ConsensusResponsePB other_peer_resp;
@@ -558,13 +558,13 @@ class LocalTestPeerProxy : public TestPeerProxy {
 
 
 
-  void SendVoteRequest(const VoteRequestPB* request,
+  void SendVoteRequest(const VoteRequestPB& request,
                        VoteResponsePB* response) {
 
     // Copy the request and the response for the other peer so that ownership
     // remains as close to the dist. impl. as possible.
     VoteRequestPB other_peer_req;
-    other_peer_req.CopyFrom(*request);
+    other_peer_req.CopyFrom(request);
     VoteResponsePB other_peer_resp;
     other_peer_resp.CopyFrom(*response);
 
diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc
index e6fd67a..5cd12f0 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -93,6 +93,7 @@ using kudu::rpc::RpcController;
 using kudu::tserver::TabletServerErrorPB;
 using std::shared_ptr;
 using std::string;
+using std::unique_ptr;
 using std::vector;
 using std::weak_ptr;
 using strings::Substitute;
@@ -243,7 +244,7 @@ void Peer::SendNextRequest(bool even_if_queue_empty) {
       // Capture a shared_ptr reference into the RPC callback so that we're guaranteed
       // that this object outlives the RPC.
       shared_ptr<Peer> s_this = shared_from_this();
-      proxy_->StartTabletCopyAsync(&tc_request_, &tc_response_, &controller_,
+      proxy_->StartTabletCopyAsync(tc_request_, &tc_response_, &controller_,
                                    [s_this]() {
                                      s_this->ProcessTabletCopyResponse();
                                    });
@@ -282,24 +283,38 @@ void Peer::SendNextRequest(bool even_if_queue_empty) {
   // Capture a shared_ptr reference into the RPC callback so that we're guaranteed
   // that this object outlives the RPC.
   shared_ptr<Peer> s_this = shared_from_this();
-  proxy_->UpdateAsync(&request_, &response_, &controller_,
+  proxy_->UpdateAsync(request_, &response_, &controller_,
                       [s_this]() {
                         s_this->ProcessResponse();
                       });
 }
 
-Status Peer::StartElection() {
+void Peer::StartElection() {
+  // The async proxy contract is such that the response and RPC controller must
+  // stay in scope until the callback is invoked. Unlike other Peer methods, we
+  // can't guarantee that there's only one outstanding StartElection call at a
+  // time, so we can't store the response and controller as a class member.
+  // Instead, we have to pass them into the callback and free them there.
   RunLeaderElectionRequestPB req;
-  RunLeaderElectionResponsePB resp;
-  RpcController controller;
-  req.set_dest_uuid(peer_pb().permanent_uuid());
+  unique_ptr<RunLeaderElectionResponsePB> resp_uniq(new RunLeaderElectionResponsePB());
+  unique_ptr<RpcController> controller_uniq(new RpcController());
+  string peer_uuid = peer_pb().permanent_uuid();
+  req.set_dest_uuid(peer_uuid);
   req.set_tablet_id(tablet_id_);
-  RETURN_NOT_OK(proxy_->StartElection(&req, &resp, &controller));
-  RETURN_NOT_OK(controller.status());
-  if (resp.has_error()) {
-    return StatusFromPB(response_.error().status());
-  }
-  return Status::OK();
+
+  // TODO(adar): lack of C++14 move capture makes for ugly code.
+  RunLeaderElectionResponsePB* resp = resp_uniq.release();
+  RpcController* controller = controller_uniq.release();
+  proxy_->StartElectionAsync(req, resp, controller, [resp, controller, peer_uuid]() {
+      unique_ptr<RunLeaderElectionResponsePB> r(resp);
+      unique_ptr<RpcController> c(controller);
+      string error_msg = Substitute("unable to start election on peer $0", peer_uuid);
+      if (!c->status().ok()) {
+        WARN_NOT_OK(c->status(), error_msg);
+      } else if (r->has_error()) {
+        WARN_NOT_OK(StatusFromPB(r->error().status()), error_msg);
+      }
+    });
 }
 
 void Peer::ProcessResponse() {
@@ -501,34 +516,35 @@ RpcPeerProxy::RpcPeerProxy(gscoped_ptr<HostPort> hostport,
       consensus_proxy_(std::move(DCHECK_NOTNULL(consensus_proxy))) {
 }
 
-void RpcPeerProxy::UpdateAsync(const ConsensusRequestPB* request,
+void RpcPeerProxy::UpdateAsync(const ConsensusRequestPB& request,
                                ConsensusResponsePB* response,
                                rpc::RpcController* controller,
                                const rpc::ResponseCallback& callback) {
   controller->set_timeout(MonoDelta::FromMilliseconds(FLAGS_consensus_rpc_timeout_ms));
-  consensus_proxy_->UpdateConsensusAsync(*request, response, controller, callback);
+  consensus_proxy_->UpdateConsensusAsync(request, response, controller, callback);
 }
 
-Status RpcPeerProxy::StartElection(const RunLeaderElectionRequestPB* request,
-                                 RunLeaderElectionResponsePB* response,
-                                 rpc::RpcController* controller) {
+void RpcPeerProxy::StartElectionAsync(const RunLeaderElectionRequestPB& request,
+                                      RunLeaderElectionResponsePB* response,
+                                      rpc::RpcController* controller,
+                                      const rpc::ResponseCallback& callback) {
   controller->set_timeout(MonoDelta::FromMilliseconds(FLAGS_consensus_rpc_timeout_ms));
-  return consensus_proxy_->RunLeaderElection(*request, response, controller);
+  consensus_proxy_->RunLeaderElectionAsync(request, response, controller, callback);
 }
 
-void RpcPeerProxy::RequestConsensusVoteAsync(const VoteRequestPB* request,
+void RpcPeerProxy::RequestConsensusVoteAsync(const VoteRequestPB& request,
                                              VoteResponsePB* response,
                                              rpc::RpcController* controller,
                                              const rpc::ResponseCallback& callback) {
-  consensus_proxy_->RequestConsensusVoteAsync(*request, response, controller, callback);
+  consensus_proxy_->RequestConsensusVoteAsync(request, response, controller, callback);
 }
 
-void RpcPeerProxy::StartTabletCopyAsync(const StartTabletCopyRequestPB* request,
+void RpcPeerProxy::StartTabletCopyAsync(const StartTabletCopyRequestPB& request,
                                         StartTabletCopyResponsePB* response,
                                         rpc::RpcController* controller,
                                         const rpc::ResponseCallback& callback) {
   controller->set_timeout(MonoDelta::FromMilliseconds(FLAGS_consensus_rpc_timeout_ms));
-  consensus_proxy_->StartTabletCopyAsync(*request, response, controller, callback);
+  consensus_proxy_->StartTabletCopyAsync(request, response, controller, callback);
 }
 
 string RpcPeerProxy::PeerName() const {
diff --git a/src/kudu/consensus/consensus_peers.h b/src/kudu/consensus/consensus_peers.h
index 7379512..acfdd8a 100644
--- a/src/kudu/consensus/consensus_peers.h
+++ b/src/kudu/consensus/consensus_peers.h
@@ -79,12 +79,14 @@ class Peer : public std::enable_shared_from_this<Peer> {
   // status-only requests.
   Status SignalRequest(bool even_if_queue_empty = false);
 
-  // Synchronously starts a leader election on this peer.
+  // Starts a leader election on this peer.
+  //
   // This method is ad hoc, using this instance's PeerProxy to send the
   // StartElection request.
+  //
   // The StartElection RPC does not count as the single outstanding request
   // that this class tracks.
-  Status StartElection();
+  void StartElection();
 
   const RaftPeerPB& peer_pb() const { return peer_pb_; }
 
@@ -204,23 +206,25 @@ class PeerProxy {
   virtual ~PeerProxy() {}
 
   // Sends a request, asynchronously, to a remote peer.
-  virtual void UpdateAsync(const ConsensusRequestPB* request,
+  virtual void UpdateAsync(const ConsensusRequestPB& request,
                            ConsensusResponsePB* response,
                            rpc::RpcController* controller,
                            const rpc::ResponseCallback& callback) = 0;
 
-  // Sends a RequestConsensusVote to a remote peer.
-  virtual void RequestConsensusVoteAsync(const VoteRequestPB* request,
+  // Asks a peer to vote for a candidate.
+  virtual void RequestConsensusVoteAsync(const VoteRequestPB& request,
                                          VoteResponsePB* response,
                                          rpc::RpcController* controller,
                                          const rpc::ResponseCallback& callback) = 0;
 
-  virtual Status StartElection(const RunLeaderElectionRequestPB* request,
-                               RunLeaderElectionResponsePB* response,
-                               rpc::RpcController* controller) = 0;
+  // Instructs a peer to kick off an election to elect itself leader.
+  virtual void StartElectionAsync(const RunLeaderElectionRequestPB& request,
+                                  RunLeaderElectionResponsePB* response,
+                                  rpc::RpcController* controller,
+                                  const rpc::ResponseCallback& callback) = 0;
 
   // Instructs a peer to begin a tablet copy session.
-  virtual void StartTabletCopyAsync(const StartTabletCopyRequestPB* /*request*/,
+  virtual void StartTabletCopyAsync(const StartTabletCopyRequestPB& /*request*/,
                                     StartTabletCopyResponsePB* /*response*/,
                                     rpc::RpcController* /*controller*/,
                                     const rpc::ResponseCallback& /*callback*/) {
@@ -250,21 +254,22 @@ class RpcPeerProxy : public PeerProxy {
   RpcPeerProxy(gscoped_ptr<HostPort> hostport,
                gscoped_ptr<ConsensusServiceProxy> consensus_proxy);
 
-  void UpdateAsync(const ConsensusRequestPB* request,
+  void UpdateAsync(const ConsensusRequestPB& request,
                    ConsensusResponsePB* response,
                    rpc::RpcController* controller,
                    const rpc::ResponseCallback& callback) override;
 
-  void RequestConsensusVoteAsync(const VoteRequestPB* request,
+  void RequestConsensusVoteAsync(const VoteRequestPB& request,
                                  VoteResponsePB* response,
                                  rpc::RpcController* controller,
                                  const rpc::ResponseCallback& callback) override;
 
-  Status StartElection(const RunLeaderElectionRequestPB* request,
-                       RunLeaderElectionResponsePB* response,
-                       rpc::RpcController* controller) override;
+  void StartElectionAsync(const RunLeaderElectionRequestPB& request,
+                          RunLeaderElectionResponsePB* response,
+                          rpc::RpcController* controller,
+                          const rpc::ResponseCallback& callback) override;
 
-  void StartTabletCopyAsync(const StartTabletCopyRequestPB* request,
+  void StartTabletCopyAsync(const StartTabletCopyRequestPB& request,
                             StartTabletCopyResponsePB* response,
                             rpc::RpcController* controller,
                             const rpc::ResponseCallback& callback) override;
diff --git a/src/kudu/consensus/leader_election.cc b/src/kudu/consensus/leader_election.cc
index 830c11a..46e85eb 100644
--- a/src/kudu/consensus/leader_election.cc
+++ b/src/kudu/consensus/leader_election.cc
@@ -285,7 +285,7 @@ void LeaderElection::Run() {
     state->request.set_dest_uuid(voter_uuid);
 
     state->proxy->RequestConsensusVoteAsync(
-        &state->request,
+        state->request,
         &state->response,
         &state->rpc,
         // We use gutil Bind() for the refcounting and boost::bind to adapt the
diff --git a/src/kudu/consensus/peer_manager.cc b/src/kudu/consensus/peer_manager.cc
index 2ec49f0..370f130 100644
--- a/src/kudu/consensus/peer_manager.cc
+++ b/src/kudu/consensus/peer_manager.cc
@@ -116,7 +116,8 @@ Status PeerManager::StartElection(const std::string& uuid) {
   if (!peer) {
     return Status::NotFound("unknown peer");
   }
-  return peer->StartElection();
+  peer->StartElection();
+  return Status::OK();
 }
 
 void PeerManager::Close() {