You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by dr...@apache.org on 2022/12/06 13:00:52 UTC

[ratis] branch master updated: RATIS-1751. Race condition between LeaderStateImpl & ServerState. (#789)

This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c00461b9 RATIS-1751. Race condition between LeaderStateImpl & ServerState. (#789)
1c00461b9 is described below

commit 1c00461b93a2d259bf810713b00a9791a6bd292d
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Dec 6 05:00:47 2022 -0800

    RATIS-1751. Race condition between LeaderStateImpl & ServerState. (#789)
    
    * RATIS-1751. Race condition between LeaderStateImpl & ServerState.
    
    * Fix IllegalArgumentException.
    
    * Fix findbugs.
---
 .../apache/ratis/server/impl/LeaderStateImpl.java  | 130 +++++++++++++--------
 .../ratis/server/impl/PeerConfiguration.java       |   7 +-
 .../ratis/server/impl/RaftConfigurationImpl.java   |  11 +-
 3 files changed, 96 insertions(+), 52 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 7aba10aa5..f390f70d3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -229,6 +229,72 @@ class LeaderStateImpl implements LeaderState {
     }
   }
 
+  /** For caching {@link FollowerInfo}s.  This class is immutable. */
+  static class CurrentOldFollowerInfos {
+    private final RaftConfigurationImpl conf;
+    private final List<FollowerInfo> current;
+    private final List<FollowerInfo> old;
+
+    CurrentOldFollowerInfos(RaftConfigurationImpl conf, List<FollowerInfo> current, List<FollowerInfo> old) {
+      // set null when the sizes are not the same so that it will update next time.
+      this.conf = isSameSize(current, conf.getConf()) && isSameSize(old, conf.getOldConf())? conf: null;
+      this.current = Collections.unmodifiableList(current);
+      this.old = old == null? null: Collections.unmodifiableList(old);
+    }
+
+    RaftConfigurationImpl getConf() {
+      return conf;
+    }
+
+    List<FollowerInfo> getCurrent() {
+      return current;
+    }
+
+    List<FollowerInfo> getOld() {
+      return old;
+    }
+  }
+
+  static boolean isSameSize(List<FollowerInfo> infos, PeerConfiguration conf) {
+    return conf == null? infos == null: conf.size() == infos.size();
+  }
+
+  /** Use == to compare if the confs are the same object. */
+  static boolean isSameConf(CurrentOldFollowerInfos cached, RaftConfigurationImpl conf) {
+    return cached != null && cached.getConf() == conf;
+  }
+
+  static class FollowerInfoMap {
+    private final Map<RaftPeerId, FollowerInfo> map = new ConcurrentHashMap<>();
+
+    private volatile CurrentOldFollowerInfos followerInfos;
+
+    void put(RaftPeerId id, FollowerInfo info) {
+      map.put(id, info);
+    }
+
+    CurrentOldFollowerInfos getFollowerInfos(RaftConfigurationImpl conf) {
+      final CurrentOldFollowerInfos cached = followerInfos;
+      if (isSameConf(cached, conf)) {
+        return cached;
+      }
+
+      return update(conf);
+    }
+
+    synchronized CurrentOldFollowerInfos update(RaftConfigurationImpl conf) {
+      if (!isSameConf(followerInfos, conf)) { // compare again synchronized
+        followerInfos = new CurrentOldFollowerInfos(conf, getFollowerInfos(conf.getConf()),
+            Optional.ofNullable(conf.getOldConf()).map(this::getFollowerInfos).orElse(null));
+      }
+      return followerInfos;
+    }
+
+    private List<FollowerInfo> getFollowerInfos(PeerConfiguration peers) {
+      return peers.streamPeerIds().map(map::get).filter(Objects::nonNull).collect(Collectors.toList());
+    }
+  }
+
   private final StateUpdateEvent updateCommitEvent =
       new StateUpdateEvent(StateUpdateEvent.Type.UPDATE_COMMIT, -1, this::updateCommit);
   private final StateUpdateEvent checkStagingEvent =
@@ -239,8 +305,8 @@ class LeaderStateImpl implements LeaderState {
   private final RaftLog raftLog;
   private final long currentTerm;
   private volatile ConfigurationStagingState stagingState;
-  private List<List<RaftPeerId>> voterLists;
-  private final Map<RaftPeerId, FollowerInfo> peerIdFollowerInfoMap = new ConcurrentHashMap<>();
+
+  private final FollowerInfoMap followerInfoMap = new FollowerInfoMap();
 
   /**
    * The list of threads appending entries to followers.
@@ -306,7 +372,6 @@ class LeaderStateImpl implements LeaderState {
     if (!listeners.isEmpty()) {
       addSenders(listeners, placeHolderIndex, false, RaftPeerRole.LISTENER);
     }
-    voterLists = divideFollowers(conf);
   }
 
   LogEntryProto start() {
@@ -481,7 +546,6 @@ class LeaderStateImpl implements LeaderState {
 
   private void updateConfiguration(long logIndex, RaftConfigurationImpl newConf) {
     Preconditions.assertTrue(logIndex == newConf.getLogEntryIndex());
-    voterLists = divideFollowers(newConf);
     server.getState().setRaftConf(newConf);
   }
 
@@ -517,7 +581,7 @@ class LeaderStateImpl implements LeaderState {
     final List<LogAppender> newAppenders = newPeers.stream()
         .map(peer -> {
           final FollowerInfo f = new FollowerInfoImpl(server.getMemberId(), peer, t, nextIndex, attendVote);
-          peerIdFollowerInfoMap.put(peer.getId(), f);
+          followerInfoMap.put(peer.getId(), f);
           if (role == RaftPeerRole.FOLLOWER) {
             raftServerMetrics.addFollower(peer.getId());
             logAppenderMetrics.addFollowerGauges(peer.getId(), f::getNextIndex, f::getMatchIndex, f::getLastRpcTime);
@@ -789,7 +853,8 @@ class LeaderStateImpl implements LeaderState {
     final RaftPeerId selfId = server.getId();
     final RaftConfigurationImpl conf = server.getRaftConf();
 
-    final List<RaftPeerId> followers = voterLists.get(0);
+    final CurrentOldFollowerInfos infos = followerInfoMap.getFollowerInfos(conf);
+    final List<FollowerInfo> followers = infos.getCurrent();
     final boolean includeSelf = conf.containsInConf(selfId);
     if (followers.isEmpty() && !includeSelf) {
       return Optional.empty();
@@ -801,7 +866,7 @@ class LeaderStateImpl implements LeaderState {
     if (!conf.isTransitional()) {
       return Optional.of(newConf);
     } else { // configuration is in transitional state
-      final List<RaftPeerId> oldFollowers = voterLists.get(1);
+      final List<FollowerInfo> oldFollowers = infos.getOld();
       final boolean includeSelfInOldConf = conf.containsInOldConf(selfId);
       if (oldFollowers.isEmpty() && !includeSelfInOldConf) {
         return Optional.empty();
@@ -817,28 +882,29 @@ class LeaderStateImpl implements LeaderState {
     final RaftPeerId selfId = server.getId();
     final RaftConfigurationImpl conf = server.getRaftConf();
 
-    final List<RaftPeerId> followers = voterLists.get(0);
+    final CurrentOldFollowerInfos infos = followerInfoMap.getFollowerInfos(conf);
+    final List<FollowerInfo> followers = infos.getCurrent();
     final boolean includeSelf = conf.containsInConf(selfId);
     final boolean newConf = hasMajority(isAcked, followers, includeSelf);
 
     if (!conf.isTransitional()) {
       return newConf;
     } else {
-      final List<RaftPeerId> oldFollowers = voterLists.get(1);
+      final List<FollowerInfo> oldFollowers = infos.getOld();
       final boolean includeSelfInOldConf = conf.containsInOldConf(selfId);
       final boolean oldConf = hasMajority(isAcked, oldFollowers, includeSelfInOldConf);
       return newConf && oldConf;
     }
   }
 
-  private boolean hasMajority(Predicate<RaftPeerId> isAcked, List<RaftPeerId> followers, boolean includeSelf) {
+  private boolean hasMajority(Predicate<RaftPeerId> isAcked, List<FollowerInfo> followers, boolean includeSelf) {
     if (followers.isEmpty() && !includeSelf) {
       return true;
     }
 
     int count = includeSelf ? 1 : 0;
-    for (RaftPeerId follower: followers) {
-      if (isAcked.test(follower)) {
+    for (FollowerInfo follower: followers) {
+      if (isAcked.test(follower.getPeer().getId())) {
         count++;
       }
     }
@@ -924,31 +990,14 @@ class LeaderStateImpl implements LeaderState {
     notifySenders();
   }
 
-  private List<FollowerInfo> getFollowerInfos(List<RaftPeerId> followerIDs) {
-    List<FollowerInfo> followerInfos = new ArrayList<>();
-    for (int i = 0; i < followerIDs.size(); i++) {
-      RaftPeerId id = followerIDs.get(i);
-      if (!peerIdFollowerInfoMap.containsKey(id)) {
-        throw new IllegalArgumentException("RaftPeerId:" + id +
-                " not in peerIdFollowerInfoMap of leader:" + server.getMemberId());
-      }
-
-      followerInfos.add(peerIdFollowerInfoMap.get(id));
-    }
-
-    return followerInfos;
-  }
-
-  private long[] getSorted(List<RaftPeerId> followerIDs, boolean includeSelf,
+  private long[] getSorted(List<FollowerInfo> followerInfos, boolean includeSelf,
       ToLongFunction<FollowerInfo> getFollowerIndex, LongSupplier getLogIndex) {
-    final int length = includeSelf ? followerIDs.size() + 1 : followerIDs.size();
+    final int length = includeSelf ? followerInfos.size() + 1 : followerInfos.size();
     if (length == 0) {
-      throw new IllegalArgumentException("followers.size() == "
-          + followerIDs.size() + " and includeSelf == " + includeSelf);
+      throw new IllegalArgumentException("followerInfos is empty and includeSelf == " + includeSelf);
     }
 
     final long[] indices = new long[length];
-    List<FollowerInfo> followerInfos = getFollowerInfos(followerIDs);
     for (int i = 0; i < followerInfos.size(); i++) {
       indices[i] = getFollowerIndex.applyAsLong(followerInfos.get(i));
     }
@@ -962,23 +1011,6 @@ class LeaderStateImpl implements LeaderState {
     return indices;
   }
 
-  private List<List<RaftPeerId>> divideFollowers(RaftConfigurationImpl conf) {
-    List<List<RaftPeerId>> lists = new ArrayList<>(2);
-    List<RaftPeerId> listForNew = senders.stream()
-        .map(LogAppender::getFollowerId)
-        .filter(conf::containsInConf)
-        .collect(Collectors.toList());
-    lists.add(listForNew);
-    if (conf.isTransitional()) {
-      List<RaftPeerId> listForOld = senders.stream()
-          .map(LogAppender::getFollowerId)
-          .filter(conf::containsInOldConf)
-          .collect(Collectors.toList());
-      lists.add(listForOld);
-    }
-    return lists;
-  }
-
   private void yieldLeaderToHigherPriorityPeer() {
     if (!server.getInfo().isLeader()) {
       return;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
index 6730b6181..38e3602e8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
@@ -31,10 +31,11 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.stream.Stream;
 
 /**
  * The peer configuration of a raft cluster.
- *
+ * <p>
  * The objects of this class are immutable.
  */
 class PeerConfiguration {
@@ -95,6 +96,10 @@ class PeerConfiguration {
     return peers.size();
   }
 
+  Stream<RaftPeerId> streamPeerIds() {
+    return peers.keySet().stream();
+  }
+
   @Override
   public String toString() {
     return "peers:" + peers.values() + "|listeners:" + listeners.values();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
index 43818395a..3e53451f0 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
@@ -34,10 +34,10 @@ import java.util.stream.Collectors;
 
 /**
  * The configuration of the raft cluster.
- *
+ * <p>
  * The configuration is stable if there is no on-going peer change. Otherwise,
  * the configuration is transitional, i.e. in the middle of a peer change.
- *
+ * <p>
  * The objects of this class are immutable.
  */
 final class RaftConfigurationImpl implements RaftConfiguration {
@@ -157,6 +157,13 @@ final class RaftConfigurationImpl implements RaftConfiguration {
     }
   }
 
+  PeerConfiguration getConf() {
+    return conf;
+  }
+
+  PeerConfiguration getOldConf() {
+    return oldConf;
+  }
 
   boolean isHighestPriority(RaftPeerId peerId) {
     RaftPeer target = getPeer(peerId);