You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2023/03/01 01:22:18 UTC
[ratis] 02/16: RATIS-1751. Race condition between LeaderStateImpl & ServerState. (#789)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch branch-2_tmp
in repository https://gitbox.apache.org/repos/asf/ratis.git
commit 3923d64bf02c0e2fbc0915dae5fcc0ea9b12204e
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Dec 6 05:00:47 2022 -0800
RATIS-1751. Race condition between LeaderStateImpl & ServerState. (#789)
(cherry picked from commit 1c00461b93a2d259bf810713b00a9791a6bd292d)
---
.../apache/ratis/server/impl/LeaderStateImpl.java | 119 +++++++++++++--------
.../ratis/server/impl/PeerConfiguration.java | 7 +-
.../ratis/server/impl/RaftConfigurationImpl.java | 11 +-
3 files changed, 90 insertions(+), 47 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 4a9c07bee..b55389343 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -225,6 +225,72 @@ class LeaderStateImpl implements LeaderState {
}
}
+ /** For caching {@link FollowerInfo}s. This class is immutable. */
+ static class CurrentOldFollowerInfos {
+ private final RaftConfigurationImpl conf;
+ private final List<FollowerInfo> current;
+ private final List<FollowerInfo> old;
+
+ CurrentOldFollowerInfos(RaftConfigurationImpl conf, List<FollowerInfo> current, List<FollowerInfo> old) {
+ // set null when the sizes are not the same so that it will update next time.
+ this.conf = isSameSize(current, conf.getConf()) && isSameSize(old, conf.getOldConf())? conf: null;
+ this.current = Collections.unmodifiableList(current);
+ this.old = old == null? null: Collections.unmodifiableList(old);
+ }
+
+ RaftConfigurationImpl getConf() {
+ return conf;
+ }
+
+ List<FollowerInfo> getCurrent() {
+ return current;
+ }
+
+ List<FollowerInfo> getOld() {
+ return old;
+ }
+ }
+
+ static boolean isSameSize(List<FollowerInfo> infos, PeerConfiguration conf) {
+ return conf == null? infos == null: conf.size() == infos.size();
+ }
+
+ /** Use == to compare if the confs are the same object. */
+ static boolean isSameConf(CurrentOldFollowerInfos cached, RaftConfigurationImpl conf) {
+ return cached != null && cached.getConf() == conf;
+ }
+
+ static class FollowerInfoMap {
+ private final Map<RaftPeerId, FollowerInfo> map = new ConcurrentHashMap<>();
+
+ private volatile CurrentOldFollowerInfos followerInfos;
+
+ void put(RaftPeerId id, FollowerInfo info) {
+ map.put(id, info);
+ }
+
+ CurrentOldFollowerInfos getFollowerInfos(RaftConfigurationImpl conf) {
+ final CurrentOldFollowerInfos cached = followerInfos;
+ if (isSameConf(cached, conf)) {
+ return cached;
+ }
+
+ return update(conf);
+ }
+
+ synchronized CurrentOldFollowerInfos update(RaftConfigurationImpl conf) {
+ if (!isSameConf(followerInfos, conf)) { // compare again synchronized
+ followerInfos = new CurrentOldFollowerInfos(conf, getFollowerInfos(conf.getConf()),
+ Optional.ofNullable(conf.getOldConf()).map(this::getFollowerInfos).orElse(null));
+ }
+ return followerInfos;
+ }
+
+ private List<FollowerInfo> getFollowerInfos(PeerConfiguration peers) {
+ return peers.streamPeerIds().map(map::get).filter(Objects::nonNull).collect(Collectors.toList());
+ }
+ }
+
private final StateUpdateEvent updateCommitEvent =
new StateUpdateEvent(StateUpdateEvent.Type.UPDATE_COMMIT, -1, this::updateCommit);
private final StateUpdateEvent checkStagingEvent =
@@ -235,8 +301,8 @@ class LeaderStateImpl implements LeaderState {
private final RaftLog raftLog;
private final long currentTerm;
private volatile ConfigurationStagingState stagingState;
- private List<List<RaftPeerId>> voterLists;
- private final Map<RaftPeerId, FollowerInfo> peerIdFollowerInfoMap = new ConcurrentHashMap<>();
+
+ private final FollowerInfoMap followerInfoMap = new FollowerInfoMap();
/**
* The list of threads appending entries to followers.
@@ -299,7 +365,6 @@ class LeaderStateImpl implements LeaderState {
if (!listeners.isEmpty()) {
addSenders(listeners, placeHolderIndex, false, RaftPeerRole.LISTENER);
}
- voterLists = divideFollowers(conf);
}
LogEntryProto start() {
@@ -472,7 +537,6 @@ class LeaderStateImpl implements LeaderState {
private void updateConfiguration(long logIndex, RaftConfigurationImpl newConf) {
Preconditions.assertTrue(logIndex == newConf.getLogEntryIndex());
- voterLists = divideFollowers(newConf);
server.getState().setRaftConf(newConf);
}
@@ -508,7 +572,7 @@ class LeaderStateImpl implements LeaderState {
final List<LogAppender> newAppenders = newPeers.stream()
.map(peer -> {
final FollowerInfo f = new FollowerInfoImpl(server.getMemberId(), peer, t, nextIndex, attendVote);
- peerIdFollowerInfoMap.put(peer.getId(), f);
+ followerInfoMap.put(peer.getId(), f);
if (role == RaftPeerRole.FOLLOWER) {
raftServerMetrics.addFollower(peer.getId());
logAppenderMetrics.addFollowerGauges(peer.getId(), f::getNextIndex, f::getMatchIndex, f::getLastRpcTime);
@@ -780,7 +844,8 @@ class LeaderStateImpl implements LeaderState {
final RaftPeerId selfId = server.getId();
final RaftConfigurationImpl conf = server.getRaftConf();
- final List<RaftPeerId> followers = voterLists.get(0);
+ final CurrentOldFollowerInfos infos = followerInfoMap.getFollowerInfos(conf);
+ final List<FollowerInfo> followers = infos.getCurrent();
final boolean includeSelf = conf.containsInConf(selfId);
if (followers.isEmpty() && !includeSelf) {
return Optional.empty();
@@ -792,7 +857,7 @@ class LeaderStateImpl implements LeaderState {
if (!conf.isTransitional()) {
return Optional.of(newConf);
} else { // configuration is in transitional state
- final List<RaftPeerId> oldFollowers = voterLists.get(1);
+ final List<FollowerInfo> oldFollowers = infos.getOld();
final boolean includeSelfInOldConf = conf.containsInOldConf(selfId);
if (oldFollowers.isEmpty() && !includeSelfInOldConf) {
return Optional.empty();
@@ -882,31 +947,14 @@ class LeaderStateImpl implements LeaderState {
notifySenders();
}
- private List<FollowerInfo> getFollowerInfos(List<RaftPeerId> followerIDs) {
- List<FollowerInfo> followerInfos = new ArrayList<>();
- for (int i = 0; i < followerIDs.size(); i++) {
- RaftPeerId id = followerIDs.get(i);
- if (!peerIdFollowerInfoMap.containsKey(id)) {
- throw new IllegalArgumentException("RaftPeerId:" + id +
- " not in peerIdFollowerInfoMap of leader:" + server.getMemberId());
- }
-
- followerInfos.add(peerIdFollowerInfoMap.get(id));
- }
-
- return followerInfos;
- }
-
- private long[] getSorted(List<RaftPeerId> followerIDs, boolean includeSelf,
+ private long[] getSorted(List<FollowerInfo> followerInfos, boolean includeSelf,
ToLongFunction<FollowerInfo> getFollowerIndex, LongSupplier getLogIndex) {
- final int length = includeSelf ? followerIDs.size() + 1 : followerIDs.size();
+ final int length = includeSelf ? followerInfos.size() + 1 : followerInfos.size();
if (length == 0) {
- throw new IllegalArgumentException("followers.size() == "
- + followerIDs.size() + " and includeSelf == " + includeSelf);
+ throw new IllegalArgumentException("followerInfos is empty and includeSelf == " + includeSelf);
}
final long[] indices = new long[length];
- List<FollowerInfo> followerInfos = getFollowerInfos(followerIDs);
for (int i = 0; i < followerInfos.size(); i++) {
indices[i] = getFollowerIndex.applyAsLong(followerInfos.get(i));
}
@@ -920,23 +968,6 @@ class LeaderStateImpl implements LeaderState {
return indices;
}
- private List<List<RaftPeerId>> divideFollowers(RaftConfigurationImpl conf) {
- List<List<RaftPeerId>> lists = new ArrayList<>(2);
- List<RaftPeerId> listForNew = senders.stream()
- .map(LogAppender::getFollowerId)
- .filter(conf::containsInConf)
- .collect(Collectors.toList());
- lists.add(listForNew);
- if (conf.isTransitional()) {
- List<RaftPeerId> listForOld = senders.stream()
- .map(LogAppender::getFollowerId)
- .filter(conf::containsInOldConf)
- .collect(Collectors.toList());
- lists.add(listForOld);
- }
- return lists;
- }
-
private void yieldLeaderToHigherPriorityPeer() {
if (!server.getInfo().isLeader()) {
return;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
index 6730b6181..38e3602e8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PeerConfiguration.java
@@ -31,10 +31,11 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.stream.Stream;
/**
* The peer configuration of a raft cluster.
- *
+ * <p>
* The objects of this class are immutable.
*/
class PeerConfiguration {
@@ -95,6 +96,10 @@ class PeerConfiguration {
return peers.size();
}
+ Stream<RaftPeerId> streamPeerIds() {
+ return peers.keySet().stream();
+ }
+
@Override
public String toString() {
return "peers:" + peers.values() + "|listeners:" + listeners.values();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
index 43818395a..3e53451f0 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftConfigurationImpl.java
@@ -34,10 +34,10 @@ import java.util.stream.Collectors;
/**
* The configuration of the raft cluster.
- *
+ * <p>
* The configuration is stable if there is no on-going peer change. Otherwise,
* the configuration is transitional, i.e. in the middle of a peer change.
- *
+ * <p>
* The objects of this class are immutable.
*/
final class RaftConfigurationImpl implements RaftConfiguration {
@@ -157,6 +157,13 @@ final class RaftConfigurationImpl implements RaftConfiguration {
}
}
+ PeerConfiguration getConf() {
+ return conf;
+ }
+
+ PeerConfiguration getOldConf() {
+ return oldConf;
+ }
boolean isHighestPriority(RaftPeerId peerId) {
RaftPeer target = getPeer(peerId);