You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2023/01/19 14:35:30 UTC
[ratis] branch master updated: RATIS-1772. Refactor the startLeaderElection code in LeaderStateImpl. (#811)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 8c16c2835 RATIS-1772. Refactor the startLeaderElection code in LeaderStateImpl. (#811)
8c16c2835 is described below
commit 8c16c28351576b893564c5cc621c7d069b1de14a
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Jan 19 22:35:24 2023 +0800
RATIS-1772. Refactor the startLeaderElection code in LeaderStateImpl. (#811)
---
.../apache/ratis/server/impl/LeaderStateImpl.java | 68 ++++++++++------------
1 file changed, 32 insertions(+), 36 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 2a16602b6..b0f342fd7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -661,7 +661,7 @@ class LeaderStateImpl implements LeaderState {
return pendingStepDown.submitAsync(request);
}
- private synchronized void sendStartLeaderElectionToHigherPriorityPeer(RaftPeerId follower, TermIndex lastEntry) {
+ private synchronized void sendStartLeaderElection(RaftPeerId follower, TermIndex lastEntry) {
ServerState state = server.getState();
TermIndex currLastEntry = state.getLastEntry();
if (ServerState.compareLog(currLastEntry, lastEntry) != 0) {
@@ -669,6 +669,8 @@ class LeaderStateImpl implements LeaderState {
"did not match lastEntry:{}", this, follower, currLastEntry, lastEntry);
return;
}
+ LOG.info("{}: send StartLeaderElectionRequest to follower {} on term {}, lastEntry={}",
+ this, follower, currentTerm, lastEntry);
final StartLeaderElectionRequestProto r = ServerProtoUtils.toStartLeaderElectionRequestProto(
server.getMemberId(), follower, lastEntry);
@@ -685,6 +687,22 @@ class LeaderStateImpl implements LeaderState {
});
}
+ boolean sendStartLeaderElection(FollowerInfo followerInfo) {
+ final RaftPeerId followerId = followerInfo.getPeer().getId();
+ final TermIndex leaderLastEntry = server.getState().getLastEntry();
+ if (leaderLastEntry == null) {
+ sendStartLeaderElection(followerId, null);
+ return true;
+ }
+
+ final long followerMatchIndex = followerInfo.getMatchIndex();
+ if (followerMatchIndex >= leaderLastEntry.getIndex()) {
+ sendStartLeaderElection(followerId, leaderLastEntry);
+ return true;
+ }
+ return false;
+ }
+
private void prepare() {
synchronized (server) {
if (isRunning()) {
@@ -720,9 +738,8 @@ class LeaderStateImpl implements LeaderState {
event.execute();
} else if (inStagingState()) {
checkStaging();
- } else {
- yieldLeaderToHigherPriorityPeer();
- checkLeadership();
+ } else if (checkLeadership()) {
+ checkPeersForYieldingLeader();
}
}
}
@@ -1024,52 +1041,31 @@ class LeaderStateImpl implements LeaderState {
return indices;
}
- private void yieldLeaderToHigherPriorityPeer() {
- if (!server.getInfo().isLeader()) {
- return;
- }
-
+ private void checkPeersForYieldingLeader() {
final RaftConfigurationImpl conf = server.getRaftConf();
final RaftPeer leader = conf.getPeer(server.getId());
if (leader == null) {
LOG.error("{} the leader {} is not in the conf {}", this, server.getId(), conf);
return;
}
- int leaderPriority = leader.getPriority();
+ final int leaderPriority = leader.getPriority();
+ FollowerInfo highestPriorityInfo = null;
+ int highestPriority = Integer.MIN_VALUE;
for (LogAppender logAppender : senders.getSenders()) {
- final FollowerInfo followerInfo = logAppender.getFollower();
- final RaftPeerId followerID = followerInfo.getPeer().getId();
- final RaftPeer follower = conf.getPeer(followerID);
+ final RaftPeer follower = conf.getPeer(logAppender.getFollowerId());
if (follower == null) {
- if (conf.getPeer(followerID, RaftPeerRole.LISTENER) == null) {
- LOG.error("{} the follower {} is not in the conf {}", this, followerID, conf);
- }
continue;
}
final int followerPriority = follower.getPriority();
- if (followerPriority <= leaderPriority) {
- continue;
- }
- final TermIndex leaderLastEntry = server.getState().getLastEntry();
- if (leaderLastEntry == null) {
- LOG.info("{} send StartLeaderElectionRequest to follower:{} on term:{} because follower's priority:{} " +
- "is higher than leader's:{} and leader's lastEntry is null",
- this, followerID, currentTerm, followerPriority, leaderPriority);
-
- sendStartLeaderElectionToHigherPriorityPeer(followerID, null);
- return;
- }
-
- if (followerInfo.getMatchIndex() >= leaderLastEntry.getIndex()) {
- LOG.info("{} send StartLeaderElectionRequest to follower:{} on term:{} because follower's priority:{} " +
- "is higher than leader's:{} and follower's lastEntry index:{} catch up with leader's:{}",
- this, followerID, currentTerm, followerPriority, leaderPriority, followerInfo.getMatchIndex(),
- leaderLastEntry.getIndex());
- sendStartLeaderElectionToHigherPriorityPeer(followerID, leaderLastEntry);
- return;
+ if (followerPriority > leaderPriority && followerPriority >= highestPriority) {
+ highestPriority = followerPriority;
+ highestPriorityInfo = logAppender.getFollower();
}
}
+ if (highestPriorityInfo != null) {
+ sendStartLeaderElection(highestPriorityInfo);
+ }
}
/**