You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2021/01/08 02:55:13 UTC

[incubator-ratis] branch master updated: RATIS-1283. Refactor the code for preVote and vote in RaftServerImpl. (#392)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 72a9d0f  RATIS-1283. Refactor the code for preVote and vote in RaftServerImpl. (#392)
72a9d0f is described below

commit 72a9d0fb9aa6faf1edbf04701884972b88521301
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Fri Jan 8 10:55:02 2021 +0800

    RATIS-1283. Refactor the code for preVote and vote in RaftServerImpl. (#392)
---
 .../apache/ratis/server/impl/RaftServerImpl.java   | 144 +++----------------
 .../org/apache/ratis/server/impl/ServerState.java  |  21 +--
 .../org/apache/ratis/server/impl/VoteContext.java  | 154 +++++++++++++++++++++
 3 files changed, 181 insertions(+), 138 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index f9cbeaf..d7ef2c3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -42,6 +42,7 @@ import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.RaftServerMXBean;
 import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.impl.LeaderElection.Phase;
 import org.apache.ratis.server.impl.RetryCacheImpl.CacheEntry;
 import org.apache.ratis.server.leader.LeaderState;
 import org.apache.ratis.server.leader.LogAppender;
@@ -992,18 +993,6 @@ class RaftServerImpl implements RaftServer.Division,
     return pending.getFuture();
   }
 
-  private boolean shouldWithholdVotes(long candidateTerm) {
-    if (state.getCurrentTerm() < candidateTerm) {
-      return false;
-    } else if (getInfo().isLeader()) {
-      return true;
-    } else {
-      // following a leader and not yet timeout
-      return getInfo().isFollower() && state.hasLeader()
-          && role.getFollowerState().map(FollowerState::isCurrentLeaderValid).orElse(false);
-    }
-  }
-
   /**
    * check if the remote peer is not included in the current conf
    * and should shutdown. should shutdown if all the following stands:
@@ -1023,141 +1012,54 @@ class RaftServerImpl implements RaftServer.Division,
   }
 
   @Override
-  public RequestVoteReplyProto requestVote(RequestVoteRequestProto r)
-      throws IOException {
+  public RequestVoteReplyProto requestVote(RequestVoteRequestProto r) throws IOException {
     final RaftRpcRequestProto request = r.getServerRequest();
-    if (r.getPreVote()) {
-      return requestPreVote(RaftPeerId.valueOf(request.getRequestorId()),
-          ProtoUtils.toRaftGroupId(request.getRaftGroupId()),
-          r.getCandidateTerm(),
-          TermIndex.valueOf(r.getCandidateLastEntry()));
-    } else {
-      return requestVote(RaftPeerId.valueOf(request.getRequestorId()),
-          ProtoUtils.toRaftGroupId(request.getRaftGroupId()),
-          r.getCandidateTerm(),
-          TermIndex.valueOf(r.getCandidateLastEntry()));
-    }
+    return requestVote(r.getPreVote() ? Phase.PRE_VOTE : Phase.ELECTION,
+        RaftPeerId.valueOf(request.getRequestorId()),
+        ProtoUtils.toRaftGroupId(request.getRaftGroupId()),
+        r.getCandidateTerm(),
+        TermIndex.valueOf(r.getCandidateLastEntry()));
   }
 
-  private boolean isCurrentLeaderValid() {
-    if (getInfo().isLeader()) {
-      return role.getLeaderState().map(LeaderStateImpl::checkLeadership).orElse(false);
-    } else if (getInfo().isFollower()) {
-      return state.hasLeader()
-          && role.getFollowerState().map(FollowerState::isCurrentLeaderValid).orElse(false);
-    } else if (getInfo().isCandidate()) {
-      return false;
-    }
-
-    return false;
-  }
-
-  private RequestVoteReplyProto requestPreVote(
+  private RequestVoteReplyProto requestVote(Phase phase,
       RaftPeerId candidateId, RaftGroupId candidateGroupId,
       long candidateTerm, TermIndex candidateLastEntry) throws IOException {
     CodeInjectionForTesting.execute(REQUEST_VOTE, getId(),
         candidateId, candidateTerm, candidateLastEntry);
-    LOG.info("{}: receive preVote({}, {}, {}, {})",
-        getMemberId(), candidateId, candidateGroupId, candidateTerm, candidateLastEntry);
+    LOG.info("{}: receive requestVote({}, {}, {}, {}, {})",
+        getMemberId(), phase, candidateId, candidateGroupId, candidateTerm, candidateLastEntry);
     assertLifeCycleState(LifeCycle.States.RUNNING);
     assertGroup(candidateId, candidateGroupId);
 
-    boolean preVoteGranted = false;
-    boolean shouldShutdown = false;
-    final RequestVoteReplyProto reply;
-    synchronized (this) {
-      boolean isCurrentLeaderValid = isCurrentLeaderValid();
-      RaftPeer candidate = getRaftConf().getPeer(candidateId);
-
-      // vote for candidate if:
-      // 1. current leader is not valid
-      // 2. log lags behind candidate or
-      //    log equals candidate's, and priority less or equal candidate's
-      if (!isCurrentLeaderValid && candidate != null) {
-        int compare = ServerState.compareLog(state.getLastEntry(), candidateLastEntry);
-        int priority = getRaftConf().getPeer(getId()).getPriority();
-        if (compare < 0 || (compare == 0 && priority <= candidate.getPriority())) {
-          preVoteGranted = true;
-          LOG.info("{} role:{} allow pre-vote from:{}", getMemberId(), getRole(), candidateId);
-        } else {
-          LOG.info("{} role:{} reject pre-vote from:{} because compare:{} isCurrentLeaderValid:{}",
-              getMemberId(), getRole(), candidateId, compare, isCurrentLeaderValid);
-        }
-      }
-
-      if (preVoteGranted) {
-        final FollowerState fs = role.getFollowerState().orElse(null);
-        if (fs != null) {
-          fs.updateLastRpcTime(FollowerState.UpdateType.REQUEST_VOTE);
-        }
-      } else if(shouldSendShutdown(candidateId, candidateLastEntry)) {
-        shouldShutdown = true;
-      }
-
-      reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, getMemberId(),
-          preVoteGranted, state.getCurrentTerm(), shouldShutdown);
-      LOG.info("{} replies to pre-vote request: {}. Peer's state: {}",
-          getMemberId(), ServerStringUtils.toRequestVoteReplyString(reply), state);
-    }
-    return reply;
-  }
-
-  private RequestVoteReplyProto requestVote(
-      RaftPeerId candidateId, RaftGroupId candidateGroupId,
-      long candidateTerm, TermIndex candidateLastEntry) throws IOException {
-    CodeInjectionForTesting.execute(REQUEST_VOTE, getId(),
-        candidateId, candidateTerm, candidateLastEntry);
-    LOG.debug("{}: receive requestVote({}, {}, {}, {})",
-        getMemberId(), candidateId, candidateGroupId, candidateTerm, candidateLastEntry);
-    assertLifeCycleState(LifeCycle.States.RUNNING);
-    assertGroup(candidateId, candidateGroupId);
-
-    boolean voteGranted = false;
     boolean shouldShutdown = false;
     final RequestVoteReplyProto reply;
     synchronized (this) {
       // Check life cycle state again to avoid the PAUSING/PAUSED state.
       assertLifeCycleState(LifeCycle.States.RUNNING);
-      FollowerState fs = role.getFollowerState().orElse(null);
-      if (shouldWithholdVotes(candidateTerm)) {
-        LOG.info("{}-{}: Withhold vote from candidate {} with term {}. State: leader={}, term={}, lastRpcElapsed={}",
-            getMemberId(), role, candidateId, candidateTerm, state.getLeaderId(), state.getCurrentTerm(),
-            fs != null? fs.getLastRpcTime().elapsedTimeMs() + "ms": null);
-      } else if (state.recognizeCandidate(candidateId, candidateTerm)) {
-        final boolean termUpdated = changeToFollower(candidateTerm, true, "recognizeCandidate:" + candidateId);
-        // if current server is leader, before changeToFollower FollowerState should be null,
-        // so after leader changeToFollower we should get FollowerState again.
-        fs = role.getFollowerState().orElse(null);
-
-        // see Section 5.4.1 Election restriction
-        RaftPeer candidate = getRaftConf().getPeer(candidateId);
-        if (fs != null && candidate != null) {
-          int compare = ServerState.compareLog(state.getLastEntry(), candidateLastEntry);
-          int priority = getRaftConf().getPeer(getId()).getPriority();
-          LOG.info("{} priority:{} candidate:{} candidatePriority:{} compare:{}",
-              this, priority, candidate, candidate.getPriority(), compare);
-          // vote for candidate if:
-          // 1. log lags behind candidate
-          // 2. log equals candidate's, and priority less or equal candidate's
-          if (compare < 0 || (compare == 0 && priority <= candidate.getPriority())) {
-            state.grantVote(candidateId);
-            voteGranted = true;
-          }
+
+      final VoteContext context = new VoteContext(this, phase, candidateId);
+      final RaftPeer candidate = context.recognizeCandidate(candidateTerm);
+      final boolean voteGranted = context.decideVote(candidate, candidateLastEntry);
+      if (candidate != null && phase == Phase.ELECTION) {
+        // change server state in the ELECTION phase
+        final boolean termUpdated = changeToFollower(candidateTerm, true, "candidate:" + candidateId);
+        if (voteGranted) {
+          state.grantVote(candidate.getId());
         }
         if (termUpdated || voteGranted) {
           state.persistMetadata(); // sync metafile
         }
       }
       if (voteGranted) {
-        fs.updateLastRpcTime(FollowerState.UpdateType.REQUEST_VOTE);
+        role.getFollowerState().ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.REQUEST_VOTE));
       } else if(shouldSendShutdown(candidateId, candidateLastEntry)) {
         shouldShutdown = true;
       }
       reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, getMemberId(),
           voteGranted, state.getCurrentTerm(), shouldShutdown);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("{} replies to vote request: {}. Peer's state: {}",
-            getMemberId(), ServerStringUtils.toRequestVoteReplyString(reply), state);
+      if (LOG.isInfoEnabled()) {
+        LOG.info("{} replies to {} vote request: {}. Peer's state: {}",
+            getMemberId(), phase, ServerStringUtils.toRequestVoteReplyString(reply), state);
       }
     }
     return reply;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 44a4d51..29d8320 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -235,6 +235,10 @@ class ServerState implements Closeable {
     log.persistMetadata(RaftStorageMetadata.valueOf(currentTerm.get(), votedFor));
   }
 
+  RaftPeerId getVotedFor() {
+    return votedFor;
+  }
+
   /**
    * Vote for a candidate and update the local state.
    */
@@ -320,23 +324,6 @@ class ServerState implements Closeable {
     return this.leaderId.equals(peerLeaderId);
   }
 
-  /**
-   * Check if the candidate's term is acceptable
-   */
-  boolean recognizeCandidate(RaftPeerId candidateId, long candidateTerm) {
-    if (!getRaftConf().containsInConf(candidateId)) {
-      return false;
-    }
-    final long current = currentTerm.get();
-    if (candidateTerm > current) {
-      return true;
-    } else if (candidateTerm == current) {
-      // has not voted yet or this is a retry
-      return votedFor == null || votedFor.equals(candidateId);
-    }
-    return false;
-  }
-
   static int compareLog(TermIndex lastEntry, TermIndex candidateLastEntry) {
     if (lastEntry == null) {
       // If the lastEntry of candidate is null, the proto will transfer an empty TermIndexProto,
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/VoteContext.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/VoteContext.java
new file mode 100644
index 0000000..1ef721a
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/VoteContext.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.DivisionInfo;
+import org.apache.ratis.server.impl.LeaderElection.Phase;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class VoteContext {
+  static final Logger LOG = LoggerFactory.getLogger(VoteContext.class);
+
+  private final RaftServerImpl impl;
+  private final RaftConfigurationImpl conf;
+  private final Phase phase;
+  private final RaftPeerId candidateId;
+
+  VoteContext(RaftServerImpl impl, Phase phase, RaftPeerId candidateId) {
+    this.impl = impl;
+    this.conf = impl.getRaftConf();
+    this.phase = phase;
+    this.candidateId = candidateId;
+  }
+
+  private boolean reject(String reason) {
+    return log(false, reason);
+  }
+
+  private boolean log(boolean accept, String reason) {
+    LOG.info("{}-{}: {} {} from {}: {}",
+        impl.getMemberId(), impl.getInfo().getCurrentRole(), accept? "accept": "reject", phase, candidateId, reason);
+    return accept;
+  }
+
+  /** Check if the candidate is in the current conf. */
+  private RaftPeer checkConf() {
+    if (!conf.containsInConf(candidateId)) {
+      reject(candidateId + " is not in current conf " + conf.getCurrentPeers());
+      return null;
+    }
+    return conf.getPeer(candidateId);
+  }
+
+  enum CheckTermResult {
+    FAILED, CHECK_LEADER, SKIP_CHECK_LEADER
+  }
+
+  /** Check the candidate term. */
+  private CheckTermResult checkTerm(long candidateTerm) {
+    if (phase == Phase.PRE_VOTE) {
+      return CheckTermResult.CHECK_LEADER;
+    }
+    // check term
+    final ServerState state = impl.getState();
+    final long currentTerm = state.getCurrentTerm();
+    if (currentTerm > candidateTerm) {
+      reject("current term " + currentTerm + " > candidate's term " + candidateTerm);
+      return CheckTermResult.FAILED;
+    } else if (currentTerm == candidateTerm) {
+      // check if this server has already voted
+      final RaftPeerId votedFor = state.getVotedFor();
+      if (votedFor != null && !votedFor.equals(candidateId)) {
+        reject("already has voted for " + votedFor + " at current term " + currentTerm);
+        return CheckTermResult.FAILED;
+      }
+      return CheckTermResult.CHECK_LEADER;
+    } else {
+      return CheckTermResult.SKIP_CHECK_LEADER; //currentTerm < candidateTerm
+    }
+  }
+
+  /** Check if there is already a leader. */
+  private boolean checkLeader() {
+    // check if this server is the leader
+    final DivisionInfo info = impl.getInfo();
+    if (info.isLeader()) {
+      if (impl.getRole().getLeaderState().map(LeaderStateImpl::checkLeadership).orElse(false)) {
+        return reject("this server is the leader and still has leadership");
+      }
+    }
+
+    // check if this server is a follower and has a valid leader
+    if (info.isFollower()) {
+      final RaftPeerId leader = impl.getState().getLeaderId();
+      if (leader != null
+          && impl.getRole().getFollowerState().map(FollowerState::isCurrentLeaderValid).orElse(false)) {
+        return reject("this server is a follower and still has a valid leader " + leader);
+      }
+    }
+    return true;
+  }
+
+  RaftPeer recognizeCandidate(long candidateTerm) {
+    final RaftPeer candidate = checkConf();
+    if (candidate == null) {
+      return null;
+    }
+    final CheckTermResult r = checkTerm(candidateTerm);
+    if (r == CheckTermResult.FAILED) {
+      return null;
+    }
+    if (r == CheckTermResult.CHECK_LEADER && !checkLeader()) {
+      return null;
+    }
+    return candidate;
+  }
+
+  /**
+   * A server should vote for candidate if:
+   * 1. log lags behind candidate, or
+   * 2. log equals candidate's, and priority less or equal candidate's
+   *
+   * See Section 5.4.1 Election restriction
+   */
+  boolean decideVote(RaftPeer candidate, TermIndex candidateLastEntry) {
+    if (candidate == null) {
+      return false;
+    }
+    // Check last log entry
+    final TermIndex lastEntry = impl.getState().getLastEntry();
+    final int compare = ServerState.compareLog(lastEntry, candidateLastEntry);
+    if (compare < 0) {
+      return log(true, "our last entry " + lastEntry + " < candidate's last entry " + candidateLastEntry);
+    } else if (compare > 0) {
+      return reject("our last entry " + lastEntry + " > candidate's last entry " + candidateLastEntry);
+    }
+
+    // Check priority
+    final int priority = impl.getRaftConf().getPeer(impl.getId()).getPriority();
+    if (priority <= candidate.getPriority()) {
+      return log(true, "our priority " + priority + " <= candidate's priority " + candidate.getPriority());
+    } else {
+      return reject("our priority " + priority + " > candidate's priority " + candidate.getPriority());
+    }
+  }
+}