You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2021/01/08 02:55:13 UTC
[incubator-ratis] branch master updated: RATIS-1283. Refactor the
code for preVote and vote in RaftServerImpl. (#392)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 72a9d0f RATIS-1283. Refactor the code for preVote and vote in RaftServerImpl. (#392)
72a9d0f is described below
commit 72a9d0fb9aa6faf1edbf04701884972b88521301
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Fri Jan 8 10:55:02 2021 +0800
RATIS-1283. Refactor the code for preVote and vote in RaftServerImpl. (#392)
---
.../apache/ratis/server/impl/RaftServerImpl.java | 144 +++----------------
.../org/apache/ratis/server/impl/ServerState.java | 21 +--
.../org/apache/ratis/server/impl/VoteContext.java | 154 +++++++++++++++++++++
3 files changed, 181 insertions(+), 138 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index f9cbeaf..d7ef2c3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -42,6 +42,7 @@ import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerMXBean;
import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.impl.LeaderElection.Phase;
import org.apache.ratis.server.impl.RetryCacheImpl.CacheEntry;
import org.apache.ratis.server.leader.LeaderState;
import org.apache.ratis.server.leader.LogAppender;
@@ -992,18 +993,6 @@ class RaftServerImpl implements RaftServer.Division,
return pending.getFuture();
}
- private boolean shouldWithholdVotes(long candidateTerm) {
- if (state.getCurrentTerm() < candidateTerm) {
- return false;
- } else if (getInfo().isLeader()) {
- return true;
- } else {
- // following a leader and not yet timeout
- return getInfo().isFollower() && state.hasLeader()
- && role.getFollowerState().map(FollowerState::isCurrentLeaderValid).orElse(false);
- }
- }
-
/**
* check if the remote peer is not included in the current conf
* and should shutdown. should shutdown if all the following stands:
@@ -1023,141 +1012,54 @@ class RaftServerImpl implements RaftServer.Division,
}
@Override
- public RequestVoteReplyProto requestVote(RequestVoteRequestProto r)
- throws IOException {
+ public RequestVoteReplyProto requestVote(RequestVoteRequestProto r) throws IOException {
final RaftRpcRequestProto request = r.getServerRequest();
- if (r.getPreVote()) {
- return requestPreVote(RaftPeerId.valueOf(request.getRequestorId()),
- ProtoUtils.toRaftGroupId(request.getRaftGroupId()),
- r.getCandidateTerm(),
- TermIndex.valueOf(r.getCandidateLastEntry()));
- } else {
- return requestVote(RaftPeerId.valueOf(request.getRequestorId()),
- ProtoUtils.toRaftGroupId(request.getRaftGroupId()),
- r.getCandidateTerm(),
- TermIndex.valueOf(r.getCandidateLastEntry()));
- }
+ return requestVote(r.getPreVote() ? Phase.PRE_VOTE : Phase.ELECTION,
+ RaftPeerId.valueOf(request.getRequestorId()),
+ ProtoUtils.toRaftGroupId(request.getRaftGroupId()),
+ r.getCandidateTerm(),
+ TermIndex.valueOf(r.getCandidateLastEntry()));
}
- private boolean isCurrentLeaderValid() {
- if (getInfo().isLeader()) {
- return role.getLeaderState().map(LeaderStateImpl::checkLeadership).orElse(false);
- } else if (getInfo().isFollower()) {
- return state.hasLeader()
- && role.getFollowerState().map(FollowerState::isCurrentLeaderValid).orElse(false);
- } else if (getInfo().isCandidate()) {
- return false;
- }
-
- return false;
- }
-
- private RequestVoteReplyProto requestPreVote(
+ private RequestVoteReplyProto requestVote(Phase phase,
RaftPeerId candidateId, RaftGroupId candidateGroupId,
long candidateTerm, TermIndex candidateLastEntry) throws IOException {
CodeInjectionForTesting.execute(REQUEST_VOTE, getId(),
candidateId, candidateTerm, candidateLastEntry);
- LOG.info("{}: receive preVote({}, {}, {}, {})",
- getMemberId(), candidateId, candidateGroupId, candidateTerm, candidateLastEntry);
+ LOG.info("{}: receive requestVote({}, {}, {}, {}, {})",
+ getMemberId(), phase, candidateId, candidateGroupId, candidateTerm, candidateLastEntry);
assertLifeCycleState(LifeCycle.States.RUNNING);
assertGroup(candidateId, candidateGroupId);
- boolean preVoteGranted = false;
- boolean shouldShutdown = false;
- final RequestVoteReplyProto reply;
- synchronized (this) {
- boolean isCurrentLeaderValid = isCurrentLeaderValid();
- RaftPeer candidate = getRaftConf().getPeer(candidateId);
-
- // vote for candidate if:
- // 1. current leader is not valid
- // 2. log lags behind candidate or
- // log equals candidate's, and priority less or equal candidate's
- if (!isCurrentLeaderValid && candidate != null) {
- int compare = ServerState.compareLog(state.getLastEntry(), candidateLastEntry);
- int priority = getRaftConf().getPeer(getId()).getPriority();
- if (compare < 0 || (compare == 0 && priority <= candidate.getPriority())) {
- preVoteGranted = true;
- LOG.info("{} role:{} allow pre-vote from:{}", getMemberId(), getRole(), candidateId);
- } else {
- LOG.info("{} role:{} reject pre-vote from:{} because compare:{} isCurrentLeaderValid:{}",
- getMemberId(), getRole(), candidateId, compare, isCurrentLeaderValid);
- }
- }
-
- if (preVoteGranted) {
- final FollowerState fs = role.getFollowerState().orElse(null);
- if (fs != null) {
- fs.updateLastRpcTime(FollowerState.UpdateType.REQUEST_VOTE);
- }
- } else if(shouldSendShutdown(candidateId, candidateLastEntry)) {
- shouldShutdown = true;
- }
-
- reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, getMemberId(),
- preVoteGranted, state.getCurrentTerm(), shouldShutdown);
- LOG.info("{} replies to pre-vote request: {}. Peer's state: {}",
- getMemberId(), ServerStringUtils.toRequestVoteReplyString(reply), state);
- }
- return reply;
- }
-
- private RequestVoteReplyProto requestVote(
- RaftPeerId candidateId, RaftGroupId candidateGroupId,
- long candidateTerm, TermIndex candidateLastEntry) throws IOException {
- CodeInjectionForTesting.execute(REQUEST_VOTE, getId(),
- candidateId, candidateTerm, candidateLastEntry);
- LOG.debug("{}: receive requestVote({}, {}, {}, {})",
- getMemberId(), candidateId, candidateGroupId, candidateTerm, candidateLastEntry);
- assertLifeCycleState(LifeCycle.States.RUNNING);
- assertGroup(candidateId, candidateGroupId);
-
- boolean voteGranted = false;
boolean shouldShutdown = false;
final RequestVoteReplyProto reply;
synchronized (this) {
// Check life cycle state again to avoid the PAUSING/PAUSED state.
assertLifeCycleState(LifeCycle.States.RUNNING);
- FollowerState fs = role.getFollowerState().orElse(null);
- if (shouldWithholdVotes(candidateTerm)) {
- LOG.info("{}-{}: Withhold vote from candidate {} with term {}. State: leader={}, term={}, lastRpcElapsed={}",
- getMemberId(), role, candidateId, candidateTerm, state.getLeaderId(), state.getCurrentTerm(),
- fs != null? fs.getLastRpcTime().elapsedTimeMs() + "ms": null);
- } else if (state.recognizeCandidate(candidateId, candidateTerm)) {
- final boolean termUpdated = changeToFollower(candidateTerm, true, "recognizeCandidate:" + candidateId);
- // if current server is leader, before changeToFollower FollowerState should be null,
- // so after leader changeToFollower we should get FollowerState again.
- fs = role.getFollowerState().orElse(null);
-
- // see Section 5.4.1 Election restriction
- RaftPeer candidate = getRaftConf().getPeer(candidateId);
- if (fs != null && candidate != null) {
- int compare = ServerState.compareLog(state.getLastEntry(), candidateLastEntry);
- int priority = getRaftConf().getPeer(getId()).getPriority();
- LOG.info("{} priority:{} candidate:{} candidatePriority:{} compare:{}",
- this, priority, candidate, candidate.getPriority(), compare);
- // vote for candidate if:
- // 1. log lags behind candidate
- // 2. log equals candidate's, and priority less or equal candidate's
- if (compare < 0 || (compare == 0 && priority <= candidate.getPriority())) {
- state.grantVote(candidateId);
- voteGranted = true;
- }
+
+ final VoteContext context = new VoteContext(this, phase, candidateId);
+ final RaftPeer candidate = context.recognizeCandidate(candidateTerm);
+ final boolean voteGranted = context.decideVote(candidate, candidateLastEntry);
+ if (candidate != null && phase == Phase.ELECTION) {
+ // change server state in the ELECTION phase
+ final boolean termUpdated = changeToFollower(candidateTerm, true, "candidate:" + candidateId);
+ if (voteGranted) {
+ state.grantVote(candidate.getId());
}
if (termUpdated || voteGranted) {
state.persistMetadata(); // sync metafile
}
}
if (voteGranted) {
- fs.updateLastRpcTime(FollowerState.UpdateType.REQUEST_VOTE);
+ role.getFollowerState().ifPresent(fs -> fs.updateLastRpcTime(FollowerState.UpdateType.REQUEST_VOTE));
} else if(shouldSendShutdown(candidateId, candidateLastEntry)) {
shouldShutdown = true;
}
reply = ServerProtoUtils.toRequestVoteReplyProto(candidateId, getMemberId(),
voteGranted, state.getCurrentTerm(), shouldShutdown);
- if (LOG.isDebugEnabled()) {
- LOG.debug("{} replies to vote request: {}. Peer's state: {}",
- getMemberId(), ServerStringUtils.toRequestVoteReplyString(reply), state);
+ if (LOG.isInfoEnabled()) {
+ LOG.info("{} replies to {} vote request: {}. Peer's state: {}",
+ getMemberId(), phase, ServerStringUtils.toRequestVoteReplyString(reply), state);
}
}
return reply;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 44a4d51..29d8320 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -235,6 +235,10 @@ class ServerState implements Closeable {
log.persistMetadata(RaftStorageMetadata.valueOf(currentTerm.get(), votedFor));
}
+ RaftPeerId getVotedFor() {
+ return votedFor;
+ }
+
/**
* Vote for a candidate and update the local state.
*/
@@ -320,23 +324,6 @@ class ServerState implements Closeable {
return this.leaderId.equals(peerLeaderId);
}
- /**
- * Check if the candidate's term is acceptable
- */
- boolean recognizeCandidate(RaftPeerId candidateId, long candidateTerm) {
- if (!getRaftConf().containsInConf(candidateId)) {
- return false;
- }
- final long current = currentTerm.get();
- if (candidateTerm > current) {
- return true;
- } else if (candidateTerm == current) {
- // has not voted yet or this is a retry
- return votedFor == null || votedFor.equals(candidateId);
- }
- return false;
- }
-
static int compareLog(TermIndex lastEntry, TermIndex candidateLastEntry) {
if (lastEntry == null) {
// If the lastEntry of candidate is null, the proto will transfer an empty TermIndexProto,
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/VoteContext.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/VoteContext.java
new file mode 100644
index 0000000..1ef721a
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/VoteContext.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.DivisionInfo;
+import org.apache.ratis.server.impl.LeaderElection.Phase;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class VoteContext {
+ static final Logger LOG = LoggerFactory.getLogger(VoteContext.class);
+
+ private final RaftServerImpl impl;
+ private final RaftConfigurationImpl conf;
+ private final Phase phase;
+ private final RaftPeerId candidateId;
+
+ VoteContext(RaftServerImpl impl, Phase phase, RaftPeerId candidateId) {
+ this.impl = impl;
+ this.conf = impl.getRaftConf();
+ this.phase = phase;
+ this.candidateId = candidateId;
+ }
+
+ private boolean reject(String reason) {
+ return log(false, reason);
+ }
+
+ private boolean log(boolean accept, String reason) {
+ LOG.info("{}-{}: {} {} from {}: {}",
+ impl.getMemberId(), impl.getInfo().getCurrentRole(), accept? "accept": "reject", phase, candidateId, reason);
+ return accept;
+ }
+
+ /** Check if the candidate is in the current conf. */
+ private RaftPeer checkConf() {
+ if (!conf.containsInConf(candidateId)) {
+ reject(candidateId + " is not in current conf " + conf.getCurrentPeers());
+ return null;
+ }
+ return conf.getPeer(candidateId);
+ }
+
+ enum CheckTermResult {
+ FAILED, CHECK_LEADER, SKIP_CHECK_LEADER
+ }
+
+ /** Check the candidate term. */
+ private CheckTermResult checkTerm(long candidateTerm) {
+ if (phase == Phase.PRE_VOTE) {
+ return CheckTermResult.CHECK_LEADER;
+ }
+ // check term
+ final ServerState state = impl.getState();
+ final long currentTerm = state.getCurrentTerm();
+ if (currentTerm > candidateTerm) {
+ reject("current term " + currentTerm + " > candidate's term " + candidateTerm);
+ return CheckTermResult.FAILED;
+ } else if (currentTerm == candidateTerm) {
+ // check if this server has already voted
+ final RaftPeerId votedFor = state.getVotedFor();
+ if (votedFor != null && !votedFor.equals(candidateId)) {
+ reject("already has voted for " + votedFor + " at current term " + currentTerm);
+ return CheckTermResult.FAILED;
+ }
+ return CheckTermResult.CHECK_LEADER;
+ } else {
+ return CheckTermResult.SKIP_CHECK_LEADER; //currentTerm < candidateTerm
+ }
+ }
+
+ /** Check if there is already a leader. */
+ private boolean checkLeader() {
+ // check if this server is the leader
+ final DivisionInfo info = impl.getInfo();
+ if (info.isLeader()) {
+ if (impl.getRole().getLeaderState().map(LeaderStateImpl::checkLeadership).orElse(false)) {
+ return reject("this server is the leader and still has leadership");
+ }
+ }
+
+ // check if this server is a follower and has a valid leader
+ if (info.isFollower()) {
+ final RaftPeerId leader = impl.getState().getLeaderId();
+ if (leader != null
+ && impl.getRole().getFollowerState().map(FollowerState::isCurrentLeaderValid).orElse(false)) {
+ return reject("this server is a follower and still has a valid leader " + leader);
+ }
+ }
+ return true;
+ }
+
+ RaftPeer recognizeCandidate(long candidateTerm) {
+ final RaftPeer candidate = checkConf();
+ if (candidate == null) {
+ return null;
+ }
+ final CheckTermResult r = checkTerm(candidateTerm);
+ if (r == CheckTermResult.FAILED) {
+ return null;
+ }
+ if (r == CheckTermResult.CHECK_LEADER && !checkLeader()) {
+ return null;
+ }
+ return candidate;
+ }
+
+ /**
+ * A server should vote for candidate if:
+ * 1. log lags behind candidate, or
+ * 2. log equals candidate's, and priority less or equal candidate's
+ *
+ * See Section 5.4.1 Election restriction
+ */
+ boolean decideVote(RaftPeer candidate, TermIndex candidateLastEntry) {
+ if (candidate == null) {
+ return false;
+ }
+ // Check last log entry
+ final TermIndex lastEntry = impl.getState().getLastEntry();
+ final int compare = ServerState.compareLog(lastEntry, candidateLastEntry);
+ if (compare < 0) {
+ return log(true, "our last entry " + lastEntry + " < candidate's last entry " + candidateLastEntry);
+ } else if (compare > 0) {
+ return reject("our last entry " + lastEntry + " > candidate's last entry " + candidateLastEntry);
+ }
+
+ // Check priority
+ final int priority = impl.getRaftConf().getPeer(impl.getId()).getPriority();
+ if (priority <= candidate.getPriority()) {
+ return log(true, "our priority " + priority + " <= candidate's priority " + candidate.getPriority());
+ } else {
+ return reject("our priority " + priority + " > candidate's priority " + candidate.getPriority());
+ }
+ }
+}