You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/02 01:21:49 UTC
[incubator-ratis] branch master updated: RATIS-1191. Define an
interface for FollowerInfo. (#311)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 9a3589f RATIS-1191. Define an interface for FollowerInfo. (#311)
9a3589f is described below
commit 9a3589f9bbe74bfd612e33abe9b0313f719d776f
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Wed Dec 2 09:21:03 2020 +0800
RATIS-1191. Define an interface for FollowerInfo. (#311)
---
.../java/org/apache/ratis/grpc/GrpcFactory.java | 3 +-
.../apache/ratis/grpc/server/GrpcLogAppender.java | 6 +-
.../org/apache/ratis/hadooprpc/HadoopFactory.java | 4 +-
.../java/org/apache/ratis/netty/NettyFactory.java | 4 +-
.../{FollowerInfo.java => FollowerInfoImpl.java} | 48 +++++++------
.../org/apache/ratis/server/impl/LeaderState.java | 47 +++++++++----
.../org/apache/ratis/server/impl/LogAppender.java | 26 +++----
.../apache/ratis/server/impl/RaftServerImpl.java | 9 +--
.../apache/ratis/server/impl/ServerFactory.java | 15 ++--
.../apache/ratis/server/leader/FollowerInfo.java | 81 ++++++++++++++++++++++
.../ratis/server/metrics/LogAppenderMetrics.java | 19 ++---
.../ratis/server/impl/TestLogAppenderMetrics.java | 49 +++++++++----
.../ratis/server/simulation/SimulatedRpc.java | 4 +-
.../apache/ratis/grpc/TestLogAppenderWithGrpc.java | 2 +-
14 files changed, 215 insertions(+), 102 deletions(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
index 33c9a0b..e6efebe 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -25,6 +25,7 @@ import org.apache.ratis.grpc.server.GrpcLogAppender;
import org.apache.ratis.grpc.server.GrpcService;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.*;
import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index c1d4515..7bd4f5c 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -23,7 +23,7 @@ import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.grpc.metrics.GrpcServerMetrics;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.FollowerInfo;
+import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.impl.LeaderState;
import org.apache.ratis.server.impl.LogAppender;
import org.apache.ratis.server.impl.RaftServerImpl;
@@ -247,7 +247,7 @@ public class GrpcLogAppender extends LogAppender {
}
private void increaseNextIndex(final long installedSnapshotIndex) {
- getFollower().updateNextIndexToMax(installedSnapshotIndex + 1);
+ getFollower().updateNextIndex(installedSnapshotIndex + 1);
}
/**
@@ -346,7 +346,7 @@ public class GrpcLogAppender extends LogAppender {
private synchronized void updateNextIndex(long replyNextIndex) {
pendingRequests.clear();
- getFollower().updateNextIndex(replyNextIndex);
+ getFollower().setNextIndex(replyNextIndex);
}
private class InstallSnapshotResponseHandler implements StreamObserver<InstallSnapshotReplyProto> {
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
index cf4c5fd..f024b81 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -28,7 +28,7 @@ import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.ServerFactory;
-public class HadoopFactory extends ServerFactory.BaseFactory implements ClientFactory {
+public class HadoopFactory implements ServerFactory, ClientFactory {
public static Parameters newRaftParameters(Configuration conf) {
final Parameters p = new Parameters();
HadoopConfigKeys.setConf(p, conf);
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
index 450b95f..7f981ae 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -27,7 +27,7 @@ import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.ServerFactory;
-public class NettyFactory extends ServerFactory.BaseFactory implements ClientFactory {
+public class NettyFactory implements ServerFactory, ClientFactory {
public NettyFactory(Parameters parameters) {}
@Override
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
similarity index 83%
rename from ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
rename to ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
index c423e68..0f6c1ab 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerInfoImpl.java
@@ -19,18 +19,15 @@ package org.apache.ratis.server.impl;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogIndex;
import org.apache.ratis.util.Timestamp;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
-public class FollowerInfo {
- public static final Logger LOG = LoggerFactory.getLogger(FollowerInfo.class);
-
+class FollowerInfoImpl implements FollowerInfo {
private final String name;
private final Consumer<Object> infoIndexChange;
private final Consumer<Object> debugIndexChange;
@@ -43,10 +40,8 @@ public class FollowerInfo {
private final RaftLogIndex commitIndex = new RaftLogIndex("commitIndex", RaftLog.INVALID_LOG_INDEX);
private final RaftLogIndex snapshotIndex = new RaftLogIndex("snapshotIndex", 0L);
private volatile boolean attendVote;
- private final int rpcSlownessTimeoutMs;
- FollowerInfo(RaftGroupMemberId id, RaftPeer peer, Timestamp lastRpcTime, long nextIndex,
- boolean attendVote, int rpcSlownessTimeoutMs) {
+ FollowerInfoImpl(RaftGroupMemberId id, RaftPeer peer, Timestamp lastRpcTime, long nextIndex, boolean attendVote) {
this.name = id + "->" + peer.getId();
this.infoIndexChange = s -> LOG.info("{}: {}", name, s);
this.debugIndexChange = s -> LOG.debug("{}: {}", name, s);
@@ -56,56 +51,66 @@ public class FollowerInfo {
this.lastRpcSendTime = new AtomicReference<>(lastRpcTime);
this.nextIndex = new RaftLogIndex("nextIndex", nextIndex);
this.attendVote = attendVote;
- this.rpcSlownessTimeoutMs = rpcSlownessTimeoutMs;
}
+ @Override
public long getMatchIndex() {
return matchIndex.get();
}
+ @Override
public boolean updateMatchIndex(long newMatchIndex) {
return matchIndex.updateToMax(newMatchIndex, debugIndexChange);
}
- /** @return the commit index acked by the follower. */
- long getCommitIndex() {
+ @Override
+ public long getCommitIndex() {
return commitIndex.get();
}
- boolean updateCommitIndex(long newCommitIndex) {
+ @Override
+ public boolean updateCommitIndex(long newCommitIndex) {
return commitIndex.updateToMax(newCommitIndex, debugIndexChange);
}
- long getSnapshotIndex() {
+ @Override
+ public long getSnapshotIndex() {
return snapshotIndex.get();
}
+ @Override
public long getNextIndex() {
return nextIndex.get();
}
+ @Override
public void increaseNextIndex(long newNextIndex) {
nextIndex.updateIncreasingly(newNextIndex, debugIndexChange);
}
+ @Override
public void decreaseNextIndex(long newNextIndex) {
nextIndex.updateUnconditionally(old -> old <= 0L? old: Math.min(old - 1, newNextIndex), infoIndexChange);
}
- public void updateNextIndex(long newNextIndex) {
+ @Override
+ public void setNextIndex(long newNextIndex) {
nextIndex.updateUnconditionally(old -> newNextIndex >= 0 ? newNextIndex : old, infoIndexChange);
}
- public void updateNextIndexToMax(long newNextIndex) {
+ @Override
+ public void updateNextIndex(long newNextIndex) {
nextIndex.updateToMax(newNextIndex, infoIndexChange);
}
+ @Override
public void setSnapshotIndex(long newSnapshotIndex) {
snapshotIndex.setUnconditionally(newSnapshotIndex, infoIndexChange);
matchIndex.setUnconditionally(newSnapshotIndex, infoIndexChange);
nextIndex.setUnconditionally(newSnapshotIndex + 1, infoIndexChange);
}
+ @Override
public String getName() {
return name;
}
@@ -126,29 +131,28 @@ public class FollowerInfo {
return attendVote;
}
+ @Override
public RaftPeer getPeer() {
return peer;
}
- /** Update lastRpcResponseTime to the current time. */
+ @Override
public void updateLastRpcResponseTime() {
lastRpcResponseTime.set(Timestamp.currentTime());
}
- Timestamp getLastRpcResponseTime() {
+ @Override
+ public Timestamp getLastRpcResponseTime() {
return lastRpcResponseTime.get();
}
- /** Update lastRpcSendTime to the current time. */
+ @Override
public void updateLastRpcSendTime() {
lastRpcSendTime.set(Timestamp.currentTime());
}
+ @Override
public Timestamp getLastRpcTime() {
return Timestamp.latest(lastRpcResponseTime.get(), lastRpcSendTime.get());
}
-
- boolean isSlow() {
- return lastRpcResponseTime.get().elapsedTimeMs() > rpcSlownessTimeoutMs;
- }
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 30eec01..7002c44 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -24,6 +24,7 @@ import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.NotReplicatedException;
import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException;
+import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.metrics.LogAppenderMetrics;
import org.apache.ratis.server.metrics.RaftServerMetrics;
@@ -316,6 +317,14 @@ public class LeaderState {
return currentTerm;
}
+ boolean handleResponseTerm(FollowerInfo follower, long followerTerm) {
+ if (isAttendingVote(follower) && followerTerm > getCurrentTerm()) {
+ submitStepDownEvent(followerTerm, LeaderState.StepDownReason.HIGHER_TERM);
+ return true;
+ }
+ return false;
+ }
+
TimeDuration getSyncInterval() {
return syncInterval;
}
@@ -432,9 +441,10 @@ public class LeaderState {
}
}
- AppendEntriesRequestProto newAppendEntriesRequestProto(RaftPeerId targetId,
- TermIndex previous, List<LogEntryProto> entries, boolean initializing,
- long callId) {
+ AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follower,
+ List<LogEntryProto> entries, TermIndex previous, long callId) {
+ final boolean initializing = isAttendingVote(follower);
+ final RaftPeerId targetId = follower.getPeer().getId();
return ServerProtoUtils.toAppendEntriesRequestProto(server.getMemberId(), targetId, currentTerm, entries,
ServerImplUtils.effectiveCommitIndex(raftLog.getLastCommittedIndex(), previous, entries.size()),
initializing, previous, server.getCommitInfos(), callId);
@@ -451,12 +461,11 @@ public class LeaderState {
final Timestamp t = Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs());
final List<LogAppender> newAppenders = newPeers.stream()
.map(peer -> {
- final FollowerInfo f = new FollowerInfo(server.getMemberId(), peer, t, nextIndex, attendVote,
- server.getRpcSlownessTimeoutMs());
+ final FollowerInfo f = new FollowerInfoImpl(server.getMemberId(), peer, t, nextIndex, attendVote);
LogAppender logAppender = server.newLogAppender(this, f);
peerIdFollowerInfoMap.put(peer.getId(), f);
raftServerMetrics.addFollower(peer.getId());
- logAppenderMetrics.addFollowerGauges(f);
+ logAppenderMetrics.addFollowerGauges(peer.getId(), f::getNextIndex, f::getMatchIndex, f::getLastRpcTime);
return logAppender;
}).collect(Collectors.toList());
senders.addAll(newAppenders);
@@ -570,9 +579,8 @@ public class LeaderState {
* caught-up.
* 3. Otherwise the peer is making progressing. Keep waiting.
*/
- private BootStrapProgress checkProgress(FollowerInfo follower,
- long committed) {
- Preconditions.assertTrue(!follower.isAttendingVote());
+ private BootStrapProgress checkProgress(FollowerInfo follower, long committed) {
+ Preconditions.assertTrue(!isAttendingVote(follower));
final Timestamp progressTime = Timestamp.currentTime().addTimeMs(-server.getMaxTimeoutMs());
final Timestamp timeoutTime = Timestamp.currentTime().addTimeMs(-3L * server.getMaxTimeoutMs());
if (follower.getLastRpcResponseTime().compareTo(timeoutTime) < 0) {
@@ -589,13 +597,17 @@ public class LeaderState {
private Collection<BootStrapProgress> checkAllProgress(long committed) {
Preconditions.assertTrue(inStagingState());
return senders.stream()
- .filter(sender -> !sender.getFollower().isAttendingVote())
+ .filter(sender -> !isAttendingVote(sender.getFollower()))
.map(sender -> checkProgress(sender.getFollower(), committed))
.collect(Collectors.toCollection(ArrayList::new));
}
- void submitCheckStagingEvent() {
- eventQueue.submit(checkStagingEvent);
+ void submitEventOnSuccessAppend(FollowerInfo follower) {
+ if (isAttendingVote(follower)) {
+ submitUpdateCommitEvent();
+ } else {
+ eventQueue.submit(checkStagingEvent);
+ }
}
private void checkStaging() {
@@ -611,7 +623,10 @@ public class LeaderState {
} else if (!reports.contains(BootStrapProgress.PROGRESSING)) {
// all caught up!
applyOldNewConf();
- senders.forEach(s -> s.getFollower().startAttendVote());
+ senders.stream()
+ .map(LogAppender::getFollower)
+ .map(FollowerInfoImpl.class::cast)
+ .forEach(FollowerInfoImpl::startAttendVote);
}
}
}
@@ -961,7 +976,7 @@ public class LeaderState {
void fail(BootStrapProgress progress) {
final String message = this + ": Fail to set configuration " + newConf + " due to " + progress;
LOG.debug(message);
- stopAndRemoveSenders(s -> !s.getFollower().isAttendingVote());
+ stopAndRemoveSenders(s -> !isAttendingVote(s.getFollower()));
LeaderState.this.stagingState = null;
// send back failure response to client's request
@@ -987,6 +1002,10 @@ public class LeaderState {
return senders.stream();
}
+ private static boolean isAttendingVote(FollowerInfo follower) {
+ return ((FollowerInfoImpl)follower).isAttendingVote();
+ }
+
/**
* Record Follower Heartbeat Elapsed Time.
*/
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index aac6c05..a784409 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -19,6 +19,7 @@ package org.apache.ratis.server.impl;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.FileInfo;
@@ -231,8 +232,7 @@ public class LogAppender {
final long heartbeatRemainingMs = getHeartbeatRemainingTime();
if (heartbeatRemainingMs <= 0L || heartbeat) {
// heartbeat
- return leaderState.newAppendEntriesRequestProto(
- getFollowerId(), previous, Collections.emptyList(), !follower.isAttendingVote(), callId);
+ return leaderState.newAppendEntriesRequestProto(follower, Collections.emptyList(), previous, callId);
}
Preconditions.assertTrue(buffer.isEmpty(), () -> "buffer has " + buffer.getNumElements() + " elements.");
@@ -254,8 +254,7 @@ public class LogAppender {
follower.getName(), entry, time, exception));
buffer.clear();
assertProtos(protos, followerNext, previous, snapshotIndex);
- return leaderState.newAppendEntriesRequestProto(
- getFollowerId(), previous, protos, !follower.isAttendingVote(), callId);
+ return leaderState.newAppendEntriesRequestProto(follower, protos, previous, callId);
}
private void assertProtos(List<LogEntryProto> protos, long nextIndex, TermIndex previous, long snapshotIndex) {
@@ -554,19 +553,15 @@ public class LogAppender {
}
protected void submitEventOnSuccessAppend() {
- if (follower.isAttendingVote()) {
- leaderState.submitUpdateCommitEvent();
- } else {
- leaderState.submitCheckStagingEvent();
- }
+ leaderState.submitEventOnSuccessAppend(follower);
}
protected void checkSlowness() {
- if (follower.isSlow()) {
+ final TimeDuration lastRpcResponseElapsed = follower.getLastRpcResponseTime().elapsedTime();
+ if (lastRpcResponseElapsed.compareTo(server.getRpcSlownessTimeout()) > 0) {
server.getStateMachine().leaderEvent().notifyFollowerSlowness(server.getRoleInfoProto());
}
- leaderState.recordFollowerHeartbeatElapsedTime(follower.getPeer().getId(),
- follower.getLastRpcResponseTime().elapsedTime());
+ leaderState.recordFollowerHeartbeatElapsedTime(follower.getPeer().getId(), lastRpcResponseElapsed);
}
public synchronized void notifyAppend() {
@@ -603,12 +598,7 @@ public class LogAppender {
protected boolean checkResponseTerm(long responseTerm) {
synchronized (server) {
- if (isAppenderRunning() && follower.isAttendingVote()
- && responseTerm > leaderState.getCurrentTerm()) {
- leaderState.submitStepDownEvent(responseTerm, LeaderState.StepDownReason.HIGHER_TERM);
- return true;
- }
+ return isAppenderRunning() && leaderState.handleResponseTerm(follower, responseTerm);
}
- return false;
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 02ad561..6971e7c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -33,6 +33,7 @@ import org.apache.ratis.protocol.exceptions.ServerNotReadyException;
import org.apache.ratis.protocol.exceptions.StaleReadException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.server.DataStreamMap;
+import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerMXBean;
@@ -93,7 +94,7 @@ public class RaftServerImpl implements RaftServer.Division,
private final int minTimeoutMs;
private final int maxTimeoutMs;
private final TimeDuration leaderStepDownWaitTime;
- private final int rpcSlownessTimeoutMs;
+ private final TimeDuration rpcSlownessTimeout;
private final TimeDuration sleepDeviationThreshold;
private final boolean installSnapshotEnabled;
@@ -131,7 +132,7 @@ public class RaftServerImpl implements RaftServer.Division,
final RaftProperties properties = proxy.getProperties();
minTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMin(properties).toIntExact(TimeUnit.MILLISECONDS);
maxTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMax(properties).toIntExact(TimeUnit.MILLISECONDS);
- rpcSlownessTimeoutMs = RaftServerConfigKeys.Rpc.slownessTimeout(properties).toIntExact(TimeUnit.MILLISECONDS);
+ this.rpcSlownessTimeout = RaftServerConfigKeys.Rpc.slownessTimeout(properties);
leaderStepDownWaitTime = RaftServerConfigKeys.LeaderElection.leaderStepDownWaitTime(properties);
this.sleepDeviationThreshold = RaftServerConfigKeys.sleepDeviationThreshold(properties);
installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
@@ -178,8 +179,8 @@ public class RaftServerImpl implements RaftServer.Division,
return maxTimeoutMs;
}
- int getRpcSlownessTimeoutMs() {
- return rpcSlownessTimeoutMs;
+ TimeDuration getRpcSlownessTimeout() {
+ return rpcSlownessTimeout;
}
TimeDuration getRandomElectionTimeout() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java
index e0149f2..7300ddc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -18,6 +18,7 @@
package org.apache.ratis.server.impl;
import org.apache.ratis.rpc.RpcFactory;
+import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerRpc;
@@ -33,15 +34,9 @@ public interface ServerFactory extends RpcFactory {
}
/** Create a new {@link LogAppender}. */
- LogAppender newLogAppender(RaftServerImpl server, LeaderState state, FollowerInfo f);
+ default LogAppender newLogAppender(RaftServerImpl server, LeaderState state, FollowerInfo f) {
+ return new LogAppender(server, state, f);
+ }
RaftServerRpc newRaftServerRpc(RaftServer server);
-
- abstract class BaseFactory implements ServerFactory {
- @Override
- public LogAppender newLogAppender(
- RaftServerImpl server, LeaderState state, FollowerInfo f) {
- return new LogAppender(server, state, f);
- }
- }
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
new file mode 100644
index 0000000..487576f
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/FollowerInfo.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.leader;
+
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.util.Timestamp;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Information of a follower, provided the local server is the Leader
+ */
+public interface FollowerInfo {
+ Logger LOG = LoggerFactory.getLogger(FollowerInfo.class);
+
+ /** @return the name of this object. */
+ String getName();
+
+ /** @return this follower's peer info. */
+ RaftPeer getPeer();
+
+ /** @return the matchIndex acknowledged by this follower. */
+ long getMatchIndex();
+
+ /** Update this follower's matchIndex. */
+ boolean updateMatchIndex(long newMatchIndex);
+
+ /** @return the commitIndex acknowledged by this follower. */
+ long getCommitIndex();
+
+ /** Update follower's commitIndex. */
+ boolean updateCommitIndex(long newCommitIndex);
+
+ /** @return the snapshotIndex acknowledged by this follower. */
+ long getSnapshotIndex();
+
+ /** Set follower's snapshotIndex. */
+ void setSnapshotIndex(long newSnapshotIndex);
+
+ /** @return the nextIndex for this follower. */
+ long getNextIndex();
+
+ /** Increase the nextIndex for this follower. */
+ void increaseNextIndex(long newNextIndex);
+
+ /** Decrease the nextIndex for this follower. */
+ void decreaseNextIndex(long newNextIndex);
+
+ /** Set the nextIndex for this follower. */
+ void setNextIndex(long newNextIndex);
+
+ /** Update the nextIndex for this follower. */
+ void updateNextIndex(long newNextIndex);
+
+ /** @return the lastRpcResponseTime . */
+ Timestamp getLastRpcResponseTime();
+
+ /** Update lastRpcResponseTime to the current time. */
+ void updateLastRpcResponseTime();
+
+ /** Update lastRpcSendTime to the current time. */
+ void updateLastRpcSendTime();
+
+ /** @return the latest of the lastRpcSendTime and the lastRpcResponseTime . */
+ Timestamp getLastRpcTime();
+}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/metrics/LogAppenderMetrics.java b/ratis-server/src/main/java/org/apache/ratis/server/metrics/LogAppenderMetrics.java
index e4213c0..ea6c940 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/metrics/LogAppenderMetrics.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/metrics/LogAppenderMetrics.java
@@ -25,7 +25,11 @@ import org.apache.ratis.metrics.MetricRegistryInfo;
import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.metrics.RatisMetrics;
import org.apache.ratis.protocol.RaftGroupMemberId;
-import org.apache.ratis.server.impl.FollowerInfo;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.util.Timestamp;
+
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
public final class LogAppenderMetrics extends RatisMetrics {
public static final String RATIS_LOG_APPENDER_METRICS = "log_appender";
@@ -41,13 +45,10 @@ public final class LogAppenderMetrics extends RatisMetrics {
RATIS_LOG_APPENDER_METRICS, RATIS_LOG_APPENDER_METRICS_DESC));
}
- public void addFollowerGauges(FollowerInfo followerInfo) {
- registry.gauge(String.format(FOLLOWER_NEXT_INDEX,
- followerInfo.getPeer().getId().toString()),
- () -> followerInfo::getNextIndex);
- registry.gauge(String.format(FOLLOWER_MATCH_INDEX, followerInfo.getPeer().getId().toString()),
- () -> followerInfo::getMatchIndex);
- registry.gauge(String.format(FOLLOWER_RPC_RESP_TIME, followerInfo.getPeer().getId().toString()),
- () -> () -> followerInfo.getLastRpcTime().elapsedTimeMs());
+ public void addFollowerGauges(RaftPeerId id, LongSupplier getNextIndex, LongSupplier getMatchIndex,
+ Supplier<Timestamp> getLastRpcTime) {
+ registry.gauge(String.format(FOLLOWER_NEXT_INDEX, id), () -> getNextIndex::getAsLong);
+ registry.gauge(String.format(FOLLOWER_MATCH_INDEX, id), () -> getMatchIndex::getAsLong);
+ registry.gauge(String.format(FOLLOWER_RPC_RESP_TIME, id), () -> () -> getLastRpcTime.get().elapsedTimeMs());
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestLogAppenderMetrics.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestLogAppenderMetrics.java
index 74148f9..f2e3383 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestLogAppenderMetrics.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestLogAppenderMetrics.java
@@ -20,13 +20,10 @@ package org.apache.ratis.server.impl;
import static org.apache.ratis.server.metrics.RaftLogMetrics.FOLLOWER_MATCH_INDEX;
import static org.apache.ratis.server.metrics.RaftLogMetrics.FOLLOWER_NEXT_INDEX;
import static org.apache.ratis.server.metrics.RaftLogMetrics.FOLLOWER_RPC_RESP_TIME;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
-import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.metrics.LogAppenderMetrics;
import org.apache.ratis.util.Timestamp;
@@ -40,20 +37,18 @@ public class TestLogAppenderMetrics {
private RatisMetricRegistry ratisMetricRegistry;
private RaftPeerId raftPeerId;
- private FollowerInfo followerInfo;
+ private MyFollowerInfo followerInfo;
@Before
public void setup() {
RaftGroupId raftGroupId = RaftGroupId.randomId();
raftPeerId = RaftPeerId.valueOf("TestId");
- final RaftPeer raftPeer = RaftPeer.newBuilder().setId(raftPeerId).build();
RaftGroupMemberId raftGroupMemberId = RaftGroupMemberId.valueOf(raftPeerId, raftGroupId);
- LogAppender logAppender = mock(LogAppender.class);
- followerInfo = new TestFollowerInfo(raftGroupMemberId, raftPeer, Timestamp.currentTime(), 100L, true, 1000);
- when(logAppender.getFollower()).thenReturn(followerInfo);
+ followerInfo = new MyFollowerInfo(100L);
LogAppenderMetrics logAppenderMetrics = new LogAppenderMetrics(raftGroupMemberId);
ratisMetricRegistry = logAppenderMetrics.getRegistry();
- logAppenderMetrics.addFollowerGauges(followerInfo);
+ logAppenderMetrics.addFollowerGauges(raftPeerId, followerInfo::getNextIndex, followerInfo::getMatchIndex,
+ followerInfo::getLastRpcTime);
}
@Test
@@ -75,11 +70,37 @@ public class TestLogAppenderMetrics {
Assert.assertNotNull(rpcTime.getValue());
}
- private static class TestFollowerInfo extends FollowerInfo {
- TestFollowerInfo(RaftGroupMemberId id, RaftPeer peer, Timestamp
- lastRpcTime, long nextIndex, boolean attendVote, int
- rpcSlownessTimeoutMs) {
- super(id, peer, lastRpcTime, nextIndex, attendVote, rpcSlownessTimeoutMs);
+ private static class MyFollowerInfo {
+ private volatile long nextIndex;
+ private volatile long matchIndex;
+ private volatile Timestamp lastRpcTime = Timestamp.currentTime();
+
+ MyFollowerInfo(long nextIndex) {
+ this.nextIndex = nextIndex;
+ }
+
+ long getNextIndex() {
+ return nextIndex;
+ }
+
+ void updateNextIndex(long nextIndex) {
+ this.nextIndex = nextIndex;
+ }
+
+ long getMatchIndex() {
+ return matchIndex;
+ }
+
+ void updateMatchIndex(long matchIndex) {
+ this.matchIndex = matchIndex;
+ }
+
+ Timestamp getLastRpcTime() {
+ return lastRpcTime;
+ }
+
+ void updateLastRpcResponseTime() {
+ lastRpcTime = Timestamp.currentTime();
}
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
index 11fe2ee..f5ee907 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -41,7 +41,7 @@ class SimulatedRpc implements RpcType {
return new Factory(parameters);
}
- static class Factory extends ServerFactory.BaseFactory implements ClientFactory {
+ static class Factory implements ServerFactory, ClientFactory {
static String SERVER_REQUEST_REPLY_KEY = "raft.simulated.serverRequestReply";
static String CLIENT_TO_SERVER_REQUEST_REPLY_KEY = "raft.simulated.client2serverRequestReply";
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
index 38d31c5..067baff 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
@@ -27,7 +27,7 @@ import org.apache.ratis.grpc.metrics.GrpcServerMetrics;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.FollowerInfo;
+import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;