You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by dr...@apache.org on 2023/01/20 09:13:36 UTC

[ratis] branch master updated: RATIS-1775. FollowerInfoImpl should not store RaftPeer. (#812)

This is an automated email from the ASF dual-hosted git repository.

dragonyliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new a3a631ce1 RATIS-1775. FollowerInfoImpl should not store RaftPeer. (#812)
a3a631ce1 is described below

commit a3a631ce137f5165d35dae8a8470811b58b7b5a3
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Fri Jan 20 17:13:29 2023 +0800

    RATIS-1775. FollowerInfoImpl should not store RaftPeer. (#812)
    
    * RATIS-1775. FollowerInfoImpl should not store RaftPeer.
    
    * getPeer should include LISTENER.
    
    * Update the peer stored in FollowerInfoImpl.
---
 .../apache/ratis/grpc/server/GrpcLogAppender.java  | 10 +++---
 .../java/org/apache/ratis/server/RaftServer.java   | 15 +++++----
 .../apache/ratis/server/leader/FollowerInfo.java   | 11 ++++++-
 .../apache/ratis/server/leader/LogAppender.java    |  2 +-
 .../apache/ratis/server/impl/FollowerInfoImpl.java | 24 +++++++++++---
 .../apache/ratis/server/impl/LeaderStateImpl.java  | 37 +++++++++++-----------
 .../apache/ratis/server/impl/RaftServerImpl.java   |  3 +-
 .../server/impl/SnapshotInstallationHandler.java   |  4 +--
 8 files changed, 65 insertions(+), 41 deletions(-)

diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 6e57891ea..e30f3b8c5 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -23,7 +23,6 @@ import org.apache.ratis.grpc.GrpcUtil;
 import org.apache.ratis.grpc.metrics.GrpcServerMetrics;
 import org.apache.ratis.metrics.Timekeeper;
 import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
-import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
@@ -463,12 +462,12 @@ public class GrpcLogAppender extends LogAppenderBase {
       if (followerNextIndex >= leaderStartIndex) {
         LOG.info("{}: Follower can catch up leader after install the snapshot, as leader's start index is {}",
             this, followerNextIndex);
-        notifyInstallSnapshotFinished(InstallSnapshotResult.SUCCESS, followerSnapshotIndex, getFollower().getPeer());
+        notifyInstallSnapshotFinished(InstallSnapshotResult.SUCCESS, followerSnapshotIndex);
       }
     }
 
-    void notifyInstallSnapshotFinished(InstallSnapshotResult result, long snapshotIndex, RaftPeer peer) {
-      getServer().getStateMachine().event().notifySnapshotInstalled(result, snapshotIndex, peer);
+    void notifyInstallSnapshotFinished(InstallSnapshotResult result, long snapshotIndex) {
+      getServer().getStateMachine().event().notifySnapshotInstalled(result, snapshotIndex, getFollower().getPeer());
     }
 
     boolean isDone() {
@@ -540,8 +539,7 @@ public class GrpcLogAppender extends LogAppenderBase {
         case SNAPSHOT_UNAVAILABLE:
           LOG.info("{}: Follower could not install snapshot as it is not available.", this);
           getFollower().setAttemptedToInstallSnapshot();
-          notifyInstallSnapshotFinished(InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, RaftLog.INVALID_LOG_INDEX,
-              getFollower().getPeer());
+          notifyInstallSnapshotFinished(InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, RaftLog.INVALID_LOG_INDEX);
           removePending(reply);
           break;
         case UNRECOGNIZED:
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
index c0afc67eb..1c99e88d8 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -20,8 +20,8 @@ package org.apache.ratis.server;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.conf.Parameters;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
+import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.rpc.RpcType;
 import org.apache.ratis.server.metrics.RaftServerMetrics;
@@ -69,7 +69,7 @@ public interface RaftServer extends Closeable, RpcType.Get,
 
     /** @return the {@link RaftPeer} for this division. */
     default RaftPeer getPeer() {
-      return Optional.ofNullable(getRaftConf().getPeer(getId()))
+      return Optional.ofNullable(getRaftConf().getPeer(getId(), RaftPeerRole.FOLLOWER, RaftPeerRole.LISTENER))
         .orElseGet(() -> getRaftServer().getPeer());
     }
 
@@ -78,10 +78,8 @@ public interface RaftServer extends Closeable, RpcType.Get,
 
     /** @return the {@link RaftGroup} for this division. */
     default RaftGroup getGroup() {
-      Collection<RaftPeer> allFollowerPeers =
-          getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.FOLLOWER);
-      Collection<RaftPeer> allListenerPeers =
-          getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER);
+      final Collection<RaftPeer> allFollowerPeers = getRaftConf().getAllPeers(RaftPeerRole.FOLLOWER);
+      final Collection<RaftPeer> allListenerPeers = getRaftConf().getAllPeers(RaftPeerRole.LISTENER);
       Iterable<RaftPeer> peers = Iterables.concat(allFollowerPeers, allListenerPeers);
       return RaftGroup.valueOf(getMemberId().getGroupId(), peers);
     }
@@ -126,7 +124,10 @@ public interface RaftServer extends Closeable, RpcType.Get,
   /** @return the server ID. */
   RaftPeerId getId();
 
-  /** @return the {@link RaftPeer} for this server. */
+  /**
+   * @return the general {@link RaftPeer} for this server.
+   *         To obtain a specific {@link RaftPeer} for a {@link RaftGroup}, use {@link Division#getPeer()}.
+   */
   RaftPeer getPeer();
 
   /** @return the group IDs the server is part of. */
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
index b4ae8458c..7f7e4662c 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.server.leader;
 
 import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.util.Timestamp;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,7 +32,15 @@ public interface FollowerInfo {
   /** @return the name of this object. */
   String getName();
 
-  /** @return this follower's peer info. */
+  /** @return this follower's peer id. */
+  RaftPeerId getId();
+
+  /**
+   * Return this follower's {@link RaftPeer}.
+   * To obtain the {@link RaftPeerId}, use {@link #getId()} which is more efficient than this method.
+   *
+   * @return this follower's peer info.
+   */
   RaftPeer getPeer();
 
   /** @return the matchIndex acknowledged by this follower. */
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
index f0ff28690..20b18882b 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
@@ -79,7 +79,7 @@ public interface LogAppender {
 
   /** The same as getFollower().getPeer().getId(). */
   default RaftPeerId getFollowerId() {
-    return getFollower().getPeer().getId();
+    return getFollower().getId();
   }
 
   /** @return the call id for the next {@link AppendEntriesRequestProto}. */
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
index 0d7fe2075..c1cb7962c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
@@ -19,6 +19,7 @@ package org.apache.ratis.server.impl;
 
 import org.apache.ratis.protocol.RaftGroupMemberId;
 import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.leader.FollowerInfo;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.RaftLogIndex;
@@ -26,13 +27,15 @@ import org.apache.ratis.util.Timestamp;
 
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.function.Function;
 
 class FollowerInfoImpl implements FollowerInfo {
   private final String name;
   private final Consumer<Object> infoIndexChange;
   private final Consumer<Object> debugIndexChange;
 
-  private final RaftPeer peer;
+  private final AtomicReference<RaftPeer> peer;
+  private final Function<RaftPeerId, RaftPeer> getPeer;
   private final AtomicReference<Timestamp> lastRpcResponseTime;
   private final AtomicReference<Timestamp> lastRpcSendTime;
   private final AtomicReference<Timestamp> lastHeartbeatSendTime;
@@ -43,12 +46,14 @@ class FollowerInfoImpl implements FollowerInfo {
   private volatile boolean attendVote;
   private volatile boolean ackInstallSnapshotAttempt = false;
 
-  FollowerInfoImpl(RaftGroupMemberId id, RaftPeer peer, Timestamp lastRpcTime, long nextIndex, boolean attendVote) {
+  FollowerInfoImpl(RaftGroupMemberId id, RaftPeer peer, Function<RaftPeerId, RaftPeer> getPeer,
+      Timestamp lastRpcTime, long nextIndex, boolean attendVote) {
     this.name = id + "->" + peer.getId();
     this.infoIndexChange = s -> LOG.info("{}: {}", name, s);
     this.debugIndexChange = s -> LOG.debug("{}: {}", name, s);
 
-    this.peer = peer;
+    this.peer = new AtomicReference<>(peer);
+    this.getPeer = getPeer;
     this.lastRpcResponseTime = new AtomicReference<>(lastRpcTime);
     this.lastRpcSendTime = new AtomicReference<>(lastRpcTime);
     this.lastHeartbeatSendTime = new AtomicReference<>(lastRpcTime);
@@ -145,9 +150,20 @@ class FollowerInfoImpl implements FollowerInfo {
     return attendVote;
   }
 
+  @Override
+  public RaftPeerId getId() {
+    return peer.get().getId();
+  }
+
   @Override
   public RaftPeer getPeer() {
-    return peer;
+    final RaftPeer newPeer = getPeer.apply(getId());
+    if (newPeer != null) {
+      peer.set(newPeer);
+      return newPeer;
+    } else {
+      return peer.get();
+    }
   }
 
   @Override
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index b0f342fd7..69f673de5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -565,7 +565,7 @@ class LeaderStateImpl implements LeaderState {
   public AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follower,
       List<LogEntryProto> entries, TermIndex previous, long callId) {
     final boolean initializing = isAttendingVote(follower);
-    final RaftPeerId targetId = follower.getPeer().getId();
+    final RaftPeerId targetId = follower.getId();
     return ServerProtoUtils.toAppendEntriesRequestProto(server.getMemberId(), targetId, currentTerm, entries,
         ServerImplUtils.effectiveCommitIndex(raftLog.getLastCommittedIndex(), previous, entries.size()),
         initializing, previous, server.getCommitInfos(), callId);
@@ -580,16 +580,19 @@ class LeaderStateImpl implements LeaderState {
     }
   }
 
+  private RaftPeer getPeer(RaftPeerId id) {
+    return server.getRaftConf().getPeer(id, RaftPeerRole.FOLLOWER, RaftPeerRole.LISTENER);
+  }
+
   Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers, long nextIndex, boolean attendVote) {
     final Timestamp t = Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs());
-    final List<LogAppender> newAppenders = newPeers.stream()
-        .map(peer -> {
-          final FollowerInfo f = new FollowerInfoImpl(server.getMemberId(), peer, t, nextIndex, attendVote);
-          followerInfoMap.put(peer.getId(), f);
-          raftServerMetrics.addFollower(peer.getId());
-          logAppenderMetrics.addFollowerGauges(peer.getId(), f::getNextIndex, f::getMatchIndex, f::getLastRpcTime);
-          return server.newLogAppender(this, f);
-        }).collect(Collectors.toList());
+    final List<LogAppender> newAppenders = newPeers.stream().map(peer -> {
+      final FollowerInfo f = new FollowerInfoImpl(server.getMemberId(), peer, this::getPeer, t, nextIndex, attendVote);
+      followerInfoMap.put(peer.getId(), f);
+      raftServerMetrics.addFollower(peer.getId());
+      logAppenderMetrics.addFollowerGauges(peer.getId(), f::getNextIndex, f::getMatchIndex, f::getLastRpcTime);
+      return server.newLogAppender(this, f);
+    }).collect(Collectors.toList());
     senders.addAll(newAppenders);
     return newAppenders;
   }
@@ -620,10 +623,8 @@ class LeaderStateImpl implements LeaderState {
     sender.stop();
     senders.removeAll(Collections.singleton(sender));
 
-    final RaftPeer peer = info.getPeer();
-    if (server.getRaftConf().containsInConf(peer.getId(), RaftPeerRole.FOLLOWER, RaftPeerRole.LISTENER)) {
-      addAndStartSenders(Collections.singleton(peer));
-    }
+    Optional.ofNullable(getPeer(info.getId()))
+        .ifPresent(peer -> addAndStartSenders(Collections.singleton(peer)));
   }
 
   /**
@@ -688,7 +689,7 @@ class LeaderStateImpl implements LeaderState {
   }
 
   boolean sendStartLeaderElection(FollowerInfo followerInfo) {
-    final RaftPeerId followerId = followerInfo.getPeer().getId();
+    final RaftPeerId followerId = followerInfo.getId();
     final TermIndex leaderLastEntry = server.getState().getLastEntry();
     if (leaderLastEntry == null) {
       sendStartLeaderElection(followerId, null);
@@ -793,7 +794,7 @@ class LeaderStateImpl implements LeaderState {
 
   @Override
   public boolean isFollowerBootstrapping(FollowerInfo follower) {
-    return isBootStrappingPeer(follower.getPeer().getId());
+    return isBootStrappingPeer(follower.getId());
   }
 
   private void checkStaging() {
@@ -811,7 +812,7 @@ class LeaderStateImpl implements LeaderState {
         applyOldNewConf();
         senders.stream()
             .map(LogAppender::getFollower)
-            .filter(f -> server.getRaftConf().containsInConf(f.getPeer().getId()))
+            .filter(f -> server.getRaftConf().containsInConf(f.getId()))
             .map(FollowerInfoImpl.class::cast)
             .forEach(FollowerInfoImpl::startAttendVote);
       }
@@ -934,7 +935,7 @@ class LeaderStateImpl implements LeaderState {
 
     int count = includeSelf ? 1 : 0;
     for (FollowerInfo follower: followers) {
-      if (isAcked.test(follower.getPeer().getId())) {
+      if (isAcked.test(follower.getId())) {
         count++;
       }
     }
@@ -1259,7 +1260,7 @@ class LeaderStateImpl implements LeaderState {
       server.getStateMachine().leaderEvent().notifyFollowerSlowness(leaderInfo);
       server.getStateMachine().leaderEvent().notifyFollowerSlowness(leaderInfo, follower.getPeer());
     }
-    final RaftPeerId followerId = follower.getPeer().getId();
+    final RaftPeerId followerId = follower.getId();
     raftServerMetrics.recordFollowerHeartbeatElapsedTime(followerId, elapsedTime.toLong(TimeUnit.NANOSECONDS));
   }
 
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 210d0f18c..bfeb0c19b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1529,8 +1529,7 @@ class RaftServerImpl implements RaftServer.Division,
       final long installedIndex = snapshotInstallationHandler.getInstalledIndex();
       if (installedIndex >= RaftLog.LEAST_VALID_LOG_INDEX) {
         LOG.info("{}: Follower has completed install the snapshot {}.", this, installedIndex);
-        stateMachine.event().notifySnapshotInstalled(InstallSnapshotResult.SUCCESS, installedIndex,
-            getRaftServer().getPeer());
+        stateMachine.event().notifySnapshotInstalled(InstallSnapshotResult.SUCCESS, installedIndex, getPeer());
       }
     }
     return JavaUtils.allOf(futures).whenCompleteAsync(
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index 2969ca314..e7b574cf3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -305,7 +305,7 @@ class SnapshotInstallationHandler {
             InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
         inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX);
         server.getStateMachine().event().notifySnapshotInstalled(
-            InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, server.getRaftServer().getPeer());
+            InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, server.getPeer());
         return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
             currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, -1);
       }
@@ -323,7 +323,7 @@ class SnapshotInstallationHandler {
         inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX);
         final long latestInstalledIndex = latestInstalledSnapshotTermIndex.getIndex();
         server.getStateMachine().event().notifySnapshotInstalled(
-            InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex, server.getRaftServer().getPeer());
+            InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex, server.getPeer());
         installedIndex.set(latestInstalledIndex);
         return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
             currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex.getIndex());