You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by lj...@apache.org on 2020/04/29 09:47:55 UTC
[incubator-ratis] branch master updated: RATIS-840. Memory leak of
LogAppender.
This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 2816ea6 RATIS-840. Memory leak of LogAppender.
2816ea6 is described below
commit 2816ea6762be18da4b35ee3024338514a32ac413
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Wed Apr 29 15:16:36 2020 +0530
RATIS-840. Memory leak of LogAppender.
---
.../org/apache/ratis/server/impl/LeaderState.java | 58 +++++++++++++++-------
.../apache/ratis/server/impl/RaftServerImpl.java | 9 ++--
.../apache/ratis/grpc/TestLogAppenderWithGrpc.java | 6 ++-
3 files changed, 50 insertions(+), 23 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 962c202..c794ed5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@@ -192,7 +193,8 @@ public class LeaderState {
private final RaftLog raftLog;
private final long currentTerm;
private volatile ConfigurationStagingState stagingState;
- private List<List<FollowerInfo>> voterLists;
+ private List<List<RaftPeerId>> voterLists;
+ private final Map<RaftPeerId, FollowerInfo> peerIdFollowerInfoMap = new ConcurrentHashMap<>();
/**
* The list of threads appending entries to followers.
@@ -417,9 +419,12 @@ public class LeaderState {
final Timestamp t = Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs());
final List<LogAppender> newAppenders = newPeers.stream()
.map(peer -> {
- LogAppender logAppender = server.newLogAppender(this, peer, t, nextIndex, attendVote);
- raftServerMetrics.addFollower(logAppender.getFollower().getPeer());
- logAppenderMetrics.addFollowerGauges(logAppender.getFollower());
+ final FollowerInfo f = new FollowerInfo(server.getMemberId(), peer, t, nextIndex, attendVote,
+ server.getRpcSlownessTimeoutMs());
+ LogAppender logAppender = server.newLogAppender(this, f);
+ peerIdFollowerInfoMap.put(peer.getId(), f);
+ raftServerMetrics.addFollower(f.getPeer());
+ logAppenderMetrics.addFollowerGauges(f);
return logAppender;
}).collect(Collectors.toList());
senders.addAll(newAppenders);
@@ -435,6 +440,7 @@ public class LeaderState {
void restartSender(LogAppender sender) {
final FollowerInfo follower = sender.getFollower();
LOG.info("{}: Restarting {} for {}", this, sender.getClass().getSimpleName(), follower.getName());
+ sender.stopAppender();
senders.removeAll(Collections.singleton(sender));
addAndStartSenders(Collections.singleton(follower.getPeer()));
}
@@ -611,7 +617,7 @@ public class LeaderState {
final RaftPeerId selfId = server.getId();
final RaftConfiguration conf = server.getRaftConf();
- final List<FollowerInfo> followers = voterLists.get(0);
+ final List<RaftPeerId> followers = voterLists.get(0);
final boolean includeSelf = conf.containsInConf(selfId);
if (followers.isEmpty() && !includeSelf) {
return Optional.empty();
@@ -623,7 +629,7 @@ public class LeaderState {
if (!conf.isTransitional()) {
return Optional.of(newConf);
} else { // configuration is in transitional state
- final List<FollowerInfo> oldFollowers = voterLists.get(1);
+ final List<RaftPeerId> oldFollowers = voterLists.get(1);
final boolean includeSelfInOldConf = conf.containsInOldConf(selfId);
if (oldFollowers.isEmpty() && !includeSelfInOldConf) {
return Optional.empty();
@@ -711,17 +717,35 @@ public class LeaderState {
notifySenders();
}
- private static long[] getSorted(List<FollowerInfo> followers, boolean includeSelf,
+ private List<FollowerInfo> getFollowerInfos(List<RaftPeerId> followerIDs) {
+ List<FollowerInfo> followerInfos = new ArrayList<>();
+ for (int i = 0; i < followerIDs.size(); i++) {
+ RaftPeerId id = followerIDs.get(i);
+ if (!peerIdFollowerInfoMap.containsKey(id)) {
+ throw new IllegalArgumentException("RaftPeerId:" + id +
+ " not in peerIdFollowerInfoMap of leader:" + server.getMemberId());
+ }
+
+ followerInfos.add(peerIdFollowerInfoMap.get(id));
+ }
+
+ return followerInfos;
+ }
+
+ private long[] getSorted(List<RaftPeerId> followerIDs, boolean includeSelf,
ToLongFunction<FollowerInfo> getFollowerIndex, LongSupplier getLogIndex) {
- final int length = includeSelf ? followers.size() + 1 : followers.size();
+ final int length = includeSelf ? followerIDs.size() + 1 : followerIDs.size();
if (length == 0) {
throw new IllegalArgumentException("followers.size() == "
- + followers.size() + " and includeSelf == " + includeSelf);
+ + followerIDs.size() + " and includeSelf == " + includeSelf);
}
+
final long[] indices = new long[length];
- for (int i = 0; i < followers.size(); i++) {
- indices[i] = getFollowerIndex.applyAsLong(followers.get(i));
+ List<FollowerInfo> followerInfos = getFollowerInfos(followerIDs);
+ for (int i = 0; i < followerInfos.size(); i++) {
+ indices[i] = getFollowerIndex.applyAsLong(followerInfos.get(i));
}
+
if (includeSelf) {
// note that we also need to wait for the local disk I/O
indices[length - 1] = getLogIndex.getAsLong();
@@ -731,17 +755,17 @@ public class LeaderState {
return indices;
}
- private List<List<FollowerInfo>> divideFollowers(RaftConfiguration conf) {
- List<List<FollowerInfo>> lists = new ArrayList<>(2);
- List<FollowerInfo> listForNew = senders.stream()
+ private List<List<RaftPeerId>> divideFollowers(RaftConfiguration conf) {
+ List<List<RaftPeerId>> lists = new ArrayList<>(2);
+ List<RaftPeerId> listForNew = senders.stream()
.filter(sender -> conf.containsInConf(sender.getFollower().getPeer().getId()))
- .map(LogAppender::getFollower)
+ .map(sender -> sender.getFollower().getPeer().getId())
.collect(Collectors.toList());
lists.add(listForNew);
if (conf.isTransitional()) {
- List<FollowerInfo> listForOld = senders.stream()
+ List<RaftPeerId> listForOld = senders.stream()
.filter(sender -> conf.containsInOldConf(sender.getFollower().getPeer().getId()))
- .map(LogAppender::getFollower)
+ .map(sender -> sender.getFollower().getPeer().getId())
.collect(Collectors.toList());
lists.add(listForOld);
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index e5b37b9..8526028 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -125,10 +125,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
}
LogAppender newLogAppender(
- LeaderState leaderState, RaftPeer peer, Timestamp lastRpcTime, long nextIndex,
- boolean attendVote) {
- final FollowerInfo f = new FollowerInfo(getMemberId(), peer, lastRpcTime, nextIndex, attendVote,
- rpcSlownessTimeoutMs);
+ LeaderState leaderState, FollowerInfo f) {
return getProxy().getFactory().newLogAppender(this, leaderState, f);
}
@@ -144,6 +141,10 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
return maxTimeoutMs;
}
+ int getRpcSlownessTimeoutMs() {
+ return rpcSlownessTimeoutMs;
+ }
+
int getRandomTimeoutMs() {
return minTimeoutMs + ThreadLocalRandom.current().nextInt(maxTimeoutMs - minTimeoutMs + 1);
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
index 8ecf40d..5747a84 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
@@ -125,7 +125,9 @@ public class TestLogAppenderWithGrpc
}
}
- // assert INCONSISTENCY counter should become 1
- Assert.assertEquals(1L, leaderMetrics.getRegistry().counter(counter).getCount());
+ // assert INCONSISTENCY counter >= 1
+ // If old LogAppender die before new LogAppender start, INCONSISTENCY equal to 1,
+ // else INCONSISTENCY greater than 1
+ Assert.assertTrue(leaderMetrics.getRegistry().counter(counter).getCount() >= 1L);
}
}