You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by dr...@apache.org on 2023/01/31 12:02:34 UTC
[ratis] branch master updated: RATIS-1774. Change SenderList in LeaderStateImpl to implement Iterable. (#813)
This is an automated email from the ASF dual-hosted git repository.
dragonyliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 7626a7ea0 RATIS-1774. Change SenderList in LeaderStateImpl to implement Iterable. (#813)
7626a7ea0 is described below
commit 7626a7ea0c7cf2ebe4ba9518996b598d617804e9
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Jan 31 20:02:27 2023 +0800
RATIS-1774. Change SenderList in LeaderStateImpl to implement Iterable. (#813)
* RATIS-1774. Change SenderList in LeaderStateImpl to implement Iterable.
* Remove checkAllProgress(..)
* Revert renaming getLogAppenders() to getSenders().
---
.../apache/ratis/server/impl/LeaderStateImpl.java | 73 ++++++++++------------
.../apache/ratis/server/impl/RaftServerImpl.java | 4 +-
2 files changed, 34 insertions(+), 43 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 69f673de5..a3c0cc28d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -62,11 +62,12 @@ import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.Timestamp;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.EnumSet;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -79,12 +80,12 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Predicate;
import java.util.function.ToLongFunction;
import java.util.stream.Collectors;
import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
import static org.apache.ratis.server.RaftServer.Division.LOG;
import static org.apache.ratis.server.RaftServerConfigKeys.Write.FOLLOWER_GAP_RATIO_MAX_KEY;
@@ -193,23 +194,16 @@ class LeaderStateImpl implements LeaderState {
* Since each mutation induces a copy of the list, only bulk operations
* (addAll and removeAll) are supported.
*/
- static class SenderList {
+ static class SenderList implements Iterable<LogAppender> {
private final List<LogAppender> senders;
SenderList() {
this.senders = new CopyOnWriteArrayList<>();
}
- Stream<LogAppender> stream() {
- return senders.stream();
- }
-
- List<LogAppender> getSenders() {
- return senders;
- }
-
- void forEach(Consumer<LogAppender> action) {
- senders.forEach(action);
+ @Override
+ public Iterator<LogAppender> iterator() {
+ return senders.iterator();
}
void addAll(Collection<LogAppender> newSenders) {
@@ -555,7 +549,7 @@ class LeaderStateImpl implements LeaderState {
}
void updateFollowerCommitInfos(CommitInfoCache cache, List<CommitInfoProto> protos) {
- for (LogAppender sender : senders.getSenders()) {
+ for (LogAppender sender : senders) {
FollowerInfo info = sender.getFollower();
protos.add(cache.update(info.getPeer(), info.getCommitIndex()));
}
@@ -574,7 +568,7 @@ class LeaderStateImpl implements LeaderState {
/**
* Update sender list for setConfiguration request
*/
- void addAndStartSenders(Collection<RaftPeer> newPeers) {
+ private void addAndStartSenders(Collection<RaftPeer> newPeers) {
if (!newPeers.isEmpty()) {
addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, false).forEach(LogAppender::start);
}
@@ -584,7 +578,7 @@ class LeaderStateImpl implements LeaderState {
return server.getRaftConf().getPeer(id, RaftPeerRole.FOLLOWER, RaftPeerRole.LISTENER);
}
- Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers, long nextIndex, boolean attendVote) {
+ private Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers, long nextIndex, boolean attendVote) {
final Timestamp t = Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs());
final List<LogAppender> newAppenders = newPeers.stream().map(peer -> {
final FollowerInfo f = new FollowerInfoImpl(server.getMemberId(), peer, this::getPeer, t, nextIndex, attendVote);
@@ -597,8 +591,11 @@ class LeaderStateImpl implements LeaderState {
return newAppenders;
}
- void stopAndRemoveSenders(Predicate<LogAppender> predicate) {
- final List<LogAppender> toStop = senders.stream().filter(predicate).collect(Collectors.toList());
+ private void stopAndRemoveSenders(Predicate<LogAppender> predicate) {
+ stopAndRemoveSenders(getLogAppenders().filter(predicate).collect(Collectors.toList()));
+ }
+
+ private void stopAndRemoveSenders(Collection<LogAppender> toStop) {
toStop.forEach(LogAppender::stop);
senders.removeAll(toStop);
}
@@ -620,8 +617,7 @@ class LeaderStateImpl implements LeaderState {
final FollowerInfo info = sender.getFollower();
LOG.info("{}: Restarting {} for {}", this, JavaUtils.getClassSimpleName(sender.getClass()), info.getName());
- sender.stop();
- senders.removeAll(Collections.singleton(sender));
+ stopAndRemoveSenders(Collections.singleton(sender));
Optional.ofNullable(getPeer(info.getId()))
.ifPresent(peer -> addAndStartSenders(Collections.singleton(peer)));
@@ -775,14 +771,6 @@ class LeaderStateImpl implements LeaderState {
}
}
- private Collection<BootStrapProgress> checkAllProgress(long committed) {
- Preconditions.assertTrue(inStagingState());
- return senders.stream()
- .filter(sender -> !isAttendingVote(sender.getFollower()))
- .map(sender -> checkProgress(sender.getFollower(), committed))
- .collect(Collectors.toCollection(ArrayList::new));
- }
-
@Override
public void onFollowerSuccessAppendEntries(FollowerInfo follower) {
if (isAttendingVote(follower)) {
@@ -802,15 +790,19 @@ class LeaderStateImpl implements LeaderState {
// it is possible that the bootstrapping is done. Then, fallback to UPDATE_COMMIT
updateCommitEvent.execute();
} else {
- final long committedIndex = server.getState().getLog()
- .getLastCommittedIndex();
- Collection<BootStrapProgress> reports = checkAllProgress(committedIndex);
+ final long commitIndex = server.getState().getLog().getLastCommittedIndex();
+ // check progress for the new followers
+ final EnumSet<BootStrapProgress> reports = getLogAppenders()
+ .map(LogAppender::getFollower)
+ .filter(follower -> !isAttendingVote(follower))
+ .map(follower -> checkProgress(follower, commitIndex))
+ .collect(Collectors.toCollection(() -> EnumSet.noneOf(BootStrapProgress.class)));
if (reports.contains(BootStrapProgress.NOPROGRESS)) {
stagingState.fail(BootStrapProgress.NOPROGRESS);
} else if (!reports.contains(BootStrapProgress.PROGRESSING)) {
// all caught up!
applyOldNewConf();
- senders.stream()
+ getLogAppenders()
.map(LogAppender::getFollower)
.filter(f -> server.getRaftConf().containsInConf(f.getId()))
.map(FollowerInfoImpl.class::cast)
@@ -1053,7 +1045,7 @@ class LeaderStateImpl implements LeaderState {
FollowerInfo highestPriorityInfo = null;
int highestPriority = Integer.MIN_VALUE;
- for (LogAppender logAppender : senders.getSenders()) {
+ for (LogAppender logAppender : senders) {
final RaftPeer follower = conf.getPeer(logAppender.getFollowerId());
if (follower == null) {
continue;
@@ -1088,7 +1080,7 @@ class LeaderStateImpl implements LeaderState {
return true;
}
- final List<RaftPeerId> activePeers = senders.stream()
+ final List<RaftPeerId> activePeers = getLogAppenders()
.filter(sender -> sender.getFollower()
.getLastRpcResponseTime()
.elapsedTimeMs() <= server.getMaxTimeoutMs())
@@ -1106,7 +1098,7 @@ class LeaderStateImpl implements LeaderState {
+ ". Election timeout: " + server.getMaxTimeoutMs() + "ms"
+ ". In charge for: " + server.getRole().getRoleElapsedTimeMs() + "ms"
+ ". Conf: " + conf);
- senders.stream().map(LogAppender::getFollower).forEach(f -> LOG.warn("Follower {}", f));
+ getLogAppenders().map(LogAppender::getFollower).forEach(f -> LOG.warn("Follower {}", f));
// step down as follower
stepDown(currentTerm, StepDownReason.LOST_MAJORITY_HEARTBEATS);
@@ -1173,7 +1165,7 @@ class LeaderStateImpl implements LeaderState {
}
long[] getFollowerNextIndices() {
- return senders.stream().mapToLong(s -> s.getFollower().getNextIndex()).toArray();
+ return getLogAppenders().mapToLong(s -> s.getFollower().getNextIndex()).toArray();
}
static Map<RaftPeerId, RaftPeer> newMap(Collection<RaftPeer> peers, String str) {
@@ -1237,15 +1229,14 @@ class LeaderStateImpl implements LeaderState {
/**
* @return the RaftPeer (address and id) information of the followers.
*/
- List<RaftPeer> getFollowers() {
- return Collections.unmodifiableList(senders.stream()
+ Stream<RaftPeer> getFollowers() {
+ return getLogAppenders()
.map(sender -> sender.getFollower().getPeer())
- .filter(peer -> server.getRaftConf().containsInConf(peer.getId()))
- .collect(Collectors.toList()));
+ .filter(peer -> server.getRaftConf().containsInConf(peer.getId()));
}
Stream<LogAppender> getLogAppenders() {
- return senders.stream();
+ return StreamSupport.stream(senders.spliterator(), false);
}
private static boolean isAttendingVote(FollowerInfo follower) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index bfeb0c19b..fc87f9719 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1835,8 +1835,8 @@ class RaftServerImpl implements RaftServer.Division,
@Override
public List<String> getFollowers() {
- return role.getLeaderState().map(LeaderStateImpl::getFollowers).orElse(Collections.emptyList())
- .stream().map(RaftPeer::toString).collect(Collectors.toList());
+ return role.getLeaderState().map(LeaderStateImpl::getFollowers).orElseGet(Stream::empty)
+ .map(RaftPeer::toString).collect(Collectors.toList());
}
@Override