You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2021/01/06 14:32:36 UTC

[incubator-ratis] branch master updated: RATIS-993. Support pre vote (#161)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 35f17fa  RATIS-993. Support pre vote (#161)
35f17fa is described below

commit 35f17fa5a0a7066f667e7e7a346cb849e3ebd858
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Wed Jan 6 22:31:14 2021 +0800

    RATIS-993. Support pre vote (#161)
---
 ratis-proto/src/main/proto/Raft.proto              |   1 +
 .../apache/ratis/server/impl/FollowerState.java    |   4 +-
 .../apache/ratis/server/impl/LeaderElection.java   | 143 ++++++++++++++++-----
 .../apache/ratis/server/impl/LeaderStateImpl.java  |  11 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   |  86 +++++++++++--
 .../org/apache/ratis/server/impl/RoleInfo.java     |   4 +-
 .../apache/ratis/server/impl/ServerProtoUtils.java |   5 +-
 .../ratis/server/impl/LeaderElectionTests.java     |  46 ++++++-
 .../server/simulation/SimulatedRequestReply.java   |   2 +-
 9 files changed, 246 insertions(+), 56 deletions(-)

diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index 3bcad1b..0b2dda6 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -152,6 +152,7 @@ message RequestVoteRequestProto {
   RaftRpcRequestProto serverRequest = 1;
   uint64 candidateTerm = 2;
   TermIndexProto candidateLastEntry = 3;
+  bool preVote = 4;
 }
 
 message RequestVoteReplyProto {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index c54291a..d22f86b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -86,7 +86,7 @@ class FollowerState extends Daemon {
     return outstandingOp.get();
   }
 
-  boolean shouldWithholdVotes() {
+  boolean isCurrentLeaderValid() {
     return lastRpcTime.elapsedTime().compareTo(server.properties().minRpcTimeout()) < 0;
   }
 
@@ -134,7 +134,7 @@ class FollowerState extends Daemon {
                 this, lastRpcTime.elapsedTime(), electionTimeout);
             server.getLeaderElectionMetrics().onLeaderElectionTimeout(); // Update timeout metric counters.
             // election timeout, should become a candidate
-            server.changeToCandidate();
+            server.changeToCandidate(false);
             break;
           }
         }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index 41d0dc3..e2bc46d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -64,10 +64,14 @@ class LeaderElection implements Runnable {
 
   private ResultAndTerm logAndReturn(Result result,
       Map<RaftPeerId, RequestVoteReplyProto> responses,
-      List<Exception> exceptions, long newTerm) {
-    LOG.info(this + ": Election " + result + "; received " + responses.size() + " response(s) "
-        + responses.values().stream().map(ServerStringUtils::toRequestVoteReplyString).collect(Collectors.toList())
-        + " and " + exceptions.size() + " exception(s); " + server.getState());
+      List<Exception> exceptions, long newTerm, boolean preVote) {
+    LOG.info("{}: {} {} received {} response(s):{}",
+        this,
+        preVote ? "Pre-vote " : "Election ",
+        result,
+        responses.size(),
+        responses.values().stream().map(ServerStringUtils::toRequestVoteReplyString).collect(Collectors.toList()));
+
     int i = 0;
     for(Exception e : exceptions) {
       final int j = i++;
@@ -120,12 +124,14 @@ class LeaderElection implements Runnable {
   private final Daemon daemon;
 
   private final RaftServerImpl server;
+  private final boolean force;
 
-  LeaderElection(RaftServerImpl server) {
+  LeaderElection(RaftServerImpl server, boolean force) {
     this.name = server.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass()) + COUNT.incrementAndGet();
     this.lifeCycle = new LifeCycle(this);
     this.daemon = new Daemon(this);
     this.server = server;
+    this.force = force;
   }
 
   void start() {
@@ -166,7 +172,19 @@ class LeaderElection implements Runnable {
     Timer.Context electionContext =
         server.getLeaderElectionMetrics().getLeaderElectionTimer().time();
     try {
-      askForVotes();
+      /**
+       * See the thesis section 9.6: In the Pre-Vote algorithm, a candidate
+       * only increments its term and start a real election if it first learns
+       * from a majority of the cluster that they would be willing to grant
+       * the candidate their votes (if the candidate’s log is sufficiently
+       * up-to-date, and the voters have not received heartbeats from a valid
+       * leader for at least a baseline election timeout).
+       */
+      boolean preVotePass = force ? true : askForPreVotes();
+
+      if (preVotePass) {
+        askForVotes();
+      }
     } catch(Exception e) {
       final LifeCycle.State state = lifeCycle.getCurrentState();
       if (state.isClosingOrClosed()) {
@@ -198,10 +216,81 @@ class LeaderElection implements Runnable {
     return shouldRun() && server.getState().getCurrentTerm() == electionTerm;
   }
 
+  private ResultAndTerm submitRequestAndWaitResult(
+      final ServerState state, final RaftConfigurationImpl conf, final long electionTerm, boolean preVote)
+      throws InterruptedException {
+    final ResultAndTerm r;
+    final Collection<RaftPeer> others = conf.getOtherPeers(server.getId());
+    if (others.isEmpty()) {
+      r = new ResultAndTerm(Result.PASSED, electionTerm);
+    } else {
+      TermIndex lastEntry = state.getLastEntry();
+      final Executor voteExecutor = new Executor(this, others.size());
+      try {
+        final int submitted = submitRequests(electionTerm, lastEntry, others, voteExecutor, preVote);
+        r = waitForResults(electionTerm, submitted, conf, voteExecutor, preVote);
+      } finally {
+        voteExecutor.shutdown();
+      }
+    }
+
+    return r;
+  }
+
   /**
    * After a peer changes its role to candidate, it invokes this method to
    * send out requestVote rpc to all other peers.
    */
+  private boolean askForPreVotes() throws InterruptedException, IOException {
+    final ServerState state = server.getState();
+    if (shouldRun()) {
+      // one round of request pre-votes
+      final long electionTerm;
+      final RaftConfigurationImpl conf;
+      synchronized (server) {
+        if (!shouldRun()) {
+          return false;
+        }
+        state.setLeader(null, "initPreVote");
+        conf = state.getRaftConf();
+        electionTerm = state.getCurrentTerm();
+      }
+
+      LOG.info("{}: begin a pre-vote at term {} for {}", this, electionTerm, conf);
+      final ResultAndTerm r = submitRequestAndWaitResult(state, conf, electionTerm, true);
+      LOG.info("{} pre-vote result is {}.", this, r.result);
+
+      synchronized (server) {
+        if (!shouldRun(electionTerm)) {
+          return false; // term already passed or this should not run anymore.
+        }
+
+        switch (r.result) {
+          case PASSED:
+            return true;
+          case REJECTED:
+          case TIMEOUT:
+            server.changeToFollowerAndPersistMetadata(state.getCurrentTerm(), r.result);
+            return false;
+          case SHUTDOWN:
+            LOG.info("{} received shutdown response when requesting pre-vote.", this);
+            server.getRaftServer().close();
+            return false;
+          case DISCOVERED_A_NEW_TERM:
+            LOG.error("{} should not happen {} when requesting pre-vote.", this, r.result);
+            return false;
+          default: throw new IllegalArgumentException("Unable to process result " + r.result);
+        }
+      }
+    }
+
+    return false;
+  }
+
+  /**
+   * After a peer changes its role to candidate and pass pre-vote, it invokes this method to
+   * send out requestVote rpc to all other peers.
+   */
   private void askForVotes() throws InterruptedException, IOException {
     final ServerState state = server.getState();
     while (shouldRun()) {
@@ -218,21 +307,7 @@ class LeaderElection implements Runnable {
       }
       LOG.info("{}: begin an election at term {} for {}", this, electionTerm, conf);
 
-      TermIndex lastEntry = state.getLastEntry();
-
-      final ResultAndTerm r;
-      final Collection<RaftPeer> others = conf.getOtherPeers(server.getId());
-      if (others.isEmpty()) {
-        r = new ResultAndTerm(Result.PASSED, electionTerm);
-      } else {
-        final Executor voteExecutor = new Executor(this, others.size());
-        try {
-          final int submitted = submitRequests(electionTerm, lastEntry, others, voteExecutor);
-          r = waitForResults(electionTerm, submitted, conf, voteExecutor);
-        } finally {
-          voteExecutor.shutdown();
-        }
-      }
+      final ResultAndTerm r = submitRequestAndWaitResult(state, conf, electionTerm, false);
 
       synchronized (server) {
         if (!shouldRun(electionTerm)) {
@@ -261,11 +336,11 @@ class LeaderElection implements Runnable {
   }
 
   private int submitRequests(final long electionTerm, final TermIndex lastEntry,
-      Collection<RaftPeer> others, Executor voteExecutor) {
+      Collection<RaftPeer> others, Executor voteExecutor, boolean preVote) {
     int submitted = 0;
     for (final RaftPeer peer : others) {
       final RequestVoteRequestProto r = ServerProtoUtils.toRequestVoteRequestProto(
-          server.getMemberId(), peer.getId(), electionTerm, lastEntry);
+          server.getMemberId(), peer.getId(), electionTerm, lastEntry, preVote);
       voteExecutor.submit(() -> server.getServerRpc().requestVote(r));
       submitted++;
     }
@@ -288,7 +363,7 @@ class LeaderElection implements Runnable {
   }
 
   private ResultAndTerm waitForResults(final long electionTerm, final int submitted,
-      RaftConfigurationImpl conf, Executor voteExecutor) throws InterruptedException {
+      RaftConfigurationImpl conf, Executor voteExecutor, boolean preVote) throws InterruptedException {
     final Timestamp timeout = Timestamp.currentTime().addTime(server.getRandomElectionTimeout());
     final Map<RaftPeerId, RequestVoteReplyProto> responses = new HashMap<>();
     final List<Exception> exceptions = new ArrayList<>();
@@ -303,9 +378,9 @@ class LeaderElection implements Runnable {
         if (conf.hasMajority(votedPeers, server.getId())) {
           // if some higher priority peer did not response when timeout, but candidate get majority,
           // candidate pass vote
-          return logAndReturn(Result.PASSED, responses, exceptions, -1);
+          return logAndReturn(Result.PASSED, responses, exceptions, -1, preVote);
         } else {
-          return logAndReturn(Result.TIMEOUT, responses, exceptions, -1);
+          return logAndReturn(Result.TIMEOUT, responses, exceptions, -1, preVote);
         }
       }
 
@@ -328,16 +403,16 @@ class LeaderElection implements Runnable {
           continue;
         }
         if (r.getShouldShutdown()) {
-          return logAndReturn(Result.SHUTDOWN, responses, exceptions, -1);
+          return logAndReturn(Result.SHUTDOWN, responses, exceptions, -1, preVote);
         }
-        if (r.getTerm() > electionTerm) {
+        if (!preVote && r.getTerm() > electionTerm) {
           return logAndReturn(Result.DISCOVERED_A_NEW_TERM, responses,
-              exceptions, r.getTerm());
+              exceptions, r.getTerm(), false);
         }
 
         // If any peer with higher priority rejects vote, candidate can not pass vote
         if (!r.getServerReply().getSuccess() && higherPriorityPeers.contains(replierId)) {
-          return logAndReturn(Result.REJECTED, responses, exceptions, -1);
+          return logAndReturn(Result.REJECTED, responses, exceptions, -1, preVote);
         }
 
         // remove higher priority peer, so that we check higherPriorityPeers empty to make sure
@@ -348,12 +423,12 @@ class LeaderElection implements Runnable {
           votedPeers.add(replierId);
           // If majority and all peers with higher priority have voted, candidate pass vote
           if (higherPriorityPeers.size() == 0 && conf.hasMajority(votedPeers, server.getId())) {
-            return logAndReturn(Result.PASSED, responses, exceptions, -1);
+            return logAndReturn(Result.PASSED, responses, exceptions, -1, preVote);
           }
         } else {
           rejectedPeers.add(replierId);
           if (conf.majorityRejectVotes(rejectedPeers)) {
-            return logAndReturn(Result.REJECTED, responses, exceptions, -1);
+            return logAndReturn(Result.REJECTED, responses, exceptions, -1, preVote);
           }
         }
       } catch(ExecutionException e) {
@@ -364,9 +439,9 @@ class LeaderElection implements Runnable {
     }
     // received all the responses
     if (conf.hasMajority(votedPeers, server.getId())) {
-      return logAndReturn(Result.PASSED, responses, exceptions, -1);
+      return logAndReturn(Result.PASSED, responses, exceptions, -1, preVote);
     } else {
-      return logAndReturn(Result.REJECTED, responses, exceptions, -1);
+      return logAndReturn(Result.REJECTED, responses, exceptions, -1, preVote);
     }
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 4bf79c2..1a38734 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -910,18 +910,18 @@ class LeaderStateImpl implements LeaderState {
    * if an election timeout elapses without a successful
    * round of heartbeats to a majority of its cluster.
    */
-  private void checkLeadership() {
+
+  public boolean checkLeadership() {
     if (!server.getInfo().isLeader()) {
-      return;
+      return false;
     }
-
     // The initial value of lastRpcResponseTime in FollowerInfo is set by
     // LeaderState::addSenders(), which is fake and used to trigger an
     // immediate round of AppendEntries request. Since candidates collect
     // votes from majority before becoming leader, without seeing higher term,
     // ideally, A leader is legal for election timeout if become leader soon.
     if (server.getRole().getRoleElapsedTimeMs() < server.getMaxTimeoutMs()) {
-      return;
+      return true;
     }
 
     final List<RaftPeerId> activePeers = senders.stream()
@@ -935,7 +935,7 @@ class LeaderStateImpl implements LeaderState {
 
     if (conf.hasMajority(activePeers, server.getId())) {
       // leadership check passed
-      return;
+      return true;
     }
 
     LOG.warn(this + ": Lost leadership on term: " + currentTerm
@@ -946,6 +946,7 @@ class LeaderStateImpl implements LeaderState {
 
     // step down as follower
     stepDown(currentTerm, StepDownReason.LOST_MAJORITY_HEARTBEATS);
+    return false;
   }
 
   void replyPendingRequest(long logIndex, RaftClientReply reply) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 09234aa..f9cbeaf 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -551,7 +551,7 @@ class RaftServerImpl implements RaftServer.Division,
     return roleInfo.build();
   }
 
-  synchronized void changeToCandidate() {
+  synchronized void changeToCandidate(boolean forceStartLeaderElection) {
     Preconditions.assertTrue(getInfo().isFollower());
     role.shutdownFollowerState();
     setRole(RaftPeerRole.CANDIDATE, "changeToCandidate");
@@ -559,7 +559,7 @@ class RaftServerImpl implements RaftServer.Division,
       stateMachine.followerEvent().notifyExtendedNoLeader(getRoleInfoProto());
     }
     // start election
-    role.startLeaderElection(this);
+    role.startLeaderElection(this, forceStartLeaderElection);
   }
 
   @Override
@@ -1000,7 +1000,7 @@ class RaftServerImpl implements RaftServer.Division,
     } else {
       // following a leader and not yet timeout
       return getInfo().isFollower() && state.hasLeader()
-          && role.getFollowerState().map(FollowerState::shouldWithholdVotes).orElse(false);
+          && role.getFollowerState().map(FollowerState::isCurrentLeaderValid).orElse(false);
     }
   }
 
@@ -1026,10 +1026,80 @@ class RaftServerImpl implements RaftServer.Division,
   public RequestVoteReplyProto requestVote(RequestVoteRequestProto r)
       throws IOException {
     final RaftRpcRequestProto request = r.getServerRequest();
-    return requestVote(RaftPeerId.valueOf(request.getRequestorId()),
-        ProtoUtils.toRaftGroupId(request.getRaftGroupId()),
-        r.getCandidateTerm(),
-        TermIndex.valueOf(r.getCandidateLastEntry()));
+    if (r.getPreVote()) {
+      return requestPreVote(RaftPeerId.valueOf(request.getRequestorId()),
+          ProtoUtils.toRaftGroupId(request.getRaftGroupId()),
+          r.getCandidateTerm(),
+          TermIndex.valueOf(r.getCandidateLastEntry()));
+    } else {
+      return requestVote(RaftPeerId.valueOf(request.getRequestorId()),
+          ProtoUtils.toRaftGroupId(request.getRaftGroupId()),
+          r.getCandidateTerm(),
+          TermIndex.valueOf(r.getCandidateLastEntry()));
+    }
+  }
+
+  private boolean isCurrentLeaderValid() {
+    if (getInfo().isLeader()) {
+      return role.getLeaderState().map(LeaderStateImpl::checkLeadership).orElse(false);
+    } else if (getInfo().isFollower()) {
+      return state.hasLeader()
+          && role.getFollowerState().map(FollowerState::isCurrentLeaderValid).orElse(false);
+    } else if (getInfo().isCandidate()) {
+      return false;
+    }
+
+    return false;
+  }
+
+  private RequestVoteReplyProto requestPreVote(
+      RaftPeerId candidateId, RaftGroupId candidateGroupId,
+      long candidateTerm, TermIndex candidateLastEntry) throws IOException {
+    CodeInjectionForTesting.execute(REQUEST_VOTE, getId(),
+        candidateId, candidateTerm, candidateLastEntry);
+    LOG.info("{}: receive preVote({}, {}, {}, {})",
+        getMemberId(), candidateId, candidateGroupId, candidateTerm, candidateLastEntry);
+    assertLifeCycleState(LifeCycle.States.RUNNING);
+    assertGroup(candidateId, candidateGroupId);
+
+    boolean preVoteGranted = false;
+    boolean shouldShutdown = false;
+    final RequestVoteReplyProto reply;
+    synchronized (this) {
+      boolean isCurrentLeaderValid = isCurrentLeaderValid();
+      RaftPeer candidate = getRaftConf().getPeer(candidateId);
+
+      // vote for candidate if:
+      // 1. current leader is not valid
+      // 2. log lags behind candidate or
+      //    log equals candidate's, and priority less or equal candidate's
+      if (!isCurrentLeaderValid && candidate != null) {
+        int compare = ServerState.compareLog(state.getLastEntry(), candidateLastEntry);
+        int priority = getRaftConf().getPeer(getId()).getPriority();
+        if (compare < 0 || (compare == 0 && priority <= candidate.getPriority())) {
+          preVoteGranted = true;
+          LOG.info("{} role:{} allow pre-vote from:{}", getMemberId(), getRole(), candidateId);
+        } else {
+          LOG.info("{} role:{} reject pre-vote from:{} because compare:{} isCurrentLeaderValid:{}",
+              getMemberId(), getRole(), candidateId, compare, isCurrentLeaderValid);
+        }
+      }
+
+      if (preVoteGranted) {
+        final FollowerState fs = role.getFollowerState().orElse(null);
+        if (fs != null) {
+          fs.updateLastRpcTime(FollowerState.UpdateType.REQUEST_VOTE);
+        }
+      } else if(shouldSendShutdown(candidateId, candidateLastEntry)) {
+        shouldShutdown = true;
+      }
+
+      reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, getMemberId(),
+          preVoteGranted, state.getCurrentTerm(), shouldShutdown);
+      LOG.info("{} replies to pre-vote request: {}. Peer's state: {}",
+          getMemberId(), ServerStringUtils.toRequestVoteReplyString(reply), state);
+    }
+    return reply;
   }
 
   private RequestVoteReplyProto requestVote(
@@ -1436,7 +1506,7 @@ class RaftServerImpl implements RaftServer.Division,
         return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
       }
 
-      changeToCandidate();
+      changeToCandidate(true);
       return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), true);
     }
   }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
index f909a9c..617b617 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
@@ -111,8 +111,8 @@ class RoleInfo {
     }
   }
 
-  void startLeaderElection(RaftServerImpl server) {
-    updateAndGet(leaderElection, new LeaderElection(server)).start();
+  void startLeaderElection(RaftServerImpl server, boolean force) {
+    updateAndGet(leaderElection, new LeaderElection(server, force)).start();
   }
 
   void shutdownLeaderElection() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index ca672f3..c4e7f90 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -50,10 +50,11 @@ final class ServerProtoUtils {
   }
 
   static RequestVoteRequestProto toRequestVoteRequestProto(
-      RaftGroupMemberId requestorId, RaftPeerId replyId, long term, TermIndex lastEntry) {
+      RaftGroupMemberId requestorId, RaftPeerId replyId, long term, TermIndex lastEntry, boolean preVote) {
     final RequestVoteRequestProto.Builder b = RequestVoteRequestProto.newBuilder()
         .setServerRequest(ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, replyId))
-        .setCandidateTerm(term);
+        .setCandidateTerm(term)
+        .setPreVote(preVote);
     Optional.ofNullable(lastEntry).map(TermIndex::toProto).ifPresent(b::setCandidateLastEntry);
     return b.build();
   }
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index 2605b29..d838ac8 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -60,6 +60,7 @@ import static org.apache.ratis.server.metrics.LeaderElectionMetrics.LEADER_ELECT
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.junit.Assert.assertNotNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -339,7 +340,7 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
   @Test
   public void testImmediatelyRevertedToFollower() {
     RaftServerImpl server = createMockServer(true);
-    LeaderElection subject = new LeaderElection(server);
+    LeaderElection subject = new LeaderElection(server, false);
 
     try {
       subject.startInForeground();
@@ -353,7 +354,7 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
   @Test
   public void testShutdownBeforeStart() {
     RaftServerImpl server = createMockServer(false);
-    LeaderElection subject = new LeaderElection(server);
+    LeaderElection subject = new LeaderElection(server, false);
 
     try {
       subject.shutdown();
@@ -365,6 +366,47 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
     }
   }
 
+  @Test
+  public void testPreVote() {
+    try(final MiniRaftCluster cluster = newCluster(3)) {
+      cluster.start();
+
+      RaftServer.Division leader = waitForLeader(cluster);
+      final long savedTerm = leader.getInfo().getCurrentTerm();
+
+      try (RaftClient client = cluster.createClient(leader.getId())) {
+        client.io().send(new RaftTestUtil.SimpleMessage("message"));
+
+        final List<RaftServer.Division> followers = cluster.getFollowers();
+        assertEquals(followers.size(), 2);
+
+        RaftServer.Division follower = followers.get(0);
+        isolate(cluster, follower.getId());
+        // send message so that the isolated follower's log lag the others
+        RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
+        Assert.assertTrue(reply.isSuccess());
+
+        // wait follower timeout and trigger pre-vote
+        Thread.sleep(2000);
+        deIsolate(cluster, follower.getId());
+        Thread.sleep(2000);
+        // with pre-vote leader will not step down
+        RaftServer.Division newleader = waitForLeader(cluster);
+        assertNotNull(newleader);
+        assertEquals(newleader.getId(), leader.getId());
+        // with pre-vote, term will not change
+        assertEquals(leader.getInfo().getCurrentTerm(), savedTerm);
+
+        reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
+        Assert.assertTrue(reply.isSuccess());
+      }
+
+      cluster.shutdown();
+    } catch (Exception e) {
+      fail(e.getMessage());
+    }
+  }
+
   private static RaftServerImpl createMockServer(boolean alive) {
     final DivisionInfo info = mock(DivisionInfo.class);
     when(info.isAlive()).thenReturn(alive);
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java
index 9e4299d..0874e74 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java
@@ -134,7 +134,6 @@ class SimulatedRequestReply<REQUEST extends RaftRpcMessage, REPLY extends RaftRp
     REQUEST request;
     try {
       // delay request for testing
-      RaftTestUtil.delay(q.delayTakeRequestTo::get);
       while (true) {
         request = q.takeRequest();
         Preconditions.assertTrue(qid.equals(request.getReplierId()));
@@ -149,6 +148,7 @@ class SimulatedRequestReply<REQUEST extends RaftRpcMessage, REPLY extends RaftRp
         }
         break;
       }
+      RaftTestUtil.delay(q.delayTakeRequestTo::get);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       throw IOUtils.toInterruptedIOException("", e);