You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/07 05:34:21 UTC
[incubator-ratis] branch master updated: RATIS-1208. Separate
LogAppender interface from the implementation. (#326)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new e5a052c RATIS-1208. Separate LogAppender interface from the implementation. (#326)
e5a052c is described below
commit e5a052c18766293b94cc89e3a34a469193009537
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Dec 7 13:34:14 2020 +0800
RATIS-1208. Separate LogAppender interface from the implementation. (#326)
* RATIS-1208. Separate LogAppender interface from the implementation.
* Fix a bug in SimpleStateMachine4Testing.
---
.../java/org/apache/ratis/grpc/GrpcFactory.java | 4 +-
.../apache/ratis/grpc/server/GrpcLogAppender.java | 42 +++---
.../org/apache/ratis/hadooprpc/HadoopFactory.java | 2 +-
.../java/org/apache/ratis/netty/NettyFactory.java | 2 +-
.../java/org/apache/ratis/server/RaftServer.java | 3 +-
.../ratis/server/{impl => }/ServerFactory.java | 8 +-
.../apache/ratis/server/impl/LeaderStateImpl.java | 25 ++--
.../{LogAppender.java => LogAppenderBase.java} | 148 +++++++--------------
.../ratis/server/impl/LogAppenderDaemon.java | 3 +-
.../apache/ratis/server/impl/RaftServerImpl.java | 1 +
.../apache/ratis/server/impl/RaftServerProxy.java | 1 +
.../apache/ratis/server/leader/LeaderState.java | 1 -
.../apache/ratis/server/leader/LogAppender.java | 118 ++++++++++++++++
.../java/org/apache/ratis/LogAppenderTests.java | 2 +-
.../apache/ratis/server/impl/MiniRaftCluster.java | 1 +
.../ratis/server/impl/RaftServerTestUtil.java | 1 +
.../ratis/server/simulation/SimulatedRpc.java | 2 +-
.../statemachine/SimpleStateMachine4Testing.java | 2 +-
.../ratis/datastream/DataStreamBaseTest.java | 3 +-
19 files changed, 223 insertions(+), 146 deletions(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
index c055848..3f33e44 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
@@ -26,8 +26,8 @@ import org.apache.ratis.grpc.server.GrpcService;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.impl.LogAppender;
-import org.apache.ratis.server.impl.ServerFactory;
+import org.apache.ratis.server.leader.LogAppender;
+import org.apache.ratis.server.ServerFactory;
import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.leader.LeaderState;
import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 85d3572..26329db 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -24,9 +24,9 @@ import org.apache.ratis.grpc.metrics.GrpcServerMetrics;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.LogAppenderBase;
import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.leader.LeaderState;
-import org.apache.ratis.server.impl.LogAppender;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
@@ -49,7 +49,7 @@ import com.codahale.metrics.Timer;
/**
* A new log appender implementation using grpc bi-directional stream API.
*/
-public class GrpcLogAppender extends LogAppender {
+public class GrpcLogAppender extends LogAppenderBase {
public static final Logger LOG = LoggerFactory.getLogger(GrpcLogAppender.class);
private final RequestMap pendingRequests = new RequestMap();
@@ -80,7 +80,7 @@ public class GrpcLogAppender extends LogAppender {
}
@Override
- protected GrpcService getServerRpc() {
+ public GrpcService getServerRpc() {
return (GrpcService)super.getServerRpc();
}
@@ -114,10 +114,18 @@ public class GrpcLogAppender extends LogAppender {
getFollower().decreaseNextIndex(nextIndex);
}
+ private boolean haveLogEntriesToSendOut() {
+ return shouldAppendEntries(getFollower().getNextIndex());
+ }
+
+ private boolean isFollowerCommitBehindLastCommitIndex() {
+ return getRaftLog().getLastCommittedIndex() > getFollower().getCommitIndex();
+ }
+
@Override
- protected void runAppenderImpl() throws IOException {
+ public void run() throws IOException {
boolean installSnapshotRequired;
- for(; isAppenderRunning(); mayWait()) {
+ for(; isRunning(); mayWait()) {
installSnapshotRequired = false;
//HB period is expired OR we have messages OR follower is behind with commit index
@@ -174,13 +182,13 @@ public class GrpcLogAppender extends LogAppender {
}
@Override
- public void stopAppender() {
+ public void stop() {
grpcServerMetrics.unregister();
- super.stopAppender();
+ super.stop();
}
@Override
- protected boolean shouldSendRequest() {
+ public boolean shouldSendRequest() {
return appendLogRequestObserver == null || super.shouldSendRequest();
}
@@ -203,7 +211,7 @@ public class GrpcLogAppender extends LogAppender {
// prepare and enqueue the append request. note changes on follower's
// nextIndex and ops on pendingRequests should always be associated
// together and protected by the lock
- pending = createRequest(callId++, excludeLogEntries);
+ pending = newAppendEntriesRequest(callId++, excludeLogEntries);
if (pending == null) {
return;
}
@@ -216,7 +224,7 @@ public class GrpcLogAppender extends LogAppender {
s = appendLogRequestObserver;
}
- if (isAppenderRunning()) {
+ if (isRunning()) {
sendRequest(request, pending, s);
}
}
@@ -316,7 +324,7 @@ public class GrpcLogAppender extends LogAppender {
default:
throw new IllegalStateException("Unexpected reply result: " + reply.getResult());
}
- notifyAppend();
+ notifyLogAppender();
}
/**
@@ -324,7 +332,7 @@ public class GrpcLogAppender extends LogAppender {
*/
@Override
public void onError(Throwable t) {
- if (!isAppenderRunning()) {
+ if (!isRunning()) {
LOG.info("{} is stopped", GrpcLogAppender.this);
return;
}
@@ -382,7 +390,7 @@ public class GrpcLogAppender extends LogAppender {
void close() {
done.set(true);
- GrpcLogAppender.this.notifyAppend();
+ notifyLogAppender();
}
synchronized boolean hasAllResponse() {
@@ -439,7 +447,7 @@ public class GrpcLogAppender extends LogAppender {
@Override
public void onError(Throwable t) {
- if (!isAppenderRunning()) {
+ if (!isRunning()) {
LOG.info("{} is stopped", GrpcLogAppender.this);
return;
}
@@ -477,7 +485,7 @@ public class GrpcLogAppender extends LogAppender {
try {
snapshotRequestObserver = getClient().installSnapshot(responseHandler);
for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) {
- if (isAppenderRunning()) {
+ if (isRunning()) {
snapshotRequestObserver.onNext(request);
getFollower().updateLastRpcSendTime();
responseHandler.addPending(request);
@@ -496,7 +504,7 @@ public class GrpcLogAppender extends LogAppender {
}
synchronized (this) {
- while (isAppenderRunning() && !responseHandler.isDone()) {
+ while (isRunning() && !responseHandler.isDone()) {
try {
wait();
} catch (InterruptedException ignored) {
@@ -541,7 +549,7 @@ public class GrpcLogAppender extends LogAppender {
}
synchronized (this) {
- if (isAppenderRunning() && !responseHandler.isDone()) {
+ if (isRunning() && !responseHandler.isDone()) {
try {
wait();
} catch (InterruptedException ignored) {
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
index f024b81..6cdfd36 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/HadoopFactory.java
@@ -26,7 +26,7 @@ import org.apache.ratis.hadooprpc.server.HadoopRpcService;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.impl.ServerFactory;
+import org.apache.ratis.server.ServerFactory;
public class HadoopFactory implements ServerFactory, ClientFactory {
public static Parameters newRaftParameters(Configuration conf) {
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
index 7f981ae..24f0142 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyFactory.java
@@ -25,7 +25,7 @@ import org.apache.ratis.netty.server.NettyRpcService;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.impl.ServerFactory;
+import org.apache.ratis.server.ServerFactory;
public class NettyFactory implements ServerFactory, ClientFactory {
public NettyFactory(Parameters parameters) {}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index b76705f..4ea4776 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -21,7 +21,6 @@ import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.*;
-import org.apache.ratis.rpc.RpcFactory;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.server.impl.RaftConfiguration;
import org.apache.ratis.server.impl.ServerImplUtils;
@@ -130,7 +129,7 @@ public interface RaftServer extends Closeable, RpcType.Get,
}
/** @return the factory for creating server components. */
- RpcFactory getFactory();
+ ServerFactory getFactory();
/** Start this server. */
void start() throws IOException;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java b/ratis-server/src/main/java/org/apache/ratis/server/ServerFactory.java
similarity index 89%
rename from ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java
rename to ratis-server/src/main/java/org/apache/ratis/server/ServerFactory.java
index 12d5a74..600f02d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/ServerFactory.java
@@ -15,13 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.impl;
+package org.apache.ratis.server;
import org.apache.ratis.rpc.RpcFactory;
-import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.impl.LogAppenderBase;
import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.leader.LeaderState;
+import org.apache.ratis.server.leader.LogAppender;
/** A factory interface for creating server components. */
public interface ServerFactory extends RpcFactory {
@@ -36,7 +36,7 @@ public interface ServerFactory extends RpcFactory {
/** Create a new {@link LogAppender}. */
default LogAppender newLogAppender(RaftServer.Division server, LeaderState state, FollowerInfo f) {
- return new LogAppender(server, state, f);
+ return new LogAppenderBase(server, state, f);
}
RaftServerRpc newRaftServerRpc(RaftServer server);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 3d52e44..0352fb9 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -35,6 +35,7 @@ import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.leader.LeaderState;
+import org.apache.ratis.server.leader.LogAppender;
import org.apache.ratis.server.metrics.LogAppenderMetrics;
import org.apache.ratis.server.metrics.RaftServerMetrics;
import org.apache.ratis.server.protocol.TermIndex;
@@ -284,7 +285,7 @@ class LeaderStateImpl implements LeaderState {
server.getId().toString(), null);
raftLog.append(placeHolder);
processor.start();
- senders.forEach(LogAppender::startAppender);
+ senders.forEach(LogAppender::start);
return placeHolder;
}
@@ -295,7 +296,7 @@ class LeaderStateImpl implements LeaderState {
void stop() {
this.running = false;
// do not interrupt event processor since it may be in the middle of logSync
- senders.forEach(LogAppender::stopAppender);
+ senders.forEach(LogAppender::stop);
final NotLeaderException nle = server.generateNotLeaderException();
final Collection<CommitInfoProto> commitInfos = server.getCommitInfos();
try {
@@ -313,7 +314,7 @@ class LeaderStateImpl implements LeaderState {
}
void notifySenders() {
- senders.forEach(LogAppender::notifyAppend);
+ senders.forEach(LogAppender::notifyLogAppender);
}
boolean inStagingState() {
@@ -466,7 +467,7 @@ class LeaderStateImpl implements LeaderState {
* Update sender list for setConfiguration request
*/
void addAndStartSenders(Collection<RaftPeer> newPeers) {
- addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, false).forEach(LogAppender::startAppender);
+ addSenders(newPeers, RaftLog.LEAST_VALID_LOG_INDEX, false).forEach(LogAppender::start);
}
Collection<LogAppender> addSenders(Collection<RaftPeer> newPeers, long nextIndex, boolean attendVote) {
@@ -486,7 +487,7 @@ class LeaderStateImpl implements LeaderState {
void stopAndRemoveSenders(Predicate<LogAppender> predicate) {
final List<LogAppender> toStop = senders.stream().filter(predicate).collect(Collectors.toList());
- toStop.forEach(LogAppender::stopAppender);
+ toStop.forEach(LogAppender::stop);
senders.removeAll(toStop);
}
@@ -494,7 +495,7 @@ class LeaderStateImpl implements LeaderState {
public void restart(LogAppender sender) {
final FollowerInfo follower = sender.getFollower();
LOG.info("{}: Restarting {} for {}", this, JavaUtils.getClassSimpleName(sender.getClass()), follower.getName());
- sender.stopAppender();
+ sender.stop();
senders.removeAll(Collections.singleton(sender));
addAndStartSenders(Collections.singleton(follower.getPeer()));
}
@@ -504,7 +505,7 @@ class LeaderStateImpl implements LeaderState {
*/
private void updateSenders(RaftConfiguration conf) {
Preconditions.assertTrue(conf.isStable() && !inStagingState());
- stopAndRemoveSenders(s -> !conf.containsInConf(s.getFollower().getPeer().getId()));
+ stopAndRemoveSenders(s -> !conf.containsInConf(s.getFollowerId()));
}
void submitStepDownEvent(StepDownReason reason) {
@@ -845,14 +846,14 @@ class LeaderStateImpl implements LeaderState {
private List<List<RaftPeerId>> divideFollowers(RaftConfiguration conf) {
List<List<RaftPeerId>> lists = new ArrayList<>(2);
List<RaftPeerId> listForNew = senders.stream()
- .filter(sender -> conf.containsInConf(sender.getFollower().getPeer().getId()))
- .map(sender -> sender.getFollower().getPeer().getId())
+ .map(LogAppender::getFollowerId)
+ .filter(conf::containsInConf)
.collect(Collectors.toList());
lists.add(listForNew);
if (conf.isTransitional()) {
List<RaftPeerId> listForOld = senders.stream()
- .filter(sender -> conf.containsInOldConf(sender.getFollower().getPeer().getId()))
- .map(sender -> sender.getFollower().getPeer().getId())
+ .map(LogAppender::getFollowerId)
+ .filter(conf::containsInOldConf)
.collect(Collectors.toList());
lists.add(listForOld);
}
@@ -923,7 +924,7 @@ class LeaderStateImpl implements LeaderState {
.filter(sender -> sender.getFollower()
.getLastRpcResponseTime()
.elapsedTimeMs() <= server.getMaxTimeoutMs())
- .map(sender -> sender.getFollower().getPeer().getId())
+ .map(LogAppender::getFollowerId)
.collect(Collectors.toList());
final RaftConfiguration conf = server.getRaftConf();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderBase.java
similarity index 74%
rename from ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
rename to ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderBase.java
index a922e00..6b045fd 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderBase.java
@@ -18,25 +18,35 @@
package org.apache.ratis.server.impl;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.leader.LeaderState;
+import org.apache.ratis.server.leader.LogAppender;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLog.EntryWithData;
import org.apache.ratis.server.raftlog.RaftLogIOException;
-import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.statemachine.SnapshotInfo;
-import org.apache.ratis.util.*;
+import org.apache.ratis.util.DataQueue;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.SizeInBytes;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.InterruptedIOException;
-import java.util.*;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
import static org.apache.ratis.server.metrics.RaftLogMetrics.LOG_APPENDER_INSTALL_SNAPSHOT_METRIC;
@@ -44,31 +54,27 @@ import static org.apache.ratis.server.metrics.RaftLogMetrics.LOG_APPENDER_INSTAL
/**
* A daemon thread appending log entries to a follower peer.
*/
-public class LogAppender {
- public static final Logger LOG = LoggerFactory.getLogger(LogAppender.class);
+public class LogAppenderBase implements LogAppender {
+ public static final Logger LOG = LoggerFactory.getLogger(LogAppenderBase.class);
private final String name;
private final RaftServer.Division server;
private final LeaderState leaderState;
- private final RaftLog raftLog;
private final FollowerInfo follower;
private final DataQueue<EntryWithData> buffer;
private final int snapshotChunkMaxSize;
- private final long halfMinTimeoutMs;
private final LogAppenderDaemon daemon;
- public LogAppender(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
+ public LogAppenderBase(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
this.follower = f;
this.name = follower.getName() + "-" + JavaUtils.getClassSimpleName(getClass());
this.server = server;
this.leaderState = leaderState;
- this.raftLog = server.getRaftLog();
final RaftProperties properties = server.getRaftServer().getProperties();
this.snapshotChunkMaxSize = RaftServerConfigKeys.Log.Appender.snapshotChunkSizeMax(properties).getSizeInt();
- this.halfMinTimeoutMs = server.properties().minRpcTimeoutMs() / 2;
final SizeInBytes bufferByteLimit = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
final int bufferElementLimit = RaftServerConfigKeys.Log.Appender.bufferElementLimit(properties);
@@ -76,44 +82,38 @@ public class LogAppender {
this.daemon = new LogAppenderDaemon(this);
}
- protected RaftServer.Division getServer() {
+ @Override
+ public final RaftServer.Division getServer() {
return server;
}
- protected RaftServerRpc getServerRpc() {
- return server.getRaftServer().getServerRpc();
- }
-
- public RaftLog getRaftLog() {
- return raftLog;
- }
-
@Override
public String toString() {
return name;
}
- void startAppender() {
+ @Override
+ public void start() {
daemon.tryToStart();
}
- public boolean isAppenderRunning() {
+ @Override
+ public boolean isRunning() {
return daemon.isWorking();
}
- public void stopAppender() {
+ @Override
+ public void stop() {
daemon.tryToClose();
}
- public FollowerInfo getFollower() {
+ @Override
+ public final FollowerInfo getFollower() {
return follower;
}
- protected RaftPeerId getFollowerId() {
- return getFollower().getPeer().getId();
- }
-
- public LeaderState getLeaderState() {
+ @Override
+ public final LeaderState getLeaderState() {
return leaderState;
}
@@ -123,7 +123,7 @@ public class LogAppender {
}
final long previousIndex = nextIndex - 1;
- final TermIndex previous = raftLog.getTermIndex(previousIndex);
+ final TermIndex previous = getRaftLog().getTermIndex(previousIndex);
if (previous != null) {
return previous;
}
@@ -139,8 +139,8 @@ public class LogAppender {
return null;
}
- protected AppendEntriesRequestProto createRequest(long callId,
- boolean heartbeat) throws RaftLogIOException {
+ @Override
+ public AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean heartbeat) throws RaftLogIOException {
final TermIndex previous = getPrevious(follower.getNextIndex());
final long snapshotIndex = follower.getSnapshotIndex();
final long heartbeatRemainingMs = getHeartbeatRemainingTime();
@@ -151,11 +151,11 @@ public class LogAppender {
Preconditions.assertTrue(buffer.isEmpty(), () -> "buffer has " + buffer.getNumElements() + " elements.");
- final long leaderNext = raftLog.getNextIndex();
+ final long leaderNext = getRaftLog().getNextIndex();
final long followerNext = follower.getNextIndex();
final long halfMs = heartbeatRemainingMs/2;
for (long next = followerNext; leaderNext > next && getHeartbeatRemainingTime() - halfMs > 0; ) {
- if (!buffer.offer(raftLog.getEntryWithData(next++))) {
+ if (!buffer.offer(getRaftLog().getEntryWithData(next++))) {
break;
}
}
@@ -195,16 +195,16 @@ public class LogAppender {
throws InterruptedException, InterruptedIOException, RaftLogIOException {
int retry = 0;
AppendEntriesRequestProto request = null;
- while (isAppenderRunning()) { // keep retrying for IOException
+ while (isRunning()) { // keep retrying for IOException
try {
if (request == null || request.getEntriesCount() == 0) {
- request = createRequest(DEFAULT_CALLID, false);
+ request = newAppendEntriesRequest(DEFAULT_CALLID, false);
}
if (request == null) {
LOG.trace("{} no entries to send now, wait ...", this);
return null;
- } else if (!isAppenderRunning()) {
+ } else if (!isRunning()) {
LOG.info("{} is stopped. Skip appendEntries.", this);
return null;
}
@@ -224,14 +224,15 @@ public class LogAppender {
}
handleException(ioe);
}
- if (isAppenderRunning()) {
+ if (isRunning()) {
server.properties().rpcSleepTime().sleep();
}
}
return null;
}
- protected InstallSnapshotRequestProto newInstallSnapshotNotificationRequest(TermIndex firstAvailableLogTermIndex) {
+ @Override
+ public InstallSnapshotRequestProto newInstallSnapshotNotificationRequest(TermIndex firstAvailableLogTermIndex) {
Preconditions.assertTrue(firstAvailableLogTermIndex.getIndex() > 0);
synchronized (server) {
return ServerProtoUtils.toInstallSnapshotRequestProto(server.getMemberId(), getFollowerId(),
@@ -239,8 +240,8 @@ public class LogAppender {
}
}
- protected Iterable<InstallSnapshotRequestProto> newInstallSnapshotRequests(
- String requestId, SnapshotInfo snapshot) {
+ @Override
+ public Iterable<InstallSnapshotRequestProto> newInstallSnapshotRequests(String requestId, SnapshotInfo snapshot) {
return ServerImplUtils.newInstallSnapshotRequests(server, getFollowerId(), requestId,
snapshot, snapshotChunkMaxSize);
}
@@ -274,29 +275,14 @@ public class LogAppender {
return reply;
}
- protected SnapshotInfo shouldInstallSnapshot() {
- final long logStartIndex = raftLog.getStartIndex();
- // we should install snapshot if the follower needs to catch up and:
- // 1. there is no local log entry but there is snapshot
- // 2. or the follower's next index is smaller than the log start index
- if (follower.getNextIndex() < raftLog.getNextIndex()) {
- final SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot();
- if (follower.getNextIndex() < logStartIndex ||
- (logStartIndex == RaftLog.INVALID_LOG_INDEX && snapshot != null)) {
- return snapshot;
- }
- }
- return null;
- }
-
- /** Check and send appendEntries RPC */
- protected void runAppenderImpl() throws InterruptedException, IOException {
- while (isAppenderRunning()) {
+ @Override
+ public void run() throws InterruptedException, IOException {
+ while (isRunning()) {
if (shouldSendRequest()) {
SnapshotInfo snapshot = shouldInstallSnapshot();
if (snapshot != null) {
LOG.info("{}: followerNextIndex = {} but logStartIndex = {}, send snapshot {} to follower",
- this, follower.getNextIndex(), raftLog.getStartIndex(), snapshot);
+ this, follower.getNextIndex(), getRaftLog().getStartIndex(), snapshot);
final InstallSnapshotReplyProto r = installSnapshot(snapshot);
if (r != null && r.getResult() == InstallSnapshotResult.NOT_LEADER) {
@@ -309,7 +295,7 @@ public class LogAppender {
}
}
}
- if (isAppenderRunning() && !shouldAppendEntries(follower.getNextIndex())) {
+ if (isRunning() && !shouldAppendEntries(follower.getNextIndex())) {
final long waitTime = getHeartbeatRemainingTime();
if (waitTime > 0) {
synchronized (this) {
@@ -358,42 +344,4 @@ public class LogAppender {
LOG.trace("TRACE", e);
getServerRpc().handleException(getFollowerId(), e, false);
}
-
- public synchronized void notifyAppend() {
- this.notify();
- }
-
- /** Should the leader send appendEntries RPC to this follower? */
- protected boolean shouldSendRequest() {
- return shouldAppendEntries(follower.getNextIndex()) || heartbeatTimeout();
- }
-
- protected boolean haveLogEntriesToSendOut() {
- return shouldAppendEntries(follower.getNextIndex());
- }
-
- protected boolean isFollowerCommitBehindLastCommitIndex() {
- return raftLog.getLastCommittedIndex() > follower.getCommitIndex();
- }
-
- private boolean shouldAppendEntries(long followerIndex) {
- return followerIndex < raftLog.getNextIndex();
- }
-
- protected boolean heartbeatTimeout() {
- return getHeartbeatRemainingTime() <= 0;
- }
-
- /**
- * @return the time in milliseconds that the leader should send a heartbeat.
- */
- protected long getHeartbeatRemainingTime() {
- return halfMinTimeoutMs - follower.getLastRpcTime().elapsedTimeMs();
- }
-
- protected boolean checkResponseTerm(long responseTerm) {
- synchronized (server) {
- return isAppenderRunning() && leaderState.onFollowerTerm(follower, responseTerm);
- }
- }
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java
index 4d0a662..4cecc9c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.server.impl;
+import org.apache.ratis.server.leader.LogAppender;
import org.apache.ratis.util.Daemon;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LifeCycle;
@@ -74,7 +75,7 @@ class LogAppenderDaemon {
private void run() {
try {
if (lifeCycle.transition(TRY_TO_RUN) == RUNNING) {
- logAppender.runAppenderImpl();
+ logAppender.run();
}
lifeCycle.compareAndTransition(RUNNING, CLOSING);
} catch (InterruptedException e) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 3bca7a0..254087f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -41,6 +41,7 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerMXBean;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.leader.LeaderState;
+import org.apache.ratis.server.leader.LogAppender;
import org.apache.ratis.server.metrics.LeaderElectionMetrics;
import org.apache.ratis.server.metrics.RaftServerMetrics;
import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index 834b90a..f285d57 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -35,6 +35,7 @@ import org.apache.ratis.protocol.exceptions.GroupMismatchException;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.ServerFactory;
import org.apache.ratis.util.JvmPauseMonitor;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerRpc;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LeaderState.java
index 24649d7..5460392 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LeaderState.java
@@ -19,7 +19,6 @@ package org.apache.ratis.server.leader;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
-import org.apache.ratis.server.impl.LogAppender;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.util.JavaUtils;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppender.java
new file mode 100644
index 0000000..843e6ca
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppender.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.leader;
+
+import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.protocol.TermIndex;
+import org.apache.ratis.server.raftlog.RaftLog;
+import org.apache.ratis.server.raftlog.RaftLogIOException;
+import org.apache.ratis.statemachine.SnapshotInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * A {@link LogAppender} is for the leader to send appendEntries to a particular follower.
+ */
+public interface LogAppender {
+ Logger LOG = LoggerFactory.getLogger(LogAppender.class);
+
+ RaftServer.Division getServer();
+
+ default RaftServerRpc getServerRpc() {
+ return getServer().getRaftServer().getServerRpc();
+ }
+
+ default RaftLog getRaftLog() {
+ return getServer().getRaftLog();
+ }
+
+ void start();
+
+ boolean isRunning();
+
+ void stop();
+
+ LeaderState getLeaderState();
+
+ FollowerInfo getFollower();
+
+ default RaftPeerId getFollowerId() {
+ return getFollower().getPeer().getId();
+ }
+
+ AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean heartbeat) throws RaftLogIOException;
+
+ InstallSnapshotRequestProto newInstallSnapshotNotificationRequest(TermIndex firstAvailableLogTermIndex);
+
+ Iterable<InstallSnapshotRequestProto> newInstallSnapshotRequests(String requestId, SnapshotInfo snapshot);
+
+ default SnapshotInfo shouldInstallSnapshot() {
+ // we should install snapshot if the follower needs to catch up and:
+ // 1. there is no local log entry but there is snapshot
+ // 2. or the follower's next index is smaller than the log start index
+ final long followerNextIndex = getFollower().getNextIndex();
+ if (followerNextIndex < getRaftLog().getNextIndex()) {
+ final long logStartIndex = getRaftLog().getStartIndex();
+ final SnapshotInfo snapshot = getServer().getStateMachine().getLatestSnapshot();
+ if (followerNextIndex < logStartIndex || (logStartIndex == RaftLog.INVALID_LOG_INDEX && snapshot != null)) {
+ return snapshot;
+ }
+ }
+ return null;
+ }
+
+ void run() throws InterruptedException, IOException;
+
+ default void notifyLogAppender() {
+ synchronized (this) {
+ notify();
+ }
+ }
+
+ /** Should the leader send appendEntries RPC to this follower? */
+ default boolean shouldSendRequest() {
+ return shouldAppendEntries(getFollower().getNextIndex()) || heartbeatTimeout();
+ }
+
+ default boolean shouldAppendEntries(long followerIndex) {
+ return followerIndex < getRaftLog().getNextIndex();
+ }
+
+ default boolean heartbeatTimeout() {
+ return getHeartbeatRemainingTime() <= 0;
+ }
+
+ /**
+ * @return the time in milliseconds that the leader should send a heartbeat.
+ */
+ default long getHeartbeatRemainingTime() {
+ return getServer().properties().minRpcTimeoutMs()/2 - getFollower().getLastRpcTime().elapsedTimeMs();
+ }
+
+ default boolean checkResponseTerm(long responseTerm) {
+ synchronized (getServer()) {
+ return isRunning() && getLeaderState().onFollowerTerm(getFollower(), responseTerm);
+ }
+ }
+}
diff --git a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
index ab37bba..f9dec53 100644
--- a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
@@ -30,7 +30,7 @@ import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.LogAppender;
+import org.apache.ratis.server.leader.LogAppender;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.metrics.RaftServerMetrics;
import org.apache.ratis.server.impl.RaftServerTestUtil;
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 3a52585..2167f7e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -35,6 +35,7 @@ import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.ServerFactory;
import org.apache.ratis.server.raftlog.memory.MemoryRaftLog;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.statemachine.StateMachine;
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index f216189..8e09b8d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -30,6 +30,7 @@ import org.apache.ratis.server.DataStreamMap;
import org.apache.ratis.server.DivisionInfo;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.leader.LogAppender;
import org.apache.ratis.server.metrics.RaftServerMetrics;
import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog;
import org.apache.ratis.server.storage.RaftStorage;
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
index f5ee907..0399b41 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/SimulatedRpc.java
@@ -23,7 +23,7 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.impl.ServerFactory;
+import org.apache.ratis.server.ServerFactory;
import org.apache.ratis.util.JavaUtils;
import java.util.Objects;
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index e25c9d2..6af26bf 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -374,7 +374,7 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
@Override
public CompletableFuture<ByteString> read(LogEntryProto entry) {
return blocking.getFuture(Blocking.Type.READ_STATE_MACHINE_DATA)
- .thenApply((v) -> null);
+ .thenApply(v -> STATE_MACHINE_DATA);
}
@Override
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 0533323..da44b68 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -51,13 +51,12 @@ import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.SetConfigurationRequest;
-import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.server.DataStreamMap;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.DataStreamServerImpl;
import org.apache.ratis.server.impl.RaftConfiguration;
import org.apache.ratis.server.impl.RaftServerTestUtil;
-import org.apache.ratis.server.impl.ServerFactory;
+import org.apache.ratis.server.ServerFactory;
import org.apache.ratis.server.metrics.RaftServerMetrics;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.RaftStorage;