You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by dr...@apache.org on 2023/01/20 09:13:36 UTC
[ratis] branch master updated: RATIS-1775. FollowerInfoImpl should not store RaftPeer. (#812)
This is an automated email from the ASF dual-hosted git repository.
dragonyliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new a3a631ce1 RATIS-1775. FollowerInfoImpl should not store RaftPeer. (#812)
a3a631ce1 is described below
commit a3a631ce137f5165d35dae8a8470811b58b7b5a3
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Fri Jan 20 17:13:29 2023 +0800
RATIS-1775. FollowerInfoImpl should not store RaftPeer. (#812)
* RATIS-1775. FollowerInfoImpl should not store RaftPeer.
* getPeer should include LISTENER.
* Update the peer stored in FollowerInfoImpl.
---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 10 +++---
.../java/org/apache/ratis/server/RaftServer.java | 15 +++++----
.../apache/ratis/server/leader/FollowerInfo.java | 11 ++++++-
.../apache/ratis/server/leader/LogAppender.java | 2 +-
.../apache/ratis/server/impl/FollowerInfoImpl.java | 24 +++++++++++---
.../apache/ratis/server/impl/LeaderStateImpl.java | 37 +++++++++++-----------
.../apache/ratis/server/impl/RaftServerImpl.java | 3 +-
.../server/impl/SnapshotInstallationHandler.java | 4 +--
8 files changed, 65 insertions(+), 41 deletions(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 6e57891ea..e30f3b8c5 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -23,7 +23,6 @@ import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.grpc.metrics.GrpcServerMetrics;
import org.apache.ratis.metrics.Timekeeper;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
-import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
@@ -463,12 +462,12 @@ public class GrpcLogAppender extends LogAppenderBase {
if (followerNextIndex >= leaderStartIndex) {
LOG.info("{}: Follower can catch up leader after install the snapshot, as leader's start index is {}",
this, followerNextIndex);
- notifyInstallSnapshotFinished(InstallSnapshotResult.SUCCESS, followerSnapshotIndex, getFollower().getPeer());
+ notifyInstallSnapshotFinished(InstallSnapshotResult.SUCCESS, followerSnapshotIndex);
}
}
- void notifyInstallSnapshotFinished(InstallSnapshotResult result, long snapshotIndex, RaftPeer peer) {
- getServer().getStateMachine().event().notifySnapshotInstalled(result, snapshotIndex, peer);
+ void notifyInstallSnapshotFinished(InstallSnapshotResult result, long snapshotIndex) {
+ getServer().getStateMachine().event().notifySnapshotInstalled(result, snapshotIndex, getFollower().getPeer());
}
boolean isDone() {
@@ -540,8 +539,7 @@ public class GrpcLogAppender extends LogAppenderBase {
case SNAPSHOT_UNAVAILABLE:
LOG.info("{}: Follower could not install snapshot as it is not available.", this);
getFollower().setAttemptedToInstallSnapshot();
- notifyInstallSnapshotFinished(InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, RaftLog.INVALID_LOG_INDEX,
- getFollower().getPeer());
+ notifyInstallSnapshotFinished(InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, RaftLog.INVALID_LOG_INDEX);
removePending(reply);
break;
case UNRECOGNIZED:
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
index c0afc67eb..1c99e88d8 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -20,8 +20,8 @@ package org.apache.ratis.server;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
+import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.*;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.server.metrics.RaftServerMetrics;
@@ -69,7 +69,7 @@ public interface RaftServer extends Closeable, RpcType.Get,
/** @return the {@link RaftPeer} for this division. */
default RaftPeer getPeer() {
- return Optional.ofNullable(getRaftConf().getPeer(getId()))
+ return Optional.ofNullable(getRaftConf().getPeer(getId(), RaftPeerRole.FOLLOWER, RaftPeerRole.LISTENER))
.orElseGet(() -> getRaftServer().getPeer());
}
@@ -78,10 +78,8 @@ public interface RaftServer extends Closeable, RpcType.Get,
/** @return the {@link RaftGroup} for this division. */
default RaftGroup getGroup() {
- Collection<RaftPeer> allFollowerPeers =
- getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.FOLLOWER);
- Collection<RaftPeer> allListenerPeers =
- getRaftConf().getAllPeers(RaftProtos.RaftPeerRole.LISTENER);
+ final Collection<RaftPeer> allFollowerPeers = getRaftConf().getAllPeers(RaftPeerRole.FOLLOWER);
+ final Collection<RaftPeer> allListenerPeers = getRaftConf().getAllPeers(RaftPeerRole.LISTENER);
Iterable<RaftPeer> peers = Iterables.concat(allFollowerPeers, allListenerPeers);
return RaftGroup.valueOf(getMemberId().getGroupId(), peers);
}
@@ -126,7 +124,10 @@ public interface RaftServer extends Closeable, RpcType.Get,
/** @return the server ID. */
RaftPeerId getId();
- /** @return the {@link RaftPeer} for this server. */
+ /**
+ * @return the general {@link RaftPeer} for this server.
+ * To obtain a specific {@link RaftPeer} for a {@link RaftGroup}, use {@link Division#getPeer()}.
+ */
RaftPeer getPeer();
/** @return the group IDs the server is part of. */
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
index b4ae8458c..7f7e4662c 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
@@ -18,6 +18,7 @@
package org.apache.ratis.server.leader;
import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.util.Timestamp;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,7 +32,15 @@ public interface FollowerInfo {
/** @return the name of this object. */
String getName();
- /** @return this follower's peer info. */
+ /** @return this follower's peer id. */
+ RaftPeerId getId();
+
+ /**
+ * Return this follower's {@link RaftPeer}.
+ * To obtain the {@link RaftPeerId}, use {@link #getId()} which is more efficient than this method.
+ *
+ * @return this follower's peer info.
+ */
RaftPeer getPeer();
/** @return the matchIndex acknowledged by this follower. */
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
index f0ff28690..20b18882b 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LogAppender.java
@@ -79,7 +79,7 @@ public interface LogAppender {
/** The same as getFollower().getPeer().getId(). */
default RaftPeerId getFollowerId() {
- return getFollower().getPeer().getId();
+ return getFollower().getId();
}
/** @return the call id for the next {@link AppendEntriesRequestProto}. */
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
index 0d7fe2075..c1cb7962c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
@@ -19,6 +19,7 @@ package org.apache.ratis.server.impl;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogIndex;
@@ -26,13 +27,15 @@ import org.apache.ratis.util.Timestamp;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
+import java.util.function.Function;
class FollowerInfoImpl implements FollowerInfo {
private final String name;
private final Consumer<Object> infoIndexChange;
private final Consumer<Object> debugIndexChange;
- private final RaftPeer peer;
+ private final AtomicReference<RaftPeer> peer;
+ private final Function<RaftPeerId, RaftPeer> getPeer;
private final AtomicReference<Timestamp> lastRpcResponseTime;
private final AtomicReference<Timestamp> lastRpcSendTime;
private final AtomicReference<Timestamp> lastHeartbeatSendTime;
@@ -43,12 +46,14 @@ class FollowerInfoImpl implements FollowerInfo {
private volatile boolean attendVote;
private volatile boolean ackInstallSnapshotAttempt = false;
- FollowerInfoImpl(RaftGroupMemberId id, RaftPeer peer, Timestamp lastRpcTime, long nextIndex, boolean attendVote) {
+ FollowerInfoImpl(RaftGroupMemberId id, RaftPeer peer, Function<RaftPeerId, RaftPeer> getPeer,
+ Timestamp lastRpcTime, long nextIndex, boolean attendVote) {
this.name = id + "->" + peer.getId();
this.infoIndexChange = s -> LOG.info("{}: {}", name, s);
this.debugIndexChange = s -> LOG.debug("{}: {}", name, s);
- this.peer = peer;
+ this.peer = new AtomicReference<>(peer);
+ this.getPeer = getPeer;
this.lastRpcResponseTime = new AtomicReference<>(lastRpcTime);
this.lastRpcSendTime = new AtomicReference<>(lastRpcTime);
this.lastHeartbeatSendTime = new AtomicReference<>(lastRpcTime);
@@ -145,9 +150,20 @@ class FollowerInfoImpl implements FollowerInfo {
return attendVote;
}
+ @Override
+ public RaftPeerId getId() {
+ return peer.get().getId();
+ }
+
@Override
public RaftPeer getPeer() {
- return peer;
+ final RaftPeer newPeer = getPeer.apply(getId());
+ if (newPeer != null) {
+ peer.set(newPeer);
+ return newPeer;
+ } else {
+ return peer.get();
+ }
}
@Override
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index b0f342fd7..69f673de5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -565,7 +565,7 @@ class LeaderStateImpl implements LeaderState {
public AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follower,
List<LogEntryProto> entries, TermIndex previous, long callId) {
final boolean initializing = isAttendingVote(follower);
- final RaftPeerId targetId = follower.getPeer().getId();
+ final RaftPeerId targetId = follower.getId();
return ServerProtoUtils.toAppendEntriesRequestProto(server.getMemberId(), targetId, currentTerm, entries,
ServerImplUtils.effectiveCommitIndex(raftLog.getLastCommittedIndex(), previous, entries.size()),
initializing, previous, server.getCommitInfos(), callId);
@@ -580,16 +580,19 @@ class LeaderStateImpl implements LeaderState {
}
}
+ private RaftPeer getPeer(RaftPeerId id) {
+ return server.getRaftConf().getPeer(id, RaftPeerRole.FOLLOWER, RaftPeerRole.LISTENER);
+ }
+
Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers, long nextIndex, boolean attendVote) {
final Timestamp t = Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs());
- final List<LogAppender> newAppenders = newPeers.stream()
- .map(peer -> {
- final FollowerInfo f = new FollowerInfoImpl(server.getMemberId(), peer, t, nextIndex, attendVote);
- followerInfoMap.put(peer.getId(), f);
- raftServerMetrics.addFollower(peer.getId());
- logAppenderMetrics.addFollowerGauges(peer.getId(), f::getNextIndex, f::getMatchIndex, f::getLastRpcTime);
- return server.newLogAppender(this, f);
- }).collect(Collectors.toList());
+ final List<LogAppender> newAppenders = newPeers.stream().map(peer -> {
+ final FollowerInfo f = new FollowerInfoImpl(server.getMemberId(), peer, this::getPeer, t, nextIndex, attendVote);
+ followerInfoMap.put(peer.getId(), f);
+ raftServerMetrics.addFollower(peer.getId());
+ logAppenderMetrics.addFollowerGauges(peer.getId(), f::getNextIndex, f::getMatchIndex, f::getLastRpcTime);
+ return server.newLogAppender(this, f);
+ }).collect(Collectors.toList());
senders.addAll(newAppenders);
return newAppenders;
}
@@ -620,10 +623,8 @@ class LeaderStateImpl implements LeaderState {
sender.stop();
senders.removeAll(Collections.singleton(sender));
- final RaftPeer peer = info.getPeer();
- if (server.getRaftConf().containsInConf(peer.getId(), RaftPeerRole.FOLLOWER, RaftPeerRole.LISTENER)) {
- addAndStartSenders(Collections.singleton(peer));
- }
+ Optional.ofNullable(getPeer(info.getId()))
+ .ifPresent(peer -> addAndStartSenders(Collections.singleton(peer)));
}
/**
@@ -688,7 +689,7 @@ class LeaderStateImpl implements LeaderState {
}
boolean sendStartLeaderElection(FollowerInfo followerInfo) {
- final RaftPeerId followerId = followerInfo.getPeer().getId();
+ final RaftPeerId followerId = followerInfo.getId();
final TermIndex leaderLastEntry = server.getState().getLastEntry();
if (leaderLastEntry == null) {
sendStartLeaderElection(followerId, null);
@@ -793,7 +794,7 @@ class LeaderStateImpl implements LeaderState {
@Override
public boolean isFollowerBootstrapping(FollowerInfo follower) {
- return isBootStrappingPeer(follower.getPeer().getId());
+ return isBootStrappingPeer(follower.getId());
}
private void checkStaging() {
@@ -811,7 +812,7 @@ class LeaderStateImpl implements LeaderState {
applyOldNewConf();
senders.stream()
.map(LogAppender::getFollower)
- .filter(f -> server.getRaftConf().containsInConf(f.getPeer().getId()))
+ .filter(f -> server.getRaftConf().containsInConf(f.getId()))
.map(FollowerInfoImpl.class::cast)
.forEach(FollowerInfoImpl::startAttendVote);
}
@@ -934,7 +935,7 @@ class LeaderStateImpl implements LeaderState {
int count = includeSelf ? 1 : 0;
for (FollowerInfo follower: followers) {
- if (isAcked.test(follower.getPeer().getId())) {
+ if (isAcked.test(follower.getId())) {
count++;
}
}
@@ -1259,7 +1260,7 @@ class LeaderStateImpl implements LeaderState {
server.getStateMachine().leaderEvent().notifyFollowerSlowness(leaderInfo);
server.getStateMachine().leaderEvent().notifyFollowerSlowness(leaderInfo, follower.getPeer());
}
- final RaftPeerId followerId = follower.getPeer().getId();
+ final RaftPeerId followerId = follower.getId();
raftServerMetrics.recordFollowerHeartbeatElapsedTime(followerId, elapsedTime.toLong(TimeUnit.NANOSECONDS));
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 210d0f18c..bfeb0c19b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -1529,8 +1529,7 @@ class RaftServerImpl implements RaftServer.Division,
final long installedIndex = snapshotInstallationHandler.getInstalledIndex();
if (installedIndex >= RaftLog.LEAST_VALID_LOG_INDEX) {
LOG.info("{}: Follower has completed install the snapshot {}.", this, installedIndex);
- stateMachine.event().notifySnapshotInstalled(InstallSnapshotResult.SUCCESS, installedIndex,
- getRaftServer().getPeer());
+ stateMachine.event().notifySnapshotInstalled(InstallSnapshotResult.SUCCESS, installedIndex, getPeer());
}
}
return JavaUtils.allOf(futures).whenCompleteAsync(
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
index 2969ca314..e7b574cf3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/SnapshotInstallationHandler.java
@@ -305,7 +305,7 @@ class SnapshotInstallationHandler {
InstallSnapshotResult.SNAPSHOT_UNAVAILABLE);
inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX);
server.getStateMachine().event().notifySnapshotInstalled(
- InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, server.getRaftServer().getPeer());
+ InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, INVALID_LOG_INDEX, server.getPeer());
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.SNAPSHOT_UNAVAILABLE, -1);
}
@@ -323,7 +323,7 @@ class SnapshotInstallationHandler {
inProgressInstallSnapshotIndex.set(INVALID_LOG_INDEX);
final long latestInstalledIndex = latestInstalledSnapshotTermIndex.getIndex();
server.getStateMachine().event().notifySnapshotInstalled(
- InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex, server.getRaftServer().getPeer());
+ InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledIndex, server.getPeer());
installedIndex.set(latestInstalledIndex);
return ServerProtoUtils.toInstallSnapshotReplyProto(leaderId, getMemberId(),
currentTerm, InstallSnapshotResult.SNAPSHOT_INSTALLED, latestInstalledSnapshotTermIndex.getIndex());