You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by dr...@apache.org on 2023/01/31 12:02:34 UTC

[ratis] branch master updated: RATIS-1774. Change SenderList in LeaderStateImpl to implement Iterable. (#813)

This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 7626a7ea0 RATIS-1774. Change SenderList in LeaderStateImpl to implement Iterable. (#813)
7626a7ea0 is described below

commit 7626a7ea0c7cf2ebe4ba9518996b598d617804e9
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Jan 31 20:02:27 2023 +0800

    RATIS-1774. Change SenderList in LeaderStateImpl to implement Iterable. (#813)
    
    * RATIS-1774. Change SenderList in LeaderStateImpl to implement Iterable.
    
    * Remove checkAllProgress(..)
    
    * Revert renaming getLogAppenders() to getSenders().
---
 .../apache/ratis/server/impl/LeaderStateImpl.java  | 73 ++++++++++------------
 .../apache/ratis/server/impl/RaftServerImpl.java   |  4 +-
 2 files changed, 34 insertions(+), 43 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 69f673de5..a3c0cc28d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -62,11 +62,12 @@ import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.Timestamp;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -79,12 +80,12 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
 import java.util.function.LongSupplier;
 import java.util.function.Predicate;
 import java.util.function.ToLongFunction;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
 
 import static org.apache.ratis.server.RaftServer.Division.LOG;
 import static org.apache.ratis.server.RaftServerConfigKeys.Write.FOLLOWER_GAP_RATIO_MAX_KEY;
@@ -193,23 +194,16 @@ class LeaderStateImpl implements LeaderState {
    * Since each mutation induces a copy of the list, only bulk operations
    * (addAll and removeAll) are supported.
    */
-  static class SenderList {
+  static class SenderList implements Iterable<LogAppender> {
     private final List<LogAppender> senders;
 
     SenderList() {
       this.senders = new CopyOnWriteArrayList<>();
     }
 
-    Stream<LogAppender> stream() {
-      return senders.stream();
-    }
-
-    List<LogAppender> getSenders() {
-      return senders;
-    }
-
-    void forEach(Consumer<LogAppender> action) {
-      senders.forEach(action);
+    @Override
+    public Iterator<LogAppender> iterator() {
+      return senders.iterator();
     }
 
     void addAll(Collection<LogAppender> newSenders) {
@@ -555,7 +549,7 @@ class LeaderStateImpl implements LeaderState {
   }
 
   void updateFollowerCommitInfos(CommitInfoCache cache, List<CommitInfoProto> protos) {
-    for (LogAppender sender : senders.getSenders()) {
+    for (LogAppender sender : senders) {
       FollowerInfo info = sender.getFollower();
       protos.add(cache.update(info.getPeer(), info.getCommitIndex()));
     }
@@ -574,7 +568,7 @@ class LeaderStateImpl implements LeaderState {
   /**
    * Update sender list for setConfiguration request
    */
-  void addAndStartSenders(Collection<RaftPeer> newPeers) {
+  private void addAndStartSenders(Collection<RaftPeer> newPeers) {
     if (!newPeers.isEmpty()) {
       addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, false).forEach(LogAppender::start);
     }
@@ -584,7 +578,7 @@ class LeaderStateImpl implements LeaderState {
     return server.getRaftConf().getPeer(id, RaftPeerRole.FOLLOWER, RaftPeerRole.LISTENER);
   }
 
-  Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers, long nextIndex, boolean attendVote) {
+  private Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers, long nextIndex, boolean attendVote) {
     final Timestamp t = Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs());
     final List<LogAppender> newAppenders = newPeers.stream().map(peer -> {
       final FollowerInfo f = new FollowerInfoImpl(server.getMemberId(), peer, this::getPeer, t, nextIndex, attendVote);
@@ -597,8 +591,11 @@ class LeaderStateImpl implements LeaderState {
     return newAppenders;
   }
 
-  void stopAndRemoveSenders(Predicate<LogAppender> predicate) {
-    final List<LogAppender> toStop = senders.stream().filter(predicate).collect(Collectors.toList());
+  private void stopAndRemoveSenders(Predicate<LogAppender> predicate) {
+    stopAndRemoveSenders(getLogAppenders().filter(predicate).collect(Collectors.toList()));
+  }
+
+  private void stopAndRemoveSenders(Collection<LogAppender> toStop) {
     toStop.forEach(LogAppender::stop);
     senders.removeAll(toStop);
   }
@@ -620,8 +617,7 @@ class LeaderStateImpl implements LeaderState {
 
     final FollowerInfo info = sender.getFollower();
     LOG.info("{}: Restarting {} for {}", this, JavaUtils.getClassSimpleName(sender.getClass()), info.getName());
-    sender.stop();
-    senders.removeAll(Collections.singleton(sender));
+    stopAndRemoveSenders(Collections.singleton(sender));
 
     Optional.ofNullable(getPeer(info.getId()))
         .ifPresent(peer -> addAndStartSenders(Collections.singleton(peer)));
@@ -775,14 +771,6 @@ class LeaderStateImpl implements LeaderState {
     }
   }
 
-  private Collection<BootStrapProgress> checkAllProgress(long committed) {
-    Preconditions.assertTrue(inStagingState());
-    return senders.stream()
-        .filter(sender -> !isAttendingVote(sender.getFollower()))
-        .map(sender -> checkProgress(sender.getFollower(), committed))
-        .collect(Collectors.toCollection(ArrayList::new));
-  }
-
   @Override
   public void onFollowerSuccessAppendEntries(FollowerInfo follower) {
     if (isAttendingVote(follower)) {
@@ -802,15 +790,19 @@ class LeaderStateImpl implements LeaderState {
       // it is possible that the bootstrapping is done. Then, fallback to UPDATE_COMMIT
       updateCommitEvent.execute();
     } else {
-      final long committedIndex = server.getState().getLog()
-          .getLastCommittedIndex();
-      Collection<BootStrapProgress> reports = checkAllProgress(committedIndex);
+      final long commitIndex = server.getState().getLog().getLastCommittedIndex();
+      // check progress for the new followers
+      final EnumSet<BootStrapProgress> reports = getLogAppenders()
+          .map(LogAppender::getFollower)
+          .filter(follower -> !isAttendingVote(follower))
+          .map(follower -> checkProgress(follower, commitIndex))
+          .collect(Collectors.toCollection(() -> EnumSet.noneOf(BootStrapProgress.class)));
       if (reports.contains(BootStrapProgress.NOPROGRESS)) {
         stagingState.fail(BootStrapProgress.NOPROGRESS);
       } else if (!reports.contains(BootStrapProgress.PROGRESSING)) {
         // all caught up!
         applyOldNewConf();
-        senders.stream()
+        getLogAppenders()
             .map(LogAppender::getFollower)
             .filter(f -> server.getRaftConf().containsInConf(f.getId()))
             .map(FollowerInfoImpl.class::cast)
@@ -1053,7 +1045,7 @@ class LeaderStateImpl implements LeaderState {
 
     FollowerInfo highestPriorityInfo = null;
     int highestPriority = Integer.MIN_VALUE;
-    for (LogAppender logAppender : senders.getSenders()) {
+    for (LogAppender logAppender : senders) {
       final RaftPeer follower = conf.getPeer(logAppender.getFollowerId());
       if (follower == null) {
         continue;
@@ -1088,7 +1080,7 @@ class LeaderStateImpl implements LeaderState {
       return true;
     }
 
-    final List<RaftPeerId> activePeers = senders.stream()
+    final List<RaftPeerId> activePeers = getLogAppenders()
         .filter(sender -> sender.getFollower()
                                 .getLastRpcResponseTime()
                                 .elapsedTimeMs() <= server.getMaxTimeoutMs())
@@ -1106,7 +1098,7 @@ class LeaderStateImpl implements LeaderState {
         + ". Election timeout: " + server.getMaxTimeoutMs() + "ms"
         + ". In charge for: " + server.getRole().getRoleElapsedTimeMs() + "ms"
         + ". Conf: " + conf);
-    senders.stream().map(LogAppender::getFollower).forEach(f -> LOG.warn("Follower {}", f));
+    getLogAppenders().map(LogAppender::getFollower).forEach(f -> LOG.warn("Follower {}", f));
 
     // step down as follower
     stepDown(currentTerm, StepDownReason.LOST_MAJORITY_HEARTBEATS);
@@ -1173,7 +1165,7 @@ class LeaderStateImpl implements LeaderState {
   }
 
   long[] getFollowerNextIndices() {
-    return senders.stream().mapToLong(s -> s.getFollower().getNextIndex()).toArray();
+    return getLogAppenders().mapToLong(s -> s.getFollower().getNextIndex()).toArray();
   }
 
   static Map<RaftPeerId, RaftPeer> newMap(Collection<RaftPeer> peers, String str) {
@@ -1237,15 +1229,14 @@ class LeaderStateImpl implements LeaderState {
   /**
    * @return the RaftPeer (address and id) information of the followers.
    */
-  List<RaftPeer> getFollowers() {
-    return Collections.unmodifiableList(senders.stream()
+  Stream<RaftPeer> getFollowers() {
+    return getLogAppenders()
         .map(sender -> sender.getFollower().getPeer())
-        .filter(peer -> server.getRaftConf().containsInConf(peer.getId()))
-        .collect(Collectors.toList()));
+        .filter(peer -> server.getRaftConf().containsInConf(peer.getId()));
   }
 
   Stream<LogAppender> getLogAppenders() {
-    return senders.stream();
+    return StreamSupport.stream(senders.spliterator(), false);
   }
 
   private static boolean isAttendingVote(FollowerInfo follower) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index bfeb0c19b..fc87f9719 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1835,8 +1835,8 @@ class RaftServerImpl implements RaftServer.Division,
 
     @Override
     public List<String> getFollowers() {
-      return role.getLeaderState().map(LeaderStateImpl::getFollowers).orElse(Collections.emptyList())
-          .stream().map(RaftPeer::toString).collect(Collectors.toList());
+      return role.getLeaderState().map(LeaderStateImpl::getFollowers).orElseGet(Stream::empty)
+          .map(RaftPeer::toString).collect(Collectors.toList());
     }
 
     @Override