You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by lj...@apache.org on 2020/04/29 09:47:55 UTC

[incubator-ratis] branch master updated: RATIS-840. Memory leak of LogAppender.

This is an automated email from the ASF dual-hosted git repository.

ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 2816ea6  RATIS-840. Memory leak of LogAppender.
2816ea6 is described below

commit 2816ea6762be18da4b35ee3024338514a32ac413
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Wed Apr 29 15:16:36 2020 +0530

    RATIS-840. Memory leak of LogAppender.
---
 .../org/apache/ratis/server/impl/LeaderState.java  | 58 +++++++++++++++-------
 .../apache/ratis/server/impl/RaftServerImpl.java   |  9 ++--
 .../apache/ratis/grpc/TestLogAppenderWithGrpc.java |  6 ++-
 3 files changed, 50 insertions(+), 23 deletions(-)

diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 962c202..c794ed5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
@@ -192,7 +193,8 @@ public class LeaderState {
   private final RaftLog raftLog;
   private final long currentTerm;
   private volatile ConfigurationStagingState stagingState;
-  private List<List<FollowerInfo>> voterLists;
+  private List<List<RaftPeerId>> voterLists;
+  private final Map<RaftPeerId, FollowerInfo> peerIdFollowerInfoMap = new ConcurrentHashMap<>();
 
   /**
    * The list of threads appending entries to followers.
@@ -417,9 +419,12 @@ public class LeaderState {
     final Timestamp t = Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs());
     final List<LogAppender> newAppenders = newPeers.stream()
         .map(peer -> {
-          LogAppender logAppender = server.newLogAppender(this, peer, t, nextIndex, attendVote);
-          raftServerMetrics.addFollower(logAppender.getFollower().getPeer());
-          logAppenderMetrics.addFollowerGauges(logAppender.getFollower());
+          final FollowerInfo f = new FollowerInfo(server.getMemberId(), peer, t, nextIndex, attendVote,
+              server.getRpcSlownessTimeoutMs());
+          LogAppender logAppender = server.newLogAppender(this, f);
+          peerIdFollowerInfoMap.put(peer.getId(), f);
+          raftServerMetrics.addFollower(f.getPeer());
+          logAppenderMetrics.addFollowerGauges(f);
           return logAppender;
         }).collect(Collectors.toList());
     senders.addAll(newAppenders);
@@ -435,6 +440,7 @@ public class LeaderState {
   void restartSender(LogAppender sender) {
     final FollowerInfo follower = sender.getFollower();
     LOG.info("{}: Restarting {} for {}", this, sender.getClass().getSimpleName(), follower.getName());
+    sender.stopAppender();
     senders.removeAll(Collections.singleton(sender));
     addAndStartSenders(Collections.singleton(follower.getPeer()));
   }
@@ -611,7 +617,7 @@ public class LeaderState {
     final RaftPeerId selfId = server.getId();
     final RaftConfiguration conf = server.getRaftConf();
 
-    final List<FollowerInfo> followers = voterLists.get(0);
+    final List<RaftPeerId> followers = voterLists.get(0);
     final boolean includeSelf = conf.containsInConf(selfId);
     if (followers.isEmpty() && !includeSelf) {
       return Optional.empty();
@@ -623,7 +629,7 @@ public class LeaderState {
     if (!conf.isTransitional()) {
       return Optional.of(newConf);
     } else { // configuration is in transitional state
-      final List<FollowerInfo> oldFollowers = voterLists.get(1);
+      final List<RaftPeerId> oldFollowers = voterLists.get(1);
       final boolean includeSelfInOldConf = conf.containsInOldConf(selfId);
       if (oldFollowers.isEmpty() && !includeSelfInOldConf) {
         return Optional.empty();
@@ -711,17 +717,35 @@ public class LeaderState {
     notifySenders();
   }
 
-  private static long[] getSorted(List<FollowerInfo> followers, boolean includeSelf,
+  private List<FollowerInfo> getFollowerInfos(List<RaftPeerId> followerIDs) {
+    List<FollowerInfo> followerInfos = new ArrayList<>();
+    for (int i = 0; i < followerIDs.size(); i++) {
+      RaftPeerId id = followerIDs.get(i);
+      if (!peerIdFollowerInfoMap.containsKey(id)) {
+        throw new IllegalArgumentException("RaftPeerId:" + id +
+                " not in peerIdFollowerInfoMap of leader:" + server.getMemberId());
+      }
+
+      followerInfos.add(peerIdFollowerInfoMap.get(id));
+    }
+
+    return followerInfos;
+  }
+
+  private long[] getSorted(List<RaftPeerId> followerIDs, boolean includeSelf,
       ToLongFunction<FollowerInfo> getFollowerIndex, LongSupplier getLogIndex) {
-    final int length = includeSelf ? followers.size() + 1 : followers.size();
+    final int length = includeSelf ? followerIDs.size() + 1 : followerIDs.size();
     if (length == 0) {
       throw new IllegalArgumentException("followers.size() == "
-          + followers.size() + " and includeSelf == " + includeSelf);
+          + followerIDs.size() + " and includeSelf == " + includeSelf);
     }
+
     final long[] indices = new long[length];
-    for (int i = 0; i < followers.size(); i++) {
-      indices[i] = getFollowerIndex.applyAsLong(followers.get(i));
+    List<FollowerInfo> followerInfos = getFollowerInfos(followerIDs);
+    for (int i = 0; i < followerInfos.size(); i++) {
+      indices[i] = getFollowerIndex.applyAsLong(followerInfos.get(i));
     }
+
     if (includeSelf) {
       // note that we also need to wait for the local disk I/O
       indices[length - 1] = getLogIndex.getAsLong();
@@ -731,17 +755,17 @@ public class LeaderState {
     return indices;
   }
 
-  private List<List<FollowerInfo>> divideFollowers(RaftConfiguration conf) {
-    List<List<FollowerInfo>> lists = new ArrayList<>(2);
-    List<FollowerInfo> listForNew = senders.stream()
+  private List<List<RaftPeerId>> divideFollowers(RaftConfiguration conf) {
+    List<List<RaftPeerId>> lists = new ArrayList<>(2);
+    List<RaftPeerId> listForNew = senders.stream()
         .filter(sender -> conf.containsInConf(sender.getFollower().getPeer().getId()))
-        .map(LogAppender::getFollower)
+        .map(sender -> sender.getFollower().getPeer().getId())
         .collect(Collectors.toList());
     lists.add(listForNew);
     if (conf.isTransitional()) {
-      List<FollowerInfo> listForOld = senders.stream()
+      List<RaftPeerId> listForOld = senders.stream()
           .filter(sender -> conf.containsInOldConf(sender.getFollower().getPeer().getId()))
-          .map(LogAppender::getFollower)
+          .map(sender -> sender.getFollower().getPeer().getId())
           .collect(Collectors.toList());
       lists.add(listForOld);
     }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index e5b37b9..8526028 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -125,10 +125,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
   }
 
   LogAppender newLogAppender(
-      LeaderState leaderState, RaftPeer peer, Timestamp lastRpcTime, long nextIndex,
-      boolean attendVote) {
-    final FollowerInfo f = new FollowerInfo(getMemberId(), peer, lastRpcTime, nextIndex, attendVote,
-        rpcSlownessTimeoutMs);
+      LeaderState leaderState, FollowerInfo f) {
     return getProxy().getFactory().newLogAppender(this, leaderState, f);
   }
 
@@ -144,6 +141,10 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
     return maxTimeoutMs;
   }
 
+  int getRpcSlownessTimeoutMs() {
+    return rpcSlownessTimeoutMs;
+  }
+
   int getRandomTimeoutMs() {
     return minTimeoutMs + ThreadLocalRandom.current().nextInt(maxTimeoutMs - minTimeoutMs + 1);
   }
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
index 8ecf40d..5747a84 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
@@ -125,7 +125,9 @@ public class TestLogAppenderWithGrpc
       }
     }
 
-    // assert INCONSISTENCY counter should become 1
-    Assert.assertEquals(1L, leaderMetrics.getRegistry().counter(counter).getCount());
+    // assert INCONSISTENCY counter >= 1
+    // If old LogAppender die before new LogAppender start, INCONSISTENCY equal to 1,
+    // else INCONSISTENCY greater than 1
+    Assert.assertTrue(leaderMetrics.getRegistry().counter(counter).getCount() >= 1L);
   }
 }