You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2021/01/07 12:00:26 UTC

[incubator-ratis] branch master updated: RATIS-1282. Refactor the code for preVote and vote in LeaderElection. (#391)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 402579e  RATIS-1282. Refactor the code for preVote and vote in LeaderElection. (#391)
402579e is described below

commit 402579eb0abe17f829cb7f529d80a6e95cd4091e
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Jan 7 20:00:15 2021 +0800

    RATIS-1282. Refactor the code for preVote and vote in LeaderElection. (#391)
---
 .../apache/ratis/server/impl/LeaderElection.java   | 235 ++++++++++-----------
 .../org/apache/ratis/server/impl/ServerState.java  |  20 +-
 .../ratis/server/impl/LeaderElectionTests.java     |   6 +-
 3 files changed, 132 insertions(+), 129 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index e2bc46d..81d275e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -51,7 +51,6 @@ import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.HashSet;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import static org.apache.ratis.util.LifeCycle.State.NEW;
 import static org.apache.ratis.util.LifeCycle.State.RUNNING;
@@ -59,37 +58,74 @@ import static org.apache.ratis.util.LifeCycle.State.STARTING;
 
 import com.codahale.metrics.Timer;
 
+/**
+ * For a candidate to start an election for becoming the leader.
+ * There are two phases: Pre-Vote and Election.
+ *
+ * In Pre-Vote, the candidate does not change its term and try to learn
+ * if a majority of the cluster would be willing to grant the candidate their votes
+ * (if the candidate’s log is sufficiently up-to-date,
+ * and the voters have not received heartbeats from a valid leader
+ * for at least a baseline election timeout).
+ *
+ * Once the Pre-Vote has passed, the candidate increments its term and starts a real Election.
+ *
+ * See
+ * Ongaro, D. Consensus: Bridging Theory and Practice. PhD thesis, Stanford University, 2014.
+ * Available at https://github.com/ongardie/dissertation
+ */
 class LeaderElection implements Runnable {
   public static final Logger LOG = LoggerFactory.getLogger(LeaderElection.class);
 
-  private ResultAndTerm logAndReturn(Result result,
-      Map<RaftPeerId, RequestVoteReplyProto> responses,
-      List<Exception> exceptions, long newTerm, boolean preVote) {
-    LOG.info("{}: {} {} received {} response(s):{}",
-        this,
-        preVote ? "Pre-vote " : "Election ",
-        result,
-        responses.size(),
-        responses.values().stream().map(ServerStringUtils::toRequestVoteReplyString).collect(Collectors.toList()));
+  private ResultAndTerm logAndReturn(Phase phase, Result result, Map<RaftPeerId, RequestVoteReplyProto> responses,
+      List<Exception> exceptions) {
+    return logAndReturn(phase, result, responses, exceptions, null);
+  }
 
+  private ResultAndTerm logAndReturn(Phase phase, Result result, Map<RaftPeerId, RequestVoteReplyProto> responses,
+      List<Exception> exceptions, Long newTerm) {
+    final ResultAndTerm resultAndTerm = new ResultAndTerm(result, newTerm);
+    LOG.info("{}: {} {} received {} response(s) and {} exception(s):",
+        this, phase, resultAndTerm, responses.size(), exceptions.size());
     int i = 0;
+    for(RequestVoteReplyProto reply : responses.values()) {
+      LOG.info("  Response {}: {}", i++, ServerStringUtils.toRequestVoteReplyString(reply));
+    }
     for(Exception e : exceptions) {
       final int j = i++;
       LogUtils.infoOrTrace(LOG, () -> "  Exception " + j, e);
     }
-    return new ResultAndTerm(result, newTerm);
+    return resultAndTerm;
+  }
+
+  enum Phase {
+    PRE_VOTE,
+    ELECTION
   }
 
   enum Result {PASSED, REJECTED, TIMEOUT, DISCOVERED_A_NEW_TERM, SHUTDOWN}
 
   private static class ResultAndTerm {
     private final Result result;
-    private final long term;
+    private final Long term;
 
-    ResultAndTerm(Result result, long term) {
+    ResultAndTerm(Result result, Long term) {
       this.result = result;
       this.term = term;
     }
+
+    long maxTerm(long thatTerm) {
+      return this.term != null && this.term > thatTerm ? this.term: thatTerm;
+    }
+
+    Result getResult() {
+      return result;
+    }
+
+    @Override
+    public String toString() {
+      return result + (term != null? " (term=" + term + ")": "");
+    }
   }
 
   static class Executor {
@@ -117,6 +153,29 @@ class LeaderElection implements Runnable {
     }
   }
 
+  static class ConfAndTerm {
+    private final RaftConfigurationImpl conf;
+    private final long term;
+
+    ConfAndTerm(RaftConfigurationImpl conf, long term) {
+      this.conf = conf;
+      this.term = term;
+    }
+
+    long getTerm() {
+      return term;
+    }
+
+    RaftConfigurationImpl getConf() {
+      return conf;
+    }
+
+    @Override
+    public String toString() {
+      return "term=" + term + ", " + conf;
+    }
+  }
+
   private static final AtomicInteger COUNT = new AtomicInteger();
 
   private final String name;
@@ -124,14 +183,14 @@ class LeaderElection implements Runnable {
   private final Daemon daemon;
 
   private final RaftServerImpl server;
-  private final boolean force;
+  private final boolean skipPreVote;
 
-  LeaderElection(RaftServerImpl server, boolean force) {
+  LeaderElection(RaftServerImpl server, boolean skipPreVote) {
     this.name = server.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass()) + COUNT.incrementAndGet();
     this.lifeCycle = new LifeCycle(this);
     this.daemon = new Daemon(this);
     this.server = server;
-    this.force = force;
+    this.skipPreVote = skipPreVote;
   }
 
   void start() {
@@ -169,21 +228,12 @@ class LeaderElection implements Runnable {
       return;
     }
 
-    Timer.Context electionContext =
-        server.getLeaderElectionMetrics().getLeaderElectionTimer().time();
+    final Timer.Context electionContext = server.getLeaderElectionMetrics().getLeaderElectionTimer().time();
     try {
-      /**
-       * See the thesis section 9.6: In the Pre-Vote algorithm, a candidate
-       * only increments its term and start a real election if it first learns
-       * from a majority of the cluster that they would be willing to grant
-       * the candidate their votes (if the candidate’s log is sufficiently
-       * up-to-date, and the voters have not received heartbeats from a valid
-       * leader for at least a baseline election timeout).
-       */
-      boolean preVotePass = force ? true : askForPreVotes();
-
-      if (preVotePass) {
-        askForVotes();
+      if (skipPreVote || askForVotes(Phase.PRE_VOTE)) {
+        if (askForVotes(Phase.ELECTION)) {
+          server.changeToLeader();
+        }
       }
     } catch(Exception e) {
       final LifeCycle.State state = lifeCycle.getCurrentState();
@@ -216,19 +266,18 @@ class LeaderElection implements Runnable {
     return shouldRun() && server.getState().getCurrentTerm() == electionTerm;
   }
 
-  private ResultAndTerm submitRequestAndWaitResult(
-      final ServerState state, final RaftConfigurationImpl conf, final long electionTerm, boolean preVote)
+  private ResultAndTerm submitRequestAndWaitResult(Phase phase, RaftConfigurationImpl conf, long electionTerm)
       throws InterruptedException {
     final ResultAndTerm r;
     final Collection<RaftPeer> others = conf.getOtherPeers(server.getId());
     if (others.isEmpty()) {
       r = new ResultAndTerm(Result.PASSED, electionTerm);
     } else {
-      TermIndex lastEntry = state.getLastEntry();
+      final TermIndex lastEntry = server.getState().getLastEntry();
       final Executor voteExecutor = new Executor(this, others.size());
       try {
-        final int submitted = submitRequests(electionTerm, lastEntry, others, voteExecutor, preVote);
-        r = waitForResults(electionTerm, submitted, conf, voteExecutor, preVote);
+        final int submitted = submitRequests(phase, electionTerm, lastEntry, others, voteExecutor);
+        r = waitForResults(phase, electionTerm, submitted, conf, voteExecutor);
       } finally {
         voteExecutor.shutdown();
       }
@@ -237,110 +286,55 @@ class LeaderElection implements Runnable {
     return r;
   }
 
-  /**
-   * After a peer changes its role to candidate, it invokes this method to
-   * send out requestVote rpc to all other peers.
-   */
-  private boolean askForPreVotes() throws InterruptedException, IOException {
-    final ServerState state = server.getState();
-    if (shouldRun()) {
-      // one round of request pre-votes
+  /** Send requestVote rpc to all other peers for the given phase. */
+  private boolean askForVotes(Phase phase) throws InterruptedException, IOException {
+    for(int round = 0; shouldRun(); round++) {
       final long electionTerm;
       final RaftConfigurationImpl conf;
       synchronized (server) {
         if (!shouldRun()) {
           return false;
         }
-        state.setLeader(null, "initPreVote");
-        conf = state.getRaftConf();
-        electionTerm = state.getCurrentTerm();
+        final ConfAndTerm confAndTerm = server.getState().initElection(phase);
+        electionTerm = confAndTerm.getTerm();
+        conf = confAndTerm.getConf();
       }
 
-      LOG.info("{}: begin a pre-vote at term {} for {}", this, electionTerm, conf);
-      final ResultAndTerm r = submitRequestAndWaitResult(state, conf, electionTerm, true);
-      LOG.info("{} pre-vote result is {}.", this, r.result);
+      LOG.info("{} {} round {}: submit vote requests at term {} for {}", this, phase, round, electionTerm, conf);
+      final ResultAndTerm r = submitRequestAndWaitResult(phase, conf, electionTerm);
+      LOG.info("{} {} round {}: result {}", this, phase, round, r);
 
       synchronized (server) {
         if (!shouldRun(electionTerm)) {
           return false; // term already passed or this should not run anymore.
         }
 
-        switch (r.result) {
+        switch (r.getResult()) {
           case PASSED:
             return true;
-          case REJECTED:
-          case TIMEOUT:
-            server.changeToFollowerAndPersistMetadata(state.getCurrentTerm(), r.result);
-            return false;
           case SHUTDOWN:
-            LOG.info("{} received shutdown response when requesting pre-vote.", this);
             server.getRaftServer().close();
             return false;
+          case TIMEOUT:
+            continue; // should retry
+          case REJECTED:
           case DISCOVERED_A_NEW_TERM:
-            LOG.error("{} should not happen {} when requesting pre-vote.", this, r.result);
+            final long term = r.maxTerm(server.getState().getCurrentTerm());
+            server.changeToFollowerAndPersistMetadata(term, r);
             return false;
           default: throw new IllegalArgumentException("Unable to process result " + r.result);
         }
       }
     }
-
     return false;
   }
 
-  /**
-   * After a peer changes its role to candidate and pass pre-vote, it invokes this method to
-   * send out requestVote rpc to all other peers.
-   */
-  private void askForVotes() throws InterruptedException, IOException {
-    final ServerState state = server.getState();
-    while (shouldRun()) {
-      // one round of requestVotes
-      final long electionTerm;
-      final RaftConfigurationImpl conf;
-      synchronized (server) {
-        if (!shouldRun()) {
-          break;
-        }
-        electionTerm = state.initElection();
-        conf = state.getRaftConf();
-        state.persistMetadata();
-      }
-      LOG.info("{}: begin an election at term {} for {}", this, electionTerm, conf);
-
-      final ResultAndTerm r = submitRequestAndWaitResult(state, conf, electionTerm, false);
-
-      synchronized (server) {
-        if (!shouldRun(electionTerm)) {
-          return; // term already passed or this should not run anymore.
-        }
-
-        switch (r.result) {
-          case PASSED:
-            server.changeToLeader();
-            return;
-          case SHUTDOWN:
-            LOG.info("{} received shutdown response when requesting votes.", this);
-            server.getRaftServer().close();
-            return;
-          case REJECTED:
-          case DISCOVERED_A_NEW_TERM:
-            final long term = Math.max(r.term, state.getCurrentTerm());
-            server.changeToFollowerAndPersistMetadata(term, Result.DISCOVERED_A_NEW_TERM);
-            return;
-          case TIMEOUT: // should start another election
-            continue;
-          default: throw new IllegalArgumentException("Unable to process result " + r.result);
-        }
-      }
-    }
-  }
-
-  private int submitRequests(final long electionTerm, final TermIndex lastEntry,
-      Collection<RaftPeer> others, Executor voteExecutor, boolean preVote) {
+  private int submitRequests(Phase phase, long electionTerm, TermIndex lastEntry,
+      Collection<RaftPeer> others, Executor voteExecutor) {
     int submitted = 0;
     for (final RaftPeer peer : others) {
       final RequestVoteRequestProto r = ServerProtoUtils.toRequestVoteRequestProto(
-          server.getMemberId(), peer.getId(), electionTerm, lastEntry, preVote);
+          server.getMemberId(), peer.getId(), electionTerm, lastEntry, phase == Phase.PRE_VOTE);
       voteExecutor.submit(() -> server.getServerRpc().requestVote(r));
       submitted++;
     }
@@ -362,8 +356,8 @@ class LeaderElection implements Runnable {
     return higherPriorityPeers;
   }
 
-  private ResultAndTerm waitForResults(final long electionTerm, final int submitted,
-      RaftConfigurationImpl conf, Executor voteExecutor, boolean preVote) throws InterruptedException {
+  private ResultAndTerm waitForResults(Phase phase, long electionTerm, int submitted,
+      RaftConfigurationImpl conf, Executor voteExecutor) throws InterruptedException {
     final Timestamp timeout = Timestamp.currentTime().addTime(server.getRandomElectionTimeout());
     final Map<RaftPeerId, RequestVoteReplyProto> responses = new HashMap<>();
     final List<Exception> exceptions = new ArrayList<>();
@@ -378,9 +372,9 @@ class LeaderElection implements Runnable {
         if (conf.hasMajority(votedPeers, server.getId())) {
           // if some higher priority peer did not response when timeout, but candidate get majority,
           // candidate pass vote
-          return logAndReturn(Result.PASSED, responses, exceptions, -1, preVote);
+          return logAndReturn(phase, Result.PASSED, responses, exceptions);
         } else {
-          return logAndReturn(Result.TIMEOUT, responses, exceptions, -1, preVote);
+          return logAndReturn(phase, Result.TIMEOUT, responses, exceptions);
         }
       }
 
@@ -403,16 +397,15 @@ class LeaderElection implements Runnable {
           continue;
         }
         if (r.getShouldShutdown()) {
-          return logAndReturn(Result.SHUTDOWN, responses, exceptions, -1, preVote);
+          return logAndReturn(phase, Result.SHUTDOWN, responses, exceptions);
         }
-        if (!preVote && r.getTerm() > electionTerm) {
-          return logAndReturn(Result.DISCOVERED_A_NEW_TERM, responses,
-              exceptions, r.getTerm(), false);
+        if (r.getTerm() > electionTerm) {
+          return logAndReturn(phase, Result.DISCOVERED_A_NEW_TERM, responses, exceptions, r.getTerm());
         }
 
         // If any peer with higher priority rejects vote, candidate can not pass vote
         if (!r.getServerReply().getSuccess() && higherPriorityPeers.contains(replierId)) {
-          return logAndReturn(Result.REJECTED, responses, exceptions, -1, preVote);
+          return logAndReturn(phase, Result.REJECTED, responses, exceptions);
         }
 
         // remove higher priority peer, so that we check higherPriorityPeers empty to make sure
@@ -423,12 +416,12 @@ class LeaderElection implements Runnable {
           votedPeers.add(replierId);
           // If majority and all peers with higher priority have voted, candidate pass vote
           if (higherPriorityPeers.size() == 0 && conf.hasMajority(votedPeers, server.getId())) {
-            return logAndReturn(Result.PASSED, responses, exceptions, -1, preVote);
+            return logAndReturn(phase, Result.PASSED, responses, exceptions);
           }
         } else {
           rejectedPeers.add(replierId);
           if (conf.majorityRejectVotes(rejectedPeers)) {
-            return logAndReturn(Result.REJECTED, responses, exceptions, -1, preVote);
+            return logAndReturn(phase, Result.REJECTED, responses, exceptions);
           }
         }
       } catch(ExecutionException e) {
@@ -439,9 +432,9 @@ class LeaderElection implements Runnable {
     }
     // received all the responses
     if (conf.hasMajority(votedPeers, server.getId())) {
-      return logAndReturn(Result.PASSED, responses, exceptions, -1, preVote);
+      return logAndReturn(phase, Result.PASSED, responses, exceptions);
     } else {
-      return logAndReturn(Result.REJECTED, responses, exceptions, -1, preVote);
+      return logAndReturn(phase, Result.REJECTED, responses, exceptions);
     }
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 908f0e7..44a4d51 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -22,6 +22,7 @@ import org.apache.ratis.protocol.*;
 import org.apache.ratis.protocol.exceptions.StateMachineException;
 import org.apache.ratis.server.RaftConfiguration;
 import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.LeaderElection.Phase;
 import org.apache.ratis.server.protocol.TermIndex;
 import org.apache.ratis.server.raftlog.LogProtoUtils;
 import org.apache.ratis.server.raftlog.RaftLog;
@@ -215,10 +216,19 @@ class ServerState implements Closeable {
   /**
    * Become a candidate and start leader election
    */
-  long initElection() {
-    votedFor = getMemberId().getPeerId();
-    setLeader(null, "initElection");
-    return currentTerm.incrementAndGet();
+  LeaderElection.ConfAndTerm initElection(Phase phase) throws IOException {
+    setLeader(null, phase);
+    final long term;
+    if (phase == Phase.PRE_VOTE) {
+      term = getCurrentTerm();
+    } else if (phase == Phase.ELECTION) {
+      term = currentTerm.incrementAndGet();
+      votedFor = getMemberId().getPeerId();
+      persistMetadata();
+    } else {
+      throw new IllegalArgumentException("Unexpected phase " + phase);
+    }
+    return new LeaderElection.ConfAndTerm(getRaftConf(), term);
   }
 
   void persistMetadata() throws IOException {
@@ -233,7 +243,7 @@ class ServerState implements Closeable {
     setLeader(null, "grantVote");
   }
 
-  void setLeader(RaftPeerId newLeaderId, String op) {
+  void setLeader(RaftPeerId newLeaderId, Object op) {
     if (!Objects.equals(leaderId, newLeaderId)) {
       String suffix;
       if (newLeaderId == null) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index d838ac8..a6753d2 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -372,7 +372,6 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
       cluster.start();
 
       RaftServer.Division leader = waitForLeader(cluster);
-      final long savedTerm = leader.getInfo().getCurrentTerm();
 
       try (RaftClient client = cluster.createClient(leader.getId())) {
         client.io().send(new RaftTestUtil.SimpleMessage("message"));
@@ -386,7 +385,8 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
         RaftClientReply reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
         Assert.assertTrue(reply.isSuccess());
 
-        // wait follower timeout and trigger pre-vote
+        final long savedTerm = leader.getInfo().getCurrentTerm();
+        LOG.info("Wait follower {} timeout and trigger pre-vote", follower.getId());
         Thread.sleep(2000);
         deIsolate(cluster, follower.getId());
         Thread.sleep(2000);
@@ -395,7 +395,7 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
         assertNotNull(newleader);
         assertEquals(newleader.getId(), leader.getId());
         // with pre-vote, term will not change
-        assertEquals(leader.getInfo().getCurrentTerm(), savedTerm);
+        assertEquals(savedTerm, leader.getInfo().getCurrentTerm());
 
         reply = client.io().send(new RaftTestUtil.SimpleMessage("message"));
         Assert.assertTrue(reply.isSuccess());