You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/12/24 00:50:14 UTC

[incubator-ratis] branch master updated: RATIS-1260. Implement transferLeaderShip in server (#372)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a4141a  RATIS-1260. Implement transferLeaderShip in server (#372)
5a4141a is described below

commit 5a4141a20d6025d1071f836b711a2ec373c4f7a7
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Thu Dec 24 08:50:06 2020 +0800

    RATIS-1260. Implement transferLeaderShip in server (#372)
---
 .../apache/ratis/client/impl/ClientProtoUtils.java |  16 +++
 .../org/apache/ratis/protocol/RaftClientReply.java |  13 ++-
 .../exceptions/LeaderSteppingDownException.java    |  33 ++++++
 .../exceptions/TransferLeadershipException.java    |  33 ++++++
 .../src/test/java/org/apache/ratis/BaseTest.java   |  11 ++
 ratis-proto/src/main/proto/Raft.proto              |   2 +
 .../ratis/server/impl/RaftConfigurationImpl.java   |  11 ++
 .../apache/ratis/server/impl/RaftServerImpl.java   |  85 +++++++++++++--
 .../org/apache/ratis/server/impl/ServerState.java  |   3 +
 .../ratis/server/impl/TransferLeadership.java      | 118 +++++++++++++++++++++
 .../ratis/server/impl/GroupManagementBaseTest.java |  15 +--
 .../ratis/server/impl/LeaderElectionTests.java     |  37 +++++++
 12 files changed, 353 insertions(+), 24 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 50af252..8543523 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -25,10 +25,12 @@ import org.apache.ratis.protocol.*;
 import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
 import org.apache.ratis.protocol.exceptions.DataStreamException;
 import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
+import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.protocol.exceptions.NotReplicatedException;
 import org.apache.ratis.protocol.exceptions.RaftException;
 import org.apache.ratis.protocol.exceptions.StateMachineException;
+import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
 import org.apache.ratis.rpc.CallId;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
@@ -41,9 +43,11 @@ import java.util.stream.Collectors;
 import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.ALREADYCLOSEDEXCEPTION;
 import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.DATASTREAMEXCEPTION;
 import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.LEADERNOTREADYEXCEPTION;
+import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.LEADERSTEPPINGDOWNEXCEPTION;
 import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTLEADEREXCEPTION;
 import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.NOTREPLICATEDEXCEPTION;
 import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.STATEMACHINEEXCEPTION;
+import static org.apache.ratis.proto.RaftProtos.RaftClientReplyProto.ExceptionDetailsCase.TRANSFERLEADERSHIPEXCEPTION;
 
 public interface ClientProtoUtils {
 
@@ -255,6 +259,14 @@ public interface ClientProtoUtils {
           .map(ProtoUtils::toThrowableProto)
           .ifPresent(b::setAlreadyClosedException);
 
+      Optional.ofNullable(reply.getLeaderSteppingDownException())
+          .map(ProtoUtils::toThrowableProto)
+          .ifPresent(b::setLeaderSteppingDownException);
+
+      Optional.ofNullable(reply.getTransferLeadershipException())
+          .map(ProtoUtils::toThrowableProto)
+          .ifPresent(b::setTransferLeadershipException);
+
       final RaftClientReplyProto serialized = b.build();
       final RaftException e = reply.getException();
       if (e != null) {
@@ -340,6 +352,10 @@ public interface ClientProtoUtils {
       e = new LeaderNotReadyException(ProtoUtils.toRaftGroupMemberId(lnreProto.getServerId()));
     } else if (replyProto.getExceptionDetailsCase().equals(ALREADYCLOSEDEXCEPTION)) {
       e = ProtoUtils.toThrowable(replyProto.getAlreadyClosedException(), AlreadyClosedException.class);
+    } else if (replyProto.getExceptionDetailsCase().equals(LEADERSTEPPINGDOWNEXCEPTION)) {
+      e = ProtoUtils.toThrowable(replyProto.getLeaderSteppingDownException(), LeaderSteppingDownException.class);
+    } else if (replyProto.getExceptionDetailsCase().equals(TRANSFERLEADERSHIPEXCEPTION)) {
+      e = ProtoUtils.toThrowable(replyProto.getTransferLeadershipException(), TransferLeadershipException.class);
     } else {
       e = null;
     }
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index e61b534..ada67c9 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -21,10 +21,12 @@ import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
 import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
 import org.apache.ratis.protocol.exceptions.DataStreamException;
 import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
+import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.protocol.exceptions.NotReplicatedException;
 import org.apache.ratis.protocol.exceptions.RaftException;
 import org.apache.ratis.protocol.exceptions.StateMachineException;
+import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.ProtoUtils;
@@ -166,7 +168,8 @@ public class RaftClientReply extends RaftClientMessage {
       Preconditions.assertTrue(ReflectionUtils.isInstance(exception,
           AlreadyClosedException.class,
           NotLeaderException.class, NotReplicatedException.class,
-          LeaderNotReadyException.class, StateMachineException.class, DataStreamException.class),
+          LeaderNotReadyException.class, StateMachineException.class, DataStreamException.class,
+          LeaderSteppingDownException.class, TransferLeadershipException.class),
           () -> "Unexpected exception class: " + this);
     }
   }
@@ -234,6 +237,14 @@ public class RaftClientReply extends RaftClientMessage {
     return JavaUtils.cast(exception, LeaderNotReadyException.class);
   }
 
+  public LeaderSteppingDownException getLeaderSteppingDownException() {
+    return JavaUtils.cast(exception, LeaderSteppingDownException.class);
+  }
+
+  public TransferLeadershipException getTransferLeadershipException() {
+    return JavaUtils.cast(exception, TransferLeadershipException.class);
+  }
+
   /** @return the exception, if there is any; otherwise, return null. */
   public RaftException getException() {
     return exception;
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/LeaderSteppingDownException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/LeaderSteppingDownException.java
new file mode 100644
index 0000000..bc81893
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/LeaderSteppingDownException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol.exceptions;
+
+import org.apache.ratis.protocol.RaftGroupMemberId;
+
+public class LeaderSteppingDownException extends RaftException {
+  private final RaftGroupMemberId serverId;
+
+  public LeaderSteppingDownException(RaftGroupMemberId id) {
+    super(id + " is in steppingDown");
+    this.serverId = id;
+  }
+
+  public RaftGroupMemberId getServerId() {
+    return serverId;
+  }
+}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/TransferLeadershipException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/TransferLeadershipException.java
new file mode 100644
index 0000000..2a503aa
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/TransferLeadershipException.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol.exceptions;
+
+import org.apache.ratis.protocol.RaftGroupMemberId;
+
+public class TransferLeadershipException extends RaftException {
+  private final RaftGroupMemberId serverId;
+
+  public TransferLeadershipException(RaftGroupMemberId id, String msg) {
+    super(msg);
+    this.serverId = id;
+  }
+
+  public RaftGroupMemberId getServerId() {
+    return serverId;
+  }
+}
diff --git a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
index 3b3d8cc..42d1e77 100644
--- a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
+++ b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
@@ -77,6 +77,17 @@ public abstract class BaseTest {
     }
   }
 
+  public List<RaftPeer> getPeersWithPriority(List<RaftPeer> peers, RaftPeer suggestedLeader) {
+    List<RaftPeer> peersWithPriority = new ArrayList<>();
+    for (int i = 0; i < peers.size(); i++) {
+      RaftPeer peer = peers.get(i);
+      final int priority = peer.equals(suggestedLeader)? 2: 1;
+      peersWithPriority.add(
+          RaftPeer.newBuilder().setId(peer.getId()).setAddress(peer.getAddress()).setPriority(priority).build());
+    }
+    return peersWithPriority;
+  }
+
   public RoutingTable getRoutingTable(Collection<RaftPeer> peers, RaftPeer primary) {
     RoutingTable.Builder builder = RoutingTable.newBuilder();
     RaftPeer previous = primary;
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index d06431a..16c26c0 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -372,6 +372,8 @@ message RaftClientReplyProto {
     LeaderNotReadyExceptionProto leaderNotReadyException = 6;
     ThrowableProto alreadyClosedException = 7;
     ThrowableProto dataStreamException = 8;
+    ThrowableProto leaderSteppingDownException = 9;
+    ThrowableProto transferLeadershipException = 10;
   }
 
   uint64 logIndex = 14; // When the request is a write request and the reply is success, the log index of the transaction
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
index 8a4ebd1..bb85f75 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
@@ -145,6 +145,17 @@ final class RaftConfigurationImpl implements RaftConfiguration {
     return conf.contains(peerId);
   }
 
+  boolean isHighestPriority(RaftPeerId peerId) {
+    RaftPeer target = getPeer(peerId);
+    Collection<RaftPeer> peers = getCurrentPeers();
+    for (RaftPeer peer : peers) {
+      if (peer.getPriority() >= target.getPriority() && !peer.equals(target)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   boolean containsInOldConf(RaftPeerId peerId) {
     return oldConf != null && oldConf.contains(peerId);
   }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index e13e432..22494b3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -25,6 +25,7 @@ import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.protocol.exceptions.GroupMismatchException;
 import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
+import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
 import org.apache.ratis.protocol.exceptions.NotLeaderException;
 import org.apache.ratis.protocol.exceptions.RaftException;
 import org.apache.ratis.protocol.exceptions.ReconfigurationInProgressException;
@@ -32,6 +33,7 @@ import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
 import org.apache.ratis.protocol.exceptions.ServerNotReadyException;
 import org.apache.ratis.protocol.exceptions.StaleReadException;
 import org.apache.ratis.protocol.exceptions.StateMachineException;
+import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
 import org.apache.ratis.server.DataStreamMap;
 import org.apache.ratis.server.DivisionInfo;
 import org.apache.ratis.server.DivisionProperties;
@@ -169,6 +171,8 @@ class RaftServerImpl implements RaftServer.Division,
   // So happens IllegalStateException: ILLEGAL TRANSITION: RUNNING -> RUNNING,
   private final AtomicBoolean startComplete;
 
+  private final TransferLeadership transferLeadership;
+
   RaftServerImpl(RaftGroup group, StateMachine stateMachine, RaftServerProxy proxy) throws IOException {
     final RaftPeerId id = proxy.getId();
     LOG.info("{}: new RaftServerImpl for {} with {}", id, group, stateMachine);
@@ -204,6 +208,8 @@ class RaftServerImpl implements RaftServer.Division,
           .build();
       return client;
     });
+
+    this.transferLeadership = new TransferLeadership(this);
   }
 
   @Override
@@ -589,7 +595,8 @@ class RaftServerImpl implements RaftServer.Division,
   /**
    * @return null if the server is in leader state.
    */
-  private CompletableFuture<RaftClientReply> checkLeaderState(RaftClientRequest request, CacheEntry entry) {
+  private CompletableFuture<RaftClientReply> checkLeaderState(RaftClientRequest request, CacheEntry entry,
+      boolean isWrite) {
     try {
       assertGroup(request.getRequestorId(), request.getRaftGroupId());
     } catch (GroupMismatchException e) {
@@ -610,6 +617,13 @@ class RaftServerImpl implements RaftServer.Division,
       final RaftClientReply reply = newExceptionReply(request, lnre);
       return RetryCacheImpl.failWithReply(reply, entry);
     }
+
+    if (isWrite && isSteppingDown()) {
+      final LeaderSteppingDownException lsde = new LeaderSteppingDownException(getMemberId());
+      final RaftClientReply reply = newExceptionReply(request, lsde);
+      return RetryCacheImpl.failWithReply(reply, entry);
+    }
+
     return null;
   }
 
@@ -653,7 +667,7 @@ class RaftServerImpl implements RaftServer.Division,
 
     final PendingRequest pending;
     synchronized (this) {
-      reply = checkLeaderState(request, cacheEntry);
+      reply = checkLeaderState(request, cacheEntry, true);
       if (reply != null) {
         return reply;
       }
@@ -718,7 +732,8 @@ class RaftServerImpl implements RaftServer.Division,
       replyFuture = staleReadAsync(request);
     } else {
       // first check the server's leader state
-      CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null);
+      CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null,
+          !request.is(TypeCase.READ) && !request.is(TypeCase.WATCH));
       if (reply != null) {
         return reply;
       }
@@ -866,18 +881,68 @@ class RaftServerImpl implements RaftServer.Division,
 
   @Override
   public RaftClientReply transferLeadership(TransferLeadershipRequest request) throws IOException {
-    //TODO(runzhiwang): implement transfer leadership in server
-    return null;
+    return waitForReply(request, transferLeadershipAsync(request));
+  }
+
+  private CompletableFuture<RaftClientReply> logAndReturnTransferLeadershipFail(
+      TransferLeadershipRequest request, String msg) {
+    LOG.warn(msg);
+    return CompletableFuture.completedFuture(
+        newExceptionReply(request, new TransferLeadershipException(getMemberId(), msg)));
+  }
+
+  boolean isSteppingDown() {
+    return transferLeadership.isSteppingDown();
+  }
+
+  void finishTransferLeadership() {
+    transferLeadership.finish(state.getLeaderId(), false);
   }
 
   @Override
   public CompletableFuture<RaftClientReply> transferLeadershipAsync(TransferLeadershipRequest request)
       throws IOException {
-    //TODO(runzhiwang): implement transfer leadership in server
-    return null;
+    LOG.info("{}: receive transferLeadership {}", getMemberId(), request);
+    assertLifeCycleState(LifeCycle.States.RUNNING);
+    assertGroup(request.getRequestorId(), request.getRaftGroupId());
+
+    synchronized (this) {
+      CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null, false);
+      if (reply != null) {
+        return reply;
+      }
+
+      if (getId().equals(request.getNewLeader())) {
+        return CompletableFuture.completedFuture(newSuccessReply(request));
+      }
+
+      final RaftConfigurationImpl conf = getRaftConf();
+      final LeaderStateImpl leaderState = role.getLeaderStateNonNull();
+
+      // make sure there is no raft reconfiguration in progress
+      if (!conf.isStable() || leaderState.inStagingState() || !state.isConfCommitted()) {
+        String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() +
+            " when raft reconfiguration in progress.";
+        return logAndReturnTransferLeadershipFail(request, msg);
+      }
+
+      if (!conf.containsInConf(request.getNewLeader())) {
+        String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() +
+            " as it is not in " + conf;
+        return logAndReturnTransferLeadershipFail(request, msg);
+      }
+
+      if (!conf.isHighestPriority(request.getNewLeader())) {
+        String msg = getMemberId() + " refused to transfer leadership to peer " + request.getNewLeader() +
+            " as it does not has highest priority " + conf;
+        return logAndReturnTransferLeadershipFail(request, msg);
+      }
+
+      return transferLeadership.start(request);
+    }
   }
 
-    @Override
+  @Override
   public RaftClientReply setConfiguration(SetConfigurationRequest request) throws IOException {
     return waitForReply(request, setConfigurationAsync(request));
   }
@@ -891,7 +956,7 @@ class RaftServerImpl implements RaftServer.Division,
     assertLifeCycleState(LifeCycle.States.RUNNING);
     assertGroup(request.getRequestorId(), request.getRaftGroupId());
 
-    CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null);
+    CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null, true);
     if (reply != null) {
       return reply;
     }
@@ -899,7 +964,7 @@ class RaftServerImpl implements RaftServer.Division,
     final List<RaftPeer> peersInNewConf = request.getPeersInNewConf();
     final PendingRequest pending;
     synchronized (this) {
-      reply = checkLeaderState(request, null);
+      reply = checkLeaderState(request, null, false);
       if (reply != null) {
         return reply;
       }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 466eeb2..908f0e7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -249,6 +249,9 @@ class ServerState implements Closeable {
       LOG.info("{}: change Leader from {} to {} at term {} for {}{}",
           getMemberId(), leaderId, newLeaderId, getCurrentTerm(), op, suffix);
       leaderId = newLeaderId;
+      if (leaderId != null) {
+        server.finishTransferLeadership();
+      }
     }
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
new file mode 100644
index 0000000..b6db5e3
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.TransferLeadershipRequest;
+import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutScheduler;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class TransferLeadership {
+  public static final Logger LOG = LoggerFactory.getLogger(TransferLeadership.class);
+
+  class PendingRequest {
+    private final TransferLeadershipRequest request;
+    private final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
+
+    PendingRequest(TransferLeadershipRequest request) {
+      this.request = request;
+    }
+
+    TransferLeadershipRequest getRequest() {
+      return request;
+    }
+
+    CompletableFuture<RaftClientReply> getReplyFuture() {
+      return replyFuture;
+    }
+
+    void complete(RaftPeerId currentLeader, boolean timeout) {
+      if (replyFuture.isDone()) {
+        return;
+      }
+
+      if (currentLeader != null && currentLeader.equals(request.getNewLeader())) {
+        replyFuture.complete(server.newSuccessReply(request));
+      } else if (timeout) {
+        final TransferLeadershipException tle = new TransferLeadershipException(server.getMemberId(),
+            "Failed to transfer leadership to " + request.getNewLeader() + ": current leader is " + currentLeader);
+        replyFuture.complete(server.newExceptionReply(request, tle));
+      }
+    }
+
+    @Override
+    public String toString() {
+      return request.toString();
+    }
+  }
+
+  private final RaftServerImpl server;
+  private final TimeoutScheduler scheduler = TimeoutScheduler.getInstance();
+  private final AtomicReference<PendingRequest> pending = new AtomicReference<>();
+
+  TransferLeadership(RaftServerImpl server) {
+    this.server = server;
+  }
+
+  boolean isSteppingDown() {
+    return pending.get() != null;
+  }
+
+  CompletableFuture<RaftClientReply> start(TransferLeadershipRequest request) {
+    final MemoizedSupplier<PendingRequest> supplier = JavaUtils.memoize(() -> new PendingRequest(request));
+    final PendingRequest previous = pending.getAndUpdate(f -> f != null? f: supplier.get());
+    if (previous != null) {
+      if (request.getNewLeader().equals(previous.getRequest().getNewLeader())) {
+        final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
+        previous.getReplyFuture().whenComplete((r, e) -> {
+          if (e != null) {
+            replyFuture.completeExceptionally(e);
+          } else {
+            replyFuture.complete(r.isSuccess()? server.newSuccessReply(request)
+                : server.newExceptionReply(request, r.getException()));
+          }
+        });
+        return replyFuture;
+      } else {
+        final TransferLeadershipException tle = new TransferLeadershipException(server.getMemberId(),
+            "Failed to transfer leadership to " + request.getNewLeader() + ": a previous " + previous + " exists");
+        return CompletableFuture.completedFuture(server.newExceptionReply(request, tle));
+      }
+    }
+
+    scheduler.onTimeout(TimeDuration.ONE_MINUTE,
+        () -> finish(server.getState().getLeaderId(), true),
+        LOG, () -> "Timeout check failed for append entry request: " + request);
+    return supplier.get().getReplyFuture();
+  }
+
+  void finish(RaftPeerId currentLeader, boolean timeout) {
+    Optional.ofNullable(pending.getAndSet(null))
+        .ifPresent(r -> r.complete(currentLeader, timeout));
+  }
+}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
index e50267a..416c17a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
@@ -77,17 +77,6 @@ public abstract class GroupManagementBaseTest extends BaseTest {
     return getClusterFactory().newCluster(peerNum, prop);
   }
 
-  private List<RaftPeer> getPeersWithPriority(List<RaftPeer> peers, int suggestedLeaderIndex) {
-    List<RaftPeer> peersWithPriority = new ArrayList<>();
-    for (int i = 0; i < peers.size(); i++) {
-      RaftPeer peer = peers.get(i);
-      final int priority = i == suggestedLeaderIndex? 2: 1;
-      peersWithPriority.add(
-          RaftPeer.newBuilder().setId(peer.getId()).setAddress(peer.getAddress()).setPriority(priority).build());
-    }
-    return peersWithPriority;
-  }
-
   @Test
   public void testGroupWithPriority() throws Exception {
     final MiniRaftCluster cluster = getCluster(0);
@@ -110,7 +99,7 @@ public abstract class GroupManagementBaseTest extends BaseTest {
     List<RaftPeer> peers = cluster.getPeers();
     Random r = new Random(1);
     final int suggestedLeaderIndex = r.nextInt(peers.size());
-    List<RaftPeer> peersWithPriority = getPeersWithPriority(peers, suggestedLeaderIndex);
+    List<RaftPeer> peersWithPriority = getPeersWithPriority(peers, peers.get(suggestedLeaderIndex));
     final RaftGroup newGroup = RaftGroup.valueOf(RaftGroupId.randomId(), peersWithPriority);
     LOG.info("add new group: " + newGroup);
     try (final RaftClient client = cluster.createClient(newGroup)) {
@@ -162,7 +151,7 @@ public abstract class GroupManagementBaseTest extends BaseTest {
 
     // change the suggest leader
     final int newSuggestedLeaderIndex = (suggestedLeaderIndex + 1) % peersWithPriority.size();
-    List<RaftPeer> peersWithNewPriority = getPeersWithPriority(peers, newSuggestedLeaderIndex);
+    List<RaftPeer> peersWithNewPriority = getPeersWithPriority(peers, peers.get(newSuggestedLeaderIndex));
     try (final RaftClient client = cluster.createClient(newGroup)) {
       RaftClientReply reply = client.setConfiguration(peersWithNewPriority.toArray(new RaftPeer[0]));
       Assert.assertTrue(reply.isSuccess());
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index cf9cfda..3cd843a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -25,6 +25,7 @@ import org.apache.ratis.metrics.RatisMetricRegistry;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftGroupMemberId;
+import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.DivisionInfo;
 import org.apache.ratis.server.RaftServer;
@@ -43,6 +44,7 @@ import org.slf4j.Logger;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
@@ -117,6 +119,41 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
   }
 
   @Test
+  public void testTransferLeader() throws Exception {
+    try(final MiniRaftCluster cluster = newCluster(3)) {
+      cluster.start();
+
+      final RaftServer.Division leader = waitForLeader(cluster);
+      try (RaftClient client = cluster.createClient(leader.getId())) {
+        client.io().send(new RaftTestUtil.SimpleMessage("message"));
+        Thread.sleep(1000);
+
+        List<RaftServer.Division> followers = cluster.getFollowers();
+        Assert.assertEquals(followers.size(), 2);
+        RaftServer.Division newLeader = followers.get(0);
+
+        List<RaftPeer> peers = cluster.getPeers();
+        List<RaftPeer> peersWithNewPriority = getPeersWithPriority(peers, newLeader.getPeer());
+        RaftClientReply reply = client.setConfiguration(peersWithNewPriority.toArray(new RaftPeer[0]));
+        Assert.assertTrue(reply.isSuccess());
+
+        reply = client.transferLeadership(leader.getGroup().getGroupId(), newLeader.getId());
+        assertTrue(reply.isSuccess());
+        Thread.sleep(1000);
+
+        final RaftServer.Division currLeader = waitForLeader(cluster);
+        assertTrue(newLeader.getId() == currLeader.getId());
+
+        reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
+        Assert.assertNotEquals(reply.getReplierId(), leader.getId());
+        Assert.assertTrue(reply.isSuccess());
+      }
+
+      cluster.shutdown();
+    }
+  }
+
+  @Test
   public void testEnforceLeader() throws Exception {
     LOG.info("Running testEnforceLeader");
     final int numServer = 5;