You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/25 11:08:01 UTC

[incubator-ratis] branch master updated: RATIS-1261. Add timeout in TransferLeadershipRequest (#373)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new c219b98  RATIS-1261. Add timeout in TransferLeadershipRequest (#373)
c219b98 is described below

commit c219b9841eaa239c965faa850c2cf9685f8d42b9
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Fri Dec 25 19:07:52 2020 +0800

    RATIS-1261. Add timeout in TransferLeadershipRequest (#373)
    
    * RATIS-1261. Add timeout in TransferLeadershipRequest
---
 .../java/org/apache/ratis/client/RaftClient.java   |  2 +-
 .../org/apache/ratis/client/impl/BlockingImpl.java |  5 +-
 .../apache/ratis/client/impl/ClientProtoUtils.java | 21 ++++---
 .../apache/ratis/client/impl/RaftClientImpl.java   |  5 +-
 .../apache/ratis/protocol/RaftClientRequest.java   | 24 +++++++-
 .../ratis/protocol/TransferLeadershipRequest.java  |  4 +-
 .../exceptions/LeaderSteppingDownException.java    | 12 ++--
 .../exceptions/TransferLeadershipException.java    | 12 ++--
 .../java/org/apache/ratis/util/TimeDuration.java   |  5 ++
 .../grpc/client/GrpcClientProtocolClient.java      |  5 +-
 .../java/org/apache/ratis/netty/NettyRpcProxy.java |  4 +-
 ratis-proto/src/main/proto/Raft.proto              |  1 +
 .../apache/ratis/server/impl/LeaderStateImpl.java  |  8 +--
 .../apache/ratis/server/impl/RaftServerImpl.java   |  4 +-
 .../ratis/server/impl/TransferLeadership.java      | 10 +--
 .../ratis/server/impl/LeaderElectionTests.java     | 71 ++++++++++++++++++++--
 16 files changed, 144 insertions(+), 49 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 0adf35b..78e0b77 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -69,7 +69,7 @@ public interface RaftClient extends Closeable {
   RaftClientReply setConfiguration(RaftPeer[] serversInNewConf) throws IOException;
 
   /** Transfer leadership to the given server.*/
-  RaftClientReply transferLeadership(RaftGroupId group, RaftPeerId newLeader) throws IOException;
+  RaftClientReply transferLeadership(RaftGroupId group, RaftPeerId newLeader, long timeoutMs) throws IOException;
 
   /** @return a {@link Builder}. */
   static Builder newBuilder() {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
index 7d03e62..ee15c5a 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/BlockingImpl.java
@@ -32,7 +32,9 @@ import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.exceptions.GroupMismatchException;
+import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
 import org.apache.ratis.protocol.exceptions.StateMachineException;
+import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.rpc.CallId;
 import org.apache.ratis.util.TimeDuration;
@@ -96,7 +98,8 @@ class BlockingImpl implements BlockingApi {
         if (reply != null) {
           return reply;
         }
-      } catch (GroupMismatchException | StateMachineException e) {
+      } catch (GroupMismatchException | StateMachineException | TransferLeadershipException |
+          LeaderSteppingDownException e) {
         throw e;
       } catch (IOException e) {
         ioe = e;
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 92459fd..d00c2b4 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -63,12 +63,12 @@ public interface ClientProtoUtils {
 
   static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(RaftGroupMemberId requestorId, RaftPeerId replyId) {
     return toRaftRpcRequestProtoBuilder(requestorId.getPeerId().toByteString(),
-        replyId.toByteString(), requestorId.getGroupId(), null, null, null);
+        replyId.toByteString(), requestorId.getGroupId(), null, null, null, 0);
   }
 
   static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
       ByteString requesterId, ByteString replyId, RaftGroupId groupId, Long callId,
-      SlidingWindowEntry slidingWindowEntry, RoutingTable routingTable) {
+      SlidingWindowEntry slidingWindowEntry, RoutingTable routingTable, long timeoutMs) {
     if (slidingWindowEntry == null) {
       slidingWindowEntry = SlidingWindowEntry.getDefaultInstance();
     }
@@ -78,7 +78,8 @@ public interface ClientProtoUtils {
         .setReplyId(replyId)
         .setRaftGroupId(ProtoUtils.toRaftGroupIdProtoBuilder(groupId))
         .setCallId(Optional.ofNullable(callId).orElseGet(CallId::getDefault))
-        .setSlidingWindowEntry(slidingWindowEntry);
+        .setSlidingWindowEntry(slidingWindowEntry)
+        .setTimeoutMs(timeoutMs);
 
     if (routingTable != null) {
       b.setRoutingTable(routingTable.toProto());
@@ -89,9 +90,10 @@ public interface ClientProtoUtils {
 
   static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
       ClientId requesterId, RaftPeerId replyId, RaftGroupId groupId, long callId,
-      SlidingWindowEntry slidingWindowEntry, RoutingTable routingTable) {
+      SlidingWindowEntry slidingWindowEntry, RoutingTable routingTable, long timeoutMs) {
     return toRaftRpcRequestProtoBuilder(
-        requesterId.toByteString(), replyId.toByteString(), groupId, callId, slidingWindowEntry, routingTable);
+        requesterId.toByteString(), replyId.toByteString(), groupId, callId, slidingWindowEntry, routingTable,
+        timeoutMs);
   }
 
   static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
@@ -102,7 +104,8 @@ public interface ClientProtoUtils {
         request.getRaftGroupId(),
         request.getCallId(),
         request.getSlidingWindowEntry(),
-        request.getRoutingTable());
+        request.getRoutingTable(),
+        request.getTimeoutMs());
   }
 
   static RaftClientRequest.Type toRaftClientRequestType(RaftClientRequestProto p) {
@@ -156,6 +159,7 @@ public interface ClientProtoUtils {
         .setType(type)
         .setSlidingWindowEntry(request.getSlidingWindowEntry())
         .setRoutingTable(getRoutingTable(request))
+        .setTimeoutMs(request.getTimeoutMs())
         .build();
   }
 
@@ -206,7 +210,7 @@ public interface ClientProtoUtils {
       long seqNum, ByteString content) {
     return RaftClientRequestProto.newBuilder()
         .setRpcRequest(toRaftRpcRequestProtoBuilder(
-            clientId, serverId, groupId, callId, ProtoUtils.toSlidingWindowEntry(seqNum, false), null))
+            clientId, serverId, groupId, callId, ProtoUtils.toSlidingWindowEntry(seqNum, false), null, 0))
         .setWrite(WriteRequestTypeProto.getDefaultInstance())
         .setMessage(toClientMessageEntryProtoBuilder(content))
         .build();
@@ -441,7 +445,8 @@ public interface ClientProtoUtils {
         RaftPeerId.valueOf(m.getReplyId()),
         ProtoUtils.toRaftGroupId(m.getRaftGroupId()),
         p.getRpcRequest().getCallId(),
-        newLeader.getId());
+        newLeader.getId(),
+        m.getTimeoutMs());
   }
 
   static TransferLeadershipRequestProto toTransferLeadershipRequestProto(
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index b318a28..25a1586 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -221,11 +221,12 @@ public final class RaftClientImpl implements RaftClient {
   }
 
   @Override
-  public RaftClientReply transferLeadership(RaftGroupId raftGroupId, RaftPeerId newLeader) throws IOException {
+  public RaftClientReply transferLeadership(RaftGroupId raftGroupId, RaftPeerId newLeader, long timeoutMs)
+      throws IOException {
     Objects.requireNonNull(newLeader, "newLeader == null");
     final long callId = CallId.getAndIncrement();
     return io().sendRequestWithRetry(() -> new TransferLeadershipRequest(
-        clientId, leaderId, groupId, callId, newLeader));
+        clientId, leaderId, groupId, callId, newLeader, timeoutMs));
   }
 
   // TODO: change peersInNewConf to List<RaftPeer>
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 4aca94f..6f7a28b 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -237,10 +237,11 @@ public class RaftClientRequest extends RaftClientMessage {
     private Type type;
     private SlidingWindowEntry slidingWindowEntry;
     private RoutingTable routingTable;
+    private long timeoutMs;
 
     public RaftClientRequest build() {
       return new RaftClientRequest(
-          clientId, serverId, groupId, callId, message, type, slidingWindowEntry, routingTable);
+          clientId, serverId, groupId, callId, message, type, slidingWindowEntry, routingTable, timeoutMs);
     }
 
     public Builder setClientId(ClientId clientId) {
@@ -282,6 +283,11 @@ public class RaftClientRequest extends RaftClientMessage {
       this.routingTable = routingTable;
       return this;
     }
+
+    public Builder setTimeoutMs(long timeoutMs) {
+      this.timeoutMs = timeoutMs;
+      return this;
+    }
   }
 
   public static Builder newBuilder() {
@@ -308,20 +314,28 @@ public class RaftClientRequest extends RaftClientMessage {
 
   private final RoutingTable routingTable;
 
+  private final long timeoutMs;
+
   protected RaftClientRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId, Type type) {
-    this(clientId, serverId, groupId, callId, null, type, null, null);
+    this(clientId, serverId, groupId, callId, null, type, null, null, 0);
+  }
+
+  protected RaftClientRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId, Type type,
+      long timeoutMs) {
+    this(clientId, serverId, groupId, callId, null, type, null, null, timeoutMs);
   }
 
   @SuppressWarnings("parameternumber")
   private RaftClientRequest(
       ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
       long callId, Message message, Type type, SlidingWindowEntry slidingWindowEntry,
-      RoutingTable routingTable) {
+      RoutingTable routingTable, long timeoutMs) {
     super(clientId, serverId, groupId, callId);
     this.message = message;
     this.type = type;
     this.slidingWindowEntry = slidingWindowEntry != null? slidingWindowEntry: SlidingWindowEntry.getDefaultInstance();
     this.routingTable = routingTable;
+    this.timeoutMs = timeoutMs;
   }
 
   @Override
@@ -349,6 +363,10 @@ public class RaftClientRequest extends RaftClientMessage {
     return routingTable;
   }
 
+  public long getTimeoutMs() {
+    return timeoutMs;
+  }
+
   @Override
   public String toString() {
     return super.toString() + ", seq=" + ProtoUtils.toString(slidingWindowEntry) + ", "
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/TransferLeadershipRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/TransferLeadershipRequest.java
index 979ba7a..c80e489 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/TransferLeadershipRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/TransferLeadershipRequest.java
@@ -21,8 +21,8 @@ public class TransferLeadershipRequest extends RaftClientRequest {
   private final RaftPeerId newLeader;
 
   public TransferLeadershipRequest(
-      ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId, RaftPeerId newLeader) {
-    super(clientId, serverId, groupId, callId, readRequestType());
+      ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId, RaftPeerId newLeader, long timeoutMs) {
+    super(clientId, serverId, groupId, callId, readRequestType(), timeoutMs);
     this.newLeader = newLeader;
   }
 
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/LeaderSteppingDownException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/LeaderSteppingDownException.java
index bc81893..a1d7ea9 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/LeaderSteppingDownException.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/LeaderSteppingDownException.java
@@ -17,17 +17,13 @@
  */
 package org.apache.ratis.protocol.exceptions;
 
-import org.apache.ratis.protocol.RaftGroupMemberId;
-
 public class LeaderSteppingDownException extends RaftException {
-  private final RaftGroupMemberId serverId;
 
-  public LeaderSteppingDownException(RaftGroupMemberId id) {
-    super(id + " is in steppingDown");
-    this.serverId = id;
+  public LeaderSteppingDownException(String message) {
+    super(message);
   }
 
-  public RaftGroupMemberId getServerId() {
-    return serverId;
+  public LeaderSteppingDownException(String message, Throwable t) {
+    super(message, t);
   }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/TransferLeadershipException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/TransferLeadershipException.java
index 2a503aa..f859a28 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/TransferLeadershipException.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/TransferLeadershipException.java
@@ -17,17 +17,13 @@
  */
 package org.apache.ratis.protocol.exceptions;
 
-import org.apache.ratis.protocol.RaftGroupMemberId;
-
 public class TransferLeadershipException extends RaftException {
-  private final RaftGroupMemberId serverId;
 
-  public TransferLeadershipException(RaftGroupMemberId id, String msg) {
-    super(msg);
-    this.serverId = id;
+  public TransferLeadershipException(String message) {
+    super(message);
   }
 
-  public RaftGroupMemberId getServerId() {
-    return serverId;
+  public TransferLeadershipException(String message, Throwable t) {
+    super(message, t);
   }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
index 54d0dab..a7d2e41 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/TimeDuration.java
@@ -205,6 +205,11 @@ public final class TimeDuration implements Comparable<TimeDuration> {
     return valueOf(this.toLong(minUnit) + that.toLong(minUnit), minUnit);
   }
 
+  /** @return (this + (thatDuration, thatUnit)) in the minimum unit among them. */
+  public TimeDuration add(long thatDuration, TimeUnit thatUnit) {
+    return add(TimeDuration.valueOf(thatDuration, thatUnit));
+  }
+
   /** @return (this - that) in the minimum unit among them. */
   public TimeDuration subtract(TimeDuration that) {
     Objects.requireNonNull(that, "that == null");
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index e29baa4..1d366f9 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -68,6 +68,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -181,8 +182,10 @@ public class GrpcClientProtocolClient implements Closeable {
 
   RaftClientReplyProto transferLeadership(
       TransferLeadershipRequestProto request) throws IOException {
+    TimeDuration newDuration = requestTimeoutDuration.add(
+        request.getRpcRequest().getTimeoutMs(), TimeUnit.MILLISECONDS);
     return blockingCall(() -> blockingStub
-        .withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
+        .withDeadlineAfter(newDuration.getDuration(), newDuration.getUnit())
         .transferLeadership(request));
   }
 
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
index 52f9915..65a8052 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
@@ -42,6 +42,7 @@ import java.util.LinkedList;
 import java.util.Queue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.ratis.proto.netty.NettyProtos.RaftNettyServerReplyProto.RaftNettyServerReplyCase.EXCEPTIONREPLY;
@@ -182,7 +183,8 @@ public class NettyRpcProxy implements Closeable {
 
     try {
       channelFuture.sync();
-      return reply.get(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit());
+      TimeDuration newDuration = requestTimeoutDuration.add(request.getTimeoutMs(), TimeUnit.MILLISECONDS);
+      return reply.get(newDuration.getDuration(), newDuration.getUnit());
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw IOUtils.toInterruptedIOException(ProtoUtils.toString(request)
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index 468c458..1d5e57e 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -111,6 +111,7 @@ message RaftRpcRequestProto {
   RaftGroupIdProto raftGroupId = 3;
   uint64 callId = 4;
 
+  uint64 timeoutMs = 13;
   RoutingTableProto routingTable = 14;
   SlidingWindowEntry slidingWindowEntry = 15;
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 8b6d391..d54e337 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -872,9 +872,9 @@ class LeaderStateImpl implements LeaderState {
 
       final TermIndex leaderLastEntry = server.getState().getLastEntry();
       if (leaderLastEntry == null) {
-        LOG.info("{} stepDown leadership on term:{} because follower's priority:{} is higher than leader's:{} " +
+        LOG.info("{} stepDown leadership on term:{} because follower {}'s priority:{} is higher than leader's:{} " +
                 "and leader's lastEntry is null",
-            this, currentTerm, followerPriority, leaderPriority);
+            this, currentTerm, followerID, followerPriority, leaderPriority);
 
         // step down as follower
         yieldLeaderToHigherPriorityPeer(currentTerm, leaderLastEntry);
@@ -882,9 +882,9 @@ class LeaderStateImpl implements LeaderState {
       }
 
       if (followerInfo.getMatchIndex() >= leaderLastEntry.getIndex()) {
-        LOG.info("{} stepDown leadership on term:{} because follower's priority:{} is higher than leader's:{} " +
+        LOG.info("{} stepDown leadership on term:{} because follower {}'s priority:{} is higher than leader's:{} " +
                 "and follower's lastEntry index:{} catch up with leader's:{}",
-            this, currentTerm, followerPriority, leaderPriority, followerInfo.getMatchIndex(),
+            this, currentTerm, followerID, followerPriority, leaderPriority, followerInfo.getMatchIndex(),
             leaderLastEntry.getIndex());
 
         // step down as follower
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 22494b3..f24c3bc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -619,7 +619,7 @@ class RaftServerImpl implements RaftServer.Division,
     }
 
     if (isWrite && isSteppingDown()) {
-      final LeaderSteppingDownException lsde = new LeaderSteppingDownException(getMemberId());
+      final LeaderSteppingDownException lsde = new LeaderSteppingDownException(getMemberId() + " is stepping down");
       final RaftClientReply reply = newExceptionReply(request, lsde);
       return RetryCacheImpl.failWithReply(reply, entry);
     }
@@ -888,7 +888,7 @@ class RaftServerImpl implements RaftServer.Division,
       TransferLeadershipRequest request, String msg) {
     LOG.warn(msg);
     return CompletableFuture.completedFuture(
-        newExceptionReply(request, new TransferLeadershipException(getMemberId(), msg)));
+        newExceptionReply(request, new TransferLeadershipException(msg)));
   }
 
   boolean isSteppingDown() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
index b6db5e3..03b1dfc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
@@ -30,6 +30,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 public class TransferLeadership {
@@ -59,8 +60,9 @@ public class TransferLeadership {
       if (currentLeader != null && currentLeader.equals(request.getNewLeader())) {
         replyFuture.complete(server.newSuccessReply(request));
       } else if (timeout) {
-        final TransferLeadershipException tle = new TransferLeadershipException(server.getMemberId(),
-            "Failed to transfer leadership to " + request.getNewLeader() + ": current leader is " + currentLeader);
+        final TransferLeadershipException tle = new TransferLeadershipException(server.getMemberId()
+            + ": Failed to transfer leadership to " + request.getNewLeader()
+            + " (timed out " + request.getTimeoutMs() + "ms): current leader is " + currentLeader);
         replyFuture.complete(server.newExceptionReply(request, tle));
       }
     }
@@ -99,13 +101,13 @@ public class TransferLeadership {
         });
         return replyFuture;
       } else {
-        final TransferLeadershipException tle = new TransferLeadershipException(server.getMemberId(),
+        final TransferLeadershipException tle = new TransferLeadershipException(server.getMemberId() +
             "Failed to transfer leadership to " + request.getNewLeader() + ": a previous " + previous + " exists");
         return CompletableFuture.completedFuture(server.newExceptionReply(request, tle));
       }
     }
 
-    scheduler.onTimeout(TimeDuration.ONE_MINUTE,
+    scheduler.onTimeout(TimeDuration.valueOf(request.getTimeoutMs(), TimeUnit.MILLISECONDS),
         () -> finish(server.getState().getLeaderId(), true),
         LOG, () -> "Timeout check failed for append entry request: " + request);
     return supplier.get().getReplyFuture();
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index 3cd843a..4a45bbe 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -27,6 +27,8 @@ import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.LeaderSteppingDownException;
+import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
 import org.apache.ratis.server.DivisionInfo;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
@@ -46,6 +48,7 @@ import java.io.IOException;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 
@@ -126,7 +129,6 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
       final RaftServer.Division leader = waitForLeader(cluster);
       try (RaftClient client = cluster.createClient(leader.getId())) {
         client.io().send(new RaftTestUtil.SimpleMessage("message"));
-        Thread.sleep(1000);
 
         List<RaftServer.Division> followers = cluster.getFollowers();
         Assert.assertEquals(followers.size(), 2);
@@ -137,15 +139,14 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
         RaftClientReply reply = client.setConfiguration(peersWithNewPriority.toArray(new RaftPeer[0]));
         Assert.assertTrue(reply.isSuccess());
 
-        reply = client.transferLeadership(leader.getGroup().getGroupId(), newLeader.getId());
+        reply = client.transferLeadership(leader.getGroup().getGroupId(), newLeader.getId(), 20000);
         assertTrue(reply.isSuccess());
-        Thread.sleep(1000);
 
         final RaftServer.Division currLeader = waitForLeader(cluster);
         assertTrue(newLeader.getId() == currLeader.getId());
 
         reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
-        Assert.assertNotEquals(reply.getReplierId(), leader.getId());
+        Assert.assertTrue(reply.getReplierId().equals(newLeader.getId().toString()));
         Assert.assertTrue(reply.isSuccess());
       }
 
@@ -154,6 +155,68 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
   }
 
   @Test
+  public void testTransferLeaderTimeout() throws Exception {
+    try(final MiniRaftCluster cluster = newCluster(3)) {
+      cluster.start();
+
+      final RaftServer.Division leader = waitForLeader(cluster);
+      try (RaftClient client = cluster.createClient(leader.getId())) {
+        List<RaftServer.Division> followers = cluster.getFollowers();
+        Assert.assertEquals(followers.size(), 2);
+        RaftServer.Division newLeader = followers.get(0);
+
+        // isolate new leader, so that transfer leadership will timeout
+        isolate(cluster, newLeader.getId());
+
+        List<RaftPeer> peers = cluster.getPeers();
+        List<RaftPeer> peersWithNewPriority = getPeersWithPriority(peers, newLeader.getPeer());
+        RaftClientReply reply = client.setConfiguration(peersWithNewPriority.toArray(new RaftPeer[0]));
+        Assert.assertTrue(reply.isSuccess());
+
+        CompletableFuture<Boolean> transferTimeoutFuture = CompletableFuture.supplyAsync(() -> {
+          try {
+            long timeoutMs = 5000;
+            long start = System.currentTimeMillis();
+            try {
+              client.transferLeadership(leader.getGroup().getGroupId(), newLeader.getId(), timeoutMs);
+            } catch (TransferLeadershipException e) {
+              long cost = System.currentTimeMillis() - start;
+              Assert.assertTrue(cost > timeoutMs);
+              Assert.assertTrue(e.getMessage().contains("Failed to transfer leadership to"));
+              Assert.assertTrue(e.getMessage().contains("timed out"));
+            }
+
+            return true;
+          } catch (IOException e) {
+            return false;
+          }
+        });
+
+        // before transfer timeout, leader should in steppingDown
+        JavaUtils.attemptRepeatedly(() -> {
+          try {
+            client.io().send(new RaftTestUtil.SimpleMessage("message"));
+          } catch (LeaderSteppingDownException e) {
+            Assert.assertTrue(e.getMessage().contains("is stepping down"));
+          }
+          return null;
+        }, 5, TimeDuration.ONE_SECOND, "check leader steppingDown", RaftServer.LOG);
+
+        Assert.assertTrue(transferTimeoutFuture.get());
+
+        // after transfer timeout, leader should accept request
+        reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
+        Assert.assertTrue(reply.getReplierId().equals(leader.getId().toString()));
+        Assert.assertTrue(reply.isSuccess());
+
+        deIsolate(cluster, newLeader.getId());
+      }
+
+      cluster.shutdown();
+    }
+  }
+
+  @Test
   public void testEnforceLeader() throws Exception {
     LOG.info("Running testEnforceLeader");
     final int numServer = 5;