You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kudu.apache.org by mp...@apache.org on 2018/10/18 02:09:12 UTC

[2/3] kudu git commit: KUDU-2245 Graceful leadership transfer

KUDU-2245 Graceful leadership transfer

This patch implements graceful leadership transfer, as described in the
original Raft thesis. It has the following steps:

1. An admin client sends a request to the tablet leader for it to
   transfer leadership. The client can indicate a specific voter that it
   wants to become the leader, or it can allow the current leader to
   choose its successor.
2. The leader receives the request and begins a leader transfer period.
   During a leader transfer period, the leader does not accept writes or
   config change requests. This allows followers to catch up to the
   leader. A background timer expires the transfer period after one
   election timeout, since clients should be able to ride over
   interruptions in service lasting at least that long. If another
   request to transfer leadership is received during a transfer period,
   it will be rejected.
3. During the transfer period, the leader continues to update peers.
   When it receives a response from a peer, it checks if that peer is
   a voter and fully caught up to the leader's log. If it is, and if it
   is the designated successor if one was provided, the leader signals
   the peer to start an election, which it should win. If no eligible
   successor appears, the transfer period expires and the leader resumes
   normal operation.

This is an improvement over the current leader step down method, which
causes the leader to simply relinquish leadership and snooze its
election timer for an extra long period, so another voter will likely
become leader. Leadership transfer should usually be much faster and it
allows the client to select the new leader among current voters.
However, note that it does not provide strictly better guarantees- it is
still possible that leadership will not be transferred.

I ran TestRepeatLeaderStepDown and TestGracefulLeaderStepDown 1000 times
and 200 times each, in debug and TSAN modes, with 4 stress threads, and
saw no failures.

Change-Id: Ic97343af9eb349556424c999799ed5e2941f0083
Reviewed-on: http://gerrit.cloudera.org:8080/11251
Tested-by: Kudu Jenkins
Reviewed-by: Mike Percy <mp...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kudu/repo
Commit: http://git-wip-us.apache.org/repos/asf/kudu/commit/e61de497
Tree: http://git-wip-us.apache.org/repos/asf/kudu/tree/e61de497
Diff: http://git-wip-us.apache.org/repos/asf/kudu/diff/e61de497

Branch: refs/heads/master
Commit: e61de497c487f33fe1a99c9df6dfb697e8efb757
Parents: f0ed88b
Author: Will Berkeley <wd...@gmail.org>
Authored: Thu Aug 9 13:44:00 2018 -0700
Committer: Will Berkeley <wd...@gmail.com>
Committed: Thu Oct 18 00:44:53 2018 +0000

----------------------------------------------------------------------
 src/kudu/consensus/consensus-test-util.h        |  28 +-
 src/kudu/consensus/consensus.proto              |  18 +-
 src/kudu/consensus/consensus_peers.cc           |  37 +-
 src/kudu/consensus/consensus_peers.h            |  31 +-
 src/kudu/consensus/consensus_queue.cc           |  60 +++
 src/kudu/consensus/consensus_queue.h            |  25 +-
 src/kudu/consensus/peer_manager.cc              |  12 +
 src/kudu/consensus/peer_manager.h               |   3 +
 src/kudu/consensus/raft_consensus.cc            | 100 +++++
 src/kudu/consensus/raft_consensus.h             |  33 +-
 .../integration-tests/raft_consensus-itest.cc   |  68 ++++
 src/kudu/tools/kudu-admin-test.cc               | 365 +++++++++++++++++--
 src/kudu/tools/kudu-tool-test.cc                |   2 +-
 src/kudu/tools/tool_action_tablet.cc            |  57 ++-
 src/kudu/tools/tool_replica_util.cc             |  14 +
 src/kudu/tools/tool_replica_util.h              |  22 +-
 src/kudu/tserver/tablet_service.cc              |  33 +-
 17 files changed, 840 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kudu/blob/e61de497/src/kudu/consensus/consensus-test-util.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus-test-util.h b/src/kudu/consensus/consensus-test-util.h
index d6dc3f0..a0a0e42 100644
--- a/src/kudu/consensus/consensus-test-util.h
+++ b/src/kudu/consensus/consensus-test-util.h
@@ -227,6 +227,12 @@ class DelayablePeerProxy : public TestPeerProxy {
                                            this, kUpdate));
   }
 
+  virtual Status StartElection(const RunLeaderElectionRequestPB* /*request*/,
+                               RunLeaderElectionResponsePB* /*response*/,
+                               rpc::RpcController* /*controller*/) override {
+    return Status::OK();
+  }
+
   virtual void RequestConsensusVoteAsync(const VoteRequestPB* request,
                                          VoteResponsePB* response,
                                          rpc::RpcController* controller,
@@ -291,6 +297,12 @@ class MockedPeerProxy : public TestPeerProxy {
     return RegisterCallbackAndRespond(kRequestVote, callback);
   }
 
+  Status StartElection(const RunLeaderElectionRequestPB* /*request*/,
+                     RunLeaderElectionResponsePB* /*response*/,
+                     rpc::RpcController* /*controller*/) override {
+    return Status::OK();
+  }
+
   // Return the number of times that UpdateAsync() has been called.
   int update_count() const {
     std::lock_guard<simple_spinlock> l(lock_);
@@ -342,9 +354,15 @@ class NoOpTestPeerProxy : public TestPeerProxy {
     return RegisterCallbackAndRespond(kUpdate, callback);
   }
 
+  virtual Status StartElection(const RunLeaderElectionRequestPB* /*request*/,
+                             RunLeaderElectionResponsePB* /*response*/,
+                             rpc::RpcController* /*controller*/) override {
+    return Status::OK();
+  }
+
   virtual void RequestConsensusVoteAsync(const VoteRequestPB* request,
                                          VoteResponsePB* response,
-                                         rpc::RpcController* controller,
+                                         rpc::RpcController* /*controller*/,
                                          const rpc::ResponseCallback& callback) OVERRIDE {
     {
       std::lock_guard<simple_spinlock> lock(lock_);
@@ -464,9 +482,15 @@ class LocalTestPeerProxy : public TestPeerProxy {
                                            this, request, response)));
   }
 
+  Status StartElection(const RunLeaderElectionRequestPB* /*request*/,
+                       RunLeaderElectionResponsePB* /*response*/,
+                       rpc::RpcController* /*controller*/) override {
+    return Status::OK();
+  }
+
   virtual void RequestConsensusVoteAsync(const VoteRequestPB* request,
                                          VoteResponsePB* response,
-                                         rpc::RpcController* controller,
+                                         rpc::RpcController* /*controller*/,
                                          const rpc::ResponseCallback& callback) OVERRIDE {
     RegisterCallback(kRequestVote, callback);
     CHECK_OK(pool_->SubmitFunc(boost::bind(&LocalTestPeerProxy::SendVoteRequest,

http://git-wip-us.apache.org/repos/asf/kudu/blob/e61de497/src/kudu/consensus/consensus.proto
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus.proto b/src/kudu/consensus/consensus.proto
index 68877ca..b6a2d9a 100644
--- a/src/kudu/consensus/consensus.proto
+++ b/src/kudu/consensus/consensus.proto
@@ -445,12 +445,28 @@ message RunLeaderElectionResponsePB {
   optional tserver.TabletServerErrorPB error = 1;
 }
 
+enum LeaderStepDownMode {
+  // The leader will immediately step down.
+  ABRUPT = 1;
+  // The leader will attempt to arrange for a successor to be elected ASAP.
+  // If it cannot do so, it remains leader.
+  GRACEFUL = 2;
+}
+
 message LeaderStepDownRequestPB {
-  // UUID of server this request is addressed to.
+  // UUID of the server this request is addressed to.
   optional bytes dest_uuid = 2;
 
   // The id of the tablet.
   required bytes tablet_id = 1;
+
+  // How the leader will attempt to relinquish its leadership.
+  optional LeaderStepDownMode mode = 3;
+
+  // The UUID of the peer that should be promoted to leader in GRACEFUL mode.
+  // If unset, the leader will select a successor.
+  // In ABRUPT mode, it is illegal to set this field.
+  optional bytes new_leader_uuid = 4;
 }
 
 message LeaderStepDownResponsePB {

http://git-wip-us.apache.org/repos/asf/kudu/blob/e61de497/src/kudu/consensus/consensus_peers.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers.cc b/src/kudu/consensus/consensus_peers.cc
index c7cf2fe..a99b2c0 100644
--- a/src/kudu/consensus/consensus_peers.cc
+++ b/src/kudu/consensus/consensus_peers.cc
@@ -239,10 +239,10 @@ void Peer::SendNextRequest(bool even_if_queue_empty) {
       // Capture a shared_ptr reference into the RPC callback so that we're guaranteed
       // that this object outlives the RPC.
       shared_ptr<Peer> s_this = shared_from_this();
-      proxy_->StartTabletCopy(&tc_request_, &tc_response_, &controller_,
-                              [s_this]() {
-                                s_this->ProcessTabletCopyResponse();
-                              });
+      proxy_->StartTabletCopyAsync(&tc_request_, &tc_response_, &controller_,
+                                   [s_this]() {
+                                     s_this->ProcessTabletCopyResponse();
+                                   });
     } else {
       LOG_WITH_PREFIX_UNLOCKED(WARNING) << "Unable to generate Tablet Copy request for peer: "
                                         << s.ToString();
@@ -284,6 +284,20 @@ void Peer::SendNextRequest(bool even_if_queue_empty) {
                       });
 }
 
+Status Peer::StartElection() {
+  RunLeaderElectionRequestPB req;
+  RunLeaderElectionResponsePB resp;
+  RpcController controller;
+  req.set_dest_uuid(peer_pb().permanent_uuid());
+  req.set_tablet_id(tablet_id_);
+  RETURN_NOT_OK(proxy_->StartElection(&req, &resp, &controller));
+  RETURN_NOT_OK(controller.status());
+  if (resp.has_error()) {
+    return StatusFromPB(response_.error().status());
+  }
+  return Status::OK();
+}
+
 void Peer::ProcessResponse() {
   // Note: This method runs on the reactor thread.
   std::unique_lock<simple_spinlock> lock(peer_lock_);
@@ -481,6 +495,13 @@ void RpcPeerProxy::UpdateAsync(const ConsensusRequestPB* request,
   consensus_proxy_->UpdateConsensusAsync(*request, response, controller, callback);
 }
 
+Status RpcPeerProxy::StartElection(const RunLeaderElectionRequestPB* request,
+                                 RunLeaderElectionResponsePB* response,
+                                 rpc::RpcController* controller) {
+  controller->set_timeout(MonoDelta::FromMilliseconds(FLAGS_consensus_rpc_timeout_ms));
+  return consensus_proxy_->RunLeaderElection(*request, response, controller);
+}
+
 void RpcPeerProxy::RequestConsensusVoteAsync(const VoteRequestPB* request,
                                              VoteResponsePB* response,
                                              rpc::RpcController* controller,
@@ -488,10 +509,10 @@ void RpcPeerProxy::RequestConsensusVoteAsync(const VoteRequestPB* request,
   consensus_proxy_->RequestConsensusVoteAsync(*request, response, controller, callback);
 }
 
-void RpcPeerProxy::StartTabletCopy(const StartTabletCopyRequestPB* request,
-                                   StartTabletCopyResponsePB* response,
-                                   rpc::RpcController* controller,
-                                   const rpc::ResponseCallback& callback) {
+void RpcPeerProxy::StartTabletCopyAsync(const StartTabletCopyRequestPB* request,
+                                        StartTabletCopyResponsePB* response,
+                                        rpc::RpcController* controller,
+                                        const rpc::ResponseCallback& callback) {
   controller->set_timeout(MonoDelta::FromMilliseconds(FLAGS_consensus_rpc_timeout_ms));
   consensus_proxy_->StartTabletCopyAsync(*request, response, controller, callback);
 }

http://git-wip-us.apache.org/repos/asf/kudu/blob/e61de497/src/kudu/consensus/consensus_peers.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_peers.h b/src/kudu/consensus/consensus_peers.h
index 7812687..b111fe9 100644
--- a/src/kudu/consensus/consensus_peers.h
+++ b/src/kudu/consensus/consensus_peers.h
@@ -78,6 +78,13 @@ class Peer : public std::enable_shared_from_this<Peer> {
   // status-only requests.
   Status SignalRequest(bool even_if_queue_empty = false);
 
+  // Synchronously starts a leader election on this peer.
+  // This method is ad hoc, using this instance's PeerProxy to send the
+  // StartElection request.
+  // The StartElection RPC does not count as the single outstanding request
+  // that this class tracks.
+  Status StartElection();
+
   const RaftPeerPB& peer_pb() const { return peer_pb_; }
 
   // Stop sending requests and periodic heartbeats.
@@ -208,11 +215,15 @@ class PeerProxy {
                                          rpc::RpcController* controller,
                                          const rpc::ResponseCallback& callback) = 0;
 
+  virtual Status StartElection(const RunLeaderElectionRequestPB* request,
+                               RunLeaderElectionResponsePB* response,
+                               rpc::RpcController* controller) = 0;
+
   // Instructs a peer to begin a tablet copy session.
-  virtual void StartTabletCopy(const StartTabletCopyRequestPB* request,
-                                    StartTabletCopyResponsePB* response,
-                                    rpc::RpcController* controller,
-                                    const rpc::ResponseCallback& callback) {
+  virtual void StartTabletCopyAsync(const StartTabletCopyRequestPB* /*request*/,
+                                    StartTabletCopyResponsePB* /*response*/,
+                                    rpc::RpcController* /*controller*/,
+                                    const rpc::ResponseCallback& /*callback*/) {
     LOG(DFATAL) << "Not implemented";
   }
 
@@ -249,10 +260,14 @@ class RpcPeerProxy : public PeerProxy {
                                  rpc::RpcController* controller,
                                  const rpc::ResponseCallback& callback) override;
 
-  void StartTabletCopy(const StartTabletCopyRequestPB* request,
-                       StartTabletCopyResponsePB* response,
-                       rpc::RpcController* controller,
-                       const rpc::ResponseCallback& callback) override;
+  Status StartElection(const RunLeaderElectionRequestPB* request,
+                       RunLeaderElectionResponsePB* response,
+                       rpc::RpcController* controller) override;
+
+  void StartTabletCopyAsync(const StartTabletCopyRequestPB* request,
+                            StartTabletCopyResponsePB* response,
+                            rpc::RpcController* controller,
+                            const rpc::ResponseCallback& callback) override;
 
   std::string PeerName() const override;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/e61de497/src/kudu/consensus/consensus_queue.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.cc b/src/kudu/consensus/consensus_queue.cc
index eb366ef..2c64e1c 100644
--- a/src/kudu/consensus/consensus_queue.cc
+++ b/src/kudu/consensus/consensus_queue.cc
@@ -161,6 +161,7 @@ PeerMessageQueue::PeerMessageQueue(const scoped_refptr<MetricEntity>& metric_ent
     : raft_pool_observers_token_(std::move(raft_pool_observers_token)),
       local_peer_pb_(std::move(local_peer_pb)),
       tablet_id_(std::move(tablet_id)),
+      successor_watch_in_progress_(false),
       log_cache_(metric_entity, std::move(log), local_peer_pb_.permanent_uuid(), tablet_id_),
       metrics_(metric_entity),
       time_manager_(std::move(time_manager)) {
@@ -867,6 +868,18 @@ void PeerMessageQueue::AdvanceQueueWatermark(const char* type,
   }
 }
 
+void PeerMessageQueue::BeginWatchForSuccessor(
+    const boost::optional<string>& successor_uuid) {
+  std::lock_guard<simple_spinlock> l(queue_lock_);
+  successor_watch_in_progress_ = true;
+  designated_successor_uuid_ = successor_uuid;
+}
+
+void PeerMessageQueue::EndWatchForSuccessor() {
+  std::lock_guard<simple_spinlock> l(queue_lock_);
+  successor_watch_in_progress_ = false;
+}
+
 void PeerMessageQueue::UpdateFollowerWatermarks(int64_t committed_index,
                                                 int64_t all_replicated_index) {
   std::lock_guard<simple_spinlock> l(queue_lock_);
@@ -1025,6 +1038,42 @@ void PeerMessageQueue::PromoteIfNeeded(TrackedPeer* peer, const TrackedPeer& pre
   }
 }
 
+void PeerMessageQueue::TransferLeadershipIfNeeded(const TrackedPeer& peer,
+                                                  const ConsensusStatusPB& status) {
+  DCHECK(queue_lock_.is_locked());
+  if (!successor_watch_in_progress_) {
+    return;
+  }
+
+  if (designated_successor_uuid_ && peer.uuid() != designated_successor_uuid_.get()) {
+    return;
+  }
+
+  if (queue_state_.mode != PeerMessageQueue::LEADER ||
+      peer.last_exchange_status != PeerStatus::OK) {
+    return;
+  }
+
+  RaftPeerPB* peer_pb;
+  Status s = GetRaftConfigMember(DCHECK_NOTNULL(queue_state_.active_config.get()),
+                                 peer.uuid(), &peer_pb);
+  if (!s.ok() || peer_pb->member_type() != RaftPeerPB::VOTER) {
+    return;
+  }
+
+  bool peer_caught_up =
+      !OpIdEquals(status.last_received_current_leader(), MinimumOpId()) &&
+      OpIdEquals(status.last_received_current_leader(), queue_state_.last_appended);
+  if (!peer_caught_up) {
+    return;
+  }
+
+  VLOG(1) << "Successor watch: peer " << peer.uuid() << " is caught up to "
+          << "the leader at OpId " << OpIdToString(status.last_received_current_leader());
+  successor_watch_in_progress_ = false;
+  NotifyObserversOfSuccessor(peer.uuid());
+}
+
 bool PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
                                         const ConsensusResponsePB& response) {
   DCHECK(response.IsInitialized()) << "Error: Uninitialized: "
@@ -1085,6 +1134,7 @@ bool PeerMessageQueue::ResponseFromPeer(const std::string& peer_uuid,
       // Check if the peer is a NON_VOTER candidate ready for promotion.
       PromoteIfNeeded(peer, prev_peer_state, status);
 
+      TransferLeadershipIfNeeded(*peer, status);
     } else if (!OpIdEquals(status.last_received_current_leader(), MinimumOpId())) {
       // Their log may have diverged from ours, however we are in the process
       // of replicating our ops to them, so continue doing so. Eventually, we
@@ -1397,6 +1447,16 @@ void PeerMessageQueue::NotifyObserversOfPeerToPromote(const string& peer_uuid) {
       LogPrefixUnlocked() + "Unable to notify RaftConsensus of peer to promote.");
 }
 
+void PeerMessageQueue::NotifyObserversOfSuccessor(const string& peer_uuid) {
+  DCHECK(queue_lock_.is_locked());
+  WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure(
+      Bind(&PeerMessageQueue::NotifyObserversTask, Unretained(this),
+           [=](PeerMessageQueueObserver* observer) {
+             observer->NotifyPeerToStartElection(peer_uuid);
+           })),
+      LogPrefixUnlocked() + "Unable to notify RaftConsensus of available successor.");
+}
+
 void PeerMessageQueue::NotifyObserversOfPeerHealthChange() {
   WARN_NOT_OK(raft_pool_observers_token_->SubmitClosure(
       Bind(&PeerMessageQueue::NotifyObserversTask, Unretained(this),

http://git-wip-us.apache.org/repos/asf/kudu/blob/e61de497/src/kudu/consensus/consensus_queue.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/consensus_queue.h b/src/kudu/consensus/consensus_queue.h
index 844fefd..d80d200 100644
--- a/src/kudu/consensus/consensus_queue.h
+++ b/src/kudu/consensus/consensus_queue.h
@@ -358,6 +358,13 @@ class PeerMessageQueue {
 
   ~PeerMessageQueue();
 
+  // Begin or end the watch for an eligible successor. If 'successor_uuid' is
+  // boost::none, the queue will notify its observers when 'successor_uuid' is
+  // caught up to the leader. Otherwise, it will notify its observers
+  // with the UUID of the first voter that is caught up.
+  void BeginWatchForSuccessor(const boost::optional<std::string>& successor_uuid);
+  void EndWatchForSuccessor();
+
  private:
   FRIEND_TEST(ConsensusQueueTest, TestQueueAdvancesCommittedIndex);
   FRIEND_TEST(ConsensusQueueTest, TestQueueMovesWatermarksBackward);
@@ -458,6 +465,14 @@ class PeerMessageQueue {
   void PromoteIfNeeded(TrackedPeer* peer, const TrackedPeer& prev_peer_state,
                        const ConsensusStatusPB& status);
 
+  // If there is a graceful leadership change underway, notify queue observers
+  // to initiate leadership transfer to the specified peer under the following
+  // conditions:
+  // * 'peer' has fully caught up to the leader
+  // * 'peer' is the designated successor, or no successor was designated
+  void TransferLeadershipIfNeeded(const TrackedPeer& peer,
+                                  const ConsensusStatusPB& status);
+
   // Calculate a peer's up-to-date health status based on internal fields.
   static HealthReportPB::HealthStatus PeerHealthStatus(const TrackedPeer& peer);
 
@@ -469,6 +484,7 @@ class PeerMessageQueue {
                                        int64_t term,
                                        const std::string& reason);
   void NotifyObserversOfPeerToPromote(const std::string& peer_uuid);
+  void NotifyObserversOfSuccessor(const std::string& peer_uuid);
   void NotifyObserversOfPeerHealthChange();
 
   // Notify all PeerMessageQueueObservers using the given callback function.
@@ -541,7 +557,10 @@ class PeerMessageQueue {
 
   // The currently tracked peers.
   PeersMap peers_map_;
-  mutable simple_spinlock queue_lock_; // TODO: rename
+  mutable simple_spinlock queue_lock_; // TODO(todd): rename
+
+  bool successor_watch_in_progress_;
+  boost::optional<std::string> designated_successor_uuid_;
 
   // We assume that we never have multiple threads racing to append to the queue.
   // This fake mutex adds some extra assurance that this implementation property
@@ -575,6 +594,10 @@ class PeerMessageQueueObserver {
   // NON_VOTER to VOTER.
   virtual void NotifyPeerToPromote(const std::string& peer_uuid) = 0;
 
+  // Notify the observer that the specified peer is ready to become leader, and
+  // and it should be told to run an election.
+  virtual void NotifyPeerToStartElection(const std::string& peer_uuid) = 0;
+
   // Notify the observer that the health of one of the peers has changed.
   virtual void NotifyPeerHealthChange() = 0;
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/e61de497/src/kudu/consensus/peer_manager.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/peer_manager.cc b/src/kudu/consensus/peer_manager.cc
index 134bc3a..2ec49f0 100644
--- a/src/kudu/consensus/peer_manager.cc
+++ b/src/kudu/consensus/peer_manager.cc
@@ -107,6 +107,18 @@ void PeerManager::SignalRequest(bool force_if_queue_empty) {
   }
 }
 
+Status PeerManager::StartElection(const std::string& uuid) {
+  std::shared_ptr<Peer> peer;
+  {
+    std::lock_guard<simple_spinlock> lock(lock_);
+    peer = FindPtrOrNull(peers_, uuid);
+  }
+  if (!peer) {
+    return Status::NotFound("unknown peer");
+  }
+  return peer->StartElection();
+}
+
 void PeerManager::Close() {
   {
     std::lock_guard<simple_spinlock> lock(lock_);

http://git-wip-us.apache.org/repos/asf/kudu/blob/e61de497/src/kudu/consensus/peer_manager.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/peer_manager.h b/src/kudu/consensus/peer_manager.h
index bfde816..a67c325 100644
--- a/src/kudu/consensus/peer_manager.h
+++ b/src/kudu/consensus/peer_manager.h
@@ -62,6 +62,9 @@ class PeerManager {
   // Signals all peers of the current configuration that there is a new request pending.
   void SignalRequest(bool force_if_queue_empty = false);
 
+  // Start an election on the peer with UUID 'uuid'.
+  Status StartElection(const std::string& uuid);
+
   // Closes all peers.
   void Close();
 

http://git-wip-us.apache.org/repos/asf/kudu/blob/e61de497/src/kudu/consensus/raft_consensus.cc
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.cc b/src/kudu/consensus/raft_consensus.cc
index f4ba768..fc3fdca 100644
--- a/src/kudu/consensus/raft_consensus.cc
+++ b/src/kudu/consensus/raft_consensus.cc
@@ -48,6 +48,7 @@
 #include "kudu/consensus/quorum_util.h"
 #include "kudu/gutil/bind.h"
 #include "kudu/gutil/bind_helpers.h"
+#include "kudu/gutil/macros.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/port.h"
 #include "kudu/gutil/stl_util.h"
@@ -190,6 +191,7 @@ RaftConsensus::RaftConsensus(
       raft_pool_(raft_pool),
       state_(kNew),
       rng_(GetRandomSeed32()),
+      leader_transfer_in_progress_(false),
       withhold_votes_until_(MonoTime::Min()),
       last_received_cur_leader_(MinimumOpId()),
       failed_elections_since_stable_leader_(0),
@@ -298,6 +300,18 @@ Status RaftConsensus::Start(const ConsensusBootstrapInfo& info,
       },
       MinimumElectionTimeout());
 
+  PeriodicTimer::Options opts;
+  opts.one_shot = true;
+  transfer_period_timer_ = PeriodicTimer::Create(
+      peer_proxy_factory_->messenger(),
+      [w]() {
+        if (auto consensus = w.lock()) {
+          consensus->EndLeaderTransferPeriod();
+        }
+      },
+      MinimumElectionTimeout(),
+      opts);
+
   {
     ThreadRestrictions::AssertWaitAllowed();
     LockGuard l(lock_);
@@ -561,6 +575,63 @@ Status RaftConsensus::StepDown(LeaderStepDownResponsePB* resp) {
   return Status::OK();
 }
 
+Status RaftConsensus::TransferLeadership(const boost::optional<string>& new_leader_uuid,
+                                         LeaderStepDownResponsePB* resp) {
+  TRACE_EVENT0("consensus", "RaftConsensus::TransferLeadership");
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
+  LOG_WITH_PREFIX_UNLOCKED(INFO) << "Received request to transfer leadership"
+                                 << (new_leader_uuid ?
+                                    Substitute(" to TS $0", *new_leader_uuid) :
+                                    "");
+  DCHECK((queue_->IsInLeaderMode() && cmeta_->active_role() == RaftPeerPB::LEADER) ||
+         (!queue_->IsInLeaderMode() && cmeta_->active_role() != RaftPeerPB::LEADER));
+  RETURN_NOT_OK(CheckRunningUnlocked());
+  if (cmeta_->active_role() != RaftPeerPB::LEADER) {
+    LOG_WITH_PREFIX_UNLOCKED(INFO) << "Rejecting request to transer leadership while not leader";
+    resp->mutable_error()->set_code(TabletServerErrorPB::NOT_THE_LEADER);
+    StatusToPB(Status::IllegalState("not currently leader"),
+               resp->mutable_error()->mutable_status());
+    // We return OK so that the tablet service won't overwrite the error code.
+    return Status::OK();
+  }
+  if (new_leader_uuid) {
+    if (*new_leader_uuid == peer_uuid()) {
+      // Short-circuit as we are transferring leadership to ourselves and we
+      // already checked that we are leader.
+      return Status::OK();
+    }
+    if (!IsRaftConfigVoter(*new_leader_uuid, cmeta_->ActiveConfig())) {
+      const string msg = Substitute("tablet server $0 is not a voter in the active config",
+                                    *new_leader_uuid);
+      LOG_WITH_PREFIX_UNLOCKED(INFO) << "Rejecting request to transfer leadership "
+                                     << "because " << msg;
+      return Status::InvalidArgument(msg);
+    }
+  }
+  return BeginLeaderTransferPeriodUnlocked(new_leader_uuid);
+}
+
+Status RaftConsensus::BeginLeaderTransferPeriodUnlocked(
+    const boost::optional<string>& successor_uuid) {
+  DCHECK(lock_.is_locked());
+  if (leader_transfer_in_progress_.CompareAndSwap(false, true)) {
+    return Status::ServiceUnavailable(
+        Substitute("leadership transfer for $0 already in progress",
+                   options_.tablet_id));
+  }
+  leader_transfer_in_progress_.Store(true, kMemOrderAcquire);
+  queue_->BeginWatchForSuccessor(successor_uuid);
+  transfer_period_timer_->Start();
+  return Status::OK();
+}
+
+void RaftConsensus::EndLeaderTransferPeriod() {
+  transfer_period_timer_->Stop();
+  queue_->EndWatchForSuccessor();
+  leader_transfer_in_progress_.Store(false, kMemOrderRelease);
+}
+
 scoped_refptr<ConsensusRound> RaftConsensus::NewRound(
     gscoped_ptr<ReplicateMsg> replicate_msg,
     ConsensusReplicatedCallback replicated_cb) {
@@ -600,6 +671,9 @@ Status RaftConsensus::BecomeLeaderUnlocked() {
   // Don't vote for anyone if we're a leader.
   withhold_votes_until_ = MonoTime::Max();
 
+  // Leadership never starts in a transfer period.
+  EndLeaderTransferPeriod();
+
   queue_->RegisterObserver(this);
   RETURN_NOT_OK(RefreshConsensusQueueAndPeersUnlocked());
 
@@ -822,7 +896,14 @@ void RaftConsensus::NotifyPeerToPromote(const std::string& peer_uuid) {
                                                      shared_from_this(),
                                                      peer_uuid)),
               LogPrefixThreadSafe() + "Unable to start TryPromoteNonVoterTask");
+}
 
+void RaftConsensus::NotifyPeerToStartElection(const string& peer_uuid) {
+  LOG(INFO) << "Instructing follower " << peer_uuid << " to start an election";
+  WARN_NOT_OK(raft_pool_token_->SubmitFunc(std::bind(&RaftConsensus::TryStartElectionOnPeerTask,
+                                                     shared_from_this(),
+                                                     peer_uuid)),
+              LogPrefixThreadSafe() + "Unable to start TryStartElectionOnPeerTask");
 }
 
 void RaftConsensus::NotifyPeerHealthChange() {
@@ -896,6 +977,22 @@ void RaftConsensus::TryPromoteNonVoterTask(const std::string& peer_uuid) {
               LogPrefixThreadSafe() + Substitute("Unable to promote non-voter $0", peer_uuid));
 }
 
+void RaftConsensus::TryStartElectionOnPeerTask(const string& peer_uuid) {
+  ThreadRestrictions::AssertWaitAllowed();
+  LockGuard l(lock_);
+  // Double-check that the peer is a voter in the active config.
+  if (!IsRaftConfigVoter(peer_uuid, cmeta_->ActiveConfig())) {
+    LOG_WITH_PREFIX_UNLOCKED(INFO) << "Not signalling peer " << peer_uuid
+                                   << "to start an election: it's not a voter "
+                                   << "in the active config.";
+    return;
+  }
+  LOG_WITH_PREFIX_UNLOCKED(INFO) << "Signalling peer " << peer_uuid
+                                 << "to start an election";
+  WARN_NOT_OK(peer_manager_->StartElection(peer_uuid),
+              Substitute("unable to start election on peer $0", peer_uuid));
+}
+
 Status RaftConsensus::Update(const ConsensusRequestPB* request,
                              ConsensusResponsePB* response) {
   update_calls_for_tests_.Increment();
@@ -2823,6 +2920,9 @@ Status RaftConsensus::CheckActiveLeaderUnlocked() const {
       // Check for the consistency of the information in the consensus metadata
       // and the state of the consensus queue.
       DCHECK(queue_->IsInLeaderMode());
+      if (leader_transfer_in_progress_.Load()) {
+        return Status::ServiceUnavailable("leader transfer in progress");
+      }
       return Status::OK();
 
     default:

http://git-wip-us.apache.org/repos/asf/kudu/blob/e61de497/src/kudu/consensus/raft_consensus.h
----------------------------------------------------------------------
diff --git a/src/kudu/consensus/raft_consensus.h b/src/kudu/consensus/raft_consensus.h
index f472103..de12806 100644
--- a/src/kudu/consensus/raft_consensus.h
+++ b/src/kudu/consensus/raft_consensus.h
@@ -173,9 +173,32 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
     return failure_detector_;
   }
 
-  // Implement a LeaderStepDown() request.
+  // Performs an abrupt leader step down. This node, if the leader, becomes a
+  // follower immediately and sleeps its failure detector for an extra election
+  // timeout to decrease its chances of being reelected.
   Status StepDown(LeaderStepDownResponsePB* resp);
 
+  // Attempts to gracefully transfer leadership to the peer with uuid
+  // 'new_leader_uuid' or to the next up-to-date peer the leader gets
+  // a response from if 'new_leader_uuid' is boost::none. To allow peers time
+  // to catch up, the leader will not accept write or config change requests
+  // during a 'transfer period' that lasts one election timeout. If no
+  // successor is eligible by the end of the transfer period, leadership
+  // transfer fails and the leader resumes normal operation. The transfer is
+  // asynchronous: once the transfer period is started the method returns
+  // success.
+  // Additional calls to this method during the transfer period prolong it.
+  Status TransferLeadership(const boost::optional<std::string>& new_leader_uuid,
+                            LeaderStepDownResponsePB* resp);
+
+  // Begin or end a leadership transfer period. During a transfer period, a
+  // leader will not accept writes or config changes, but will continue updating
+  // followers. If a leader transfer period is already in progress,
+  // BeginLeaderTransferPeriodUnlocked returns ServiceUnavailable.
+  Status BeginLeaderTransferPeriodUnlocked(
+      const boost::optional<std::string>& successor_uuid);
+  void EndLeaderTransferPeriod();
+
   // Creates a new ConsensusRound, the entity that owns all the data
   // structures required for a consensus round, such as the ReplicateMsg
   // (and later on the CommitMsg). ConsensusRound will also point to and
@@ -345,6 +368,8 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
 
   void NotifyPeerToPromote(const std::string& peer_uuid) override;
 
+  void NotifyPeerToStartElection(const std::string& peer_uuid) override;
+
   void NotifyPeerHealthChange() override;
 
   // Return the log indexes which the consensus implementation would like to retain.
@@ -661,6 +686,8 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
   // Attempt to promote the given non-voter to a voter.
   void TryPromoteNonVoterTask(const std::string& peer_uuid);
 
+  void TryStartElectionOnPeerTask(const std::string& peer_uuid);
+
   // Called when the failure detector expires.
   // Submits ReportFailureDetectedTask() to a thread pool.
   void ReportFailureDetected();
@@ -822,6 +849,10 @@ class RaftConsensus : public std::enable_shared_from_this<RaftConsensus>,
 
   std::shared_ptr<rpc::PeriodicTimer> failure_detector_;
 
+  AtomicBool leader_transfer_in_progress_;
+  boost::optional<std::string> designated_successor_uuid_;
+  std::shared_ptr<rpc::PeriodicTimer> transfer_period_timer_;
+
   // Lock held while starting a failure-triggered election.
   //
   // After reporting a failure and asynchronously starting an election, the

http://git-wip-us.apache.org/repos/asf/kudu/blob/e61de497/src/kudu/integration-tests/raft_consensus-itest.cc
----------------------------------------------------------------------
diff --git a/src/kudu/integration-tests/raft_consensus-itest.cc b/src/kudu/integration-tests/raft_consensus-itest.cc
index 5ee5bde..29adabc 100644
--- a/src/kudu/integration-tests/raft_consensus-itest.cc
+++ b/src/kudu/integration-tests/raft_consensus-itest.cc
@@ -112,6 +112,8 @@ using kudu::consensus::ConsensusRequestPB;
 using kudu::consensus::ConsensusResponsePB;
 using kudu::consensus::ConsensusServiceProxy;
 using kudu::consensus::EXCLUDE_HEALTH_REPORT;
+using kudu::consensus::LeaderStepDownRequestPB;
+using kudu::consensus::LeaderStepDownResponsePB;
 using kudu::consensus::MajoritySize;
 using kudu::consensus::MakeOpId;
 using kudu::consensus::OpId;
@@ -3018,5 +3020,71 @@ TEST_P(RaftConsensusParamReplicationModesITest, TestRestartWithDifferentUUID) {
     ASSERT_FALSE(files_in_wal_dir.empty());
   });
 }
+
+// Designating graceful leadership transfer to a follower that cannot catch up
+// should eventually fail.
+TEST_F(RaftConsensusITest, TestLeaderTransferWhenFollowerFallsBehindLeaderGC) {
+  if (!AllowSlowTests()) {
+    LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+    return;
+  }
+
+  vector<string> ts_flags = {
+    // Disable follower eviction.
+    "--evict_failed_followers=false",
+  };
+  vector<string> master_flags = {
+    // Prevent the master from evicting unrecoverably failed followers.
+    "--catalog_manager_evict_excess_replicas=false",
+  };
+  AddFlagsForLogRolls(&ts_flags); // For CauseFollowerToFallBehindLogGC().
+  NO_FATALS(BuildAndStart(ts_flags, master_flags));
+
+  string leader_uuid;
+  int64_t orig_term;
+  string follower_uuid;
+  NO_FATALS(CauseFollowerToFallBehindLogGC(
+      tablet_servers_, &leader_uuid, &orig_term, &follower_uuid));
+  SCOPED_TRACE(Substitute("leader: $0 follower: $1", leader_uuid, follower_uuid));
+
+  // Wait for remaining majority to agree.
+  TabletServerMap active_tablet_servers = tablet_servers_;
+  ASSERT_EQ(3, active_tablet_servers.size());
+  ASSERT_EQ(1, active_tablet_servers.erase(follower_uuid));
+  ASSERT_OK(WaitForServersToAgree(MonoDelta::FromSeconds(30), active_tablet_servers,
+                                  tablet_id_, 1));
+
+  // Try to transfer leadership to the follower that has fallen behind log GC.
+  auto* leader_ts = FindOrDie(tablet_servers_, leader_uuid);
+  ConsensusServiceProxy* c_proxy = CHECK_NOTNULL(leader_ts->consensus_proxy.get());
+  LeaderStepDownRequestPB req;
+  LeaderStepDownResponsePB resp;
+  RpcController rpc;
+
+  req.set_dest_uuid(leader_uuid);
+  req.set_tablet_id(tablet_id_);
+  req.set_mode(consensus::LeaderStepDownMode::GRACEFUL);
+  req.set_new_leader_uuid(follower_uuid);
+
+  // The request should succeed.
+  ASSERT_OK(c_proxy->LeaderStepDown(req, &resp, &rpc));
+  ASSERT_FALSE(resp.has_error()) << SecureDebugString(resp);
+
+  // However, the leader will not be able to transfer leadership to the lagging
+  // follower, and eventually will resume normal operation. We check this by
+  // waiting for a write to succeed.
+  ASSERT_EVENTUALLY([&] {
+    WriteRequestPB w_req;
+    WriteResponsePB w_resp;
+    rpc.Reset();
+    rpc.set_timeout(MonoDelta::FromSeconds(30));
+    w_req.set_tablet_id(tablet_id_);
+    ASSERT_OK(SchemaToPB(schema_, w_req.mutable_schema()));
+    AddTestRowToPB(RowOperationsPB::INSERT, schema_, kTestRowKey, kTestRowIntVal,
+                   "hello world", w_req.mutable_row_operations());
+    ASSERT_OK(leader_ts->tserver_proxy->Write(w_req, &w_resp, &rpc));
+  });
+}
+
 }  // namespace tserver
 }  // namespace kudu

http://git-wip-us.apache.org/repos/asf/kudu/blob/e61de497/src/kudu/tools/kudu-admin-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-admin-test.cc b/src/kudu/tools/kudu-admin-test.cc
index ecf6323..22612ac 100644
--- a/src/kudu/tools/kudu-admin-test.cc
+++ b/src/kudu/tools/kudu-admin-test.cc
@@ -47,6 +47,7 @@
 #include "kudu/consensus/metadata.pb.h"
 #include "kudu/consensus/opid.pb.h"
 #include "kudu/consensus/quorum_util.h"
+#include "kudu/gutil/basictypes.h"
 #include "kudu/gutil/gscoped_ptr.h"
 #include "kudu/gutil/map-util.h"
 #include "kudu/gutil/strings/split.h"
@@ -91,6 +92,7 @@ using kudu::cluster::ExternalTabletServer;
 using kudu::consensus::COMMITTED_OPID;
 using kudu::consensus::ConsensusStatePB;
 using kudu::consensus::EXCLUDE_HEALTH_REPORT;
+using kudu::consensus::LeaderStepDownMode;
 using kudu::consensus::OpId;
 using kudu::itest::FindTabletFollowers;
 using kudu::itest::FindTabletLeader;
@@ -1203,50 +1205,312 @@ Status GetTermFromConsensus(const vector<TServerDetails*>& tservers,
       "No leader replica found for tablet $0", tablet_id));
 }
 
-TEST_F(AdminCliTest, TestLeaderStepDown) {
+TEST_F(AdminCliTest, TestAbruptLeaderStepDown) {
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+  const vector<string> kMasterFlags = {
+    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
+  };
+  const vector<string> kTserverFlags = {
+    "--enable_leader_failure_detection=false",
+  };
   FLAGS_num_tablet_servers = 3;
   FLAGS_num_replicas = 3;
-  NO_FATALS(BuildAndStart());
+  NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
 
+  // Wait for the tablet to be running.
   vector<TServerDetails*> tservers;
   AppendValuesFromMap(tablet_servers_, &tservers);
   ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
   for (auto& ts : tservers) {
-    ASSERT_OK(WaitUntilTabletRunning(ts,
-                                     tablet_id_,
-                                     MonoDelta::FromSeconds(10)));
+    ASSERT_OK(WaitUntilTabletRunning(ts, tablet_id_, kTimeout));
   }
 
-  int64_t current_term;
-  ASSERT_OK(GetTermFromConsensus(tservers, tablet_id_,
-                                 &current_term));
-
-  // The leader for the given tablet may change anytime, resulting in
-  // the command returning an error code. Hence checking for term advancement
-  // only if the leader_step_down succeeds. It is also unsafe to check
-  // the term advancement without honoring status of the command since
-  // there may not have been another election in the meanwhile.
+  // Elect the leader and wait for the tservers and master to see the leader.
+  const auto* leader = tservers[0];
+  ASSERT_OK(StartElection(leader, tablet_id_, kTimeout));
+  ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_,
+                                  tablet_id_, /*minimum_index=*/1));
+  TServerDetails* master_observed_leader;
+  ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader));
+  ASSERT_EQ(leader->uuid(), master_observed_leader->uuid());
+
+  // Ask the leader to step down.
   string stderr;
   Status s = RunKuduTool({
     "tablet",
     "leader_step_down",
+    "--abrupt",
     cluster_->master()->bound_rpc_addr().ToString(),
     tablet_id_
   }, nullptr, &stderr);
-  bool not_currently_leader = stderr.find(
-      Status::IllegalState("").CodeAsString()) != string::npos;
-  ASSERT_TRUE(s.ok() || not_currently_leader) << "stderr: " << stderr;
-  if (s.ok()) {
-    int64_t new_term;
-    ASSERT_EVENTUALLY([&]() {
-        ASSERT_OK(GetTermFromConsensus(tservers, tablet_id_,
-                                       &new_term));
-        ASSERT_GT(new_term, current_term);
-      });
+
+  // There shouldn't be a leader now, since failure detection is disabled.
+  for (const auto* ts : tservers) {
+    s = GetReplicaStatusAndCheckIfLeader(ts, tablet_id_, kTimeout);
+    ASSERT_TRUE(s.IsIllegalState()) << "Expected IllegalState because replica "
+      "should not be the leader: " << s.ToString();
+  }
+}
+
+TEST_F(AdminCliTest, TestGracefulLeaderStepDown) {
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+  const vector<string> kMasterFlags = {
+    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
+  };
+  const vector<string> kTserverFlags = {
+    "--enable_leader_failure_detection=false",
+  };
+  FLAGS_num_tablet_servers = 3;
+  FLAGS_num_replicas = 3;
+  NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
+
+  // Wait for the tablet to be running.
+  vector<TServerDetails*> tservers;
+  AppendValuesFromMap(tablet_servers_, &tservers);
+  ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
+  for (auto& ts : tservers) {
+    ASSERT_OK(WaitUntilTabletRunning(ts, tablet_id_, kTimeout));
+  }
+
+  // Elect the leader and wait for the tservers and master to see the leader.
+  const auto* leader = tservers[0];
+  ASSERT_OK(StartElection(leader, tablet_id_, kTimeout));
+  ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_,
+                                  tablet_id_, /*minimum_index=*/1));
+  TServerDetails* master_observed_leader;
+  ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader));
+  ASSERT_EQ(leader->uuid(), master_observed_leader->uuid());
+
+  // Ask the leader to transfer leadership to a specific peer.
+  const auto new_leader_uuid = tservers[1]->uuid();
+  string stderr;
+  Status s = RunKuduTool({
+    "tablet",
+    "leader_step_down",
+    Substitute("--new_leader_uuid=$0", new_leader_uuid),
+    cluster_->master()->bound_rpc_addr().ToString(),
+    tablet_id_
+  }, nullptr, &stderr);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+
+  // Eventually, the chosen node should become leader.
+  ASSERT_EVENTUALLY([&]() {
+    ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader));
+    ASSERT_EQ(new_leader_uuid, master_observed_leader->uuid());
+  });
+
+  // Ask the leader to transfer leadership.
+  s = RunKuduTool({
+    "tablet",
+    "leader_step_down",
+    cluster_->master()->bound_rpc_addr().ToString(),
+    tablet_id_
+  }, nullptr, &stderr);
+  ASSERT_TRUE(s.ok()) << s.ToString();
+
+  // Eventually, some other node should become leader.
+  const std::unordered_set<string> possible_new_leaders = { tservers[0]->uuid(),
+                                                            tservers[2]->uuid() };
+  ASSERT_EVENTUALLY([&]() {
+    ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader));
+    ASSERT_TRUE(ContainsKey(possible_new_leaders, master_observed_leader->uuid()));
+  });
+}
+
+// Leader should reject requests to transfer leadership to a non-member of the
+// config.
+TEST_F(AdminCliTest, TestLeaderTransferToEvictedPeer) {
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+  // In this test, tablet leadership is manually controlled and the master
+  // should not rereplicate.
+  const vector<string> kMasterFlags = {
+    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
+    "--master_add_server_when_underreplicated=false",
+  };
+  const vector<string> kTserverFlags = {
+    "--enable_leader_failure_detection=false",
+  };
+  FLAGS_num_tablet_servers = 3;
+  FLAGS_num_replicas = 3;
+  NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
+
+  // Wait for the tablet to be running.
+  vector<TServerDetails*> tservers;
+  AppendValuesFromMap(tablet_servers_, &tservers);
+  ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
+  for (auto& ts : tservers) {
+    ASSERT_OK(WaitUntilTabletRunning(ts, tablet_id_, kTimeout));
+  }
+
+  // Elect the leader and wait for the tservers and master to see the leader.
+  const auto* leader = tservers[0];
+  ASSERT_OK(StartElection(leader, tablet_id_, kTimeout));
+  ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_,
+                                  tablet_id_, /*minimum_index=*/1));
+  TServerDetails* master_observed_leader;
+  ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader));
+  ASSERT_EQ(leader->uuid(), master_observed_leader->uuid());
+
+  const string& master_addr = cluster_->master()->bound_rpc_addr().ToString();
+
+  // Evict the first follower.
+  string stderr;
+  const auto evicted_uuid = tservers[1]->uuid();
+  Status s = RunKuduTool({
+    "tablet",
+    "change_config",
+    "remove_replica",
+    master_addr,
+    tablet_id_,
+    evicted_uuid,
+  }, nullptr, &stderr);
+  ASSERT_TRUE(s.ok()) << s.ToString() << " stderr: " << stderr;
+
+  // Ask the leader to transfer leadership to the evicted peer.
+  stderr.clear();
+  s = RunKuduTool({
+    "tablet",
+    "leader_step_down",
+    Substitute("--new_leader_uuid=$0", evicted_uuid),
+    master_addr,
+    tablet_id_
+  }, nullptr, &stderr);
+  ASSERT_TRUE(s.IsRuntimeError()) << s.ToString() << " stderr: " << stderr;
+  ASSERT_STR_CONTAINS(stderr,
+                      Substitute("tablet server $0 is not a voter in the active config",
+                                 evicted_uuid));
+}
+
+// Leader should reject requests to transfer leadership to a non-voter of the
+// config.
+TEST_F(AdminCliTest, TestLeaderTransferToNonVoter) {
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+  // In this test, tablet leadership is manually controlled and the master
+  // should not rereplicate.
+  const vector<string> kMasterFlags = {
+    "--catalog_manager_wait_for_new_tablets_to_elect_leader=false",
+    "--master_add_server_when_underreplicated=false",
+  };
+  const vector<string> kTserverFlags = {
+    "--enable_leader_failure_detection=false",
+  };
+  FLAGS_num_tablet_servers = 3;
+  FLAGS_num_replicas = 3;
+  NO_FATALS(BuildAndStart(kTserverFlags, kMasterFlags));
+
+  // Wait for the tablet to be running.
+  vector<TServerDetails*> tservers;
+  AppendValuesFromMap(tablet_servers_, &tservers);
+  ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
+  for (auto& ts : tservers) {
+    ASSERT_OK(WaitUntilTabletRunning(ts, tablet_id_, kTimeout));
+  }
+
+  // Elect the leader and wait for the tservers and master to see the leader.
+  const auto* leader = tservers[0];
+  ASSERT_OK(StartElection(leader, tablet_id_, kTimeout));
+  ASSERT_OK(WaitForServersToAgree(kTimeout, tablet_servers_,
+                                  tablet_id_, /*minimum_index=*/1));
+  TServerDetails* master_observed_leader;
+  ASSERT_OK(GetLeaderReplicaWithRetries(tablet_id_, &master_observed_leader));
+  ASSERT_EQ(leader->uuid(), master_observed_leader->uuid());
+
+  const string& master_addr = cluster_->master()->bound_rpc_addr().ToString();
+
+  // Demote the first follower to a non-voter.
+  string stderr;
+  const auto non_voter_uuid = tservers[1]->uuid();
+  Status s = RunKuduTool({
+    "tablet",
+    "change_config",
+    "change_replica_type",
+    master_addr,
+    tablet_id_,
+    non_voter_uuid,
+    "NON_VOTER",
+  }, nullptr, &stderr);
+  ASSERT_TRUE(s.ok()) << s.ToString() << " stderr: " << stderr;
+
+  // Ask the leader to transfer leadership to the non-voter.
+  stderr.clear();
+  s = RunKuduTool({
+    "tablet",
+    "leader_step_down",
+    Substitute("--new_leader_uuid=$0", non_voter_uuid),
+    master_addr,
+    tablet_id_
+  }, nullptr, &stderr);
+  ASSERT_TRUE(s.IsRuntimeError()) << s.ToString() << " stderr: " << stderr;
+  ASSERT_STR_CONTAINS(stderr,
+                      Substitute("tablet server $0 is not a voter in the active config",
+                                 non_voter_uuid));
+}
+
+// Leader transfer causes the tablet to stop accepting new writes. This test
+// tests that writes can still succeed even if lots of leader transfers and
+// abrupt stepdowns are happening, as long as the writes have long enough
+// timeouts to ride over the unstable leadership.
+TEST_F(AdminCliTest, TestSimultaneousLeaderTransferAndAbruptStepdown) {
+  if (!AllowSlowTests()) {
+    LOG(WARNING) << "test is skipped; set KUDU_ALLOW_SLOW_TESTS=1 to run";
+    return;
+  }
+
+  const MonoDelta kTimeout = MonoDelta::FromSeconds(10);
+  FLAGS_num_tablet_servers = 3;
+  FLAGS_num_replicas = 3;
+  NO_FATALS(BuildAndStart());
+
+  // Wait for the tablet to be running.
+  vector<TServerDetails*> tservers;
+  AppendValuesFromMap(tablet_servers_, &tservers);
+  ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
+  for (auto& ts : tservers) {
+    ASSERT_OK(WaitUntilTabletRunning(ts, tablet_id_, kTimeout));
+  }
+
+  // Start a workload with long timeouts. Everything should eventually go
+  // through but it might take a while given the leadership changes.
+  TestWorkload workload(cluster_.get());
+  workload.set_table_name(kTableId);
+  workload.set_timeout_allowed(false);
+  workload.set_write_timeout_millis(60000);
+  workload.set_num_replicas(FLAGS_num_replicas);
+  workload.set_num_write_threads(1);
+  workload.set_write_batch_size(1);
+  workload.Setup();
+  workload.Start();
+
+  const string& master_addr = cluster_->master()->bound_rpc_addr().ToString();
+  while (workload.rows_inserted() < 1000) {
+    // Issue a graceful stepdown and then an abrupt stepdown, every second.
+    // The results are ignored because the tools might fail due to the
+    // constant leadership changes.
+    ignore_result(RunKuduTool({
+      "tablet",
+      "leader_step_down",
+      master_addr,
+      tablet_id_
+    }));
+    ignore_result(RunKuduTool({
+      "tablet",
+      "leader_step_down",
+      "--abrupt",
+      master_addr,
+      tablet_id_
+    }));
+    SleepFor(MonoDelta::FromMilliseconds(1000));
   }
 }
 
-TEST_F(AdminCliTest, TestLeaderStepDownWhenNotPresent) {
+class TestLeaderStepDown :
+    public AdminCliTest,
+    public ::testing::WithParamInterface<LeaderStepDownMode> {
+};
+INSTANTIATE_TEST_CASE_P(, TestLeaderStepDown,
+                        ::testing::Values(LeaderStepDownMode::ABRUPT,
+                                          LeaderStepDownMode::GRACEFUL));
+TEST_P(TestLeaderStepDown, TestLeaderStepDownWhenNotPresent) {
   FLAGS_num_tablet_servers = 3;
   FLAGS_num_replicas = 3;
   NO_FATALS(BuildAndStart(
@@ -1268,6 +1532,7 @@ TEST_F(AdminCliTest, TestLeaderStepDownWhenNotPresent) {
   ASSERT_OK(RunKuduTool({
     "tablet",
     "leader_step_down",
+    Substitute("--abrupt=$0", GetParam() == LeaderStepDownMode::ABRUPT),
     cluster_->master()->bound_rpc_addr().ToString(),
     tablet_id_
   }, &stdout));
@@ -1276,6 +1541,56 @@ TEST_F(AdminCliTest, TestLeaderStepDownWhenNotPresent) {
                                  tablet_id_));
 }
 
+TEST_P(TestLeaderStepDown, TestRepeatedLeaderStepDown) {
+  FLAGS_num_tablet_servers = 3;
+  FLAGS_num_replicas = 3;
+  // Speed up leader failure detection and shorten the leader transfer period.
+  NO_FATALS(BuildAndStart({ "--raft_heartbeat_interval_ms=50" }));
+  vector<TServerDetails*> tservers;
+  AppendValuesFromMap(tablet_servers_, &tservers);
+  ASSERT_EQ(FLAGS_num_tablet_servers, tservers.size());
+  for (auto& ts : tservers) {
+    ASSERT_OK(WaitUntilTabletRunning(ts,
+                                     tablet_id_,
+                                     MonoDelta::FromSeconds(10)));
+  }
+
+  // Start a workload.
+  TestWorkload workload(cluster_.get());
+  workload.set_table_name(kTableId);
+  workload.set_timeout_allowed(false);
+  workload.set_write_timeout_millis(30000);
+  workload.set_num_replicas(FLAGS_num_replicas);
+  workload.set_num_write_threads(4);
+  workload.set_write_batch_size(1);
+  workload.Setup();
+  workload.Start();
+
+  // Issue stepdown requests repeatedly. If we leave some time for an election,
+  // the workload should still make progress.
+  const string abrupt_flag = Substitute("--abrupt=$0",
+                                        GetParam() == LeaderStepDownMode::ABRUPT);
+  string stdout;
+  string stderr;
+  while (workload.rows_inserted() < 2000) {
+    stdout.clear();
+    stderr.clear();
+    Status s = RunKuduTool({
+      "tablet",
+      "leader_step_down",
+      abrupt_flag,
+      cluster_->master()->bound_rpc_addr().ToString(),
+      tablet_id_
+    }, &stdout, &stderr);
+    bool not_currently_leader = stderr.find(
+        Status::IllegalState("").CodeAsString()) != string::npos;
+    ASSERT_TRUE(s.ok() || not_currently_leader) << s.ToString();
+    SleepFor(MonoDelta::FromMilliseconds(1000));
+  }
+
+  ClusterVerifier(cluster_.get()).CheckCluster();
+}
+
 TEST_F(AdminCliTest, TestDeleteTable) {
   FLAGS_num_tablet_servers = 1;
   FLAGS_num_replicas = 1;

http://git-wip-us.apache.org/repos/asf/kudu/blob/e61de497/src/kudu/tools/kudu-tool-test.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/kudu-tool-test.cc b/src/kudu/tools/kudu-tool-test.cc
index a4ae0a4..cde6559 100644
--- a/src/kudu/tools/kudu-tool-test.cc
+++ b/src/kudu/tools/kudu-tool-test.cc
@@ -594,7 +594,7 @@ TEST_F(ToolTest, TestModeHelp) {
   {
     const vector<string> kTabletModeRegexes = {
         "change_config.*Change.*Raft configuration",
-        "leader_step_down.*Force the tablet's leader replica to step down"
+        "leader_step_down.*Change.*tablet's leader"
     };
     NO_FATALS(RunTestHelp("tablet", kTabletModeRegexes));
   }

http://git-wip-us.apache.org/repos/asf/kudu/blob/e61de497/src/kudu/tools/tool_action_tablet.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_action_tablet.cc b/src/kudu/tools/tool_action_tablet.cc
index da0209b..259828f 100644
--- a/src/kudu/tools/tool_action_tablet.cc
+++ b/src/kudu/tools/tool_action_tablet.cc
@@ -44,6 +44,19 @@
 #include "kudu/util/status.h"
 #include "kudu/util/string_case.h"
 
+DEFINE_bool(abrupt, false,
+            "Whether the leader should step down without attempting to "
+            "transfer leadership gracefully. A graceful transfer minimizes "
+            "delays in tablet operations, but will fail if the tablet cannot "
+            "arrange a successor.");
+DEFINE_string(new_leader_uuid, "",
+              "UUID of the server that leadership should be transferred to. "
+              "Leadership may only be transferred to a voting member of the "
+              "leader's active config. If the designated successor cannot "
+              "catch up to the leader within one election timeout, leadership "
+              "transfer will not occur. If blank, the leader chooses its own "
+              "successor, attempting to transfer leadership as soon as "
+              "possible. This cannot be set if --abrupt is set.");
 DEFINE_int64(move_copy_timeout_sec, 600,
              "Number of seconds to wait for tablet copy to complete when relocating a tablet");
 DEFINE_int64(move_leader_timeout_sec, 30,
@@ -51,16 +64,9 @@ DEFINE_int64(move_leader_timeout_sec, 30,
 
 using kudu::client::KuduClient;
 using kudu::client::KuduClientBuilder;
-using kudu::client::KuduTablet;
-using kudu::client::KuduTabletServer;
 using kudu::consensus::ADD_PEER;
-using kudu::consensus::BulkChangeConfigRequestPB;
 using kudu::consensus::ChangeConfigType;
-using kudu::consensus::ConsensusStatePB;
-using kudu::consensus::GetConsensusStateRequestPB;
-using kudu::consensus::GetConsensusStateResponsePB;
-using kudu::consensus::GetLastOpIdRequestPB;
-using kudu::consensus::GetLastOpIdResponsePB;
+using kudu::consensus::LeaderStepDownMode;
 using kudu::consensus::MODIFY_PEER;
 using kudu::consensus::RaftPeerPB;
 using kudu::master::MasterServiceProxy;
@@ -137,25 +143,48 @@ Status LeaderStepDown(const RunnerContext& context) {
                                                  kMasterAddressesArg);
   vector<string> master_addresses = Split(master_addresses_str, ",");
   const string& tablet_id = FindOrDie(context.required_args, kTabletIdArg);
+  const LeaderStepDownMode mode = FLAGS_abrupt ? LeaderStepDownMode::ABRUPT :
+                                                 LeaderStepDownMode::GRACEFUL;
+  const boost::optional<string> new_leader_uuid =
+    FLAGS_new_leader_uuid.empty() ? boost::none :
+                                    boost::make_optional(FLAGS_new_leader_uuid);
+  if (mode == LeaderStepDownMode::ABRUPT && new_leader_uuid) {
+    return Status::InvalidArgument("cannot specify both --new_leader_uuid and --abrupt");
+  }
 
   client::sp::shared_ptr<KuduClient> client;
   RETURN_NOT_OK(KuduClientBuilder()
                 .master_server_addrs(master_addresses)
                 .Build(&client));
 
-  // If leader is not present, command can gracefully return.
   string leader_uuid;
   HostPort leader_hp;
-  bool is_no_leader = false;
+  bool no_leader = false;
   Status s = GetTabletLeader(client, tablet_id,
-                             &leader_uuid, &leader_hp, &is_no_leader);
-  if (s.IsNotFound() && is_no_leader) {
+                             &leader_uuid, &leader_hp, &no_leader);
+  if (s.IsNotFound() && no_leader) {
+    // If leadership should be transferred to a specific node, exit with an
+    // error if there's no leader since we can't orchestrate the transfer.
+    if (new_leader_uuid) {
+        return s.CloneAndPrepend(
+            Substitute("unable to transfer leadership to $0", new_leader_uuid.get()));
+    }
+    // Otherwise, a new election should happen soon, which will achieve
+    // something like what the client wanted, so we'll exit gracefully.
     cout << s.ToString() << endl;
     return Status::OK();
   }
   RETURN_NOT_OK(s);
 
+  // If the requested new leader is the leader, the command can short-circuit.
+  if (new_leader_uuid && (leader_uuid == new_leader_uuid.get())) {
+    cout << Substitute("Requested new leader $0 is already the leader",
+                       leader_uuid) << endl;
+    return Status::OK();
+  }
+
   return DoLeaderStepDown(tablet_id, leader_uuid, leader_hp,
+                          mode, new_leader_uuid,
                           client->default_admin_operation_timeout());
 }
 
@@ -288,9 +317,11 @@ unique_ptr<Mode> BuildTabletMode() {
 
   unique_ptr<Action> leader_step_down =
       ActionBuilder("leader_step_down", &LeaderStepDown)
-      .Description("Force the tablet's leader replica to step down")
+      .Description("Change the tablet's leader")
       .AddRequiredParameter({ kMasterAddressesArg, kMasterAddressesArgDesc })
       .AddRequiredParameter({ kTabletIdArg, kTabletIdArgDesc })
+      .AddOptionalParameter("abrupt")
+      .AddOptionalParameter("new_leader_uuid")
       .Build();
 
   unique_ptr<Mode> change_config =

http://git-wip-us.apache.org/repos/asf/kudu/blob/e61de497/src/kudu/tools/tool_replica_util.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_replica_util.cc b/src/kudu/tools/tool_replica_util.cc
index f8e7339..ccec47e 100644
--- a/src/kudu/tools/tool_replica_util.cc
+++ b/src/kudu/tools/tool_replica_util.cc
@@ -60,6 +60,7 @@ using kudu::consensus::GetConsensusStateRequestPB;
 using kudu::consensus::GetConsensusStateResponsePB;
 using kudu::consensus::GetLastOpIdRequestPB;
 using kudu::consensus::GetLastOpIdResponsePB;
+using kudu::consensus::LeaderStepDownMode;
 using kudu::consensus::MODIFY_PEER;
 using kudu::consensus::OpId;
 using kudu::consensus::REMOVE_PEER;
@@ -136,13 +137,24 @@ Status GetConsensusState(const unique_ptr<ConsensusServiceProxy>& proxy,
 Status DoLeaderStepDown(const string& tablet_id,
                         const string& leader_uuid,
                         const HostPort& leader_hp,
+                        LeaderStepDownMode mode,
+                        const boost::optional<string>& new_leader_uuid,
                         const MonoDelta& timeout) {
+  if (mode == LeaderStepDownMode::ABRUPT && new_leader_uuid) {
+    return Status::InvalidArgument(
+        "cannot specify a new leader uuid for an abrupt stepdown");
+  }
+
   unique_ptr<ConsensusServiceProxy> proxy;
   RETURN_NOT_OK(BuildProxy(leader_hp.host(), leader_hp.port(), &proxy));
 
   consensus::LeaderStepDownRequestPB req;
   req.set_dest_uuid(leader_uuid);
   req.set_tablet_id(tablet_id);
+  req.set_mode(mode);
+  if (new_leader_uuid) {
+    req.set_new_leader_uuid(new_leader_uuid.get());
+  }
 
   RpcController rpc;
   rpc.set_timeout(timeout);
@@ -202,6 +214,7 @@ Status CheckCompleteMove(const vector<string>& master_addresses,
   DCHECK(is_complete);
   DCHECK(completion_status);
   *is_complete = false;
+
   // Get the latest leader info. It may change later, due to our actions or
   // outside factors.
   string orig_leader_uuid;
@@ -276,6 +289,7 @@ Status CheckCompleteMove(const vector<string>& master_addresses,
           (is_343_scheme || DoKsckForTablet(master_addresses, tablet_id).ok())) {
         // The leader is the node we intend to remove; make it step down.
         ignore_result(DoLeaderStepDown(tablet_id, orig_leader_uuid, orig_leader_hp,
+                                       LeaderStepDownMode::GRACEFUL, boost::none,
                                        client->default_admin_operation_timeout()));
       }
       from_ts_uuid_in_config = true;

http://git-wip-us.apache.org/repos/asf/kudu/blob/e61de497/src/kudu/tools/tool_replica_util.h
----------------------------------------------------------------------
diff --git a/src/kudu/tools/tool_replica_util.h b/src/kudu/tools/tool_replica_util.h
index 19267a4..07b47c0 100644
--- a/src/kudu/tools/tool_replica_util.h
+++ b/src/kudu/tools/tool_replica_util.h
@@ -54,11 +54,31 @@ Status GetConsensusState(
     consensus::ConsensusStatePB* consensus_state,
     bool* is_3_4_3_replication = nullptr);
 
-// Request current leader replica 'leader_uuid' to step down.
+// Request that the replica with UUID 'leader_uuid' step down.
+// In GRACEFUL mode:
+//   * If 'new_leader_uuid' is not boost::none, the leader will attempt
+//     to gracefully transfer leadership to the replica with that UUID.
+//   * If 'new_leader_uuid' is boost::none, the replica will choose its own
+//     successor, preferring to transfer leadership ASAP.
+// In ABRUPT mode, the replica will step down without arranging a successor.
+// 'new_leader_uuid' has no effect in this mode and must be provided as
+// boost::none.
+// Note that in neither mode does this function guarantee that leadership will
+// change, even if it returns OK. In ABRUPT mode, if the function succeeds,
+// the leader will step down, but it may be reelected again. In GRACEFUL mode,
+// the leader may not relinquish leadership at all, or it may and may be
+// reelected, even if the function succeeds. The advantage of GRACEFUL mode is
+// that it is on average less disruptive to tablet operations, particularly
+// when the leadership transfer fails.
+//
+// If a caller wants to ensure leadership changes, it must wait and see if
+// leadership changes as expected and, if not, retry.
 Status DoLeaderStepDown(
     const std::string& tablet_id,
     const std::string& leader_uuid,
     const HostPort& leader_hp,
+    consensus::LeaderStepDownMode mode,
+    const boost::optional<std::string>& new_leader_uuid,
     const MonoDelta& timeout);
 
 // Get information on the current leader replica for the specified tablet.

http://git-wip-us.apache.org/repos/asf/kudu/blob/e61de497/src/kudu/tserver/tablet_service.cc
----------------------------------------------------------------------
diff --git a/src/kudu/tserver/tablet_service.cc b/src/kudu/tserver/tablet_service.cc
index 8957ce5..f8b309e 100644
--- a/src/kudu/tserver/tablet_service.cc
+++ b/src/kudu/tserver/tablet_service.cc
@@ -145,6 +145,7 @@ using kudu::consensus::ConsensusResponsePB;
 using kudu::consensus::GetLastOpIdRequestPB;
 using kudu::consensus::GetNodeInstanceRequestPB;
 using kudu::consensus::GetNodeInstanceResponsePB;
+using kudu::consensus::LeaderStepDownMode;
 using kudu::consensus::LeaderStepDownRequestPB;
 using kudu::consensus::LeaderStepDownResponsePB;
 using kudu::consensus::OpId;
@@ -1139,6 +1140,14 @@ void ConsensusServiceImpl::LeaderStepDown(const LeaderStepDownRequestPB* req,
                                           RpcContext* context) {
   LOG(INFO) << "Received LeaderStepDown RPC: " << SecureDebugString(*req)
             << " from " << context->requestor_string();
+  if (PREDICT_FALSE(!req->new_leader_uuid().empty() &&
+                    req->mode() == LeaderStepDownMode::ABRUPT)) {
+    Status s = Status::InvalidArgument(
+        "cannot specify a new leader uuid for an abrupt step down");
+    SetupErrorAndRespond(resp->mutable_error(), s,
+                         TabletServerErrorPB::UNKNOWN_ERROR,
+                         context);
+  }
   if (!CheckUuidMatchOrRespond(tablet_manager_, "LeaderStepDown", req, resp, context)) {
     return;
   }
@@ -1150,14 +1159,24 @@ void ConsensusServiceImpl::LeaderStepDown(const LeaderStepDownRequestPB* req,
 
   shared_ptr<RaftConsensus> consensus;
   if (!GetConsensusOrRespond(replica, resp, context, &consensus)) return;
-  Status s = consensus->StepDown(resp);
-  if (PREDICT_FALSE(!s.ok())) {
-    SetupErrorAndRespond(resp->mutable_error(), s,
-                         TabletServerErrorPB::UNKNOWN_ERROR,
-                         context);
-    return;
+  switch (req->mode()) {
+    case LeaderStepDownMode::ABRUPT:
+      HandleResponse(req, resp, context, consensus->StepDown(resp));
+      break;
+    case LeaderStepDownMode::GRACEFUL: {
+      const auto new_leader_uuid =
+        req->new_leader_uuid().empty() ?
+        boost::none :
+        boost::make_optional(req->new_leader_uuid());
+      Status s = consensus->TransferLeadership(new_leader_uuid, resp);
+      HandleResponse(req, resp, context, s);
+      break;
+    }
+    default:
+      Status s = Status::InvalidArgument(
+          Substitute("unknown LeaderStepDown mode: $0", req->mode()));
+      HandleUnknownError(s, resp, context);
   }
-  context->RespondSuccess();
 }
 
 void ConsensusServiceImpl::GetLastOpId(const consensus::GetLastOpIdRequestPB *req,