You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2021/01/07 12:00:26 UTC
[incubator-ratis] branch master updated: RATIS-1282. Refactor the
code for preVote and vote in LeaderElection. (#391)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 402579e RATIS-1282. Refactor the code for preVote and vote in LeaderElection. (#391)
402579e is described below
commit 402579eb0abe17f829cb7f529d80a6e95cd4091e
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Jan 7 20:00:15 2021 +0800
RATIS-1282. Refactor the code for preVote and vote in LeaderElection. (#391)
---
.../apache/ratis/server/impl/LeaderElection.java | 235 ++++++++++-----------
.../org/apache/ratis/server/impl/ServerState.java | 20 +-
.../ratis/server/impl/LeaderElectionTests.java | 6 +-
3 files changed, 132 insertions(+), 129 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index e2bc46d..81d275e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -51,7 +51,6 @@ import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.HashSet;
import java.util.Set;
-import java.util.stream.Collectors;
import static org.apache.ratis.util.LifeCycle.State.NEW;
import static org.apache.ratis.util.LifeCycle.State.RUNNING;
@@ -59,37 +58,74 @@ import static org.apache.ratis.util.LifeCycle.State.STARTING;
import com.codahale.metrics.Timer;
+/**
+ * For a candidate to start an election for becoming the leader.
+ * There are two phases: Pre-Vote and Election.
+ *
+ * In Pre-Vote, the candidate does not change its term and try to learn
+ * if a majority of the cluster would be willing to grant the candidate their votes
+ * (if the candidate’s log is sufficiently up-to-date,
+ * and the voters have not received heartbeats from a valid leader
+ * for at least a baseline election timeout).
+ *
+ * Once the Pre-Vote has passed, the candidate increments its term and starts a real Election.
+ *
+ * See
+ * Ongaro, D. Consensus: Bridging Theory and Practice. PhD thesis, Stanford University, 2014.
+ * Available at https://github.com/ongardie/dissertation
+ */
class LeaderElection implements Runnable {
public static final Logger LOG = LoggerFactory.getLogger(LeaderElection.class);
- private ResultAndTerm logAndReturn(Result result,
- Map<RaftPeerId, RequestVoteReplyProto> responses,
- List<Exception> exceptions, long newTerm, boolean preVote) {
- LOG.info("{}: {} {} received {} response(s):{}",
- this,
- preVote ? "Pre-vote " : "Election ",
- result,
- responses.size(),
- responses.values().stream().map(ServerStringUtils::toRequestVoteReplyString).collect(Collectors.toList()));
+ private ResultAndTerm logAndReturn(Phase phase, Result result, Map<RaftPeerId, RequestVoteReplyProto> responses,
+ List<Exception> exceptions) {
+ return logAndReturn(phase, result, responses, exceptions, null);
+ }
+ private ResultAndTerm logAndReturn(Phase phase, Result result, Map<RaftPeerId, RequestVoteReplyProto> responses,
+ List<Exception> exceptions, Long newTerm) {
+ final ResultAndTerm resultAndTerm = new ResultAndTerm(result, newTerm);
+ LOG.info("{}: {} {} received {} response(s) and {} exception(s):",
+ this, phase, resultAndTerm, responses.size(), exceptions.size());
int i = 0;
+ for(RequestVoteReplyProto reply : responses.values()) {
+ LOG.info(" Response {}: {}", i++, ServerStringUtils.toRequestVoteReplyString(reply));
+ }
for(Exception e : exceptions) {
final int j = i++;
LogUtils.infoOrTrace(LOG, () -> " Exception " + j, e);
}
- return new ResultAndTerm(result, newTerm);
+ return resultAndTerm;
+ }
+
+ enum Phase {
+ PRE_VOTE,
+ ELECTION
}
enum Result {PASSED, REJECTED, TIMEOUT, DISCOVERED_A_NEW_TERM, SHUTDOWN}
private static class ResultAndTerm {
private final Result result;
- private final long term;
+ private final Long term;
- ResultAndTerm(Result result, long term) {
+ ResultAndTerm(Result result, Long term) {
this.result = result;
this.term = term;
}
+
+ long maxTerm(long thatTerm) {
+ return this.term != null && this.term > thatTerm ? this.term: thatTerm;
+ }
+
+ Result getResult() {
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return result + (term != null? " (term=" + term + ")": "");
+ }
}
static class Executor {
@@ -117,6 +153,29 @@ class LeaderElection implements Runnable {
}
}
+ static class ConfAndTerm {
+ private final RaftConfigurationImpl conf;
+ private final long term;
+
+ ConfAndTerm(RaftConfigurationImpl conf, long term) {
+ this.conf = conf;
+ this.term = term;
+ }
+
+ long getTerm() {
+ return term;
+ }
+
+ RaftConfigurationImpl getConf() {
+ return conf;
+ }
+
+ @Override
+ public String toString() {
+ return "term=" + term + ", " + conf;
+ }
+ }
+
private static final AtomicInteger COUNT = new AtomicInteger();
private final String name;
@@ -124,14 +183,14 @@ class LeaderElection implements Runnable {
private final Daemon daemon;
private final RaftServerImpl server;
- private final boolean force;
+ private final boolean skipPreVote;
- LeaderElection(RaftServerImpl server, boolean force) {
+ LeaderElection(RaftServerImpl server, boolean skipPreVote) {
this.name = server.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass()) + COUNT.incrementAndGet();
this.lifeCycle = new LifeCycle(this);
this.daemon = new Daemon(this);
this.server = server;
- this.force = force;
+ this.skipPreVote = skipPreVote;
}
void start() {
@@ -169,21 +228,12 @@ class LeaderElection implements Runnable {
return;
}
- Timer.Context electionContext =
- server.getLeaderElectionMetrics().getLeaderElectionTimer().time();
+ final Timer.Context electionContext = server.getLeaderElectionMetrics().getLeaderElectionTimer().time();
try {
- /**
- * See the thesis section 9.6: In the Pre-Vote algorithm, a candidate
- * only increments its term and start a real election if it first learns
- * from a majority of the cluster that they would be willing to grant
- * the candidate their votes (if the candidate’s log is sufficiently
- * up-to-date, and the voters have not received heartbeats from a valid
- * leader for at least a baseline election timeout).
- */
- boolean preVotePass = force ? true : askForPreVotes();
-
- if (preVotePass) {
- askForVotes();
+ if (skipPreVote || askForVotes(Phase.PRE_VOTE)) {
+ if (askForVotes(Phase.ELECTION)) {
+ server.changeToLeader();
+ }
}
} catch(Exception e) {
final LifeCycle.State state = lifeCycle.getCurrentState();
@@ -216,19 +266,18 @@ class LeaderElection implements Runnable {
return shouldRun() && server.getState().getCurrentTerm() == electionTerm;
}
- private ResultAndTerm submitRequestAndWaitResult(
- final ServerState state, final RaftConfigurationImpl conf, final long electionTerm, boolean preVote)
+ private ResultAndTerm submitRequestAndWaitResult(Phase phase, RaftConfigurationImpl conf, long electionTerm)
throws InterruptedException {
final ResultAndTerm r;
final Collection<RaftPeer> others = conf.getOtherPeers(server.getId());
if (others.isEmpty()) {
r = new ResultAndTerm(Result.PASSED, electionTerm);
} else {
- TermIndex lastEntry = state.getLastEntry();
+ final TermIndex lastEntry = server.getState().getLastEntry();
final Executor voteExecutor = new Executor(this, others.size());
try {
- final int submitted = submitRequests(electionTerm, lastEntry, others, voteExecutor, preVote);
- r = waitForResults(electionTerm, submitted, conf, voteExecutor, preVote);
+ final int submitted = submitRequests(phase, electionTerm, lastEntry, others, voteExecutor);
+ r = waitForResults(phase, electionTerm, submitted, conf, voteExecutor);
} finally {
voteExecutor.shutdown();
}
@@ -237,110 +286,55 @@ class LeaderElection implements Runnable {
return r;
}
- /**
- * After a peer changes its role to candidate, it invokes this method to
- * send out requestVote rpc to all other peers.
- */
- private boolean askForPreVotes() throws InterruptedException, IOException {
- final ServerState state = server.getState();
- if (shouldRun()) {
- // one round of request pre-votes
+ /** Send requestVote rpc to all other peers for the given phase. */
+ private boolean askForVotes(Phase phase) throws InterruptedException, IOException {
+ for(int round = 0; shouldRun(); round++) {
final long electionTerm;
final RaftConfigurationImpl conf;
synchronized (server) {
if (!shouldRun()) {
return false;
}
- state.setLeader(null, "initPreVote");
- conf = state.getRaftConf();
- electionTerm = state.getCurrentTerm();
+ final ConfAndTerm confAndTerm = server.getState().initElection(phase);
+ electionTerm = confAndTerm.getTerm();
+ conf = confAndTerm.getConf();
}
- LOG.info("{}: begin a pre-vote at term {} for {}", this, electionTerm, conf);
- final ResultAndTerm r = submitRequestAndWaitResult(state, conf, electionTerm, true);
- LOG.info("{} pre-vote result is {}.", this, r.result);
+ LOG.info("{} {} round {}: submit vote requests at term {} for {}", this, phase, round, electionTerm, conf);
+ final ResultAndTerm r = submitRequestAndWaitResult(phase, conf, electionTerm);
+ LOG.info("{} {} round {}: result {}", this, phase, round, r);
synchronized (server) {
if (!shouldRun(electionTerm)) {
return false; // term already passed or this should not run anymore.
}
- switch (r.result) {
+ switch (r.getResult()) {
case PASSED:
return true;
- case REJECTED:
- case TIMEOUT:
- server.changeToFollowerAndPersistMetadata(state.getCurrentTerm(), r.result);
- return false;
case SHUTDOWN:
- LOG.info("{} received shutdown response when requesting pre-vote.", this);
server.getRaftServer().close();
return false;
+ case TIMEOUT:
+ continue; // should retry
+ case REJECTED:
case DISCOVERED_A_NEW_TERM:
- LOG.error("{} should not happen {} when requesting pre-vote.", this, r.result);
+ final long term = r.maxTerm(server.getState().getCurrentTerm());
+ server.changeToFollowerAndPersistMetadata(term, r);
return false;
default: throw new IllegalArgumentException("Unable to process result " + r.result);
}
}
}
-
return false;
}
- /**
- * After a peer changes its role to candidate and pass pre-vote, it invokes this method to
- * send out requestVote rpc to all other peers.
- */
- private void askForVotes() throws InterruptedException, IOException {
- final ServerState state = server.getState();
- while (shouldRun()) {
- // one round of requestVotes
- final long electionTerm;
- final RaftConfigurationImpl conf;
- synchronized (server) {
- if (!shouldRun()) {
- break;
- }
- electionTerm = state.initElection();
- conf = state.getRaftConf();
- state.persistMetadata();
- }
- LOG.info("{}: begin an election at term {} for {}", this, electionTerm, conf);
-
- final ResultAndTerm r = submitRequestAndWaitResult(state, conf, electionTerm, false);
-
- synchronized (server) {
- if (!shouldRun(electionTerm)) {
- return; // term already passed or this should not run anymore.
- }
-
- switch (r.result) {
- case PASSED:
- server.changeToLeader();
- return;
- case SHUTDOWN:
- LOG.info("{} received shutdown response when requesting votes.", this);
- server.getRaftServer().close();
- return;
- case REJECTED:
- case DISCOVERED_A_NEW_TERM:
- final long term = Math.max(r.term, state.getCurrentTerm());
- server.changeToFollowerAndPersistMetadata(term, Result.DISCOVERED_A_NEW_TERM);
- return;
- case TIMEOUT: // should start another election
- continue;
- default: throw new IllegalArgumentException("Unable to process result " + r.result);
- }
- }
- }
- }
-
- private int submitRequests(final long electionTerm, final TermIndex lastEntry,
- Collection<RaftPeer> others, Executor voteExecutor, boolean preVote) {
+ private int submitRequests(Phase phase, long electionTerm, TermIndex lastEntry,
+ Collection<RaftPeer> others, Executor voteExecutor) {
int submitted = 0;
for (final RaftPeer peer : others) {
final RequestVoteRequestProto r = ServerProtoUtils.toRequestVoteRequestProto(
- server.getMemberId(), peer.getId(), electionTerm, lastEntry, preVote);
+ server.getMemberId(), peer.getId(), electionTerm, lastEntry, phase == Phase.PRE_VOTE);
voteExecutor.submit(() -> server.getServerRpc().requestVote(r));
submitted++;
}
@@ -362,8 +356,8 @@ class LeaderElection implements Runnable {
return higherPriorityPeers;
}
- private ResultAndTerm waitForResults(final long electionTerm, final int submitted,
- RaftConfigurationImpl conf, Executor voteExecutor, boolean preVote) throws InterruptedException {
+ private ResultAndTerm waitForResults(Phase phase, long electionTerm, int submitted,
+ RaftConfigurationImpl conf, Executor voteExecutor) throws InterruptedException {
final Timestamp timeout = Timestamp.currentTime().addTime(server.getRandomElectionTimeout());
final Map<RaftPeerId, RequestVoteReplyProto> responses = new HashMap<>();
final List<Exception> exceptions = new ArrayList<>();
@@ -378,9 +372,9 @@ class LeaderElection implements Runnable {
if (conf.hasMajority(votedPeers, server.getId())) {
// if some higher priority peer did not response when timeout, but candidate get majority,
// candidate pass vote
- return logAndReturn(Result.PASSED, responses, exceptions, -1, preVote);
+ return logAndReturn(phase, Result.PASSED, responses, exceptions);
} else {
- return logAndReturn(Result.TIMEOUT, responses, exceptions, -1, preVote);
+ return logAndReturn(phase, Result.TIMEOUT, responses, exceptions);
}
}
@@ -403,16 +397,15 @@ class LeaderElection implements Runnable {
continue;
}
if (r.getShouldShutdown()) {
- return logAndReturn(Result.SHUTDOWN, responses, exceptions, -1, preVote);
+ return logAndReturn(phase, Result.SHUTDOWN, responses, exceptions);
}
- if (!preVote && r.getTerm() > electionTerm) {
- return logAndReturn(Result.DISCOVERED_A_NEW_TERM, responses,
- exceptions, r.getTerm(), false);
+ if (r.getTerm() > electionTerm) {
+ return logAndReturn(phase, Result.DISCOVERED_A_NEW_TERM, responses, exceptions, r.getTerm());
}
// If any peer with higher priority rejects vote, candidate can not pass vote
if (!r.getServerReply().getSuccess() && higherPriorityPeers.contains(replierId)) {
- return logAndReturn(Result.REJECTED, responses, exceptions, -1, preVote);
+ return logAndReturn(phase, Result.REJECTED, responses, exceptions);
}
// remove higher priority peer, so that we check higherPriorityPeers empty to make sure
@@ -423,12 +416,12 @@ class LeaderElection implements Runnable {
votedPeers.add(replierId);
// If majority and all peers with higher priority have voted, candidate pass vote
if (higherPriorityPeers.size() == 0 && conf.hasMajority(votedPeers, server.getId())) {
- return logAndReturn(Result.PASSED, responses, exceptions, -1, preVote);
+ return logAndReturn(phase, Result.PASSED, responses, exceptions);
}
} else {
rejectedPeers.add(replierId);
if (conf.majorityRejectVotes(rejectedPeers)) {
- return logAndReturn(Result.REJECTED, responses, exceptions, -1, preVote);
+ return logAndReturn(phase, Result.REJECTED, responses, exceptions);
}
}
} catch(ExecutionException e) {
@@ -439,9 +432,9 @@ class LeaderElection implements Runnable {
}
// received all the responses
if (conf.hasMajority(votedPeers, server.getId())) {
- return logAndReturn(Result.PASSED, responses, exceptions, -1, preVote);
+ return logAndReturn(phase, Result.PASSED, responses, exceptions);
} else {
- return logAndReturn(Result.REJECTED, responses, exceptions, -1, preVote);
+ return logAndReturn(phase, Result.REJECTED, responses, exceptions);
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 908f0e7..44a4d51 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -22,6 +22,7 @@ import org.apache.ratis.protocol.*;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.LeaderElection.Phase;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.LogProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
@@ -215,10 +216,19 @@ class ServerState implements Closeable {
/**
* Become a candidate and start leader election
*/
- long initElection() {
- votedFor = getMemberId().getPeerId();
- setLeader(null, "initElection");
- return currentTerm.incrementAndGet();
+ LeaderElection.ConfAndTerm initElection(Phase phase) throws IOException {
+ setLeader(null, phase);
+ final long term;
+ if (phase == Phase.PRE_VOTE) {
+ term = getCurrentTerm();
+ } else if (phase == Phase.ELECTION) {
+ term = currentTerm.incrementAndGet();
+ votedFor = getMemberId().getPeerId();
+ persistMetadata();
+ } else {
+ throw new IllegalArgumentException("Unexpected phase " + phase);
+ }
+ return new LeaderElection.ConfAndTerm(getRaftConf(), term);
}
void persistMetadata() throws IOException {
@@ -233,7 +243,7 @@ class ServerState implements Closeable {
setLeader(null, "grantVote");
}
- void setLeader(RaftPeerId newLeaderId, String op) {
+ void setLeader(RaftPeerId newLeaderId, Object op) {
if (!Objects.equals(leaderId, newLeaderId)) {
String suffix;
if (newLeaderId == null) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index d838ac8..a6753d2 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -372,7 +372,6 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
cluster.start();
RaftServer.Division leader = waitForLeader(cluster);
- final long savedTerm = leader.getInfo().getCurrentTerm();
try (RaftClient client = cluster.createClient(leader.getId())) {
client.io().send(new RaftTestUtil.SimpleMessage("message"));
@@ -386,7 +385,8 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
Assert.assertTrue(reply.isSuccess());
- // wait follower timeout and trigger pre-vote
+ final long savedTerm = leader.getInfo().getCurrentTerm();
+ LOG.info("Wait follower {} timeout and trigger pre-vote", follower.getId());
Thread.sleep(2000);
deIsolate(cluster, follower.getId());
Thread.sleep(2000);
@@ -395,7 +395,7 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
assertNotNull(newleader);
assertEquals(newleader.getId(), leader.getId());
// with pre-vote, term will not change
- assertEquals(leader.getInfo().getCurrentTerm(), savedTerm);
+ assertEquals(savedTerm, leader.getInfo().getCurrentTerm());
reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
Assert.assertTrue(reply.isSuccess());