You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2023/03/31 08:20:22 UTC

[ratis] branch master updated: RATIS-1824. Membership change may fail when a Listener is present in the cluster. (#865)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 0b42e9445 RATIS-1824. Membership change may fail when a Listener is present in the cluster.  (#865)
0b42e9445 is described below

commit 0b42e9445bb64d9ba600b8ce7ac0f92e9fca293c
Author: qian0817 <qi...@gmail.com>
AuthorDate: Fri Mar 31 16:20:16 2023 +0800

    RATIS-1824. Membership change may fail when a Listener is present in the cluster.  (#865)
---
 .../apache/ratis/server/impl/FollowerInfoImpl.java | 16 ++++++-------
 .../apache/ratis/server/impl/LeaderStateImpl.java  | 24 ++++++++++----------
 .../ratis/server/impl/LeaderElectionTests.java     | 26 ++++++++++++++++++++++
 3 files changed, 46 insertions(+), 20 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
index 891a01c76..245cbc888 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
@@ -43,11 +43,11 @@ class FollowerInfoImpl implements FollowerInfo {
   private final RaftLogIndex matchIndex = new RaftLogIndex("matchIndex", RaftLog.INVALID_LOG_INDEX);
   private final RaftLogIndex commitIndex = new RaftLogIndex("commitIndex", RaftLog.INVALID_LOG_INDEX);
   private final RaftLogIndex snapshotIndex = new RaftLogIndex("snapshotIndex", 0L);
-  private volatile boolean attendVote;
+  private volatile boolean caughtUp;
   private volatile boolean ackInstallSnapshotAttempt = false;
 
   FollowerInfoImpl(RaftGroupMemberId id, RaftPeer peer, Function<RaftPeerId, RaftPeer> getPeer,
-      Timestamp lastRpcTime, long nextIndex, boolean attendVote) {
+      Timestamp lastRpcTime, long nextIndex, boolean caughtUp) {
     this.name = id + "->" + peer.getId();
     this.infoIndexChange = s -> LOG.info("{}: {}", name, s);
     this.debugIndexChange = s -> LOG.debug("{}: {}", name, s);
@@ -58,7 +58,7 @@ class FollowerInfoImpl implements FollowerInfo {
     this.lastRpcSendTime = new AtomicReference<>(lastRpcTime);
     this.lastHeartbeatSendTime = new AtomicReference<>(lastRpcTime);
     this.nextIndex = new RaftLogIndex("nextIndex", nextIndex);
-    this.attendVote = attendVote;
+    this.caughtUp = caughtUp;
   }
 
   @Override
@@ -140,17 +140,17 @@ class FollowerInfoImpl implements FollowerInfo {
   @Override
   public String toString() {
     return name + "(c" + getCommitIndex() + ",m" + getMatchIndex() + ",n" + getNextIndex()
-        + ", attendVote=" + attendVote +
+        + ", caughtUp=" + caughtUp +
         ", lastRpcSendTime=" + lastRpcSendTime.get().elapsedTimeMs() +
         ", lastRpcResponseTime=" + lastRpcResponseTime.get().elapsedTimeMs() + ")";
   }
 
-  void startAttendVote() {
-    attendVote = true;
+  void catchUp() {
+    caughtUp = true;
   }
 
-  boolean isAttendingVote() {
-    return attendVote;
+  boolean isCaughtUp() {
+    return caughtUp;
   }
 
   @Override
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 6627d8e7b..7ea4d738d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -364,7 +364,7 @@ class LeaderStateImpl implements LeaderState {
 
     final Collection<RaftPeer> listeners = conf.getAllPeers(RaftPeerRole.LISTENER);
     if (!listeners.isEmpty()) {
-      addSenders(listeners, placeHolderIndex, false);
+      addSenders(listeners, placeHolderIndex, true);
     }
   }
 
@@ -429,7 +429,7 @@ class LeaderStateImpl implements LeaderState {
 
   @Override
   public boolean onFollowerTerm(FollowerInfo follower, long followerTerm) {
-    if (isAttendingVote(follower) && followerTerm > getCurrentTerm()) {
+    if (isCaughtUp(follower) && followerTerm > getCurrentTerm()) {
       submitStepDownEvent(followerTerm, StepDownReason.HIGHER_TERM);
       return true;
     }
@@ -561,7 +561,7 @@ class LeaderStateImpl implements LeaderState {
   @Override
   public AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follower,
       List<LogEntryProto> entries, TermIndex previous, long callId) {
-    final boolean initializing = isAttendingVote(follower);
+    final boolean initializing = isCaughtUp(follower);
     final RaftPeerId targetId = follower.getId();
     return ServerProtoUtils.toAppendEntriesRequestProto(server.getMemberId(), targetId, currentTerm, entries,
         ServerImplUtils.effectiveCommitIndex(raftLog.getLastCommittedIndex(), previous, entries.size()),
@@ -581,10 +581,10 @@ class LeaderStateImpl implements LeaderState {
     return server.getRaftConf().getPeer(id, RaftPeerRole.FOLLOWER, RaftPeerRole.LISTENER);
   }
 
-  private Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers, long nextIndex, boolean attendVote) {
+  private Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers, long nextIndex, boolean caughtUp) {
     final Timestamp t = Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs());
     final List<LogAppender> newAppenders = newPeers.stream().map(peer -> {
-      final FollowerInfo f = new FollowerInfoImpl(server.getMemberId(), peer, this::getPeer, t, nextIndex, attendVote);
+      final FollowerInfo f = new FollowerInfoImpl(server.getMemberId(), peer, this::getPeer, t, nextIndex, caughtUp);
       followerInfoMap.put(peer.getId(), f);
       raftServerMetrics.addFollower(peer.getId());
       logAppenderMetrics.addFollowerGauges(peer.getId(), f::getNextIndex, f::getMatchIndex, f::getLastRpcTime);
@@ -726,7 +726,7 @@ class LeaderStateImpl implements LeaderState {
    * 3. Otherwise the peer is making progressing. Keep waiting.
    */
   private BootStrapProgress checkProgress(FollowerInfo follower, long committed) {
-    Preconditions.assertTrue(!isAttendingVote(follower));
+    Preconditions.assertTrue(!isCaughtUp(follower));
     final Timestamp progressTime = Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs());
     final Timestamp timeoutTime = Timestamp.currentTime().addTimeMs(-3L * server.getMaxTimeoutMs());
     if (follower.getLastRpcResponseTime().compareTo(timeoutTime) < 0) {
@@ -744,7 +744,7 @@ class LeaderStateImpl implements LeaderState {
 
   @Override
   public void onFollowerSuccessAppendEntries(FollowerInfo follower) {
-    if (isAttendingVote(follower)) {
+    if (isCaughtUp(follower)) {
       submitUpdateCommitEvent();
     } else {
       eventQueue.submit(checkStagingEvent);
@@ -766,7 +766,7 @@ class LeaderStateImpl implements LeaderState {
       // check progress for the new followers
       final EnumSet<BootStrapProgress> reports = getLogAppenders()
           .map(LogAppender::getFollower)
-          .filter(follower -> !isAttendingVote(follower))
+          .filter(follower -> !isCaughtUp(follower))
           .map(follower -> checkProgress(follower, commitIndex))
           .collect(Collectors.toCollection(() -> EnumSet.noneOf(BootStrapProgress.class)));
       if (reports.contains(BootStrapProgress.NOPROGRESS)) {
@@ -778,7 +778,7 @@ class LeaderStateImpl implements LeaderState {
             .map(LogAppender::getFollower)
             .filter(f -> server.getRaftConf().containsInConf(f.getId()))
             .map(FollowerInfoImpl.class::cast)
-            .forEach(FollowerInfoImpl::startAttendVote);
+            .forEach(FollowerInfoImpl::catchUp);
       }
     }
   }
@@ -1184,7 +1184,7 @@ class LeaderStateImpl implements LeaderState {
     void fail(BootStrapProgress progress) {
       final String message = this + ": Fail to set configuration " + newConf + " due to " + progress;
       LOG.debug(message);
-      stopAndRemoveSenders(s -> !isAttendingVote(s.getFollower()));
+      stopAndRemoveSenders(s -> !isCaughtUp(s.getFollower()));
 
       stagingState = null;
       // send back failure response to client's request
@@ -1214,8 +1214,8 @@ class LeaderStateImpl implements LeaderState {
     return getLogAppenders().filter(a -> a.getFollowerId().equals(id)).findAny();
   }
 
-  private static boolean isAttendingVote(FollowerInfo follower) {
-    return ((FollowerInfoImpl)follower).isAttendingVote();
+  private static boolean isCaughtUp(FollowerInfo follower) {
+    return ((FollowerInfoImpl)follower).isCaughtUp();
   }
 
   @Override
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index dc22e2743..9e2b7bd2d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -410,6 +410,32 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
     }
   }
 
+  @Test
+  public void testAddFollowerWhenExistsListener() throws Exception {
+    try (final MiniRaftCluster cluster = newCluster(3, 1)) {
+      cluster.start();
+      final RaftServer.Division leader = waitForLeader(cluster);
+      try (RaftClient client = cluster.createClient(leader.getId())) {
+        client.io().send(new RaftTestUtil.SimpleMessage("message"));
+        List<RaftPeer> servers = cluster.getPeers();
+        Assert.assertEquals(4, servers.size());
+        List<RaftPeer> listener = new ArrayList<>(
+            leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER));
+        Assert.assertEquals(1, listener.size());
+        MiniRaftCluster.PeerChanges changes = cluster.addNewPeers(1, true, false);
+        ArrayList<RaftPeer> newPeers = new ArrayList<>(Arrays.asList(changes.newPeers));
+        newPeers.addAll(leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.FOLLOWER));
+        RaftClientReply reply = client.admin().setConfiguration(newPeers, listener);
+        Assert.assertTrue(reply.isSuccess());
+        Assert.assertEquals(4,
+            leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.FOLLOWER).size());
+        Assert.assertEquals(1,
+            leader.getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER).size());
+      }
+      cluster.shutdown();
+    }
+  }
+
   @Test
   public void testRemoveListener() throws Exception {
     try(final MiniRaftCluster cluster = newCluster(3,1)) {