You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2023/03/31 08:20:22 UTC
[ratis] branch master updated: RATIS-1824. Membership change may fail when a Listener is present in the cluster. (#865)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 0b42e9445 RATIS-1824. Membership change may fail when a Listener is present in the cluster. (#865)
0b42e9445 is described below
commit 0b42e9445bb64d9ba600b8ce7ac0f92e9fca293c
Author: qian0817 <qi...@gmail.com>
AuthorDate: Fri Mar 31 16:20:16 2023 +0800
RATIS-1824. Membership change may fail when a Listener is present in the cluster. (#865)
---
.../apache/ratis/server/impl/FollowerInfoImpl.java | 16 ++++++-------
.../apache/ratis/server/impl/LeaderStateImpl.java | 24 ++++++++++----------
.../ratis/server/impl/LeaderElectionTests.java | 26 ++++++++++++++++++++++
3 files changed, 46 insertions(+), 20 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
index 891a01c76..245cbc888 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
@@ -43,11 +43,11 @@ class FollowerInfoImpl implements FollowerInfo {
private final RaftLogIndex matchIndex = new RaftLogIndex("matchIndex", RaftLog.INVALID_LOG_INDEX);
private final RaftLogIndex commitIndex = new RaftLogIndex("commitIndex", RaftLog.INVALID_LOG_INDEX);
private final RaftLogIndex snapshotIndex = new RaftLogIndex("snapshotIndex", 0L);
- private volatile boolean attendVote;
+ private volatile boolean caughtUp;
private volatile boolean ackInstallSnapshotAttempt = false;
FollowerInfoImpl(RaftGroupMemberId id, RaftPeer peer, Function<RaftPeerId, RaftPeer> getPeer,
- Timestamp lastRpcTime, long nextIndex, boolean attendVote) {
+ Timestamp lastRpcTime, long nextIndex, boolean caughtUp) {
this.name = id + "->" + peer.getId();
this.infoIndexChange = s -> LOG.info("{}: {}", name, s);
this.debugIndexChange = s -> LOG.debug("{}: {}", name, s);
@@ -58,7 +58,7 @@ class FollowerInfoImpl implements FollowerInfo {
this.lastRpcSendTime = new AtomicReference<>(lastRpcTime);
this.lastHeartbeatSendTime = new AtomicReference<>(lastRpcTime);
this.nextIndex = new RaftLogIndex("nextIndex", nextIndex);
- this.attendVote = attendVote;
+ this.caughtUp = caughtUp;
}
@Override
@@ -140,17 +140,17 @@ class FollowerInfoImpl implements FollowerInfo {
@Override
public String toString() {
return name + "(c" + getCommitIndex() + ",m" + getMatchIndex() + ",n" + getNextIndex()
- + ", attendVote=" + attendVote +
+ + ", caughtUp=" + caughtUp +
", lastRpcSendTime=" + lastRpcSendTime.get().elapsedTimeMs() +
", lastRpcResponseTime=" + lastRpcResponseTime.get().elapsedTimeMs() + ")";
}
- void startAttendVote() {
- attendVote = true;
+ void catchUp() {
+ caughtUp = true;
}
- boolean isAttendingVote() {
- return attendVote;
+ boolean isCaughtUp() {
+ return caughtUp;
}
@Override
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 6627d8e7b..7ea4d738d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -364,7 +364,7 @@ class LeaderStateImpl implements LeaderState {
final Collection<RaftPeer> listeners = conf.getAllPeers(RaftPeerRole.LISTENER);
if (!listeners.isEmpty()) {
- addSenders(listeners, placeHolderIndex, false);
+ addSenders(listeners, placeHolderIndex, true);
}
}
@@ -429,7 +429,7 @@ class LeaderStateImpl implements LeaderState {
@Override
public boolean onFollowerTerm(FollowerInfo follower, long followerTerm) {
- if (isAttendingVote(follower) && followerTerm > getCurrentTerm()) {
+ if (isCaughtUp(follower) && followerTerm > getCurrentTerm()) {
submitStepDownEvent(followerTerm, StepDownReason.HIGHER_TERM);
return true;
}
@@ -561,7 +561,7 @@ class LeaderStateImpl implements LeaderState {
@Override
public AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follower,
List<LogEntryProto> entries, TermIndex previous, long callId) {
- final boolean initializing = isAttendingVote(follower);
+ final boolean initializing = isCaughtUp(follower);
final RaftPeerId targetId = follower.getId();
return ServerProtoUtils.toAppendEntriesRequestProto(server.getMemberId(), targetId, currentTerm, entries,
ServerImplUtils.effectiveCommitIndex(raftLog.getLastCommittedIndex(), previous, entries.size()),
@@ -581,10 +581,10 @@ class LeaderStateImpl implements LeaderState {
return server.getRaftConf().getPeer(id, RaftPeerRole.FOLLOWER, RaftPeerRole.LISTENER);
}
- private Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers, long nextIndex, boolean attendVote) {
+ private Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers, long nextIndex, boolean caughtUp) {
final Timestamp t = Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs());
final List<LogAppender> newAppenders = newPeers.stream().map(peer -> {
- final FollowerInfo f = new FollowerInfoImpl(server.getMemberId(), peer, this::getPeer, t, nextIndex, attendVote);
+ final FollowerInfo f = new FollowerInfoImpl(server.getMemberId(), peer, this::getPeer, t, nextIndex, caughtUp);
followerInfoMap.put(peer.getId(), f);
raftServerMetrics.addFollower(peer.getId());
logAppenderMetrics.addFollowerGauges(peer.getId(), f::getNextIndex, f::getMatchIndex, f::getLastRpcTime);
@@ -726,7 +726,7 @@ class LeaderStateImpl implements LeaderState {
* 3. Otherwise the peer is making progressing. Keep waiting.
*/
private BootStrapProgress checkProgress(FollowerInfo follower, long committed) {
- Preconditions.assertTrue(!isAttendingVote(follower));
+ Preconditions.assertTrue(!isCaughtUp(follower));
final Timestamp progressTime = Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs());
final Timestamp timeoutTime = Timestamp.currentTime().addTimeMs(-3L * server.getMaxTimeoutMs());
if (follower.getLastRpcResponseTime().compareTo(timeoutTime) < 0) {
@@ -744,7 +744,7 @@ class LeaderStateImpl implements LeaderState {
@Override
public void onFollowerSuccessAppendEntries(FollowerInfo follower) {
- if (isAttendingVote(follower)) {
+ if (isCaughtUp(follower)) {
submitUpdateCommitEvent();
} else {
eventQueue.submit(checkStagingEvent);
@@ -766,7 +766,7 @@ class LeaderStateImpl implements LeaderState {
// check progress for the new followers
final EnumSet<BootStrapProgress> reports = getLogAppenders()
.map(LogAppender::getFollower)
- .filter(follower -> !isAttendingVote(follower))
+ .filter(follower -> !isCaughtUp(follower))
.map(follower -> checkProgress(follower, commitIndex))
.collect(Collectors.toCollection(() -> EnumSet.noneOf(BootStrapProgress.class)));
if (reports.contains(BootStrapProgress.NOPROGRESS)) {
@@ -778,7 +778,7 @@ class LeaderStateImpl implements LeaderState {
.map(LogAppender::getFollower)
.filter(f -> server.getRaftConf().containsInConf(f.getId()))
.map(FollowerInfoImpl.class::cast)
- .forEach(FollowerInfoImpl::startAttendVote);
+ .forEach(FollowerInfoImpl::catchUp);
}
}
}
@@ -1184,7 +1184,7 @@ class LeaderStateImpl implements LeaderState {
void fail(BootStrapProgress progress) {
final String message = this + ": Fail to set configuration " + newConf + " due to " + progress;
LOG.debug(message);
- stopAndRemoveSenders(s -> !isAttendingVote(s.getFollower()));
+ stopAndRemoveSenders(s -> !isCaughtUp(s.getFollower()));
stagingState = null;
// send back failure response to client's request
@@ -1214,8 +1214,8 @@ class LeaderStateImpl implements LeaderState {
return getLogAppenders().filter(a -> a.getFollowerId().equals(id)).findAny();
}
- private static boolean isAttendingVote(FollowerInfo follower) {
- return ((FollowerInfoImpl)follower).isAttendingVote();
+ private static boolean isCaughtUp(FollowerInfo follower) {
+ return ((FollowerInfoImpl)follower).isCaughtUp();
}
@Override
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index dc22e2743..9e2b7bd2d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -410,6 +410,32 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
}
}
+ @Test
+ public void testAddFollowerWhenExistsListener() throws Exception {
+ try (final MiniRaftCluster cluster = newCluster(3, 1)) {
+ cluster.start();
+ final RaftServer.Division leader = waitForLeader(cluster);
+ try (RaftClient client = cluster.createClient(leader.getId())) {
+ client.io().send(new RaftTestUtil.SimpleMessage("message"));
+ List<RaftPeer> servers = cluster.getPeers();
+ Assert.assertEquals(4, servers.size());
+ List<RaftPeer> listener = new ArrayList<>(
+ leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER));
+ Assert.assertEquals(1, listener.size());
+ MiniRaftCluster.PeerChanges changes = cluster.addNewPeers(1, true, false);
+ ArrayList<RaftPeer> newPeers = new ArrayList<>(Arrays.asList(changes.newPeers));
+ newPeers.addAll(leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.FOLLOWER));
+ RaftClientReply reply = client.admin().setConfiguration(newPeers, listener);
+ Assert.assertTrue(reply.isSuccess());
+ Assert.assertEquals(4,
+ leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.FOLLOWER).size());
+ Assert.assertEquals(1,
+ leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER).size());
+ }
+ cluster.shutdown();
+ }
+ }
+
@Test
public void testRemoveListener() throws Exception {
try(final MiniRaftCluster cluster = newCluster(3,1)) {