You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/03/24 12:33:04 UTC
[ratis] branch master updated: RATIS-1558. Support Listener in RaftServerImpl (#627)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 77f21ae RATIS-1558. Support Listener in RaftServerImpl (#627)
77f21ae is described below
commit 77f21ae82879120afdb1638a9cd0f3679ccbede1
Author: Yaolong Liu <ly...@163.com>
AuthorDate: Thu Mar 24 20:32:58 2022 +0800
RATIS-1558. Support Listener in RaftServerImpl (#627)
---
.../apache/ratis/server/impl/FollowerState.java | 18 ++++++++++-----
.../apache/ratis/server/impl/LeaderStateImpl.java | 2 ++
.../apache/ratis/server/impl/RaftServerImpl.java | 26 +++++++++++++++++-----
3 files changed, 36 insertions(+), 10 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index 7f5e5a8..52ae033 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.server.impl;
+import org.apache.ratis.server.DivisionInfo;
import org.apache.ratis.server.leader.LeaderState;
import org.apache.ratis.util.Daemon;
import org.apache.ratis.util.JavaUtils;
@@ -109,10 +110,19 @@ class FollowerState extends Daemon {
return true;
}
+ private boolean shouldRun() {
+ final DivisionInfo info = server.getInfo();
+ final boolean run = isRunning && (info.isFollower() || info.isListener());
+ if (!run) {
+ LOG.info("{}: Stopping now (isRunning? {}, role = {})", this, isRunning, info.getCurrentRole());
+ }
+ return run;
+ }
+
@Override
public void run() {
final TimeDuration sleepDeviationThreshold = server.getSleepDeviationThreshold();
- while (isRunning && server.getInfo().isFollower()) {
+ while (shouldRun()) {
final TimeDuration electionTimeout = server.getRandomElectionTimeout();
try {
final TimeDuration extraSleep = electionTimeout.sleep();
@@ -122,14 +132,12 @@ class FollowerState extends Daemon {
continue;
}
- final boolean isFollower = server.getInfo().isFollower();
- if (!isRunning || !isFollower) {
- LOG.info("{}: Stopping now (isRunning? {}, isFollower? {})", this, isRunning, isFollower);
+ if (!shouldRun()) {
break;
}
synchronized (server) {
if (outstandingOp.get() == 0
- && isRunning
+ && isRunning && server.getInfo().isFollower()
&& lastRpcTime.elapsedTime().compareTo(electionTimeout) >= 0
&& !lostMajorityHeartbeatsRecently()) {
LOG.info("{}: change to CANDIDATE, lastRpcElapsedTime:{}, electionTimeout:{}",
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 5390905..49ad684 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -706,6 +706,7 @@ class LeaderStateImpl implements LeaderState {
applyOldNewConf();
senders.stream()
.map(LogAppender::getFollower)
+ .filter(f -> server.getRaftConf().containsInConf(f.getPeer().getId()))
.map(FollowerInfoImpl.class::cast)
.forEach(FollowerInfoImpl::startAttendVote);
}
@@ -1100,6 +1101,7 @@ class LeaderStateImpl implements LeaderState {
List<RaftPeer> getFollowers() {
return Collections.unmodifiableList(senders.stream()
.map(sender -> sender.getFollower().getPeer())
+ .filter(peer -> server.getRaftConf().containsInConf(peer.getId()))
.collect(Collectors.toList()));
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 51447be..a62fad0 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -313,7 +313,10 @@ class RaftServerImpl implements RaftServer.Division,
final RaftConfigurationImpl conf = getRaftConf();
if (conf != null && conf.containsInBothConfs(getId())) {
LOG.info("{}: start as a follower, conf={}", getMemberId(), conf);
- startAsFollower();
+ startAsPeer(RaftPeerRole.FOLLOWER);
+ } else if (conf != null && conf.containsInConf(getId(), RaftPeerRole.LISTENER)) {
+ LOG.info("{}: start as a listener, conf={}", getMemberId(), conf);
+ startAsPeer(RaftPeerRole.LISTENER);
} else {
LOG.info("{}: start with initializing state, conf={}", getMemberId(), conf);
startInitializing();
@@ -335,11 +338,21 @@ class RaftServerImpl implements RaftServer.Division,
}
/**
- * The peer belongs to the current configuration, should start as a follower
+ * The peer belongs to the current configuration, should start as a follower or listener
*/
- private void startAsFollower() {
- setRole(RaftPeerRole.FOLLOWER, "startAsFollower");
- role.startFollowerState(this, "startAsFollower");
+ private void startAsPeer(RaftPeerRole newRole) {
+ Object reason = "";
+ if (newRole == RaftPeerRole.FOLLOWER) {
+ reason = "startAsFollower";
+ setRole(RaftPeerRole.FOLLOWER, reason);
+ } else if (newRole == RaftPeerRole.LISTENER) {
+ reason = "startAsListener";
+ setRole(RaftPeerRole.LISTENER, reason);
+ } else {
+ throw new IllegalArgumentException("Unexpected role " + newRole);
+ }
+ role.startFollowerState(this, reason);
+
lifeCycle.transition(RUNNING);
}
@@ -485,6 +498,9 @@ class RaftServerImpl implements RaftServer.Division,
*/
private synchronized boolean changeToFollower(long newTerm, boolean force, Object reason) {
final RaftPeerRole old = role.getCurrentRole();
+ if (old == RaftPeerRole.LISTENER) {
+ throw new IllegalStateException("Unexpected role " + old);
+ }
final boolean metadataUpdated = state.updateCurrentTerm(newTerm);
if (old != RaftPeerRole.FOLLOWER || force) {