You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2021/01/06 14:32:36 UTC
[incubator-ratis] branch master updated: RATIS-993. Support pre
vote (#161)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 35f17fa RATIS-993. Support pre vote (#161)
35f17fa is described below
commit 35f17fa5a0a7066f667e7e7a346cb849e3ebd858
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Wed Jan 6 22:31:14 2021 +0800
RATIS-993. Support pre vote (#161)
---
ratis-proto/src/main/proto/Raft.proto | 1 +
.../apache/ratis/server/impl/FollowerState.java | 4 +-
.../apache/ratis/server/impl/LeaderElection.java | 143 ++++++++++++++++-----
.../apache/ratis/server/impl/LeaderStateImpl.java | 11 +-
.../apache/ratis/server/impl/RaftServerImpl.java | 86 +++++++++++--
.../org/apache/ratis/server/impl/RoleInfo.java | 4 +-
.../apache/ratis/server/impl/ServerProtoUtils.java | 5 +-
.../ratis/server/impl/LeaderElectionTests.java | 46 ++++++-
.../server/simulation/SimulatedRequestReply.java | 2 +-
9 files changed, 246 insertions(+), 56 deletions(-)
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index 3bcad1b..0b2dda6 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -152,6 +152,7 @@ message RequestVoteRequestProto {
RaftRpcRequestProto serverRequest = 1;
uint64 candidateTerm = 2;
TermIndexProto candidateLastEntry = 3;
+ bool preVote = 4;
}
message RequestVoteReplyProto {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index c54291a..d22f86b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -86,7 +86,7 @@ class FollowerState extends Daemon {
return outstandingOp.get();
}
- boolean shouldWithholdVotes() {
+ boolean isCurrentLeaderValid() {
return lastRpcTime.elapsedTime().compareTo(server.properties().minRpcTimeout()) < 0;
}
@@ -134,7 +134,7 @@ class FollowerState extends Daemon {
this, lastRpcTime.elapsedTime(), electionTimeout);
server.getLeaderElectionMetrics().onLeaderElectionTimeout(); // Update timeout metric counters.
// election timeout, should become a candidate
- server.changeToCandidate();
+ server.changeToCandidate(false);
break;
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index 41d0dc3..e2bc46d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -64,10 +64,14 @@ class LeaderElection implements Runnable {
private ResultAndTerm logAndReturn(Result result,
Map<RaftPeerId, RequestVoteReplyProto> responses,
- List<Exception> exceptions, long newTerm) {
- LOG.info(this + ": Election " + result + "; received " + responses.size() + " response(s) "
- + responses.values().stream().map(ServerStringUtils::toRequestVoteReplyString).collect(Collectors.toList())
- + " and " + exceptions.size() + " exception(s); " + server.getState());
+ List<Exception> exceptions, long newTerm, boolean preVote) {
+ LOG.info("{}: {} {} received {} response(s):{}",
+ this,
+ preVote ? "Pre-vote " : "Election ",
+ result,
+ responses.size(),
+ responses.values().stream().map(ServerStringUtils::toRequestVoteReplyString).collect(Collectors.toList()));
+
int i = 0;
for(Exception e : exceptions) {
final int j = i++;
@@ -120,12 +124,14 @@ class LeaderElection implements Runnable {
private final Daemon daemon;
private final RaftServerImpl server;
+ private final boolean force;
- LeaderElection(RaftServerImpl server) {
+ LeaderElection(RaftServerImpl server, boolean force) {
this.name = server.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass()) + COUNT.incrementAndGet();
this.lifeCycle = new LifeCycle(this);
this.daemon = new Daemon(this);
this.server = server;
+ this.force = force;
}
void start() {
@@ -166,7 +172,19 @@ class LeaderElection implements Runnable {
Timer.Context electionContext =
server.getLeaderElectionMetrics().getLeaderElectionTimer().time();
try {
- askForVotes();
+ /**
+ * See the thesis section 9.6: In the Pre-Vote algorithm, a candidate
+ * only increments its term and start a real election if it first learns
+ * from a majority of the cluster that they would be willing to grant
+ * the candidate their votes (if the candidate’s log is sufficiently
+ * up-to-date, and the voters have not received heartbeats from a valid
+ * leader for at least a baseline election timeout).
+ */
+ boolean preVotePass = force ? true : askForPreVotes();
+
+ if (preVotePass) {
+ askForVotes();
+ }
} catch(Exception e) {
final LifeCycle.State state = lifeCycle.getCurrentState();
if (state.isClosingOrClosed()) {
@@ -198,10 +216,81 @@ class LeaderElection implements Runnable {
return shouldRun() && server.getState().getCurrentTerm() == electionTerm;
}
+ private ResultAndTerm submitRequestAndWaitResult(
+ final ServerState state, final RaftConfigurationImpl conf, final long electionTerm, boolean preVote)
+ throws InterruptedException {
+ final ResultAndTerm r;
+ final Collection<RaftPeer> others = conf.getOtherPeers(server.getId());
+ if (others.isEmpty()) {
+ r = new ResultAndTerm(Result.PASSED, electionTerm);
+ } else {
+ TermIndex lastEntry = state.getLastEntry();
+ final Executor voteExecutor = new Executor(this, others.size());
+ try {
+ final int submitted = submitRequests(electionTerm, lastEntry, others, voteExecutor, preVote);
+ r = waitForResults(electionTerm, submitted, conf, voteExecutor, preVote);
+ } finally {
+ voteExecutor.shutdown();
+ }
+ }
+
+ return r;
+ }
+
/**
* After a peer changes its role to candidate, it invokes this method to
* send out requestVote rpc to all other peers.
*/
+ private boolean askForPreVotes() throws InterruptedException, IOException {
+ final ServerState state = server.getState();
+ if (shouldRun()) {
+ // one round of request pre-votes
+ final long electionTerm;
+ final RaftConfigurationImpl conf;
+ synchronized (server) {
+ if (!shouldRun()) {
+ return false;
+ }
+ state.setLeader(null, "initPreVote");
+ conf = state.getRaftConf();
+ electionTerm = state.getCurrentTerm();
+ }
+
+ LOG.info("{}: begin a pre-vote at term {} for {}", this, electionTerm, conf);
+ final ResultAndTerm r = submitRequestAndWaitResult(state, conf, electionTerm, true);
+ LOG.info("{} pre-vote result is {}.", this, r.result);
+
+ synchronized (server) {
+ if (!shouldRun(electionTerm)) {
+ return false; // term already passed or this should not run anymore.
+ }
+
+ switch (r.result) {
+ case PASSED:
+ return true;
+ case REJECTED:
+ case TIMEOUT:
+ server.changeToFollowerAndPersistMetadata(state.getCurrentTerm(), r.result);
+ return false;
+ case SHUTDOWN:
+ LOG.info("{} received shutdown response when requesting pre-vote.", this);
+ server.getRaftServer().close();
+ return false;
+ case DISCOVERED_A_NEW_TERM:
+ LOG.error("{} should not happen {} when requesting pre-vote.", this, r.result);
+ return false;
+ default: throw new IllegalArgumentException("Unable to process result " + r.result);
+ }
+ }
+ }
+
+ return false;
+ }
+
+ /**
+ * After a peer changes its role to candidate and pass pre-vote, it invokes this method to
+ * send out requestVote rpc to all other peers.
+ */
private void askForVotes() throws InterruptedException, IOException {
final ServerState state = server.getState();
while (shouldRun()) {
@@ -218,21 +307,7 @@ class LeaderElection implements Runnable {
}
LOG.info("{}: begin an election at term {} for {}", this, electionTerm, conf);
- TermIndex lastEntry = state.getLastEntry();
-
- final ResultAndTerm r;
- final Collection<RaftPeer> others = conf.getOtherPeers(server.getId());
- if (others.isEmpty()) {
- r = new ResultAndTerm(Result.PASSED, electionTerm);
- } else {
- final Executor voteExecutor = new Executor(this, others.size());
- try {
- final int submitted = submitRequests(electionTerm, lastEntry, others, voteExecutor);
- r = waitForResults(electionTerm, submitted, conf, voteExecutor);
- } finally {
- voteExecutor.shutdown();
- }
- }
+ final ResultAndTerm r = submitRequestAndWaitResult(state, conf, electionTerm, false);
synchronized (server) {
if (!shouldRun(electionTerm)) {
@@ -261,11 +336,11 @@ class LeaderElection implements Runnable {
}
private int submitRequests(final long electionTerm, final TermIndex lastEntry,
- Collection<RaftPeer> others, Executor voteExecutor) {
+ Collection<RaftPeer> others, Executor voteExecutor, boolean preVote) {
int submitted = 0;
for (final RaftPeer peer : others) {
final RequestVoteRequestProto r = ServerProtoUtils.toRequestVoteRequestProto(
- server.getMemberId(), peer.getId(), electionTerm, lastEntry);
+ server.getMemberId(), peer.getId(), electionTerm, lastEntry, preVote);
voteExecutor.submit(() -> server.getServerRpc().requestVote(r));
submitted++;
}
@@ -288,7 +363,7 @@ class LeaderElection implements Runnable {
}
private ResultAndTerm waitForResults(final long electionTerm, final int submitted,
- RaftConfigurationImpl conf, Executor voteExecutor) throws InterruptedException {
+ RaftConfigurationImpl conf, Executor voteExecutor, boolean preVote) throws InterruptedException {
final Timestamp timeout = Timestamp.currentTime().addTime(server.getRandomElectionTimeout());
final Map<RaftPeerId, RequestVoteReplyProto> responses = new HashMap<>();
final List<Exception> exceptions = new ArrayList<>();
@@ -303,9 +378,9 @@ class LeaderElection implements Runnable {
if (conf.hasMajority(votedPeers, server.getId())) {
// if some higher priority peer did not response when timeout, but candidate get majority,
// candidate pass vote
- return logAndReturn(Result.PASSED, responses, exceptions, -1);
+ return logAndReturn(Result.PASSED, responses, exceptions, -1, preVote);
} else {
- return logAndReturn(Result.TIMEOUT, responses, exceptions, -1);
+ return logAndReturn(Result.TIMEOUT, responses, exceptions, -1, preVote);
}
}
@@ -328,16 +403,16 @@ class LeaderElection implements Runnable {
continue;
}
if (r.getShouldShutdown()) {
- return logAndReturn(Result.SHUTDOWN, responses, exceptions, -1);
+ return logAndReturn(Result.SHUTDOWN, responses, exceptions, -1, preVote);
}
- if (r.getTerm() > electionTerm) {
+ if (!preVote && r.getTerm() > electionTerm) {
return logAndReturn(Result.DISCOVERED_A_NEW_TERM, responses,
- exceptions, r.getTerm());
+ exceptions, r.getTerm(), false);
}
// If any peer with higher priority rejects vote, candidate can not pass vote
if (!r.getServerReply().getSuccess() && higherPriorityPeers.contains(replierId)) {
- return logAndReturn(Result.REJECTED, responses, exceptions, -1);
+ return logAndReturn(Result.REJECTED, responses, exceptions, -1, preVote);
}
// remove higher priority peer, so that we check higherPriorityPeers empty to make sure
@@ -348,12 +423,12 @@ class LeaderElection implements Runnable {
votedPeers.add(replierId);
// If majority and all peers with higher priority have voted, candidate pass vote
if (higherPriorityPeers.size() == 0 && conf.hasMajority(votedPeers, server.getId())) {
- return logAndReturn(Result.PASSED, responses, exceptions, -1);
+ return logAndReturn(Result.PASSED, responses, exceptions, -1, preVote);
}
} else {
rejectedPeers.add(replierId);
if (conf.majorityRejectVotes(rejectedPeers)) {
- return logAndReturn(Result.REJECTED, responses, exceptions, -1);
+ return logAndReturn(Result.REJECTED, responses, exceptions, -1, preVote);
}
}
} catch(ExecutionException e) {
@@ -364,9 +439,9 @@ class LeaderElection implements Runnable {
}
// received all the responses
if (conf.hasMajority(votedPeers, server.getId())) {
- return logAndReturn(Result.PASSED, responses, exceptions, -1);
+ return logAndReturn(Result.PASSED, responses, exceptions, -1, preVote);
} else {
- return logAndReturn(Result.REJECTED, responses, exceptions, -1);
+ return logAndReturn(Result.REJECTED, responses, exceptions, -1, preVote);
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 4bf79c2..1a38734 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -910,18 +910,18 @@ class LeaderStateImpl implements LeaderState {
* if an election timeout elapses without a successful
* round of heartbeats to a majority of its cluster.
*/
- private void checkLeadership() {
+
+ public boolean checkLeadership() {
if (!server.getInfo().isLeader()) {
- return;
+ return false;
}
-
// The initial value of lastRpcResponseTime in FollowerInfo is set by
// LeaderState::addSenders(), which is fake and used to trigger an
// immediate round of AppendEntries request. Since candidates collect
// votes from majority before becoming leader, without seeing higher term,
// ideally, A leader is legal for election timeout if become leader soon.
if (server.getRole().getRoleElapsedTimeMs() < server.getMaxTimeoutMs()) {
- return;
+ return true;
}
final List<RaftPeerId> activePeers = senders.stream()
@@ -935,7 +935,7 @@ class LeaderStateImpl implements LeaderState {
if (conf.hasMajority(activePeers, server.getId())) {
// leadership check passed
- return;
+ return true;
}
LOG.warn(this + ": Lost leadership on term: " + currentTerm
@@ -946,6 +946,7 @@ class LeaderStateImpl implements LeaderState {
// step down as follower
stepDown(currentTerm, StepDownReason.LOST_MAJORITY_HEARTBEATS);
+ return false;
}
void replyPendingRequest(long logIndex, RaftClientReply reply) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 09234aa..f9cbeaf 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -551,7 +551,7 @@ class RaftServerImpl implements RaftServer.Division,
return roleInfo.build();
}
- synchronized void changeToCandidate() {
+ synchronized void changeToCandidate(boolean forceStartLeaderElection) {
Preconditions.assertTrue(getInfo().isFollower());
role.shutdownFollowerState();
setRole(RaftPeerRole.CANDIDATE, "changeToCandidate");
@@ -559,7 +559,7 @@ class RaftServerImpl implements RaftServer.Division,
stateMachine.followerEvent().notifyExtendedNoLeader(getRoleInfoProto());
}
// start election
- role.startLeaderElection(this);
+ role.startLeaderElection(this, forceStartLeaderElection);
}
@Override
@@ -1000,7 +1000,7 @@ class RaftServerImpl implements RaftServer.Division,
} else {
// following a leader and not yet timeout
return getInfo().isFollower() && state.hasLeader()
- && role.getFollowerState().map(FollowerState::shouldWithholdVotes).orElse(false);
+ && role.getFollowerState().map(FollowerState::isCurrentLeaderValid).orElse(false);
}
}
@@ -1026,10 +1026,80 @@ class RaftServerImpl implements RaftServer.Division,
public RequestVoteReplyProto requestVote(RequestVoteRequestProto r)
throws IOException {
final RaftRpcRequestProto request = r.getServerRequest();
- return requestVote(RaftPeerId.valueOf(request.getRequestorId()),
- ProtoUtils.toRaftGroupId(request.getRaftGroupId()),
- r.getCandidateTerm(),
- TermIndex.valueOf(r.getCandidateLastEntry()));
+ if (r.getPreVote()) {
+ return requestPreVote(RaftPeerId.valueOf(request.getRequestorId()),
+ ProtoUtils.toRaftGroupId(request.getRaftGroupId()),
+ r.getCandidateTerm(),
+ TermIndex.valueOf(r.getCandidateLastEntry()));
+ } else {
+ return requestVote(RaftPeerId.valueOf(request.getRequestorId()),
+ ProtoUtils.toRaftGroupId(request.getRaftGroupId()),
+ r.getCandidateTerm(),
+ TermIndex.valueOf(r.getCandidateLastEntry()));
+ }
+ }
+
+ private boolean isCurrentLeaderValid() {
+ if (getInfo().isLeader()) {
+ return role.getLeaderState().map(LeaderStateImpl::checkLeadership).orElse(false);
+ } else if (getInfo().isFollower()) {
+ return state.hasLeader()
+ && role.getFollowerState().map(FollowerState::isCurrentLeaderValid).orElse(false);
+ } else if (getInfo().isCandidate()) {
+ return false;
+ }
+
+ return false;
+ }
+
+ private RequestVoteReplyProto requestPreVote(
+ RaftPeerId candidateId, RaftGroupId candidateGroupId,
+ long candidateTerm, TermIndex candidateLastEntry) throws IOException {
+ CodeInjectionForTesting.execute(REQUEST_VOTE, getId(),
+ candidateId, candidateTerm, candidateLastEntry);
+ LOG.info("{}: receive preVote({}, {}, {}, {})",
+ getMemberId(), candidateId, candidateGroupId, candidateTerm, candidateLastEntry);
+ assertLifeCycleState(LifeCycle.States.RUNNING);
+ assertGroup(candidateId, candidateGroupId);
+
+ boolean preVoteGranted = false;
+ boolean shouldShutdown = false;
+ final RequestVoteReplyProto reply;
+ synchronized (this) {
+ boolean isCurrentLeaderValid = isCurrentLeaderValid();
+ RaftPeer candidate = getRaftConf().getPeer(candidateId);
+
+ // vote for candidate if:
+ // 1. current leader is not valid
+ // 2. log lags behind candidate or
+ // log equals candidate's, and priority less or equal candidate's
+ if (!isCurrentLeaderValid && candidate != null) {
+ int compare = ServerState.compareLog(state.getLastEntry(), candidateLastEntry);
+ int priority = getRaftConf().getPeer(getId()).getPriority();
+ if (compare < 0 || (compare == 0 && priority <= candidate.getPriority())) {
+ preVoteGranted = true;
+ LOG.info("{} role:{} allow pre-vote from:{}", getMemberId(), getRole(), candidateId);
+ } else {
+ LOG.info("{} role:{} reject pre-vote from:{} because compare:{} isCurrentLeaderValid:{}",
+ getMemberId(), getRole(), candidateId, compare, isCurrentLeaderValid);
+ }
+ }
+
+ if (preVoteGranted) {
+ final FollowerState fs = role.getFollowerState().orElse(null);
+ if (fs != null) {
+ fs.updateLastRpcTime(FollowerState.UpdateType.REQUEST_VOTE);
+ }
+ } else if(shouldSendShutdown(candidateId, candidateLastEntry)) {
+ shouldShutdown = true;
+ }
+
+ reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, getMemberId(),
+ preVoteGranted, state.getCurrentTerm(), shouldShutdown);
+ LOG.info("{} replies to pre-vote request: {}. Peer's state: {}",
+ getMemberId(), ServerStringUtils.toRequestVoteReplyString(reply), state);
+ }
+ return reply;
}
private RequestVoteReplyProto requestVote(
@@ -1436,7 +1506,7 @@ class RaftServerImpl implements RaftServer.Division,
return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), false);
}
- changeToCandidate();
+ changeToCandidate(true);
return ServerProtoUtils.toStartLeaderElectionReplyProto(leaderId, getMemberId(), true);
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
index f909a9c..617b617 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
@@ -111,8 +111,8 @@ class RoleInfo {
}
}
- void startLeaderElection(RaftServerImpl server) {
- updateAndGet(leaderElection, new LeaderElection(server)).start();
+ void startLeaderElection(RaftServerImpl server, boolean force) {
+ updateAndGet(leaderElection, new LeaderElection(server, force)).start();
}
void shutdownLeaderElection() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index ca672f3..c4e7f90 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -50,10 +50,11 @@ final class ServerProtoUtils {
}
static RequestVoteRequestProto toRequestVoteRequestProto(
- RaftGroupMemberId requestorId, RaftPeerId replyId, long term, TermIndex lastEntry) {
+ RaftGroupMemberId requestorId, RaftPeerId replyId, long term, TermIndex lastEntry, boolean preVote) {
final RequestVoteRequestProto.Builder b = RequestVoteRequestProto.newBuilder()
.setServerRequest(ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId, replyId))
- .setCandidateTerm(term);
+ .setCandidateTerm(term)
+ .setPreVote(preVote);
Optional.ofNullable(lastEntry).map(TermIndex::toProto).ifPresent(b::setCandidateLastEntry);
return b.build();
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index 2605b29..d838ac8 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -60,6 +60,7 @@ import static org.apache.ratis.server.metrics.LeaderElectionMetrics.LEADER_ELECT
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.junit.Assert.assertNotNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -339,7 +340,7 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
@Test
public void testImmediatelyRevertedToFollower() {
RaftServerImpl server = createMockServer(true);
- LeaderElection subject = new LeaderElection(server);
+ LeaderElection subject = new LeaderElection(server, false);
try {
subject.startInForeground();
@@ -353,7 +354,7 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
@Test
public void testShutdownBeforeStart() {
RaftServerImpl server = createMockServer(false);
- LeaderElection subject = new LeaderElection(server);
+ LeaderElection subject = new LeaderElection(server, false);
try {
subject.shutdown();
@@ -365,6 +366,47 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
}
}
+ @Test
+ public void testPreVote() {
+ try(final MiniRaftCluster cluster = newCluster(3)) {
+ cluster.start();
+
+ RaftServer.Division leader = waitForLeader(cluster);
+ final long savedTerm = leader.getInfo().getCurrentTerm();
+
+ try (RaftClient client = cluster.createClient(leader.getId())) {
+ client.io().send(new RaftTestUtil.SimpleMessage("message"));
+
+ final List<RaftServer.Division> followers = cluster.getFollowers();
+ assertEquals(followers.size(), 2);
+
+ RaftServer.Division follower = followers.get(0);
+ isolate(cluster, follower.getId());
+ // send message so that the isolated follower's log lag the others
+ RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
+ Assert.assertTrue(reply.isSuccess());
+
+ // wait follower timeout and trigger pre-vote
+ Thread.sleep(2000);
+ deIsolate(cluster, follower.getId());
+ Thread.sleep(2000);
+ // with pre-vote leader will not step down
+ RaftServer.Division newleader = waitForLeader(cluster);
+ assertNotNull(newleader);
+ assertEquals(newleader.getId(), leader.getId());
+ // with pre-vote, term will not change
+ assertEquals(leader.getInfo().getCurrentTerm(), savedTerm);
+
+ reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
+ Assert.assertTrue(reply.isSuccess());
+ }
+
+ cluster.shutdown();
+ } catch (Exception e) {
+ fail(e.getMessage());
+ }
+ }
+
private static RaftServerImpl createMockServer(boolean alive) {
final DivisionInfo info = mock(DivisionInfo.class);
when(info.isAlive()).thenReturn(alive);
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java
index 9e4299d..0874e74 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRequestReply.java
@@ -134,7 +134,6 @@ class SimulatedRequestReply<REQUEST extends RaftRpcMessage, REPLY extends RaftRp
REQUEST request;
try {
// delay request for testing
- RaftTestUtil.delay(q.delayTakeRequestTo::get);
while (true) {
request = q.takeRequest();
Preconditions.assertTrue(qid.equals(request.getReplierId()));
@@ -149,6 +148,7 @@ class SimulatedRequestReply<REQUEST extends RaftRpcMessage, REPLY extends RaftRp
}
break;
}
+ RaftTestUtil.delay(q.delayTakeRequestTo::get);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw IOUtils.toInterruptedIOException("", e);