You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2023/03/19 06:34:37 UTC

[ratis] branch master updated: RATIS-1770. Yield leader to higher priority peer by TransferLeadership (#845)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new ee642b0b5 RATIS-1770. Yield leader to higher priority peer by TransferLeadership (#845)
ee642b0b5 is described below

commit ee642b0b537629108227864f3458daf20950b9bc
Author: Kaijie Chen <ck...@apache.org>
AuthorDate: Sun Mar 19 14:34:31 2023 +0800

    RATIS-1770. Yield leader to higher priority peer by TransferLeadership (#845)
---
 .../apache/ratis/server/impl/LeaderStateImpl.java  |  68 ++----
 .../apache/ratis/server/impl/RaftServerImpl.java   |  11 +-
 .../org/apache/ratis/server/impl/ServerState.java  |   1 -
 .../ratis/server/impl/TransferLeadership.java      | 235 ++++++++++++++++++---
 .../ratis/server/impl/LeaderElectionTests.java     |   4 +-
 .../shell/cli/sh/election/TransferCommand.java     |   8 +-
 6 files changed, 234 insertions(+), 93 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 694183bcf..6627d8e7b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -26,8 +26,6 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase;
 import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
 import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
 import org.apache.ratis.proto.RaftProtos.RoleInfoProto;
-import org.apache.ratis.proto.RaftProtos.StartLeaderElectionReplyProto;
-import org.apache.ratis.proto.RaftProtos.StartLeaderElectionRequestProto;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
@@ -62,6 +60,7 @@ import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.Timestamp;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -424,6 +423,10 @@ class LeaderStateImpl implements LeaderState {
     return currentTerm;
   }
 
+  TermIndex getLastEntry() {
+    return server.getState().getLastEntry();
+  }
+
   @Override
   public boolean onFollowerTerm(FollowerInfo follower, long followerTerm) {
     if (isAttendingVote(follower) && followerTerm > getCurrentTerm()) {
@@ -658,46 +661,14 @@ class LeaderStateImpl implements LeaderState {
     return pendingStepDown.submitAsync(request);
   }
 
-  private synchronized void sendStartLeaderElection(RaftPeerId follower, TermIndex lastEntry) {
-    ServerState state = server.getState();
-    TermIndex currLastEntry = state.getLastEntry();
-    if (ServerState.compareLog(currLastEntry, lastEntry) != 0) {
-      LOG.warn("{} can not send StartLeaderElectionRequest to follower:{} because currLastEntry:{} " +
-              "did not match lastEntry:{}", this, follower, currLastEntry, lastEntry);
-      return;
-    }
-    LOG.info("{}: send StartLeaderElectionRequest to follower {} on term {}, lastEntry={}",
-        this, follower, currentTerm, lastEntry);
-
-    final StartLeaderElectionRequestProto r = ServerProtoUtils.toStartLeaderElectionRequestProto(
-        server.getMemberId(), follower, lastEntry);
-    CompletableFuture.supplyAsync(() -> {
-      server.getLeaderElectionMetrics().onTransferLeadership();
-      try {
-        StartLeaderElectionReplyProto replyProto = server.getServerRpc().startLeaderElection(r);
-        LOG.info("{} received {} reply of StartLeaderElectionRequest from follower:{}",
-            this, replyProto.getServerReply().getSuccess() ? "success" : "fail", follower);
-      } catch (IOException e) {
-        LOG.warn("{} send StartLeaderElectionRequest throw exception", this, e);
+  private static LogAppender chooseUpToDateFollower(List<LogAppender> followers, TermIndex leaderLastEntry) {
+    for(LogAppender f : followers) {
+      if (TransferLeadership.isFollowerUpToDate(f.getFollower(), leaderLastEntry)
+          == TransferLeadership.Result.SUCCESS) {
+        return f;
       }
-      return null;
-    });
-  }
-
-  boolean sendStartLeaderElection(FollowerInfo followerInfo) {
-    final RaftPeerId followerId = followerInfo.getId();
-    final TermIndex leaderLastEntry = server.getState().getLastEntry();
-    if (leaderLastEntry == null) {
-      sendStartLeaderElection(followerId, null);
-      return true;
     }
-
-    final long followerMatchIndex = followerInfo.getMatchIndex();
-    if (followerMatchIndex >= leaderLastEntry.getIndex()) {
-      sendStartLeaderElection(followerId, leaderLastEntry);
-      return true;
-    }
-    return false;
+    return null;
   }
 
   private void prepare() {
@@ -778,7 +749,7 @@ class LeaderStateImpl implements LeaderState {
     } else {
       eventQueue.submit(checkStagingEvent);
     }
-    server.getTransferLeadership().onFollowerAppendEntriesReply(this, follower);
+    server.getTransferLeadership().onFollowerAppendEntriesReply(follower);
   }
 
   @Override
@@ -1044,7 +1015,7 @@ class LeaderStateImpl implements LeaderState {
     }
     final int leaderPriority = leader.getPriority();
 
-    FollowerInfo highestPriorityInfo = null;
+    final List<LogAppender> highestPriorityInfos = new ArrayList<>();
     int highestPriority = Integer.MIN_VALUE;
     for (LogAppender logAppender : senders) {
       final RaftPeer follower = conf.getPeer(logAppender.getFollowerId());
@@ -1053,12 +1024,17 @@ class LeaderStateImpl implements LeaderState {
       }
       final int followerPriority = follower.getPriority();
       if (followerPriority > leaderPriority && followerPriority >= highestPriority) {
-        highestPriority = followerPriority;
-        highestPriorityInfo = logAppender.getFollower();
+        if (followerPriority > highestPriority) {
+          highestPriority = followerPriority;
+          highestPriorityInfos.clear();
+        }
+        highestPriorityInfos.add(logAppender);
       }
     }
-    if (highestPriorityInfo != null) {
-      sendStartLeaderElection(highestPriorityInfo);
+    final TermIndex leaderLastEntry = getLastEntry();
+    final LogAppender appender = chooseUpToDateFollower(highestPriorityInfos, leaderLastEntry);
+    if (appender != null) {
+      server.getTransferLeadership().start(appender);
     }
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 74382b968..9204faf68 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -225,7 +225,7 @@ class RaftServerImpl implements RaftServer.Division,
         .setProperties(getRaftServer().getProperties())
         .build());
 
-    this.transferLeadership = new TransferLeadership(this);
+    this.transferLeadership = new TransferLeadership(this, properties);
     this.snapshotRequestHandler = new SnapshotManagementRequestHandler(this);
     this.snapshotInstallationHandler = new SnapshotInstallationHandler(this, properties);
 
@@ -1075,10 +1075,6 @@ class RaftServerImpl implements RaftServer.Division,
     return transferLeadership.isSteppingDown();
   }
 
-  void finishTransferLeadership() {
-    transferLeadership.finish(state.getLeaderId(), false);
-  }
-
   CompletableFuture<RaftClientReply> transferLeadershipAsync(TransferLeadershipRequest request)
       throws IOException {
     if (request.getNewLeader() == null) {
@@ -1464,6 +1460,10 @@ class RaftServerImpl implements RaftServer.Division,
     return commitInfoCache.update(getPeer(), state.getLog().getLastCommittedIndex());
   }
 
+  ExecutorService getServerExecutor() {
+    return serverExecutor;
+  }
+
   @SuppressWarnings("checkstyle:parameternumber")
   private CompletableFuture<AppendEntriesReplyProto> appendEntriesAsync(
       RaftPeerId leaderId, long leaderTerm, TermIndex previous, long leaderCommit, long callId, boolean initializing,
@@ -1853,5 +1853,6 @@ class RaftServerImpl implements RaftServer.Division,
 
   void onGroupLeaderElected() {
     this.firstElectionSinceStartup.set(false);
+    transferLeadership.complete(TransferLeadership.Result.SUCCESS);
   }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index e0832c9be..2954ccdcf 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -260,7 +260,6 @@ class ServerState {
       LOG.info("{}: change Leader from {} to {} at term {} for {}{}",
           getMemberId(), oldLeaderId, newLeaderId, getCurrentTerm(), op, suffix);
       if (newLeaderId != null) {
-        server.finishTransferLeadership();
         server.onGroupLeaderElected();
       }
     }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
index c5c1a46cb..beab02b67 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/TransferLeadership.java
@@ -17,27 +17,110 @@
  */
 package org.apache.ratis.server.impl;
 
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.TransferLeadershipRequest;
 import org.apache.ratis.protocol.exceptions.TransferLeadershipException;
+import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.leader.FollowerInfo;
 import org.apache.ratis.server.leader.LogAppender;
+import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.StringUtils;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.TimeoutExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
 public class TransferLeadership {
   public static final Logger LOG = LoggerFactory.getLogger(TransferLeadership.class);
 
+  private static class Context {
+    private final TransferLeadershipRequest request;
+    private final Supplier<LogAppender> transferee;
+
+    Context(TransferLeadershipRequest request, Supplier<LogAppender> transferee) {
+      this.request = request;
+      this.transferee = transferee;
+    }
+
+    TransferLeadershipRequest getRequest() {
+      return request;
+    }
+
+    RaftPeerId getTransfereeId() {
+      return request.getNewLeader();
+    }
+
+    LogAppender getTransfereeLogAppender() {
+      return transferee.get();
+    }
+  }
+
+  static class Result {
+    enum Type {
+      SUCCESS,
+      DIFFERENT_LEADER,
+      NULL_FOLLOWER,
+      NULL_LOG_APPENDER,
+      NOT_UP_TO_DATE,
+      TIMED_OUT,
+      FAILED_TO_START,
+      COMPLETED_EXCEPTIONALLY,
+    }
+
+    static final Result SUCCESS = new Result(Type.SUCCESS);
+    static final Result DIFFERENT_LEADER = new Result(Type.DIFFERENT_LEADER);
+    static final Result NULL_FOLLOWER = new Result(Type.NULL_FOLLOWER);
+    static final Result NULL_LOG_APPENDER = new Result(Type.NULL_LOG_APPENDER);
+
+    private final Type type;
+    private final String errorMessage;
+    private final Throwable exception;
+
+    private Result(Type type) {
+      this(type, null);
+    }
+
+    private Result(Type type, String errorMessage, Throwable exception) {
+      this.type = type;
+      this.errorMessage = errorMessage;
+      this.exception = exception;
+    }
+
+    Result(Type type, String errorMessage) {
+      this(type, errorMessage, null);
+    }
+
+    Result(Throwable t) {
+      this(Type.COMPLETED_EXCEPTIONALLY, null, t);
+    }
+
+    Type getType() {
+      return type;
+    }
+
+    @Override
+    public String toString() {
+      if (exception == null) {
+        return type + (errorMessage == null ? "" : "(" + errorMessage + ")");
+      }
+      return type + ": " + StringUtils.stringifyException(exception);
+    }
+  }
+
   class PendingRequest {
     private final TransferLeadershipRequest request;
     private final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
@@ -54,17 +137,20 @@ public class TransferLeadership {
       return replyFuture;
     }
 
-    void complete(RaftPeerId currentLeader, boolean timeout) {
+    void complete(Result result) {
       if (replyFuture.isDone()) {
         return;
       }
-
+      final RaftPeerId currentLeader = server.getState().getLeaderId();
       if (currentLeader != null && currentLeader.equals(request.getNewLeader())) {
         replyFuture.complete(server.newSuccessReply(request));
-      } else if (timeout) {
+      } else {
+        if (result.getType() == Result.Type.SUCCESS) {
+          result = Result.DIFFERENT_LEADER;
+        }
         final TransferLeadershipException tle = new TransferLeadershipException(server.getMemberId()
             + ": Failed to transfer leadership to " + request.getNewLeader()
-            + " (timed out " + request.getTimeoutMs() + "ms): current leader is " + currentLeader);
+            + " (the current leader is " + currentLeader + "): " + result);
         replyFuture.complete(server.newExceptionReply(request, tle));
       }
     }
@@ -76,11 +162,14 @@ public class TransferLeadership {
   }
 
   private final RaftServerImpl server;
+  private final TimeDuration requestTimeout;
   private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
+
   private final AtomicReference<PendingRequest> pending = new AtomicReference<>();
 
-  TransferLeadership(RaftServerImpl server) {
+  TransferLeadership(RaftServerImpl server, RaftProperties properties) {
     this.server = server;
+    this.requestTimeout = RaftServerConfigKeys.Rpc.requestTimeout(properties);
   }
 
   private Optional<RaftPeerId> getTransferee() {
@@ -92,50 +181,130 @@ public class TransferLeadership {
     return pending.get() != null;
   }
 
-  void onFollowerAppendEntriesReply(LeaderStateImpl leaderState, FollowerInfo follower) {
-    final Optional<RaftPeerId> transferee = getTransferee();
-    // If the transferee has just append some entries and becomes up-to-date,
-    // send StartLeaderElection to it
-    if (transferee.filter(t -> t.equals(follower.getId())).isPresent()
-        && leaderState.sendStartLeaderElection(follower)) {
+  static Result isFollowerUpToDate(FollowerInfo follower, TermIndex leaderLastEntry) {
+    if (follower == null) {
+      return Result.NULL_FOLLOWER;
+    } else if (leaderLastEntry != null) {
+      final long followerMatchIndex = follower.getMatchIndex();
+      if (followerMatchIndex < leaderLastEntry.getIndex()) {
+        return new Result(Result.Type.NOT_UP_TO_DATE, "followerMatchIndex = " + followerMatchIndex
+            + " < leaderLastEntry.getIndex() = " + leaderLastEntry.getIndex());
+      }
+    }
+    return Result.SUCCESS;
+  }
+
+  private Result sendStartLeaderElection(FollowerInfo follower) {
+    final TermIndex lastEntry = server.getState().getLastEntry();
+
+    final Result result = isFollowerUpToDate(follower, lastEntry);
+    if (result != Result.SUCCESS) {
+      return result;
+    }
+
+    final RaftPeerId transferee = follower.getId();
+    LOG.info("{}: sendStartLeaderElection to follower {}, lastEntry={}",
+        server.getMemberId(), transferee, lastEntry);
+
+    final RaftProtos.StartLeaderElectionRequestProto r = ServerProtoUtils.toStartLeaderElectionRequestProto(
+        server.getMemberId(), transferee, lastEntry);
+    final CompletableFuture<RaftProtos.StartLeaderElectionReplyProto> f = CompletableFuture.supplyAsync(() -> {
+      server.getLeaderElectionMetrics().onTransferLeadership();
+      try {
+        return server.getServerRpc().startLeaderElection(r);
+      } catch (IOException e) {
+        throw new CompletionException("Failed to sendStartLeaderElection to follower " + transferee, e);
+      }
+    }, server.getServerExecutor()).whenComplete((reply, exception) -> {
+      if (reply != null) {
+        LOG.info("{}: Received startLeaderElection reply from {}: success? {}",
+            server.getMemberId(), transferee, reply.getServerReply().getSuccess());
+      } else if (exception != null) {
+        LOG.warn(server.getMemberId() + ": Failed to startLeaderElection for " + transferee, exception);
+      }
+    });
+
+    if (f.isCompletedExceptionally()) { // already failed
+      try {
+        f.join();
+      } catch (Throwable t) {
+        return new Result(t);
+      }
+    }
+    return Result.SUCCESS;
+  }
+
+  /**
+   * If the transferee has just append some entries and becomes up-to-date,
+   * send StartLeaderElection to it
+   */
+  void onFollowerAppendEntriesReply(FollowerInfo follower) {
+    if (!getTransferee().filter(t -> t.equals(follower.getId())).isPresent()) {
+      return;
+    }
+    final Result result = sendStartLeaderElection(follower);
+    if (result == Result.SUCCESS) {
       LOG.info("{}: sent StartLeaderElection to transferee {} after received AppendEntriesResponse",
-          server.getMemberId(), transferee.get());
+          server.getMemberId(), follower.getId());
     }
   }
 
-  private void tryTransferLeadership(LeaderStateImpl leaderState, RaftPeerId transferee) {
+  private Result tryTransferLeadership(Context context) {
+    final RaftPeerId transferee = context.getTransfereeId();
     LOG.info("{}: start transferring leadership to {}", server.getMemberId(), transferee);
-    final LogAppender appender = leaderState.getLogAppender(transferee).orElse(null);
-
+    final LogAppender appender = context.getTransfereeLogAppender();
     if (appender == null) {
-      LOG.error("{}: cannot find LogAppender for transferee {}", server.getMemberId(), transferee);
-      return;
+      return Result.NULL_LOG_APPENDER;
     }
     final FollowerInfo follower = appender.getFollower();
-    if (leaderState.sendStartLeaderElection(follower)) {
-      LOG.info("{}: sent StartLeaderElection to transferee {} immediately as it already has up-to-date log",
-          server.getMemberId(), transferee);
-    } else {
-      LOG.info("{}: notifying LogAppender to send AppendEntries as transferee {} is not up-to-date",
-          server.getMemberId(), transferee);
+    final Result result = sendStartLeaderElection(follower);
+    if (result.getType() == Result.Type.SUCCESS) {
+      LOG.info("{}: {} sent StartLeaderElection to transferee {} immediately as it already has up-to-date log",
+          server.getMemberId(), result, transferee);
+    } else if (result.getType() == Result.Type.NOT_UP_TO_DATE) {
+      LOG.info("{}: {} notifying LogAppender to send AppendEntries to transferee {}",
+          server.getMemberId(), result, transferee);
       appender.notifyLogAppender();
     }
+    return result;
+  }
+
+  void start(LogAppender transferee) {
+    // TransferLeadership will block client request, so we don't want wait too long.
+    // If everything goes well, transferee should be elected within the min rpc timeout.
+    final long timeout = server.properties().minRpcTimeoutMs();
+    final TransferLeadershipRequest request = new TransferLeadershipRequest(ClientId.emptyClientId(),
+        server.getId(), server.getMemberId().getGroupId(), 0, transferee.getFollowerId(), timeout);
+    start(new Context(request, () -> transferee));
   }
 
   CompletableFuture<RaftClientReply> start(LeaderStateImpl leaderState, TransferLeadershipRequest request) {
+    final Context context = new Context(request,
+        JavaUtils.memoize(() -> leaderState.getLogAppender(request.getNewLeader()).orElse(null)));
+    return start(context);
+  }
+
+  private CompletableFuture<RaftClientReply> start(Context context) {
+    final TransferLeadershipRequest request = context.getRequest();
     final MemoizedSupplier<PendingRequest> supplier = JavaUtils.memoize(() -> new PendingRequest(request));
     final PendingRequest previous = pending.getAndUpdate(f -> f != null? f: supplier.get());
     if (previous != null) {
       return createReplyFutureFromPreviousRequest(request, previous);
     }
-    tryTransferLeadership(leaderState, request.getNewLeader());
-
-    // if timeout is not specified in request, default to random election timeout
-    final TimeDuration timeout = request.getTimeoutMs() == 0 ? server.getRandomElectionTimeout()
-        : TimeDuration.valueOf(request.getTimeoutMs(), TimeUnit.MILLISECONDS);
-    scheduler.onTimeout(timeout, () -> finish(server.getState().getLeaderId(), true),
-        LOG, () -> "Failed to transfer leadership to " + request.getNewLeader() + ": timeout after " + timeout);
-    return supplier.get().getReplyFuture();
+    final PendingRequest pendingRequest = supplier.get();
+    final Result result = tryTransferLeadership(context);
+    final Result.Type type = result.getType();
+    if (type != Result.Type.SUCCESS && type != Result.Type.NOT_UP_TO_DATE) {
+      pendingRequest.complete(result);
+    } else {
+      // if timeout is not specified in request, use default request timeout
+      final TimeDuration timeout = request.getTimeoutMs() == 0 ? requestTimeout
+          : TimeDuration.valueOf(request.getTimeoutMs(), TimeUnit.MILLISECONDS);
+      scheduler.onTimeout(timeout, () -> complete(new Result(Result.Type.TIMED_OUT,
+              timeout.toString(TimeUnit.SECONDS, 3))),
+          LOG, () -> "Failed to handle timeout");
+    }
+    return pendingRequest.getReplyFuture();
   }
 
   private CompletableFuture<RaftClientReply> createReplyFutureFromPreviousRequest(
@@ -158,8 +327,8 @@ public class TransferLeadership {
     }
   }
 
-  void finish(RaftPeerId currentLeader, boolean timeout) {
+  void complete(Result result) {
     Optional.ofNullable(pending.getAndSet(null))
-        .ifPresent(r -> r.complete(currentLeader, timeout));
+        .ifPresent(r -> r.complete(result));
   }
 }
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index e33c2a512..205de19a2 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -260,7 +260,7 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
               long cost = System.currentTimeMillis() - start;
               Assert.assertTrue(cost > timeoutMs);
               Assert.assertTrue(e.getMessage().contains("Failed to transfer leadership to"));
-              Assert.assertTrue(e.getMessage().contains("timed out"));
+              Assert.assertTrue(e.getMessage().contains(TransferLeadership.Result.Type.TIMED_OUT.toString()));
             }
 
             return true;
@@ -283,7 +283,7 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
 
         // after transfer timeout, leader should accept request
         RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
-        Assert.assertTrue(reply.getReplierId().equals(leader.getId().toString()));
+        Assert.assertEquals(leader.getId().toString(), reply.getReplierId());
         Assert.assertTrue(reply.isSuccess());
 
         deIsolate(cluster, newLeader.getId());
diff --git a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java
index b5180548c..7dba5ae9e 100644
--- a/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java
+++ b/ratis-shell/src/main/java/org/apache/ratis/shell/cli/sh/election/TransferCommand.java
@@ -58,12 +58,8 @@ public class TransferCommand extends AbstractRatisCommand {
     super.run(cl);
 
     String strAddr = cl.getOptionValue(ADDRESS_OPTION_NAME);
-    // TODO: Default timeout should be set to 0, which means let server decide (based on election timeout).
-    //       However, occasionally the request could timeout too fast while the transfer is in progress.
-    //       i.e. request timeout doesn't mean transfer leadership has failed.
-    //       Currently, Ratis shell returns merely based on the result of the request.
-    //       So we set a larger default timeout here (3s).
-    final TimeDuration timeoutDefault = TimeDuration.valueOf(3, TimeUnit.SECONDS);
+    // Default timeout is 0, which means let server decide (will use default request timeout).
+    final TimeDuration timeoutDefault = TimeDuration.ZERO;
     // Default timeout for legacy mode matches with the legacy command (version 2.4.x and older).
     final TimeDuration timeoutLegacy = TimeDuration.valueOf(60, TimeUnit.SECONDS);
     final Optional<TimeDuration> timeout = !cl.hasOption(TIMEOUT_OPTION_NAME) ? Optional.empty() :