You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/04 13:04:19 UTC
[incubator-ratis] branch master updated: RATIS-1205. Change
LogAppender to use RaftServer.Division and LeaderState. (#323)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 3569b13 RATIS-1205. Change LogAppender to use RaftServer.Division and LeaderState. (#323)
3569b13 is described below
commit 3569b13d684ec1ac50f3a6d4c7560d5711ef22af
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Fri Dec 4 21:04:13 2020 +0800
RATIS-1205. Change LogAppender to use RaftServer.Division and LeaderState. (#323)
---
.../apache/ratis/grpc/server/GrpcLogAppender.java | 9 ++--
.../apache/ratis/server/DivisionProperties.java | 55 +++++++++++++++++++
.../java/org/apache/ratis/server/RaftServer.java | 14 +++++
.../ratis/server/impl/DivisionPropertiesImpl.java | 61 ++++++++++++++++++++++
.../apache/ratis/server/impl/FollowerState.java | 2 +-
.../apache/ratis/server/impl/LeaderStateImpl.java | 40 ++++++++------
.../org/apache/ratis/server/impl/LogAppender.java | 56 ++++++++------------
.../apache/ratis/server/impl/RaftServerImpl.java | 30 +++++------
.../apache/ratis/server/impl/RaftServerProxy.java | 8 +--
.../org/apache/ratis/server/impl/ServerState.java | 2 +-
.../apache/ratis/server/leader/LeaderState.java | 26 +++++++++
.../ratis/server/impl/RaftServerTestUtil.java | 2 +-
.../ratis/datastream/DataStreamBaseTest.java | 15 +++++-
13 files changed, 237 insertions(+), 83 deletions(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 38a2e41..85d3572 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -140,8 +140,7 @@ public class GrpcLogAppender extends LogAppender {
appendLog(installSnapshotRequired || haveTooManyPendingRequests());
}
- checkSlowness();
-
+ getLeaderState().checkHealth(getFollower());
}
Optional.ofNullable(appendLogRequestObserver).ifPresent(StreamObserver::onCompleted);
@@ -299,9 +298,9 @@ public class GrpcLogAppender extends LogAppender {
switch (reply.getResult()) {
case SUCCESS:
grpcServerMetrics.onRequestSuccess(getFollowerId().toString(), reply.getIsHearbeat());
- updateCommitIndex(reply.getFollowerCommit());
+ getLeaderState().onFollowerCommitIndex(getFollower(), reply.getFollowerCommit());
if (getFollower().updateMatchIndex(reply.getMatchIndex())) {
- submitEventOnSuccessAppend();
+ getLeaderState().onFollowerSuccessAppendEntries(getFollower());
}
break;
case NOT_LEADER:
@@ -417,7 +416,7 @@ public class GrpcLogAppender extends LogAppender {
final long followerSnapshotIndex = reply.getSnapshotIndex();
LOG.info("{}: Already Installed Snapshot Index {}.", this, followerSnapshotIndex);
getFollower().setSnapshotIndex(followerSnapshotIndex);
- updateCommitIndex(followerSnapshotIndex);
+ getLeaderState().onFollowerCommitIndex(getFollower(), followerSnapshotIndex);
increaseNextIndex(followerSnapshotIndex);
removePending(reply);
break;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/DivisionProperties.java b/ratis-server/src/main/java/org/apache/ratis/server/DivisionProperties.java
new file mode 100644
index 0000000..a7b8c9d
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/DivisionProperties.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server;
+
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * The properties set for a server division.
+ *
+ * @see RaftServerConfigKeys
+ */
+public interface DivisionProperties {
+ Logger LOG = LoggerFactory.getLogger(DivisionProperties.class);
+
+ /** @return the minimum rpc timeout. */
+ TimeDuration minRpcTimeout();
+
+ /** @return the minimum rpc timeout in milliseconds. */
+ default int minRpcTimeoutMs() {
+ return minRpcTimeout().toIntExact(TimeUnit.MILLISECONDS);
+ }
+
+ /** @return the maximum rpc timeout. */
+ TimeDuration maxRpcTimeout();
+
+ /** @return the maximum rpc timeout in milliseconds. */
+ default int maxRpcTimeoutMs() {
+ return maxRpcTimeout().toIntExact(TimeUnit.MILLISECONDS);
+ }
+
+ /** @return the rpc sleep time period. */
+ TimeDuration rpcSleepTime();
+
+ /** @return the rpc slowness timeout. */
+ TimeDuration rpcSlownessTimeout();
+}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index d6e267b..b76705f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -25,6 +25,7 @@ import org.apache.ratis.rpc.RpcFactory;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.server.impl.RaftConfiguration;
import org.apache.ratis.server.impl.ServerImplUtils;
+import org.apache.ratis.server.metrics.RaftServerMetrics;
import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
import org.apache.ratis.server.protocol.RaftServerProtocol;
import org.apache.ratis.server.raftlog.RaftLog;
@@ -50,6 +51,9 @@ public interface RaftServer extends Closeable, RpcType.Get,
interface Division extends Closeable {
Logger LOG = LoggerFactory.getLogger(Division.class);
+ /** @return the {@link DivisionProperties} for this division. */
+ DivisionProperties properties();
+
/** @return the {@link RaftGroupMemberId} for this division. */
RaftGroupMemberId getMemberId();
@@ -79,6 +83,9 @@ public interface RaftServer extends Closeable, RpcType.Get,
/** @return the {@link RaftServer} containing this division. */
RaftServer getRaftServer();
+ /** @return the {@link RaftServerMetrics} for this division. */
+ RaftServerMetrics getRaftServerMetrics();
+
/** @return the {@link StateMachine} for this division. */
StateMachine getStateMachine();
@@ -115,6 +122,13 @@ public interface RaftServer extends Closeable, RpcType.Get,
/** @return the server properties. */
RaftProperties getProperties();
+ /** @return the rpc service. */
+ RaftServerRpc getServerRpc();
+
+ default RpcType getRpcType() {
+ return getFactory().getRpcType();
+ }
+
/** @return the factory for creating server components. */
RpcFactory getFactory();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/DivisionPropertiesImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/DivisionPropertiesImpl.java
new file mode 100644
index 0000000..63cbc02
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/DivisionPropertiesImpl.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.impl;
+
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.server.DivisionProperties;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.TimeDuration;
+
+class DivisionPropertiesImpl implements DivisionProperties {
+ private final TimeDuration rpcTimeoutMin;
+ private final TimeDuration rpcTimeoutMax;
+ private final TimeDuration rpcSleepTime;
+ private final TimeDuration rpcSlownessTimeout;
+
+ DivisionPropertiesImpl(RaftProperties properties) {
+ this.rpcTimeoutMin = RaftServerConfigKeys.Rpc.timeoutMin(properties);
+ this.rpcTimeoutMax = RaftServerConfigKeys.Rpc.timeoutMax(properties);
+ Preconditions.assertTrue(rpcTimeoutMax.compareTo(rpcTimeoutMin) >= 0,
+ "rpcTimeoutMax = %s < rpcTimeoutMin = %s", rpcTimeoutMax, rpcTimeoutMin);
+
+ this.rpcSleepTime = RaftServerConfigKeys.Rpc.sleepTime(properties);
+ this.rpcSlownessTimeout = RaftServerConfigKeys.Rpc.slownessTimeout(properties);
+ }
+
+ @Override
+ public TimeDuration minRpcTimeout() {
+ return rpcTimeoutMin;
+ }
+
+ @Override
+ public TimeDuration maxRpcTimeout() {
+ return rpcTimeoutMax;
+ }
+
+ @Override
+ public TimeDuration rpcSleepTime() {
+ return rpcSleepTime;
+ }
+
+ @Override
+ public TimeDuration rpcSlownessTimeout() {
+ return rpcSlownessTimeout;
+ }
+}
\ No newline at end of file
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index 14bff6d..c54291a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -87,7 +87,7 @@ class FollowerState extends Daemon {
}
boolean shouldWithholdVotes() {
- return lastRpcTime.elapsedTimeMs() < server.getMinTimeoutMs();
+ return lastRpcTime.elapsedTime().compareTo(server.properties().minRpcTimeout()) < 0;
}
void stopRunning() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 1543dc3..3d52e44 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -242,7 +242,6 @@ class LeaderStateImpl implements LeaderState {
private volatile boolean running = true;
private final int stagingCatchupGap;
- private final TimeDuration syncInterval;
private final long placeHolderIndex;
private final RaftServerMetrics raftServerMetrics;
private final LogAppenderMetrics logAppenderMetrics;
@@ -253,7 +252,6 @@ class LeaderStateImpl implements LeaderState {
final RaftProperties properties = server.getRaftServer().getProperties();
stagingCatchupGap = RaftServerConfigKeys.stagingCatchupGap(properties);
- syncInterval = RaftServerConfigKeys.Rpc.sleepTime(properties);
final ServerState state = server.getState();
this.raftLog = state.getLog();
@@ -326,7 +324,8 @@ class LeaderStateImpl implements LeaderState {
return currentTerm;
}
- boolean handleResponseTerm(FollowerInfo follower, long followerTerm) {
+ @Override
+ public boolean onFollowerTerm(FollowerInfo follower, long followerTerm) {
if (isAttendingVote(follower) && followerTerm > getCurrentTerm()) {
submitStepDownEvent(followerTerm, StepDownReason.HIGHER_TERM);
return true;
@@ -334,10 +333,6 @@ class LeaderStateImpl implements LeaderState {
return false;
}
- TimeDuration getSyncInterval() {
- return syncInterval;
- }
-
/**
* Start bootstrapping new peers
*/
@@ -413,7 +408,14 @@ class LeaderStateImpl implements LeaderState {
}
}
- void commitIndexChanged() {
+ @Override
+ public void onFollowerCommitIndex(FollowerInfo follower, long commitIndex) {
+ if (follower.updateCommitIndex(commitIndex)) {
+ commitIndexChanged();
+ }
+ }
+
+ private void commitIndexChanged() {
getMajorityMin(FollowerInfo::getCommitIndex, raftLog::getLastCommittedIndex).ifPresent(m -> {
// Normally, leader commit index is always ahead of followers.
// However, after a leader change, the new leader commit index may
@@ -450,7 +452,8 @@ class LeaderStateImpl implements LeaderState {
}
}
- AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follower,
+ @Override
+ public AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follower,
List<LogEntryProto> entries, TermIndex previous, long callId) {
final boolean initializing = isAttendingVote(follower);
final RaftPeerId targetId = follower.getPeer().getId();
@@ -487,7 +490,8 @@ class LeaderStateImpl implements LeaderState {
senders.removeAll(toStop);
}
- void restartSender(LogAppender sender) {
+ @Override
+ public void restart(LogAppender sender) {
final FollowerInfo follower = sender.getFollower();
LOG.info("{}: Restarting {} for {}", this, JavaUtils.getClassSimpleName(sender.getClass()), follower.getName());
sender.stopAppender();
@@ -611,7 +615,8 @@ class LeaderStateImpl implements LeaderState {
.collect(Collectors.toCollection(ArrayList::new));
}
- void submitEventOnSuccessAppend(FollowerInfo follower) {
+ @Override
+ public void onFollowerSuccessAppendEntries(FollowerInfo follower) {
if (isAttendingVote(follower)) {
submitUpdateCommitEvent();
} else {
@@ -767,7 +772,7 @@ class LeaderStateImpl implements LeaderState {
LOG.info("{} is not included in the new configuration {}. Will shutdown server...", this, conf);
try {
// leave some time for all RPC senders to send out new conf entry
- Thread.sleep(server.getMinTimeoutMs());
+ server.properties().minRpcTimeout().sleep();
} catch (InterruptedException ignored) {
Thread.currentThread().interrupt();
}
@@ -1015,10 +1020,13 @@ class LeaderStateImpl implements LeaderState {
return ((FollowerInfoImpl)follower).isAttendingVote();
}
- /**
- * Record Follower Heartbeat Elapsed Time.
- */
- void recordFollowerHeartbeatElapsedTime(RaftPeerId followerId, TimeDuration elapsedTime) {
+ @Override
+ public void checkHealth(FollowerInfo follower) {
+ final TimeDuration elapsedTime = follower.getLastRpcResponseTime().elapsedTime();
+ if (elapsedTime.compareTo(server.properties().rpcSlownessTimeout()) > 0) {
+ server.getStateMachine().leaderEvent().notifyFollowerSlowness(server.getInfo().getRoleInfoProto());
+ }
+ final RaftPeerId followerId = follower.getPeer().getId();
raftServerMetrics.recordFollowerHeartbeatElapsedTime(followerId, elapsedTime.toLong(TimeUnit.NANOSECONDS));
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index d9aeab4..493b820 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -96,7 +96,7 @@ public class LogAppender {
lifeCycle.transitionIfNotEqual(EXCEPTION);
}
if (lifeCycle.getCurrentState() == EXCEPTION) {
- leaderState.restartSender(LogAppender.this);
+ leaderState.restart(LogAppender.this);
}
}
}
@@ -136,8 +136,8 @@ public class LogAppender {
}
private final String name;
- private final RaftServerImpl server;
- private final LeaderStateImpl leaderState;
+ private final RaftServer.Division server;
+ private final LeaderState leaderState;
private final RaftLog raftLog;
private final FollowerInfo follower;
@@ -150,13 +150,13 @@ public class LogAppender {
public LogAppender(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
this.follower = f;
this.name = follower.getName() + "-" + JavaUtils.getClassSimpleName(getClass());
- this.server = (RaftServerImpl) server;
- this.leaderState = (LeaderStateImpl) leaderState;
- this.raftLog = this.server.getState().getLog();
+ this.server = server;
+ this.leaderState = leaderState;
+ this.raftLog = server.getRaftLog();
final RaftProperties properties = server.getRaftServer().getProperties();
this.snapshotChunkMaxSize = RaftServerConfigKeys.Log.Appender.snapshotChunkSizeMax(properties).getSizeInt();
- this.halfMinTimeoutMs = this.server.getMinTimeoutMs() / 2;
+ this.halfMinTimeoutMs = server.properties().minRpcTimeoutMs() / 2;
final SizeInBytes bufferByteLimit = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
final int bufferElementLimit = RaftServerConfigKeys.Log.Appender.bufferElementLimit(properties);
@@ -201,6 +201,10 @@ public class LogAppender {
return getFollower().getPeer().getId();
}
+ public LeaderState getLeaderState() {
+ return leaderState;
+ }
+
private TermIndex getPrevious(long nextIndex) {
if (nextIndex == RaftLog.LEAST_VALID_LOG_INDEX) {
return null;
@@ -212,7 +216,7 @@ public class LogAppender {
return previous;
}
- final SnapshotInfo snapshot = server.getState().getLatestSnapshot();
+ final SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot();
if (snapshot != null) {
final TermIndex snapshotTermIndex = snapshot.getTermIndex();
if (snapshotTermIndex.getIndex() == previousIndex) {
@@ -294,10 +298,10 @@ public class LogAppender {
}
follower.updateLastRpcSendTime();
- final AppendEntriesReplyProto r = server.getServerRpc().appendEntries(request);
+ final AppendEntriesReplyProto r = getServerRpc().appendEntries(request);
follower.updateLastRpcResponseTime();
- updateCommitIndex(r.getFollowerCommit());
+ getLeaderState().onFollowerCommitIndex(getFollower(), r.getFollowerCommit());
return r;
} catch (InterruptedIOException | RaftLogIOException e) {
throw e;
@@ -309,18 +313,12 @@ public class LogAppender {
handleException(ioe);
}
if (isAppenderRunning()) {
- leaderState.getSyncInterval().sleep();
+ server.properties().rpcSleepTime().sleep();
}
}
return null;
}
- protected void updateCommitIndex(long commitIndex) {
- if (follower.updateCommitIndex(commitIndex)) {
- leaderState.commitIndexChanged();
- }
- }
-
protected InstallSnapshotRequestProto newInstallSnapshotNotificationRequest(TermIndex firstAvailableLogTermIndex) {
Preconditions.assertTrue(firstAvailableLogTermIndex.getIndex() > 0);
synchronized (server) {
@@ -340,7 +338,7 @@ public class LogAppender {
try {
for (InstallSnapshotRequestProto request : newInstallSnapshotRequests(requestId, snapshot)) {
follower.updateLastRpcSendTime();
- reply = server.getServerRpc().installSnapshot(request);
+ reply = getServerRpc().installSnapshot(request);
follower.updateLastRpcResponseTime();
if (!reply.getServerReply().getSuccess()) {
@@ -369,7 +367,7 @@ public class LogAppender {
// 1. there is no local log entry but there is snapshot
// 2. or the follower's next index is smaller than the log start index
if (follower.getNextIndex() < raftLog.getNextIndex()) {
- SnapshotInfo snapshot = server.getState().getLatestSnapshot();
+ final SnapshotInfo snapshot = server.getStateMachine().getLatestSnapshot();
if (follower.getNextIndex() < logStartIndex ||
(logStartIndex == RaftLog.INVALID_LOG_INDEX && snapshot != null)) {
return snapshot;
@@ -406,7 +404,7 @@ public class LogAppender {
}
}
}
- checkSlowness();
+ getLeaderState().checkHealth(getFollower());
}
}
@@ -425,7 +423,7 @@ public class LogAppender {
if (nextIndex > oldNextIndex) {
follower.updateMatchIndex(nextIndex - 1);
follower.increaseNextIndex(nextIndex);
- submitEventOnSuccessAppend();
+ getLeaderState().onFollowerSuccessAppendEntries(getFollower());
}
break;
case NOT_LEADER:
@@ -445,19 +443,7 @@ public class LogAppender {
private void handleException(Exception e) {
LOG.trace("TRACE", e);
- server.getServerRpc().handleException(follower.getPeer().getId(), e, false);
- }
-
- protected void submitEventOnSuccessAppend() {
- leaderState.submitEventOnSuccessAppend(follower);
- }
-
- protected void checkSlowness() {
- final TimeDuration lastRpcResponseElapsed = follower.getLastRpcResponseTime().elapsedTime();
- if (lastRpcResponseElapsed.compareTo(server.getRpcSlownessTimeout()) > 0) {
- server.getStateMachine().leaderEvent().notifyFollowerSlowness(server.getInfo().getRoleInfoProto());
- }
- leaderState.recordFollowerHeartbeatElapsedTime(follower.getPeer().getId(), lastRpcResponseElapsed);
+ getServerRpc().handleException(getFollowerId(), e, false);
}
public synchronized void notifyAppend() {
@@ -494,7 +480,7 @@ public class LogAppender {
protected boolean checkResponseTerm(long responseTerm) {
synchronized (server) {
- return isAppenderRunning() && leaderState.handleResponseTerm(follower, responseTerm);
+ return isAppenderRunning() && leaderState.onFollowerTerm(follower, responseTerm);
}
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 4a9bac6..3bca7a0 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -34,6 +34,7 @@ import org.apache.ratis.protocol.exceptions.StaleReadException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.server.DataStreamMap;
import org.apache.ratis.server.DivisionInfo;
+import org.apache.ratis.server.DivisionProperties;
import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
@@ -134,10 +135,9 @@ class RaftServerImpl implements RaftServer.Division,
private final StateMachine stateMachine;
private final Info info = new Info();
- private final int minTimeoutMs;
+ private final DivisionProperties divisionProperties;
private final int maxTimeoutMs;
private final TimeDuration leaderStepDownWaitTime;
- private final TimeDuration rpcSlownessTimeout;
private final TimeDuration sleepDeviationThreshold;
private final boolean installSnapshotEnabled;
@@ -173,14 +173,11 @@ class RaftServerImpl implements RaftServer.Division,
this.role = new RoleInfo(id);
final RaftProperties properties = proxy.getProperties();
- minTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMin(properties).toIntExact(TimeUnit.MILLISECONDS);
- maxTimeoutMs = RaftServerConfigKeys.Rpc.timeoutMax(properties).toIntExact(TimeUnit.MILLISECONDS);
- this.rpcSlownessTimeout = RaftServerConfigKeys.Rpc.slownessTimeout(properties);
+ this.divisionProperties = new DivisionPropertiesImpl(properties);
+ maxTimeoutMs = properties().maxRpcTimeoutMs();
leaderStepDownWaitTime = RaftServerConfigKeys.LeaderElection.leaderStepDownWaitTime(properties);
this.sleepDeviationThreshold = RaftServerConfigKeys.sleepDeviationThreshold(properties);
installSnapshotEnabled = RaftServerConfigKeys.Log.Appender.installSnapshotEnabled(properties);
- Preconditions.assertTrue(maxTimeoutMs > minTimeoutMs,
- "max timeout: %s, min timeout: %s", maxTimeoutMs, minTimeoutMs);
this.proxy = proxy;
this.state = new ServerState(id, group, properties, this, stateMachine);
@@ -205,6 +202,11 @@ class RaftServerImpl implements RaftServer.Division,
});
}
+ @Override
+ public DivisionProperties properties() {
+ return divisionProperties;
+ }
+
private RetryCache initRetryCache(RaftProperties prop) {
final TimeDuration expireTime = RaftServerConfigKeys.RetryCache.expiryTime(prop);
return new RetryCache(expireTime);
@@ -214,20 +216,13 @@ class RaftServerImpl implements RaftServer.Division,
return getRaftServer().getFactory().newLogAppender(this, leaderState, f);
}
- int getMinTimeoutMs() {
- return minTimeoutMs;
- }
-
int getMaxTimeoutMs() {
return maxTimeoutMs;
}
- TimeDuration getRpcSlownessTimeout() {
- return rpcSlownessTimeout;
- }
-
TimeDuration getRandomElectionTimeout() {
- final long millis = minTimeoutMs + ThreadLocalRandom.current().nextInt(maxTimeoutMs - minTimeoutMs + 1);
+ final int min = properties().minRpcTimeoutMs();
+ final int millis = min + ThreadLocalRandom.current().nextInt(properties().maxRpcTimeoutMs() - min + 1);
return TimeDuration.valueOf(millis, TimeUnit.MILLISECONDS);
}
@@ -1599,7 +1594,8 @@ class RaftServerImpl implements RaftServer.Division,
return leaderElectionMetrics;
}
- RaftServerMetrics getRaftServerMetrics() {
+ @Override
+ public RaftServerMetrics getRaftServerMetrics() {
return raftServerMetrics;
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index df702bb..81b010c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -292,11 +292,6 @@ class RaftServerProxy implements RaftServer {
}
@Override
- public RpcType getRpcType() {
- return getFactory().getRpcType();
- }
-
- @Override
public ServerFactory getFactory() {
return factory;
}
@@ -306,7 +301,8 @@ class RaftServerProxy implements RaftServer {
return properties;
}
- RaftServerRpc getServerRpc() {
+ @Override
+ public RaftServerRpc getServerRpc() {
return serverRpc;
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index a40e0d1..855f4a0 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -429,7 +429,7 @@ class ServerState implements Closeable {
latestInstalledSnapshot.set(lastTermIndexInSnapshot);
}
- SnapshotInfo getLatestSnapshot() {
+ private SnapshotInfo getLatestSnapshot() {
return server.getStateMachine().getLatestSnapshot();
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LeaderState.java
index da635a5..24649d7 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LeaderState.java
@@ -17,12 +17,19 @@
*/
package org.apache.ratis.server.leader;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
+import org.apache.ratis.server.impl.LogAppender;
+import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.util.JavaUtils;
+import java.util.List;
+
/**
* States for leader only.
*/
public interface LeaderState {
+ /** The reasons that this leader steps down and becomes a follower. */
enum StepDownReason {
HIGHER_TERM, HIGHER_PRIORITY, LOST_MAJORITY_HEARTBEATS, STATE_MACHINE_EXCEPTION, JVM_PAUSE;
@@ -33,4 +40,23 @@ public interface LeaderState {
return longName;
}
}
+
+ /** Restart the given {@link LogAppender}. */
+ void restart(LogAppender appender);
+
+ /** @return a new {@link AppendEntriesRequestProto} object. */
+ AppendEntriesRequestProto newAppendEntriesRequestProto(FollowerInfo follower,
+ List<LogEntryProto> entries, TermIndex previous, long callId);
+
+ /** Check if the follower is healthy. */
+ void checkHealth(FollowerInfo follower);
+
+ /** Handle the event that the follower has replied a term. */
+ boolean onFollowerTerm(FollowerInfo follower, long followerTerm);
+
+ /** Handle the event that the follower has replied a commit index. */
+ void onFollowerCommitIndex(FollowerInfo follower, long commitIndex);
+
+ /** Handle the event that the follower has replied a success append entries. */
+ void onFollowerSuccessAppendEntries(FollowerInfo follower);
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index cf9307c..f216189 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -159,7 +159,7 @@ public class RaftServerTestUtil {
public static void restartLogAppenders(RaftServer.Division server) {
final LeaderStateImpl leaderState = getLeaderState(server).orElseThrow(
() -> new IllegalStateException(server + " is not the leader"));
- leaderState.getLogAppenders().forEach(leaderState::restartSender);
+ leaderState.getLogAppenders().forEach(leaderState::restart);
}
public static RaftServer.Division getDivision(RaftServer server, RaftGroupId groupId) {
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 8d94b9b..0533323 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -19,6 +19,8 @@ package org.apache.ratis.datastream;
import org.apache.ratis.BaseTest;
import org.apache.ratis.server.DivisionInfo;
+import org.apache.ratis.server.DivisionProperties;
+import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.impl.ClientProtoUtils;
@@ -56,6 +58,7 @@ import org.apache.ratis.server.impl.DataStreamServerImpl;
import org.apache.ratis.server.impl.RaftConfiguration;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.impl.ServerFactory;
+import org.apache.ratis.server.metrics.RaftServerMetrics;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.statemachine.StateMachine;
@@ -88,6 +91,11 @@ abstract class DataStreamBaseTest extends BaseTest {
}
@Override
+ public DivisionProperties properties() {
+ return null;
+ }
+
+ @Override
public RaftGroupMemberId getMemberId() {
return null;
}
@@ -108,6 +116,11 @@ abstract class DataStreamBaseTest extends BaseTest {
}
@Override
+ public RaftServerMetrics getRaftServerMetrics() {
+ return null;
+ }
+
+ @Override
public MultiDataStreamStateMachine getStateMachine() {
return stateMachine;
}
@@ -236,7 +249,7 @@ abstract class DataStreamBaseTest extends BaseTest {
}
@Override
- public RpcType getRpcType() {
+ public RaftServerRpc getServerRpc() {
return null;
}