You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/02 11:15:30 UTC
[incubator-ratis] branch master updated: RATIS-1192. Add a public
API to get server states. (#313)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 7557104 RATIS-1192. Add a public API to get server states. (#313)
7557104 is described below
commit 7557104b2f94dad8c63799808fe81ffbe09bd97e
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Wed Dec 2 19:15:21 2020 +0800
RATIS-1192. Add a public API to get server states. (#313)
---
.../java/org/apache/ratis/grpc/GrpcFactory.java | 9 +-
.../apache/ratis/grpc/server/GrpcLogAppender.java | 17 ++--
.../java/org/apache/ratis/server/DivisionInfo.java | 49 ++++++++++
.../java/org/apache/ratis/server/RaftServer.java | 2 +
.../apache/ratis/server/impl/FollowerState.java | 5 +-
.../apache/ratis/server/impl/LeaderElection.java | 6 +-
.../{LeaderState.java => LeaderStateImpl.java} | 57 ++++++-----
.../org/apache/ratis/server/impl/LogAppender.java | 21 ++--
.../apache/ratis/server/impl/RaftServerImpl.java | 107 +++++++++------------
.../org/apache/ratis/server/impl/RoleInfo.java | 26 ++---
.../apache/ratis/server/impl/ServerFactory.java | 5 +-
.../apache/ratis/server/leader/LeaderState.java | 36 +++++++
.../test/java/org/apache/ratis/RaftAsyncTests.java | 8 +-
.../test/java/org/apache/ratis/RaftTestUtil.java | 2 +-
.../ratis/server/impl/LeaderElectionTests.java | 7 +-
.../apache/ratis/server/impl/MiniRaftCluster.java | 6 +-
.../server/impl/RaftReconfigurationBaseTest.java | 3 +-
.../ratis/server/impl/RaftServerTestUtil.java | 17 +---
.../ratis/server/impl/ServerPauseResumeTest.java | 4 +-
.../ratis/datastream/DataStreamBaseTest.java | 9 +-
.../apache/ratis/grpc/TestRaftServerWithGrpc.java | 2 +-
.../org/apache/ratis/grpc/TestRaftWithGrpc.java | 6 +-
22 files changed, 240 insertions(+), 164 deletions(-)
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
index e6efebe..c055848 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcFactory.java
@@ -25,9 +25,11 @@ import org.apache.ratis.grpc.server.GrpcLogAppender;
import org.apache.ratis.grpc.server.GrpcService;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.impl.*;
+import org.apache.ratis.server.impl.LogAppender;
+import org.apache.ratis.server.impl.ServerFactory;
+import org.apache.ratis.server.leader.FollowerInfo;
+import org.apache.ratis.server.leader.LeaderState;
import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -76,8 +78,7 @@ public class GrpcFactory implements ServerFactory, ClientFactory {
}
@Override
- public LogAppender newLogAppender(RaftServerImpl server, LeaderState state,
- FollowerInfo f) {
+ public LogAppender newLogAppender(RaftServer.Division server, LeaderState state, FollowerInfo f) {
return new GrpcLogAppender(server, state, f);
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index 7bd4f5c..9bbf9fe 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -22,11 +22,11 @@ import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.grpc.GrpcUtil;
import org.apache.ratis.grpc.metrics.GrpcServerMetrics;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.leader.FollowerInfo;
-import org.apache.ratis.server.impl.LeaderState;
+import org.apache.ratis.server.leader.LeaderState;
import org.apache.ratis.server.impl.LogAppender;
-import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
@@ -52,7 +52,6 @@ import com.codahale.metrics.Timer;
public class GrpcLogAppender extends LogAppender {
public static final Logger LOG = LoggerFactory.getLogger(GrpcLogAppender.class);
- private final GrpcService rpcService;
private final RequestMap pendingRequests = new RequestMap();
private final int maxPendingRequestsNum;
private long callId = 0;
@@ -66,11 +65,10 @@ public class GrpcLogAppender extends LogAppender {
private final GrpcServerMetrics grpcServerMetrics;
- public GrpcLogAppender(RaftServerImpl server, LeaderState leaderState,
- FollowerInfo f) {
+ public GrpcLogAppender(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
super(server, leaderState, f);
- this.rpcService = (GrpcService) server.getRaftServer().getServerRpc();
+ Preconditions.assertNotNull(getServerRpc(), "getServerRpc()");
final RaftProperties properties = server.getRaftServer().getProperties();
this.maxPendingRequestsNum = GrpcConfigKeys.Server.leaderOutstandingAppendsMax(properties);
@@ -81,8 +79,13 @@ public class GrpcLogAppender extends LogAppender {
grpcServerMetrics.addPendingRequestsCount(getFollowerId().toString(), pendingRequests::logRequestsSize);
}
+ @Override
+ protected GrpcService getServerRpc() {
+ return (GrpcService)super.getServerRpc();
+ }
+
private GrpcServerProtocolClient getClient() throws IOException {
- return rpcService.getProxies().getProxy(getFollowerId());
+ return getServerRpc().getProxies().getProxy(getFollowerId());
}
private synchronized void resetClient(AppendEntriesRequest request, boolean onError) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/DivisionInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/DivisionInfo.java
new file mode 100644
index 0000000..3fd42dc
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/DivisionInfo.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ratis.server;
+
+import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
+import org.apache.ratis.util.LifeCycle;
+
+/**
+ * Information of a {@link RaftServer.Division}.
+ */
+public interface DivisionInfo {
+ RaftPeerRole getCurrentRole();
+
+ default boolean isFollower() {
+ return getCurrentRole() == RaftPeerRole.FOLLOWER;
+ }
+
+ default boolean isCandidate() {
+ return getCurrentRole() == RaftPeerRole.CANDIDATE;
+ }
+
+ default boolean isLeader() {
+ return getCurrentRole() == RaftPeerRole.LEADER;
+ }
+
+ boolean isLeaderReady();
+
+ LifeCycle.State getLifeCycleState();
+
+ default boolean isAlive() {
+ return !getLifeCycleState().isClosingOrClosed();
+ }
+}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index e228bed..14ca81c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -62,6 +62,8 @@ public interface RaftServer extends Closeable, RpcType.Get,
.orElseGet(() -> getRaftServer().getPeer());
}
+ DivisionInfo getInfo();
+
/** @return the {@link RaftGroup} for this division. */
RaftGroup getGroup();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
index b3173bb..14bff6d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/FollowerState.java
@@ -17,6 +17,7 @@
*/
package org.apache.ratis.server.impl;
+import org.apache.ratis.server.leader.LeaderState;
import org.apache.ratis.util.Daemon;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.TimeDuration;
@@ -110,7 +111,7 @@ class FollowerState extends Daemon {
@Override
public void run() {
final TimeDuration sleepDeviationThreshold = server.getSleepDeviationThreshold();
- while (isRunning && server.isFollower()) {
+ while (isRunning && server.getInfo().isFollower()) {
final TimeDuration electionTimeout = server.getRandomElectionTimeout();
try {
final TimeDuration extraSleep = electionTimeout.sleep();
@@ -120,7 +121,7 @@ class FollowerState extends Daemon {
continue;
}
- final boolean isFollower = server.isFollower();
+ final boolean isFollower = server.getInfo().isFollower();
if (!isRunning || !isFollower) {
LOG.info("{}: Stopping now (isRunning? {}, isFollower? {})", this, isRunning, isFollower);
break;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
index 1a31d66..84c842d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderElection.java
@@ -21,6 +21,7 @@ import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
import org.apache.ratis.proto.RaftProtos.RequestVoteRequestProto;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.DivisionInfo;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.ratis.util.Daemon;
@@ -170,7 +171,7 @@ class LeaderElection implements Runnable {
LOG.info("{}: {} is safely ignored since this is already {}",
this, JavaUtils.getClassSimpleName(e.getClass()), state, e);
} else {
- if (!server.isAlive()) {
+ if (!server.getInfo().isAlive()) {
LOG.info("{}: {} is safely ignored since the server is not alive: {}",
this, JavaUtils.getClassSimpleName(e.getClass()), server, e);
} else {
@@ -187,7 +188,8 @@ class LeaderElection implements Runnable {
}
private boolean shouldRun() {
- return lifeCycle.getCurrentState().isRunning() && server.isCandidate() && server.isAlive();
+ final DivisionInfo info = server.getInfo();
+ return lifeCycle.getCurrentState().isRunning() && info.isCandidate() && info.isAlive();
}
private boolean shouldRun(long electionTerm) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
similarity index 96%
rename from ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
rename to ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 7002c44..d50b26f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -18,27 +18,47 @@
package org.apache.ratis.server.impl;
import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
+import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
-import org.apache.ratis.protocol.*;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.NotReplicatedException;
import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException;
-import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.leader.FollowerInfo;
+import org.apache.ratis.server.leader.LeaderState;
import org.apache.ratis.server.metrics.LogAppenderMetrics;
import org.apache.ratis.server.metrics.RaftServerMetrics;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
-import org.apache.ratis.proto.RaftProtos.CommitInfoProto;
-import org.apache.ratis.proto.RaftProtos.AppendEntriesRequestProto;
-import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.server.raftlog.RaftLogIOException;
import org.apache.ratis.statemachine.TransactionContext;
-import org.apache.ratis.util.*;
+import org.apache.ratis.util.CodeInjectionForTesting;
+import org.apache.ratis.util.CollectionUtils;
+import org.apache.ratis.util.Daemon;
+import org.apache.ratis.util.JavaUtils;
+import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.Timestamp;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
@@ -63,24 +83,13 @@ import static org.apache.ratis.server.RaftServer.Division.LOG;
* 3. PendingRequestHandler: a handler sending back responses to clients when
* corresponding log entries are committed
*/
-public class LeaderState {
+class LeaderStateImpl implements LeaderState {
public static final String APPEND_PLACEHOLDER = JavaUtils.getClassSimpleName(LeaderState.class) + ".placeholder";
private enum BootStrapProgress {
NOPROGRESS, PROGRESSING, CAUGHTUP
}
- enum StepDownReason {
- HIGHER_TERM, HIGHER_PRIORITY, LOST_MAJORITY_HEARTBEATS, STATE_MACHINE_EXCEPTION, JVM_PAUSE;
-
- private final String longName = JavaUtils.getClassSimpleName(getClass()) + ":" + name();
-
- @Override
- public String toString() {
- return longName;
- }
- }
-
static class StateUpdateEvent {
private enum Type {
STEP_DOWN, UPDATE_COMMIT, CHECK_STAGING
@@ -238,7 +247,7 @@ public class LeaderState {
private final RaftServerMetrics raftServerMetrics;
private final LogAppenderMetrics logAppenderMetrics;
- LeaderState(RaftServerImpl server) {
+ LeaderStateImpl(RaftServerImpl server) {
this.name = server.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass());
this.server = server;
@@ -319,7 +328,7 @@ public class LeaderState {
boolean handleResponseTerm(FollowerInfo follower, long followerTerm) {
if (isAttendingVote(follower) && followerTerm > getCurrentTerm()) {
- submitStepDownEvent(followerTerm, LeaderState.StepDownReason.HIGHER_TERM);
+ submitStepDownEvent(followerTerm, StepDownReason.HIGHER_TERM);
return true;
}
return false;
@@ -846,7 +855,7 @@ public class LeaderState {
}
private void yieldLeaderToHigherPriorityPeer() {
- if (!server.getRole().isLeader()) {
+ if (!server.getInfo().isLeader()) {
return;
}
@@ -892,7 +901,7 @@ public class LeaderState {
* round of heartbeats to a majority of its cluster.
*/
private void checkLeadership() {
- if (!server.getRole().isLeader()) {
+ if (!server.getInfo().isLeader()) {
return;
}
@@ -978,7 +987,7 @@ public class LeaderState {
LOG.debug(message);
stopAndRemoveSenders(s -> !isAttendingVote(s.getFollower()));
- LeaderState.this.stagingState = null;
+ stagingState = null;
// send back failure response to client's request
pendingRequests.failSetConfiguration(new ReconfigurationTimeoutException(message));
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
index a784409..6d3ad3b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LogAppender.java
@@ -19,8 +19,11 @@ package org.apache.ratis.server.impl;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.leader.LeaderState;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.storage.FileInfo;
import org.apache.ratis.server.raftlog.RaftLog;
@@ -139,7 +142,7 @@ public class LogAppender {
private final String name;
private final RaftServerImpl server;
- private final LeaderState leaderState;
+ private final LeaderStateImpl leaderState;
private final RaftLog raftLog;
private final FollowerInfo follower;
@@ -149,16 +152,16 @@ public class LogAppender {
private final AppenderDaemon daemon;
- public LogAppender(RaftServerImpl server, LeaderState leaderState, FollowerInfo f) {
+ public LogAppender(RaftServer.Division server, LeaderState leaderState, FollowerInfo f) {
this.follower = f;
this.name = follower.getName() + "-" + JavaUtils.getClassSimpleName(getClass());
- this.server = server;
- this.leaderState = leaderState;
- this.raftLog = server.getState().getLog();
+ this.server = (RaftServerImpl) server;
+ this.leaderState = (LeaderStateImpl) leaderState;
+ this.raftLog = this.server.getState().getLog();
final RaftProperties properties = server.getRaftServer().getProperties();
this.snapshotChunkMaxSize = RaftServerConfigKeys.Log.Appender.snapshotChunkSizeMax(properties).getSizeInt();
- this.halfMinTimeoutMs = server.getMinTimeoutMs() / 2;
+ this.halfMinTimeoutMs = this.server.getMinTimeoutMs() / 2;
final SizeInBytes bufferByteLimit = RaftServerConfigKeys.Log.Appender.bufferByteLimit(properties);
final int bufferElementLimit = RaftServerConfigKeys.Log.Appender.bufferElementLimit(properties);
@@ -166,10 +169,14 @@ public class LogAppender {
this.daemon = new AppenderDaemon();
}
- public RaftServerImpl getServer() {
+ protected RaftServer.Division getServer() {
return server;
}
+ protected RaftServerRpc getServerRpc() {
+ return server.getRaftServer().getServerRpc();
+ }
+
public RaftLog getRaftLog() {
return raftLog;
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index b723271..67fb76f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -33,11 +33,13 @@ import org.apache.ratis.protocol.exceptions.ServerNotReadyException;
import org.apache.ratis.protocol.exceptions.StaleReadException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.server.DataStreamMap;
+import org.apache.ratis.server.DivisionInfo;
import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerMXBean;
import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.leader.LeaderState;
import org.apache.ratis.server.metrics.LeaderElectionMetrics;
import org.apache.ratis.server.metrics.RaftServerMetrics;
import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
@@ -89,8 +91,27 @@ public class RaftServerImpl implements RaftServer.Division,
static final String APPEND_ENTRIES = CLASS_NAME + ".appendEntries";
static final String INSTALL_SNAPSHOT = CLASS_NAME + ".installSnapshot";
+ class Info implements DivisionInfo {
+ @Override
+ public RaftPeerRole getCurrentRole() {
+ return getRole().getCurrentRole();
+ }
+
+ @Override
+ public boolean isLeaderReady() {
+ return isLeader() && getRole().isLeaderReady();
+ }
+
+ @Override
+ public LifeCycle.State getLifeCycleState() {
+ return lifeCycle.getCurrentState();
+ }
+ }
+
private final RaftServerProxy proxy;
private final StateMachine stateMachine;
+ private final Info info = new Info();
+
private final int minTimeoutMs;
private final int maxTimeoutMs;
private final TimeDuration leaderStepDownWaitTime;
@@ -287,8 +308,9 @@ public class RaftServerImpl implements RaftServer.Division,
return getState().getMemberId();
}
- public RaftPeerId getId() {
- return getMemberId().getPeerId();
+ @Override
+ public DivisionInfo getInfo() {
+ return info;
}
RoleInfo getRole() {
@@ -393,34 +415,6 @@ public class RaftServerImpl implements RaftServer.Division,
});
}
- public boolean isAlive() {
- return !lifeCycle.getCurrentState().isClosingOrClosed();
- }
-
- public boolean isFollower() {
- return role.isFollower();
- }
-
- public boolean isCandidate() {
- return role.isCandidate();
- }
-
- public boolean isLeader() {
- return role.isLeader();
- }
-
- public boolean isPausingOrPaused() {
- return lifeCycle.getCurrentState().isPausingOrPaused();
- }
-
- /**
- * return ref to the commit info cache.
- * @return commit info cache
- */
- public CommitInfoCache getCommitInfoCache() {
- return commitInfoCache;
- }
-
/**
* Change the server state to Follower if this server is in a different role or force is true.
* @param newTerm The new term.
@@ -452,7 +446,7 @@ public class RaftServerImpl implements RaftServer.Division,
}
synchronized void changeToLeader() {
- Preconditions.assertTrue(isCandidate());
+ Preconditions.assertTrue(getInfo().isCandidate());
role.shutdownLeaderElection();
setRole(RaftPeerRole.LEADER, "changeToLeader");
state.becomeLeader();
@@ -468,7 +462,7 @@ public class RaftServerImpl implements RaftServer.Division,
infos.add(updateCommitInfoCache());
// add the commit infos of other servers
- if (isLeader()) {
+ if (getInfo().isLeader()) {
role.getLeaderState().ifPresent(
leader -> leader.updateFollowerCommitInfos(commitInfoCache, infos));
} else {
@@ -528,7 +522,7 @@ public class RaftServerImpl implements RaftServer.Division,
}
synchronized void changeToCandidate() {
- Preconditions.assertTrue(isFollower());
+ Preconditions.assertTrue(getInfo().isFollower());
role.shutdownFollowerState();
setRole(RaftPeerRole.CANDIDATE, "changeToCandidate");
if (state.shouldNotifyExtendedNoLeader()) {
@@ -580,13 +574,12 @@ public class RaftServerImpl implements RaftServer.Division,
return RetryCache.failWithException(e, entry);
}
- if (!isLeader()) {
+ if (!getInfo().isLeader()) {
NotLeaderException exception = generateNotLeaderException();
final RaftClientReply reply = newExceptionReply(request, exception);
return RetryCache.failWithReply(reply, entry);
}
- final LeaderState leaderState = role.getLeaderState().orElse(null);
- if (leaderState == null || !leaderState.isReady()) {
+ if (!getInfo().isLeaderReady()) {
final RetryCache.CacheEntry cacheEntry = retryCache.get(ClientInvocationId.valueOf(request));
if (cacheEntry != null && cacheEntry.isCompletedNormally()) {
return cacheEntry.getReplyFuture();
@@ -598,19 +591,6 @@ public class RaftServerImpl implements RaftServer.Division,
return null;
}
- public boolean isLeaderReady() {
- if (!isLeader()) {
- return false;
- }
-
- final LeaderState leaderState = role.getLeaderState().orElse(null);
- if (leaderState == null || !leaderState.isReady()) {
- return false;
- }
-
- return true;
- }
-
NotLeaderException generateNotLeaderException() {
if (lifeCycle.getCurrentState() != RUNNING) {
return new NotLeaderException(getMemberId(), null, null);
@@ -658,7 +638,7 @@ public class RaftServerImpl implements RaftServer.Division,
}
// append the message to its local log
- final LeaderState leaderState = role.getLeaderStateNonNull();
+ final LeaderStateImpl leaderState = role.getLeaderStateNonNull();
final PendingRequests.Permit permit = leaderState.tryAcquirePendingRequest(request.getMessage());
if (permit == null) {
cacheEntry.failWithException(new ResourceUnavailableException(
@@ -673,7 +653,7 @@ public class RaftServerImpl implements RaftServer.Division,
RaftClientReply exceptionReply = newExceptionReply(request, e);
cacheEntry.failWithReply(exceptionReply);
// leader will step down here
- if (isLeader()) {
+ if (getInfo().isLeader()) {
leaderState.submitStepDownEvent(LeaderState.StepDownReason.STATE_MACHINE_EXCEPTION);
}
return CompletableFuture.completedFuture(exceptionReply);
@@ -692,7 +672,7 @@ public class RaftServerImpl implements RaftServer.Division,
}
public void stepDownOnJvmPause() {
- if (isLeader()) {
+ if (getInfo().isLeader()) {
role.getLeaderState().ifPresent(leader -> leader.submitStepDownEvent(LeaderState.StepDownReason.JVM_PAUSE));
}
}
@@ -892,7 +872,7 @@ public class RaftServerImpl implements RaftServer.Division,
}
final RaftConfiguration current = getRaftConf();
- final LeaderState leaderState = role.getLeaderStateNonNull();
+ final LeaderStateImpl leaderState = role.getLeaderStateNonNull();
// make sure there is no other raft reconfiguration in progress
if (!current.isStable() || leaderState.inStagingState() || !state.isConfCommitted()) {
throw new ReconfigurationInProgressException(
@@ -917,11 +897,11 @@ public class RaftServerImpl implements RaftServer.Division,
private boolean shouldWithholdVotes(long candidateTerm) {
if (state.getCurrentTerm() < candidateTerm) {
return false;
- } else if (isLeader()) {
+ } else if (getInfo().isLeader()) {
return true;
} else {
// following a leader and not yet timeout
- return isFollower() && state.hasLeader()
+ return getInfo().isFollower() && state.hasLeader()
&& role.getFollowerState().map(FollowerState::shouldWithholdVotes).orElse(false);
}
}
@@ -936,7 +916,7 @@ public class RaftServerImpl implements RaftServer.Division,
*/
private boolean shouldSendShutdown(RaftPeerId candidateId,
TermIndex candidateLastEntry) {
- return isLeader()
+ return getInfo().isLeader()
&& getRaftConf().isStable()
&& getState().isConfCommitted()
&& !getRaftConf().containsInConf(candidateId)
@@ -1509,7 +1489,7 @@ public class RaftServerImpl implements RaftServer.Division,
}
public void submitUpdateCommitEvent() {
- role.getLeaderState().ifPresent(LeaderState::submitUpdateCommitEvent);
+ role.getLeaderState().ifPresent(LeaderStateImpl::submitUpdateCommitEvent);
}
/**
@@ -1525,7 +1505,7 @@ public class RaftServerImpl implements RaftServer.Division,
final ClientInvocationId invocationId = ClientInvocationId.valueOf(logEntry.getStateMachineLogEntry());
// update the retry cache
final RetryCache.CacheEntry cacheEntry = retryCache.getOrCreateEntry(invocationId);
- if (isLeader()) {
+ if (getInfo().isLeader()) {
Preconditions.assertTrue(cacheEntry != null && !cacheEntry.isCompletedNormally(),
"retry cache entry should be pending: %s", cacheEntry);
}
@@ -1548,9 +1528,8 @@ public class RaftServerImpl implements RaftServer.Division,
// update pending request
synchronized (RaftServerImpl.this) {
- final LeaderState leaderState = role.getLeaderState().orElse(null);
- if (isLeader() && leaderState != null) { // is leader and is running
- leaderState.replyPendingRequest(logIndex, r);
+ if (getInfo().isLeader()) {
+ role.getLeaderState().ifPresent(leader -> leader.replyPendingRequest(logIndex, r));
}
}
cacheEntry.updateResult(r);
@@ -1558,10 +1537,10 @@ public class RaftServerImpl implements RaftServer.Division,
}
public long[] getFollowerNextIndices() {
- if (!isLeader()) {
+ if (!getInfo().isLeader()) {
return null;
}
- return role.getLeaderState().map(LeaderState::getFollowerNextIndices).orElse(null);
+ return role.getLeaderState().map(LeaderStateImpl::getFollowerNextIndices).orElse(null);
}
CompletableFuture<Message> applyLogToStateMachine(LogEntryProto next) {
@@ -1658,7 +1637,7 @@ public class RaftServerImpl implements RaftServer.Division,
@Override
public List<String> getFollowers() {
- return role.getLeaderState().map(LeaderState::getFollowers).orElse(Collections.emptyList())
+ return role.getLeaderState().map(LeaderStateImpl::getFollowers).orElse(Collections.emptyList())
.stream().map(RaftPeer::toString).collect(Collectors.toList());
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
index 1f057dd..f909a9c 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
@@ -39,7 +39,7 @@ class RoleInfo {
private final RaftPeerId id;
private volatile RaftPeerRole role;
/** Used when the peer is leader */
- private final AtomicReference<LeaderState> leaderState = new AtomicReference<>();
+ private final AtomicReference<LeaderStateImpl> leaderState = new AtomicReference<>();
/** Used when the peer is follower, to monitor election timeout */
private final AtomicReference<FollowerState> followerState = new AtomicReference<>();
/** Used when the peer is candidate, to request votes from other peers */
@@ -52,10 +52,6 @@ class RoleInfo {
this.transitionTime = new AtomicReference<>(Timestamp.currentTime());
}
- RaftPeerRole getRaftPeerRole() {
- return role;
- }
-
void transitionRole(RaftPeerRole newRole) {
this.role = newRole;
this.transitionTime.set(Timestamp.currentTime());
@@ -69,32 +65,24 @@ class RoleInfo {
return role;
}
- boolean isFollower() {
- return role == RaftPeerRole.FOLLOWER;
- }
-
- boolean isCandidate() {
- return role == RaftPeerRole.CANDIDATE;
- }
-
- boolean isLeader() {
- return role == RaftPeerRole.LEADER;
+ boolean isLeaderReady() {
+ return getLeaderState().map(LeaderStateImpl::isReady).orElse(false);
}
- Optional<LeaderState> getLeaderState() {
+ Optional<LeaderStateImpl> getLeaderState() {
return Optional.ofNullable(leaderState.get());
}
- LeaderState getLeaderStateNonNull() {
+ LeaderStateImpl getLeaderStateNonNull() {
return Objects.requireNonNull(leaderState.get(), "leaderState is null");
}
LogEntryProto startLeaderState(RaftServerImpl server) {
- return updateAndGet(leaderState, new LeaderState(server)).start();
+ return updateAndGet(leaderState, new LeaderStateImpl(server)).start();
}
void shutdownLeaderState(boolean allowNull) {
- final LeaderState leader = leaderState.getAndSet(null);
+ final LeaderStateImpl leader = leaderState.getAndSet(null);
if (leader == null) {
if (!allowNull) {
throw new NullPointerException("leaderState == null");
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java
index 7300ddc..12d5a74 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerFactory.java
@@ -18,9 +18,10 @@
package org.apache.ratis.server.impl;
import org.apache.ratis.rpc.RpcFactory;
-import org.apache.ratis.server.leader.FollowerInfo;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.leader.FollowerInfo;
+import org.apache.ratis.server.leader.LeaderState;
/** A factory interface for creating server components. */
public interface ServerFactory extends RpcFactory {
@@ -34,7 +35,7 @@ public interface ServerFactory extends RpcFactory {
}
/** Create a new {@link LogAppender}. */
- default LogAppender newLogAppender(RaftServerImpl server, LeaderState state, FollowerInfo f) {
+ default LogAppender newLogAppender(RaftServer.Division server, LeaderState state, FollowerInfo f) {
return new LogAppender(server, state, f);
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LeaderState.java
new file mode 100644
index 0000000..da635a5
--- /dev/null
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LeaderState.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.server.leader;
+
+import org.apache.ratis.util.JavaUtils;
+
+/**
+ * States for leader only.
+ */
+public interface LeaderState {
+ enum StepDownReason {
+ HIGHER_TERM, HIGHER_PRIORITY, LOST_MAJORITY_HEARTBEATS, STATE_MACHINE_EXCEPTION, JVM_PAUSE;
+
+ private final String longName = JavaUtils.getClassSimpleName(getClass()) + ":" + name();
+
+ @Override
+ public String toString() {
+ return longName;
+ }
+ }
+}
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index a4a8685..e4ed070 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -389,7 +389,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
try (final RaftClient client = cluster.createClient()) {
// block append requests
cluster.getServerAliveStream()
- .filter(impl -> !impl.isLeader())
+ .filter(impl -> !impl.getInfo().isLeader())
.map(SimpleStateMachine4Testing::get)
.forEach(SimpleStateMachine4Testing::blockWriteStateMachineData);
@@ -399,7 +399,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
Assert.assertFalse(replyFuture.isDone());
// unblock append request.
cluster.getServerAliveStream()
- .filter(impl -> !impl.isLeader())
+ .filter(impl -> !impl.getInfo().isLeader())
.map(SimpleStateMachine4Testing::get)
.forEach(SimpleStateMachine4Testing::unblockWriteStateMachineData);
@@ -427,7 +427,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
try (final RaftClient client = cluster.createClient()) {
// block append entries request
cluster.getServerAliveStream()
- .filter(impl -> !impl.isLeader())
+ .filter(impl -> !impl.getInfo().isLeader())
.map(SimpleStateMachine4Testing::get)
.forEach(peer -> logSyncDelay.setDelayMs(peer.getId().toString(), 1000));
@@ -440,7 +440,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
// previous leader should not there.
cluster.getServerAliveStream()
- .forEach(impl -> Assert.assertTrue(!impl.isLeader()
+ .forEach(impl -> Assert.assertTrue(!impl.getInfo().isLeader()
|| impl.getState().getCurrentTerm() > termOfPrevLeader));
} finally {
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 5a2428f..e44e44b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -99,7 +99,7 @@ public interface RaftTestUtil {
final RaftServer.Division leader = JavaUtils.attemptRepeatedly(() -> {
final RaftServer.Division l = cluster.getLeader(groupId, handleNoLeaders, handleMultipleLeaders);
- if (l != null && !RaftServerTestUtil.isLeaderReady(l)) {
+ if (l != null && !l.getInfo().isLeaderReady()) {
throw new IllegalStateException("Leader: "+ l.getMemberId() + " not ready");
}
return l;
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index fd0b126..cf9cfda 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -26,6 +26,7 @@ import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.DivisionInfo;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.metrics.LeaderElectionMetrics;
@@ -265,9 +266,11 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
}
private static RaftServerImpl createMockServer(boolean alive) {
+ final DivisionInfo info = mock(DivisionInfo.class);
+ when(info.isAlive()).thenReturn(alive);
+ when(info.isCandidate()).thenReturn(false);
RaftServerImpl server = mock(RaftServerImpl.class);
- when(server.isAlive()).thenReturn(alive);
- when(server.isCandidate()).thenReturn(false);
+ when(server.getInfo()).thenReturn(info);
final RaftGroupMemberId memberId = RaftGroupMemberId.valueOf(RaftPeerId.valueOf("any"), RaftGroupId.randomId());
when(server.getMemberId()).thenReturn(memberId);
LeaderElectionMetrics leaderElectionMetrics = LeaderElectionMetrics.getLeaderElectionMetrics(memberId, () -> 0);
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 5c6acef..9084bf5 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -573,7 +573,7 @@ public abstract class MiniRaftCluster implements Closeable {
private List<RaftServer.Division> getLeaders(RaftGroupId groupId) {
final Stream<RaftServerImpl> serverAliveStream = getServerAliveStream(groupId);
final List<RaftServer.Division> leaders = new ArrayList<>();
- serverAliveStream.filter(RaftServerImpl::isLeader).forEach(s -> {
+ serverAliveStream.filter(server -> server.getInfo().isLeader()).forEach(s -> {
if (leaders.isEmpty()) {
leaders.add(s);
} else {
@@ -597,7 +597,7 @@ public abstract class MiniRaftCluster implements Closeable {
public List<RaftServer.Division> getFollowers() {
return getServerAliveStream()
- .filter(RaftServerImpl::isFollower)
+ .filter(server -> server.getInfo().isFollower())
.collect(Collectors.toList());
}
@@ -630,7 +630,7 @@ public abstract class MiniRaftCluster implements Closeable {
}
private Stream<RaftServerImpl> getServerAliveStream(RaftGroupId groupId) {
- return getServerStream(groupId).filter(RaftServerImpl::isAlive);
+ return getServerStream(groupId).filter(server -> server.getInfo().isAlive());
}
private RetryPolicy getDefaultRetryPolicy() {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index d0f99a5..4e0ac9a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -31,6 +31,7 @@ import org.apache.ratis.protocol.exceptions.ReconfigurationInProgressException;
import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.leader.LeaderState;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.RaftStorageTestUtils;
import org.apache.ratis.util.JavaUtils;
@@ -65,7 +66,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
private static final DelayLocalExecutionInjection logSyncDelay =
new DelayLocalExecutionInjection(RaftLog.LOG_SYNC);
private static final DelayLocalExecutionInjection leaderPlaceHolderDelay =
- new DelayLocalExecutionInjection(LeaderState.APPEND_PLACEHOLDER);
+ new DelayLocalExecutionInjection(LeaderStateImpl.APPEND_PLACEHOLDER);
static final int STAGING_CATCHUP_GAP = 10;
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index e57dc37..7d3786d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -18,7 +18,6 @@
package org.apache.ratis.server.impl;
import org.apache.log4j.Level;
-import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.RaftGroupId;
@@ -84,7 +83,7 @@ public class RaftServerTestUtil {
numIncluded++;
Assert.assertTrue(server.getRaftConf().isStable());
Assert.assertTrue(server.getRaftConf().hasNoChange(peers));
- } else if (server.isAlive()) {
+ } else if (server.getInfo().isAlive()) {
// The server is successfully removed from the conf
// It may not be shutdown since it may not be able to talk to the new leader (who is not in its conf).
Assert.assertTrue(server.getRaftConf().isStable());
@@ -94,10 +93,6 @@ public class RaftServerTestUtil {
Assert.assertEquals(peers.size(), numIncluded + deadIncluded);
}
- public static boolean isLeaderReady(RaftServer.Division server) {
- return ((RaftServerImpl)server).isLeaderReady();
- }
-
public static long getCurrentTerm(RaftServer.Division server) {
return ((RaftServerImpl)server).getState().getCurrentTerm();
}
@@ -130,10 +125,6 @@ public class RaftServerTestUtil {
return entry.isFailed();
}
- public static RaftPeerRole getRole(RaftServer.Division server) {
- return ((RaftServerImpl)server).getRole().getRaftPeerRole();
- }
-
public static RaftConfiguration getRaftConf(RaftServer.Division server) {
return ((RaftServerImpl)server).getRaftConf();
}
@@ -154,16 +145,16 @@ public class RaftServerTestUtil {
return ((RaftServerImpl)server).getRaftServer().getServerRpc();
}
- private static Optional<LeaderState> getLeaderState(RaftServer.Division server) {
+ private static Optional<LeaderStateImpl> getLeaderState(RaftServer.Division server) {
return ((RaftServerImpl)server).getRole().getLeaderState();
}
public static Stream<LogAppender> getLogAppenders(RaftServer.Division server) {
- return getLeaderState(server).map(LeaderState::getLogAppenders).orElse(null);
+ return getLeaderState(server).map(LeaderStateImpl::getLogAppenders).orElse(null);
}
public static void restartLogAppenders(RaftServer.Division server) {
- final LeaderState leaderState = getLeaderState(server).orElseThrow(
+ final LeaderStateImpl leaderState = getLeaderState(server).orElseThrow(
() -> new IllegalStateException(server + " is not the leader"));
leaderState.getLogAppenders().forEach(leaderState::restartSender);
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
index da41625..49d4878 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
@@ -66,7 +66,7 @@ public abstract class ServerPauseResumeTest <CLUSTER extends MiniRaftCluster>
// pause follower.
boolean isSuccess = follower.pause();
Assert.assertTrue(isSuccess);
- Assert.assertTrue(follower.isPausingOrPaused());
+ Assert.assertTrue(follower.getInfo().getLifeCycleState().isPausingOrPaused());
SimpleMessage[] batch2 = SimpleMessage.create(100, "batch2");
Thread writeThread2 = RaftTestUtil.sendMessageInNewThread(cluster, leaderId, batch2);
@@ -79,7 +79,7 @@ public abstract class ServerPauseResumeTest <CLUSTER extends MiniRaftCluster>
// resume follower.
isSuccess = follower.resume();
Assert.assertTrue(isSuccess);
- Assert.assertTrue(!follower.isPausingOrPaused());
+ Assert.assertFalse(follower.getInfo().getLifeCycleState().isPausingOrPaused());
Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) * 5);
// follower should contain all logs.
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 70a7e17..b8d7dd3 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -18,6 +18,7 @@
package org.apache.ratis.datastream;
import org.apache.ratis.BaseTest;
+import org.apache.ratis.server.DivisionInfo;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.impl.ClientProtoUtils;
@@ -60,7 +61,6 @@ import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.NetUtils;
import org.junit.Assert;
-import org.mockito.Mockito;
import java.io.IOException;
import java.util.ArrayList;
@@ -72,8 +72,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
-import static org.mockito.Mockito.when;
-
abstract class DataStreamBaseTest extends BaseTest {
static class MyDivision implements RaftServer.Division {
private final RaftServer server;
@@ -92,6 +90,11 @@ abstract class DataStreamBaseTest extends BaseTest {
}
@Override
+ public DivisionInfo getInfo() {
+ return null;
+ }
+
+ @Override
public RaftGroup getGroup() {
return null;
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index eddf1eb..d88980e 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -172,7 +172,7 @@ public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterW
// change leader
RaftTestUtil.changeLeader(cluster, leader.getId());
- Assert.assertNotEquals(RaftPeerRole.LEADER, RaftServerTestUtil.getRole(leader));
+ Assert.assertNotEquals(RaftPeerRole.LEADER, leader.getInfo().getCurrentRole());
// the blocked request should fail
testFailureCase("request should fail", futureBlocked::get,
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index d89ae4a..c145872 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -75,7 +75,7 @@ public class TestRaftWithGrpc
try (final RaftClient client = cluster.createClient()) {
// block append requests
cluster.getServerAliveStream()
- .filter(impl -> !impl.isLeader())
+ .filter(impl -> !impl.getInfo().isLeader())
.map(SimpleStateMachine4Testing::get)
.forEach(SimpleStateMachine4Testing::blockWriteStateMachineData);
@@ -86,14 +86,14 @@ public class TestRaftWithGrpc
Assert.assertFalse(replyFuture.isDone());
// unblock append request.
cluster.getServerAliveStream()
- .filter(impl -> !impl.isLeader())
+ .filter(impl -> !impl.getInfo().isLeader())
.map(SimpleStateMachine4Testing::get)
.forEach(SimpleStateMachine4Testing::unblockWriteStateMachineData);
final RaftLog leaderLog = RaftServerTestUtil.getRaftLog(cluster.getLeader());
// The entries have been appended in the followers
// although the append entry timed out at the leader
- cluster.getServerAliveStream().filter(impl -> !impl.isLeader()).forEach(raftServer ->
+ cluster.getServerAliveStream().filter(impl -> !impl.getInfo().isLeader()).forEach(raftServer ->
JavaUtils.runAsUnchecked(() -> JavaUtils.attempt(() -> {
final long leaderNextIndex = leaderLog.getNextIndex();
final TermIndex[] leaderEntries = leaderLog.getEntries(0, Long.MAX_VALUE);