You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/07 08:49:26 UTC
[incubator-ratis] branch master updated: RATIS-1214. Separate the
default LogAppender implementation from LogAppenderBase. (#331)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 80d857b RATIS-1214. Separate the default LogAppender implementation from LogAppenderBase. (#331)
80d857b is described below
commit 80d857b7418855c4653487e8fe7fe77dccbb3afd
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Dec 7 16:49:18 2020 +0800
RATIS-1214. Separate the default LogAppender implementation from LogAppenderBase. (#331)
---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 27 ++-
.../org/apache/ratis/server/ServerFactory.java | 3 +-
.../apache/ratis/server/impl/ServerImplUtils.java | 8 -
.../{impl => leader}/InstallSnapshotRequests.java | 3 +-
.../apache/ratis/server/leader/LogAppender.java | 61 +++++--
.../server/{impl => leader}/LogAppenderBase.java | 174 +------------------
.../server/{impl => leader}/LogAppenderDaemon.java | 3 +-
.../ratis/server/leader/LogAppenderDefault.java | 185 +++++++++++++++++++++
8 files changed, 261 insertions(+), 203 deletions(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 26329db..579e1cb 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -24,10 +24,10 @@ import org.apache.ratis.grpc.metrics.GrpcServerMetrics;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.LogAppenderBase;
import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.leader.LeaderState;
import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.leader.LogAppenderBase;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
@@ -114,10 +114,6 @@ public class GrpcLogAppender extends LogAppenderBase {
getFollower().decreaseNextIndex(nextIndex);
}
- private boolean haveLogEntriesToSendOut() {
- return shouldAppendEntries(getFollower().getNextIndex());
- }
-
private boolean isFollowerCommitBehindLastCommitIndex() {
return getRaftLog().getLastCommittedIndex() > getFollower().getCommitIndex();
}
@@ -129,7 +125,7 @@ public class GrpcLogAppender extends LogAppenderBase {
installSnapshotRequired = false;
//HB period is expired OR we have messages OR follower is behind with commit index
- if (haveLogEntriesToSendOut() || heartbeatTimeout() || isFollowerCommitBehindLastCommitIndex()) {
+ if (shouldSendAppendEntries() || isFollowerCommitBehindLastCommitIndex()) {
if (installSnapshotEnabled) {
SnapshotInfo snapshot = shouldInstallSnapshot();
@@ -156,11 +152,11 @@ public class GrpcLogAppender extends LogAppenderBase {
private long getWaitTimeMs() {
if (haveTooManyPendingRequests()) {
- return getHeartbeatRemainingTime(); // Should wait for a short time
- } else if (haveLogEntriesToSendOut() || heartbeatTimeout()) {
+ return getHeartbeatRemainingTimeMs(); // Should wait for a short time
+ } else if (shouldSendAppendEntries()) {
return 0L;
}
- return Math.min(10L, getHeartbeatRemainingTime());
+ return Math.min(10L, getHeartbeatRemainingTimeMs());
}
private void mayWait() {
@@ -188,8 +184,8 @@ public class GrpcLogAppender extends LogAppenderBase {
}
@Override
- public boolean shouldSendRequest() {
- return appendLogRequestObserver == null || super.shouldSendRequest();
+ public boolean shouldSendAppendEntries() {
+ return appendLogRequestObserver == null || super.shouldSendAppendEntries();
}
/**
@@ -313,7 +309,7 @@ public class GrpcLogAppender extends LogAppenderBase {
break;
case NOT_LEADER:
grpcServerMetrics.onRequestNotLeader(getFollowerId().toString());
- if (checkResponseTerm(reply.getTerm())) {
+ if (onFollowerTerm(reply.getTerm())) {
return;
}
break;
@@ -429,7 +425,7 @@ public class GrpcLogAppender extends LogAppenderBase {
removePending(reply);
break;
case NOT_LEADER:
- checkResponseTerm(reply.getTerm());
+ onFollowerTerm(reply.getTerm());
break;
case CONF_MISMATCH:
LOG.error("{}: Configuration Mismatch ({}): Leader {} has it set to {} but follower {} has it set to {}",
@@ -565,12 +561,13 @@ public class GrpcLogAppender extends LogAppenderBase {
* @return the first available log's start term index
*/
private TermIndex shouldNotifyToInstallSnapshot() {
- if (getFollower().getNextIndex() < getRaftLog().getStartIndex()) {
+ final long leaderStartIndex = getRaftLog().getStartIndex();
+ if (getFollower().getNextIndex() < leaderStartIndex) {
// The Leader does not have the logs from the Follower's last log
// index onwards. And install snapshot is disabled. So the Follower
// should be notified to install the latest snapshot through its
// State Machine.
- return getRaftLog().getTermIndex(getRaftLog().getStartIndex());
+ return getRaftLog().getTermIndex(leaderStartIndex);
}
return null;
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/ServerFactory.java b/ratis-server/src/main/java/org/apache/ratis/server/ServerFactory.java
index 600f02d..b788bc9 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/ServerFactory.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/ServerFactory.java
@@ -18,7 +18,6 @@
package org.apache.ratis.server;
import org.apache.ratis.rpc.RpcFactory;
-import org.apache.ratis.server.impl.LogAppenderBase;
import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.leader.LeaderState;
import org.apache.ratis.server.leader.LogAppender;
@@ -36,7 +35,7 @@ public interface ServerFactory extends RpcFactory {
/** Create a new {@link LogAppender}. */
default LogAppender newLogAppender(RaftServer.Division server, LeaderState state, FollowerInfo f) {
- return new LogAppenderBase(server, state, f);
+ return LogAppender.newLogAppenderDefault(server, state, f);
}
RaftServerRpc newRaftServerRpc(RaftServer server);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
index 1117ceb..db452da 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerImplUtils.java
@@ -19,13 +19,11 @@ package org.apache.ratis.server.impl;
import org.apache.ratis.conf.Parameters;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
-import org.apache.ratis.statemachine.SnapshotInfo;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.IOUtils;
import org.apache.ratis.util.JavaUtils;
@@ -68,12 +66,6 @@ public final class ServerImplUtils {
return proxy;
}
- public static Iterable<InstallSnapshotRequestProto> newInstallSnapshotRequests(RaftServer.Division server,
- RaftPeerId targetId, String requestId, SnapshotInfo snapshot, int snapshotChunkMaxSize) {
- return new InstallSnapshotRequests(server, targetId, requestId, snapshot, snapshotChunkMaxSize);
- }
-
-
static long effectiveCommitIndex(long leaderCommitIndex, TermIndex followerPrevious, int numAppendEntries) {
final long p = Optional.ofNullable(followerPrevious).map(TermIndex::getIndex).orElse(RaftLog.LEAST_VALID_LOG_INDEX);
return Math.min(leaderCommitIndex, p + numAppendEntries);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/InstallSnapshotRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java
similarity index 98%
rename from ratis-server/src/main/java/org/apache/ratis/server/impl/InstallSnapshotRequests.java
rename to ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java
index 448770a..31e32dc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/InstallSnapshotRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/InstallSnapshotRequests.java
@@ -15,12 +15,13 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.impl;
+package org.apache.ratis.server.leader;
import org.apache.ratis.proto.RaftProtos.FileChunkProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.storage.FileChunkReader;
import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.statemachine.SnapshotInfo;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppender.java
index 843e6ca..483ef33 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppender.java
@@ -37,36 +37,67 @@ import java.io.IOException;
public interface LogAppender {
Logger LOG = LoggerFactory.getLogger(LogAppender.class);
+ /** Create the default {@link LogAppender}. */
+ static LogAppender newLogAppenderDefault(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
+ return new LogAppenderDefault(server, leaderState, f);
+ }
+
+ /** @return the server. */
RaftServer.Division getServer();
+ /** The same as getServer().getRaftServer().getServerRpc(). */
default RaftServerRpc getServerRpc() {
return getServer().getRaftServer().getServerRpc();
}
+ /** The same as getServer().getRaftLog(). */
default RaftLog getRaftLog() {
return getServer().getRaftLog();
}
+ /** Start this {@link LogAppender}. */
void start();
+ /** Is this {@link LogAppender} running? */
boolean isRunning();
+ /** Stop this {@link LogAppender}. */
void stop();
+ /** @return the leader state. */
LeaderState getLeaderState();
+ /** @return the follower information for this {@link LogAppender}. */
FollowerInfo getFollower();
+ /** The same as getFollower().getPeer().getId(). */
default RaftPeerId getFollowerId() {
return getFollower().getPeer().getId();
}
+ /**
+ * Create a {@link AppendEntriesRequestProto} object using the {@link FollowerInfo} of this {@link LogAppender}.
+ * The {@link AppendEntriesRequestProto} object may contain zero or more log entries.
+ * When there is zero log entries, the {@link AppendEntriesRequestProto} object is a heartbeat.
+ *
+ * @param callId The call id of the returned request.
+ * @param heartbeat the returned request must be a heartbeat.
+ *
+ * @return a new {@link AppendEntriesRequestProto} object.
+ */
AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean heartbeat) throws RaftLogIOException;
+ /** @return a new {@link InstallSnapshotRequestProto} object. */
InstallSnapshotRequestProto newInstallSnapshotNotificationRequest(TermIndex firstAvailableLogTermIndex);
+ /** @return an {@link Iterable} of {@link InstallSnapshotRequestProto} for sending the given snapshot. */
Iterable<InstallSnapshotRequestProto> newInstallSnapshotRequests(String requestId, SnapshotInfo snapshot);
+ /**
+ * Should this {@link LogAppender} send a snapshot to the follower?
+ *
+ * @return the snapshot if it should install a snapshot; otherwise, return null.
+ */
default SnapshotInfo shouldInstallSnapshot() {
// we should install snapshot if the follower needs to catch up and:
// 1. there is no local log entry but there is snapshot
@@ -82,37 +113,47 @@ public interface LogAppender {
return null;
}
+ /** Define how this {@link LogAppender} should run. */
void run() throws InterruptedException, IOException;
+ /**
+ * Similar to {@link #notify()}, wake up this {@link LogAppender} for an event, which can be:
+ * (1) new log entries available,
+ * (2) log indices changed, or
+ * (3) a snapshot installation completed.
+ */
default void notifyLogAppender() {
synchronized (this) {
notify();
}
}
- /** Should the leader send appendEntries RPC to this follower? */
- default boolean shouldSendRequest() {
- return shouldAppendEntries(getFollower().getNextIndex()) || heartbeatTimeout();
+ /** Should the leader send appendEntries RPC to the follower? */
+ default boolean shouldSendAppendEntries() {
+ return hasAppendEntries() || shouldHeartbeat();
}
- default boolean shouldAppendEntries(long followerIndex) {
- return followerIndex < getRaftLog().getNextIndex();
+ /** Does it has outstanding appendEntries? */
+ default boolean hasAppendEntries() {
+ return getFollower().getNextIndex() < getRaftLog().getNextIndex();
}
- default boolean heartbeatTimeout() {
- return getHeartbeatRemainingTime() <= 0;
+ /** The same as getHeartbeatRemainingTime() <= 0. */
+ default boolean shouldHeartbeat() {
+ return getHeartbeatRemainingTimeMs() <= 0;
}
/**
* @return the time in milliseconds that the leader should send a heartbeat.
*/
- default long getHeartbeatRemainingTime() {
+ default long getHeartbeatRemainingTimeMs() {
return getServer().properties().minRpcTimeoutMs()/2 - getFollower().getLastRpcTime().elapsedTimeMs();
}
- default boolean checkResponseTerm(long responseTerm) {
+ /** Handle the event that the follower has replied a term. */
+ default boolean onFollowerTerm(long followerTerm) {
synchronized (getServer()) {
- return isRunning() && getLeaderState().onFollowerTerm(getFollower(), responseTerm);
+ return isRunning() && getLeaderState().onFollowerTerm(getFollower(), followerTerm);
}
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderBase.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
similarity index 51%
rename from ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderBase.java
rename to ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
index 6b045fd..43a84c0 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderBase.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderBase.java
@@ -15,20 +15,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.impl;
+package org.apache.ratis.server.leader;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
-import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
-import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.leader.FollowerInfo;
-import org.apache.ratis.server.leader.LeaderState;
-import org.apache.ratis.server.leader.LogAppender;
+import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLog.EntryWithData;
@@ -38,25 +33,15 @@ import org.apache.ratis.util.DataQueue;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.SizeInBytes;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.io.InterruptedIOException;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
-import java.util.UUID;
-
-import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
-import static org.apache.ratis.server.metrics.RaftLogMetrics.LOG_APPENDER_INSTALL_SNAPSHOT_METRIC;
/**
- * A daemon thread appending log entries to a follower peer.
+ * An abstract implementation of {@link LogAppender}.
*/
-public class LogAppenderBase implements LogAppender {
- public static final Logger LOG = LoggerFactory.getLogger(LogAppenderBase.class);
-
+public abstract class LogAppenderBase implements LogAppender {
private final String name;
private final RaftServer.Division server;
private final LeaderState leaderState;
@@ -67,7 +52,7 @@ public class LogAppenderBase implements LogAppender {
private final LogAppenderDaemon daemon;
- public LogAppenderBase(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
+ protected LogAppenderBase(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
this.follower = f;
this.name = follower.getName() + "-" + JavaUtils.getClassSimpleName(getClass());
this.server = server;
@@ -143,7 +128,7 @@ public class LogAppenderBase implements LogAppender {
public AppendEntriesRequestProto newAppendEntriesRequest(long callId, boolean heartbeat) throws RaftLogIOException {
final TermIndex previous = getPrevious(follower.getNextIndex());
final long snapshotIndex = follower.getSnapshotIndex();
- final long heartbeatRemainingMs = getHeartbeatRemainingTime();
+ final long heartbeatRemainingMs = getHeartbeatRemainingTimeMs();
if (heartbeatRemainingMs <= 0L || heartbeat) {
// heartbeat
return leaderState.newAppendEntriesRequestProto(follower, Collections.emptyList(), previous, callId);
@@ -154,7 +139,7 @@ public class LogAppenderBase implements LogAppender {
final long leaderNext = getRaftLog().getNextIndex();
final long followerNext = follower.getNextIndex();
final long halfMs = heartbeatRemainingMs/2;
- for (long next = followerNext; leaderNext > next && getHeartbeatRemainingTime() - halfMs > 0; ) {
+ for (long next = followerNext; leaderNext > next && getHeartbeatRemainingTimeMs() - halfMs > 0; ) {
if (!buffer.offer(getRaftLog().getEntryWithData(next++))) {
break;
}
@@ -163,7 +148,7 @@ public class LogAppenderBase implements LogAppender {
return null;
}
- final List<LogEntryProto> protos = buffer.pollList(getHeartbeatRemainingTime(), EntryWithData::getEntry,
+ final List<LogEntryProto> protos = buffer.pollList(getHeartbeatRemainingTimeMs(), EntryWithData::getEntry,
(entry, time, exception) -> LOG.warn("{}: Failed to get {} in {}: {}",
follower.getName(), entry, time, exception));
buffer.clear();
@@ -190,47 +175,6 @@ public class LogAppenderBase implements LogAppender {
}
}
- /** Send an appendEntries RPC; retry indefinitely. */
- private AppendEntriesReplyProto sendAppendEntriesWithRetries()
- throws InterruptedException, InterruptedIOException, RaftLogIOException {
- int retry = 0;
- AppendEntriesRequestProto request = null;
- while (isRunning()) { // keep retrying for IOException
- try {
- if (request == null || request.getEntriesCount() == 0) {
- request = newAppendEntriesRequest(DEFAULT_CALLID, false);
- }
-
- if (request == null) {
- LOG.trace("{} no entries to send now, wait ...", this);
- return null;
- } else if (!isRunning()) {
- LOG.info("{} is stopped. Skip appendEntries.", this);
- return null;
- }
-
- follower.updateLastRpcSendTime();
- final AppendEntriesReplyProto r = getServerRpc().appendEntries(request);
- follower.updateLastRpcResponseTime();
-
- getLeaderState().onFollowerCommitIndex(getFollower(), r.getFollowerCommit());
- return r;
- } catch (InterruptedIOException | RaftLogIOException e) {
- throw e;
- } catch (IOException ioe) {
- // TODO should have more detailed retry policy here.
- if (retry++ % 10 == 0) { // to reduce the number of messages
- LOG.warn("{}: Failed to appendEntries (retry={}): {}", this, retry++, ioe);
- }
- handleException(ioe);
- }
- if (isRunning()) {
- server.properties().rpcSleepTime().sleep();
- }
- }
- return null;
- }
-
@Override
public InstallSnapshotRequestProto newInstallSnapshotNotificationRequest(TermIndex firstAvailableLogTermIndex) {
Preconditions.assertTrue(firstAvailableLogTermIndex.getIndex() > 0);
@@ -242,106 +186,6 @@ public class LogAppenderBase implements LogAppender {
@Override
public Iterable<InstallSnapshotRequestProto> newInstallSnapshotRequests(String requestId, SnapshotInfo snapshot) {
- return ServerImplUtils.newInstallSnapshotRequests(server, getFollowerId(), requestId,
- snapshot, snapshotChunkMaxSize);
- }
-
- private InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot) throws InterruptedIOException {
- String requestId = UUID.randomUUID().toString();
- InstallSnapshotReplyProto reply = null;
- try {
- for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) {
- follower.updateLastRpcSendTime();
- reply = getServerRpc().installSnapshot(request);
- follower.updateLastRpcResponseTime();
-
- if (!reply.getServerReply().getSuccess()) {
- return reply;
- }
- }
- } catch (InterruptedIOException iioe) {
- throw iioe;
- } catch (Exception ioe) {
- LOG.warn("{}: Failed to installSnapshot {}: {}", this, snapshot, ioe);
- handleException(ioe);
- return null;
- }
-
- if (reply != null) {
- follower.setSnapshotIndex(snapshot.getTermIndex().getIndex());
- LOG.info("{}: installSnapshot {} successfully", this, snapshot);
- server.getRaftServerMetrics().getCounter(LOG_APPENDER_INSTALL_SNAPSHOT_METRIC).inc();
- }
- return reply;
- }
-
- @Override
- public void run() throws InterruptedException, IOException {
- while (isRunning()) {
- if (shouldSendRequest()) {
- SnapshotInfo snapshot = shouldInstallSnapshot();
- if (snapshot != null) {
- LOG.info("{}: followerNextIndex = {} but logStartIndex = {}, send snapshot {} to follower",
- this, follower.getNextIndex(), getRaftLog().getStartIndex(), snapshot);
-
- final InstallSnapshotReplyProto r = installSnapshot(snapshot);
- if (r != null && r.getResult() == InstallSnapshotResult.NOT_LEADER) {
- checkResponseTerm(r.getTerm());
- } // otherwise if r is null, retry the snapshot installation
- } else {
- final AppendEntriesReplyProto r = sendAppendEntriesWithRetries();
- if (r != null) {
- handleReply(r);
- }
- }
- }
- if (isRunning() && !shouldAppendEntries(follower.getNextIndex())) {
- final long waitTime = getHeartbeatRemainingTime();
- if (waitTime > 0) {
- synchronized (this) {
- wait(waitTime);
- }
- }
- }
- getLeaderState().checkHealth(getFollower());
- }
- }
-
- private void handleReply(AppendEntriesReplyProto reply) throws IllegalArgumentException {
- if (reply != null) {
- switch (reply.getResult()) {
- case SUCCESS:
- final long oldNextIndex = follower.getNextIndex();
- final long nextIndex = reply.getNextIndex();
- if (nextIndex < oldNextIndex) {
- throw new IllegalStateException("nextIndex=" + nextIndex
- + " < oldNextIndex=" + oldNextIndex
- + ", reply=" + ServerProtoUtils.toString(reply));
- }
-
- if (nextIndex > oldNextIndex) {
- follower.updateMatchIndex(nextIndex - 1);
- follower.increaseNextIndex(nextIndex);
- getLeaderState().onFollowerSuccessAppendEntries(getFollower());
- }
- break;
- case NOT_LEADER:
- // check if should step down
- checkResponseTerm(reply.getTerm());
- break;
- case INCONSISTENCY:
- follower.decreaseNextIndex(reply.getNextIndex());
- break;
- case UNRECOGNIZED:
- LOG.warn("{}: received {}", this, reply.getResult());
- break;
- default: throw new IllegalArgumentException("Unable to process result " + reply.getResult());
- }
- }
- }
-
- private void handleException(Exception e) {
- LOG.trace("TRACE", e);
- getServerRpc().handleException(getFollowerId(), e, false);
+ return new InstallSnapshotRequests(server, getFollowerId(), requestId, snapshot, snapshotChunkMaxSize);
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
similarity index 97%
rename from ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java
rename to ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
index 4cecc9c..6b2d607 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppenderDaemon.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDaemon.java
@@ -15,9 +15,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.impl;
+package org.apache.ratis.server.leader;
-import org.apache.ratis.server.leader.LogAppender;
import org.apache.ratis.util.Daemon;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LifeCycle;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
new file mode 100644
index 0000000..5eb85d8
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.leader;
+
+import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotReplyProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotRequestProto;
+import org.apache.ratis.proto.RaftProtos.InstallSnapshotResult;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.impl.ServerProtoUtils;
+import org.apache.ratis.server.raftlog.RaftLogIOException;
+import org.apache.ratis.statemachine.SnapshotInfo;
+
+import java.io.IOException;
+import java.io.InterruptedIOException;
+import java.util.UUID;
+
+import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
+import static org.apache.ratis.server.metrics.RaftLogMetrics.LOG_APPENDER_INSTALL_SNAPSHOT_METRIC;
+
+/**
+ * The default implementation of {@link LogAppender}
+ * using {@link org.apache.ratis.server.protocol.RaftServerProtocol}.
+ */
+class LogAppenderDefault extends LogAppenderBase {
+ LogAppenderDefault(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
+ super(server, leaderState, f);
+ }
+
+ /** Send an appendEntries RPC; retry indefinitely. */
+ private AppendEntriesReplyProto sendAppendEntriesWithRetries()
+ throws InterruptedException, InterruptedIOException, RaftLogIOException {
+ int retry = 0;
+ AppendEntriesRequestProto request = null;
+ while (isRunning()) { // keep retrying for IOException
+ try {
+ if (request == null || request.getEntriesCount() == 0) {
+ request = newAppendEntriesRequest(DEFAULT_CALLID, false);
+ }
+
+ if (request == null) {
+ LOG.trace("{} no entries to send now, wait ...", this);
+ return null;
+ } else if (!isRunning()) {
+ LOG.info("{} is stopped. Skip appendEntries.", this);
+ return null;
+ }
+
+ getFollower().updateLastRpcSendTime();
+ final AppendEntriesReplyProto r = getServerRpc().appendEntries(request);
+ getFollower().updateLastRpcResponseTime();
+
+ getLeaderState().onFollowerCommitIndex(getFollower(), r.getFollowerCommit());
+ return r;
+ } catch (InterruptedIOException | RaftLogIOException e) {
+ throw e;
+ } catch (IOException ioe) {
+ // TODO should have more detailed retry policy here.
+ if (retry++ % 10 == 0) { // to reduce the number of messages
+ LOG.warn("{}: Failed to appendEntries (retry={}): {}", this, retry++, ioe);
+ }
+ handleException(ioe);
+ }
+ if (isRunning()) {
+ getServer().properties().rpcSleepTime().sleep();
+ }
+ }
+ return null;
+ }
+
+ private InstallSnapshotReplyProto installSnapshot(SnapshotInfo snapshot) throws InterruptedIOException {
+ String requestId = UUID.randomUUID().toString();
+ InstallSnapshotReplyProto reply = null;
+ try {
+ for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) {
+ getFollower().updateLastRpcSendTime();
+ reply = getServerRpc().installSnapshot(request);
+ getFollower().updateLastRpcResponseTime();
+
+ if (!reply.getServerReply().getSuccess()) {
+ return reply;
+ }
+ }
+ } catch (InterruptedIOException iioe) {
+ throw iioe;
+ } catch (Exception ioe) {
+ LOG.warn("{}: Failed to installSnapshot {}: {}", this, snapshot, ioe);
+ handleException(ioe);
+ return null;
+ }
+
+ if (reply != null) {
+ getFollower().setSnapshotIndex(snapshot.getTermIndex().getIndex());
+ LOG.info("{}: installSnapshot {} successfully", this, snapshot);
+ getServer().getRaftServerMetrics().getCounter(LOG_APPENDER_INSTALL_SNAPSHOT_METRIC).inc();
+ }
+ return reply;
+ }
+
+ @Override
+ public void run() throws InterruptedException, IOException {
+ while (isRunning()) {
+ if (shouldSendAppendEntries()) {
+ SnapshotInfo snapshot = shouldInstallSnapshot();
+ if (snapshot != null) {
+ LOG.info("{}: followerNextIndex = {} but logStartIndex = {}, send snapshot {} to follower",
+ this, getFollower().getNextIndex(), getRaftLog().getStartIndex(), snapshot);
+
+ final InstallSnapshotReplyProto r = installSnapshot(snapshot);
+ if (r != null && r.getResult() == InstallSnapshotResult.NOT_LEADER) {
+ onFollowerTerm(r.getTerm());
+ } // otherwise if r is null, retry the snapshot installation
+ } else {
+ final AppendEntriesReplyProto r = sendAppendEntriesWithRetries();
+ if (r != null) {
+ handleReply(r);
+ }
+ }
+ }
+ if (isRunning() && !hasAppendEntries()) {
+ final long waitTime = getHeartbeatRemainingTimeMs();
+ if (waitTime > 0) {
+ synchronized (this) {
+ wait(waitTime);
+ }
+ }
+ }
+ getLeaderState().checkHealth(getFollower());
+ }
+ }
+
+ private void handleReply(AppendEntriesReplyProto reply) throws IllegalArgumentException {
+ if (reply != null) {
+ switch (reply.getResult()) {
+ case SUCCESS:
+ final long oldNextIndex = getFollower().getNextIndex();
+ final long nextIndex = reply.getNextIndex();
+ if (nextIndex < oldNextIndex) {
+ throw new IllegalStateException("nextIndex=" + nextIndex
+ + " < oldNextIndex=" + oldNextIndex
+ + ", reply=" + ServerProtoUtils.toString(reply));
+ }
+
+ if (nextIndex > oldNextIndex) {
+ getFollower().updateMatchIndex(nextIndex - 1);
+ getFollower().increaseNextIndex(nextIndex);
+ getLeaderState().onFollowerSuccessAppendEntries(getFollower());
+ }
+ break;
+ case NOT_LEADER:
+ // check if should step down
+ onFollowerTerm(reply.getTerm());
+ break;
+ case INCONSISTENCY:
+ getFollower().decreaseNextIndex(reply.getNextIndex());
+ break;
+ case UNRECOGNIZED:
+ LOG.warn("{}: received {}", this, reply.getResult());
+ break;
+ default: throw new IllegalArgumentException("Unable to process result " + reply.getResult());
+ }
+ }
+ }
+
+ private void handleException(Exception e) {
+ LOG.trace("TRACE", e);
+ getServerRpc().handleException(getFollowerId(), e, false);
+ }
+}