You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/01 05:36:06 UTC
[incubator-ratis] branch master updated: RATIS-1189. Change other
MiniRaftCluster methods to return RaftServer.Division. (#307)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 0672157 RATIS-1189. Change other MiniRaftCluster methods to return RaftServer.Division. (#307)
0672157 is described below
commit 06721577c75a95640d486b3ab8720359425fe8bc
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Dec 1 13:35:58 2020 +0800
RATIS-1189. Change other MiniRaftCluster methods to return RaftServer.Division. (#307)
---
.../java/org/apache/ratis/MiniRaftCluster.java | 82 +++++++++-------------
.../org/apache/ratis/RaftAsyncExceptionTests.java | 9 +--
.../test/java/org/apache/ratis/RaftAsyncTests.java | 19 ++---
.../test/java/org/apache/ratis/RaftBasicTests.java | 6 +-
.../test/java/org/apache/ratis/RaftTestUtil.java | 28 ++++----
.../apache/ratis/RequestLimitAsyncBaseTest.java | 3 +-
.../java/org/apache/ratis/WatchRequestTests.java | 6 +-
.../ratis/server/impl/GroupManagementBaseTest.java | 4 +-
.../ratis/server/impl/LeaderElectionTests.java | 4 +-
.../server/impl/RaftReconfigurationBaseTest.java | 2 +-
.../ratis/server/impl/RaftServerTestUtil.java | 24 ++++---
.../impl/RaftStateMachineExceptionTests.java | 8 +--
.../ratis/server/impl/ServerPauseResumeTest.java | 9 +--
.../server/impl/TestRatisServerMetricsBase.java | 8 +--
.../MiniRaftClusterWithSimulatedRpc.java | 6 +-
.../ratis/TestRaftServerNoLeaderTimeout.java | 5 +-
.../datastream/DataStreamAsyncClusterTests.java | 6 +-
.../ratis/datastream/DataStreamClusterTests.java | 7 +-
.../ratis/datastream/DataStreamTestUtils.java | 7 +-
.../apache/ratis/grpc/TestGrpcOutputStream.java | 2 -
.../apache/ratis/grpc/TestLogAppenderWithGrpc.java | 20 +++---
.../apache/ratis/grpc/TestRaftServerWithGrpc.java | 12 ++--
.../ratis/server/raftlog/TestRaftLogMetrics.java | 2 +-
23 files changed, 134 insertions(+), 145 deletions(-)
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 4d61d65..d0a1510 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -300,7 +300,7 @@ public abstract class MiniRaftCluster implements Closeable {
return s;
}
- private Collection<RaftServerProxy> putNewServers(
+ private Collection<RaftServer> putNewServers(
Iterable<RaftPeerId> peers, boolean format) {
return StreamSupport.stream(peers.spliterator(), false)
.map(id -> putNewServer(id, group, format))
@@ -325,11 +325,11 @@ public abstract class MiniRaftCluster implements Closeable {
/**
* start a stopped server again.
*/
- public RaftServerImpl restartServer(RaftPeerId serverId, boolean format) throws IOException {
+ public RaftServer.Division restartServer(RaftPeerId serverId, boolean format) throws IOException {
return restartServer(serverId, group, format);
}
- public RaftServerImpl restartServer(RaftPeerId serverId, RaftGroup group, boolean format) throws IOException {
+ public RaftServer.Division restartServer(RaftPeerId serverId, RaftGroup group, boolean format) throws IOException {
killServer(serverId);
servers.remove(serverId);
@@ -404,8 +404,8 @@ public abstract class MiniRaftCluster implements Closeable {
};
}
- private static List<RaftPeer> toRaftPeers(Collection<RaftServerProxy> servers) {
- return servers.stream()
+ private static List<RaftPeer> toRaftPeers(Iterable<RaftServer> servers) {
+ return StreamSupport.stream(servers.spliterator(), false)
.map(RaftServer::getPeer)
.collect(Collectors.toList());
}
@@ -419,13 +419,15 @@ public abstract class MiniRaftCluster implements Closeable {
LOG.info("Add new peers {}", Arrays.asList(ids));
// create and add new RaftServers
- final Collection<RaftServerProxy> newServers = putNewServers(
+ final Collection<RaftServer> newServers = putNewServers(
CollectionUtils.as(Arrays.asList(ids), RaftPeerId::valueOf), true);
startServers(newServers);
if (!startNewPeer) {
// start and then close, in order to bind the port
- newServers.forEach(RaftServerProxy::close);
+ for(RaftServer s : newServers) {
+ s.close();
+ }
}
final Collection<RaftPeer> newPeers = toRaftPeers(newServers);
@@ -455,7 +457,7 @@ public abstract class MiniRaftCluster implements Closeable {
peers.remove(leader);
removedPeers.add(leader);
}
- List<RaftServerImpl> followers = getFollowers();
+ final List<RaftServer.Division> followers = getFollowers();
for (int i = 0, removed = 0; i < followers.size() &&
removed < (removeLeader ? number - 1 : number); i++) {
RaftPeer toRemove = followers.get(i).getPeer();
@@ -492,11 +494,11 @@ public abstract class MiniRaftCluster implements Closeable {
public String printAllLogs() {
StringBuilder b = new StringBuilder("\n#servers = " + servers.size() + "\n");
- for (RaftServerImpl s : iterateServerImpls()) {
+ for (RaftServer.Division s : iterateDivisions()) {
b.append(" ");
b.append(s).append("\n");
- final RaftLog log = s.getState().getLog();
+ final RaftLog log = RaftServerTestUtil.getRaftLog(s);
if (log instanceof MemoryRaftLog) {
b.append(" ");
b.append(((MemoryRaftLog) log).getEntryString());
@@ -522,10 +524,10 @@ public abstract class MiniRaftCluster implements Closeable {
return new IllegalStateException("No leader yet " + g + ": " + printServers(groupId));
}
- IllegalStateException newIllegalStateExceptionForMultipleLeaders(RaftGroupId groupId, List<RaftServerImpl> leaders) {
+ IllegalStateException newIllegalStateExceptionForMultipleLeaders(RaftGroupId groupId, List<RaftServer.Division> leaders) {
final String g = groupId == null? "": " for " + groupId;
return new IllegalStateException("Found multiple leaders" + g
- + " at the same term (=" + leaders.get(0).getState().getCurrentTerm()
+ + " at the same term (=" + RaftServerTestUtil.getCurrentTerm(leaders.get(0))
+ "), leaders.size() = " + leaders.size() + " > 1, leaders = " + leaders
+ ": " + printServers(groupId));
}
@@ -543,13 +545,13 @@ public abstract class MiniRaftCluster implements Closeable {
});
}
- RaftServerImpl getLeader(RaftGroupId groupId, Runnable handleNoLeaders,
- Consumer<List<RaftServerImpl>> handleMultipleLeaders) {
+ RaftServer.Division getLeader(RaftGroupId groupId, Runnable handleNoLeaders,
+ Consumer<List<RaftServer.Division>> handleMultipleLeaders) {
return getLeader(getLeaders(groupId), handleNoLeaders, handleMultipleLeaders);
}
- static RaftServerImpl getLeader(List<RaftServerImpl> leaders, Runnable handleNoLeaders,
- Consumer<List<RaftServerImpl>> handleMultipleLeaders) {
+ static RaftServer.Division getLeader(List<RaftServer.Division> leaders, Runnable handleNoLeaders,
+ Consumer<List<RaftServer.Division>> handleMultipleLeaders) {
if (leaders.isEmpty()) {
if (handleNoLeaders != null) {
handleNoLeaders.run();
@@ -569,14 +571,14 @@ public abstract class MiniRaftCluster implements Closeable {
* @return the list of leaders with the highest term (i.e. leaders with a lower term are not included).
* from the given group.
*/
- private List<RaftServerImpl> getLeaders(RaftGroupId groupId) {
+ private List<RaftServer.Division> getLeaders(RaftGroupId groupId) {
final Stream<RaftServerImpl> serverAliveStream = getServerAliveStream(groupId);
- final List<RaftServerImpl> leaders = new ArrayList<>();
+ final List<RaftServer.Division> leaders = new ArrayList<>();
serverAliveStream.filter(RaftServerImpl::isLeader).forEach(s -> {
if (leaders.isEmpty()) {
leaders.add(s);
} else {
- final long leaderTerm = leaders.get(0).getState().getCurrentTerm();
+ final long leaderTerm = RaftServerTestUtil.getCurrentTerm(leaders.get(0));
final long term = s.getState().getCurrentTerm();
if (term >= leaderTerm) {
if (term > leaderTerm) {
@@ -594,40 +596,34 @@ public abstract class MiniRaftCluster implements Closeable {
return leader != null && leader.getId().toString().equals(leaderId);
}
- public List<RaftServerImpl> getFollowers() {
+ public List<RaftServer.Division> getFollowers() {
return getServerAliveStream()
.filter(RaftServerImpl::isFollower)
.collect(Collectors.toList());
}
- public List<RaftServer.Division> getFollowerDivisions() {
- return getServerAliveStream()
- .filter(RaftServerImpl::isFollower)
- .collect(Collectors.toList());
+ public int getNumServers() {
+ return servers.size();
}
- public Collection<RaftServerProxy> getServers() {
- return servers.values();
+ public Iterable<RaftServer> getServers() {
+ return CollectionUtils.as(servers.values(), s -> s);
}
private Stream<RaftServerProxy> getRaftServerProxyStream(RaftGroupId groupId) {
- return getServers().stream()
+ return servers.values().stream()
.filter(s -> groupId == null || s.containsGroup(groupId));
}
- public Iterable<RaftServerImpl> iterateServerImpls() {
- return CollectionUtils.as(getServers(), this::getRaftServerImpl);
- }
-
public Iterable<RaftServer.Division> iterateDivisions() {
- return CollectionUtils.as(getServers(), this::getRaftServerImpl);
+ return CollectionUtils.as(getServers(), this::getDivision);
}
private Stream<RaftServerImpl> getServerStream(RaftGroupId groupId) {
final Stream<RaftServerProxy> stream = getRaftServerProxyStream(groupId);
return groupId != null?
- stream.filter(s -> s.containsGroup(groupId)).map(s -> RaftServerTestUtil.getRaftServerImpl(s, groupId))
- : stream.flatMap(s -> RaftServerTestUtil.getRaftServerImpls(s).stream());
+ stream.map(s -> JavaUtils.callAsUnchecked(() -> s.getImpl(groupId)))
+ : stream.flatMap(s -> JavaUtils.callAsUnchecked(s::getImpls).stream());
}
public Stream<RaftServerImpl> getServerAliveStream() {
@@ -647,23 +643,15 @@ public abstract class MiniRaftCluster implements Closeable {
}
public RaftServer.Division getDivision(RaftPeerId id) {
- return getRaftServerImpl(servers.get(id));
+ return getDivision(servers.get(id));
}
public RaftServer.Division getDivision(RaftPeerId id, RaftGroupId groupId) {
- return RaftServerTestUtil.getRaftServerImpl(servers.get(id), groupId);
- }
-
- public RaftServerImpl getRaftServerImpl(RaftPeerId id) {
- return getRaftServerImpl(servers.get(id));
- }
-
- public RaftServerImpl getRaftServerImpl(RaftPeerId id, RaftGroupId groupId) {
- return RaftServerTestUtil.getRaftServerImpl(servers.get(id), groupId);
+ return RaftServerTestUtil.getDivision(servers.get(id), groupId);
}
- public RaftServerImpl getRaftServerImpl(RaftServerProxy proxy) {
- return RaftServerTestUtil.getRaftServerImpl(proxy, getGroupId());
+ public RaftServer.Division getDivision(RaftServer server) {
+ return RaftServerTestUtil.getDivision(server, getGroupId());
}
public List<RaftPeer> getPeers() {
@@ -762,7 +750,7 @@ public abstract class MiniRaftCluster implements Closeable {
ExitUtils.setTerminateOnUncaughtException(false);
final ExecutorService executor = Executors.newFixedThreadPool(servers.size(), Daemon::new);
- getServers().forEach(proxy -> executor.submit(proxy::close));
+ getServers().forEach(proxy -> executor.submit(() -> JavaUtils.runAsUnchecked(proxy::close)));
try {
executor.shutdown();
// just wait for a few seconds
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
index 2a47164..e267584 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncExceptionTests.java
@@ -38,6 +38,7 @@ import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
+import java.util.stream.StreamSupport;
public abstract class RaftAsyncExceptionTests<CLUSTER extends MiniRaftCluster>
extends BaseTest
@@ -104,15 +105,15 @@ public abstract class RaftAsyncExceptionTests<CLUSTER extends MiniRaftCluster>
RaftClientConfigKeys.Rpc.setRequestTimeout(properties.get(), ONE_SECOND);
// Block StartTransaction
- cluster.getServers().stream()
- .map(cluster::getRaftServerImpl)
+ StreamSupport.stream(cluster.getServers().spliterator(), false)
+ .map(cluster::getDivision)
.map(SimpleStateMachine4Testing::get)
.forEach(SimpleStateMachine4Testing::blockStartTransaction);
final CompletableFuture<RaftClientReply> replyFuture = client.async().send(new SimpleMessage("m1"));
FIVE_SECONDS.sleep();
// Unblock StartTransaction
- cluster.getServers().stream()
- .map(cluster::getRaftServerImpl)
+ StreamSupport.stream(cluster.getServers().spliterator(), false)
+ .map(cluster::getDivision)
.map(SimpleStateMachine4Testing::get)
.forEach(SimpleStateMachine4Testing::unblockStartTransaction);
// The request should succeed after start transaction is unblocked
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index 342dff9..23ff350 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -62,6 +62,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.StreamSupport;
import static org.apache.ratis.RaftTestUtil.waitForLeader;
@@ -208,10 +209,10 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
final SimpleMessage[] messages = SimpleMessage.create(numMessages);
try (final RaftClient client = cluster.createClient()) {
//Set blockTransaction flag so that transaction blocks
- cluster.getServers().stream()
- .map(cluster::getRaftServerImpl)
- .map(SimpleStateMachine4Testing::get)
- .forEach(SimpleStateMachine4Testing::blockStartTransaction);
+ StreamSupport.stream(cluster.getServers().spliterator(), false)
+ .map(cluster::getDivision)
+ .map(SimpleStateMachine4Testing::get)
+ .forEach(SimpleStateMachine4Testing::blockStartTransaction);
//Send numMessages which are blocked and do not release the client semaphore permits
AtomicInteger blockedRequestsCount = new AtomicInteger();
@@ -238,10 +239,10 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
RaftClientTestUtil.assertAsyncRequestSemaphore(client, 0, 1);
//Unset the blockTransaction flag so that semaphore permits can be released
- cluster.getServers().stream()
- .map(cluster::getRaftServerImpl)
- .map(SimpleStateMachine4Testing::get)
- .forEach(SimpleStateMachine4Testing::unblockStartTransaction);
+ StreamSupport.stream(cluster.getServers().spliterator(), false)
+ .map(cluster::getDivision)
+ .map(SimpleStateMachine4Testing::get)
+ .forEach(SimpleStateMachine4Testing::unblockStartTransaction);
for (int i = 0; i <= numMessages; i++) {
futures[i].join();
@@ -464,7 +465,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
private void runTestNoRetryWaitOnNotLeaderException(MiniRaftCluster cluster) throws Exception {
final RaftServer.Division leader = waitForLeader(cluster);
- final List<RaftServer.Division> followers = cluster.getFollowerDivisions();
+ final List<RaftServer.Division> followers = cluster.getFollowers();
Assert.assertNotNull(followers);
Assert.assertEquals(2, followers.size());
Assert.assertNotSame(leader, followers.get(0));
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index e183dce..8d07d1f 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -184,7 +184,7 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
final RaftPeerId leaderId = leader.getId();
final long term = RaftServerTestUtil.getCurrentTerm(leader);
- final List<RaftServer.Division> followers = cluster.getFollowerDivisions();
+ final List<RaftServer.Division> followers = cluster.getFollowers();
final List<RaftServer.Division> followersToSendLog = followers.subList(0, followers.size() / 2);
for (int i = followers.size() / 2; i < NUM_SERVERS - 1; i++) {
cluster.killServer(followers.get(i).getId());
@@ -228,7 +228,7 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
void runTestOldLeaderNotCommit(CLUSTER cluster) throws Exception {
final RaftPeerId leaderId = waitForLeader(cluster).getId();
- final List<RaftServer.Division> followers = cluster.getFollowerDivisions();
+ final List<RaftServer.Division> followers = cluster.getFollowers();
final RaftServer.Division followerToCommit = followers.get(0);
try {
for (int i = 1; i < NUM_SERVERS - 1; i++) {
@@ -484,7 +484,7 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
}
private static void checkFollowerCommitLagsLeader(MiniRaftCluster cluster) {
- final List<RaftServer.Division> followers = cluster.getFollowerDivisions();
+ final List<RaftServer.Division> followers = cluster.getFollowers();
final RaftGroupMemberId leader = cluster.getLeader().getMemberId();
Gauge leaderCommitGauge = RaftServerMetrics.getPeerCommitIndexGauge(leader, leader.getPeerId());
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index 118293e..b664fd7 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -30,7 +30,6 @@ import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
-import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
@@ -50,7 +49,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.EnumMap;
import java.util.List;
import java.util.Objects;
@@ -66,23 +64,23 @@ import java.util.function.Function;
import java.util.function.IntSupplier;
import java.util.function.Predicate;
import java.util.function.Supplier;
+import java.util.stream.StreamSupport;
public interface RaftTestUtil {
Logger LOG = LoggerFactory.getLogger(RaftTestUtil.class);
- static RaftServerImpl waitForLeader(MiniRaftCluster cluster)
+ static RaftServer.Division waitForLeader(MiniRaftCluster cluster)
throws InterruptedException {
return waitForLeader(cluster, null);
}
- static RaftServerImpl waitForLeader(MiniRaftCluster cluster, RaftGroupId groupId)
+ static RaftServer.Division waitForLeader(MiniRaftCluster cluster, RaftGroupId groupId)
throws InterruptedException {
return waitForLeader(cluster, groupId, true);
}
- static RaftServerImpl waitForLeader(
- MiniRaftCluster cluster, RaftGroupId groupId, boolean expectLeader)
+ static RaftServer.Division waitForLeader(MiniRaftCluster cluster, RaftGroupId groupId, boolean expectLeader)
throws InterruptedException {
final String name = "waitForLeader-" + groupId + "-(expectLeader? " + expectLeader + ")";
final int numAttempts = expectLeader? 100: 10;
@@ -93,14 +91,14 @@ public interface RaftTestUtil {
final Runnable handleNoLeaders = () -> {
throw cluster.newIllegalStateExceptionForNoLeaders(groupId);
};
- final Consumer<List<RaftServerImpl>> handleMultipleLeaders = leaders -> {
+ final Consumer<List<RaftServer.Division>> handleMultipleLeaders = leaders -> {
final IllegalStateException ise = cluster.newIllegalStateExceptionForMultipleLeaders(groupId, leaders);
exception.set(ise);
};
- final RaftServerImpl leader = JavaUtils.attemptRepeatedly(() -> {
- RaftServerImpl l = cluster.getLeader(groupId, handleNoLeaders, handleMultipleLeaders);
- if (l != null && !l.isLeaderReady()) {
+ final RaftServer.Division leader = JavaUtils.attemptRepeatedly(() -> {
+ final RaftServer.Division l = cluster.getLeader(groupId, handleNoLeaders, handleMultipleLeaders);
+ if (l != null && !RaftServerTestUtil.isLeaderReady(l)) {
throw new IllegalStateException("Leader: "+ l.getMemberId() + " not ready");
}
return l;
@@ -119,7 +117,7 @@ public interface RaftTestUtil {
}
static RaftPeerId waitAndKillLeader(MiniRaftCluster cluster) throws InterruptedException {
- final RaftServerImpl leader = waitForLeader(cluster);
+ final RaftServer.Division leader = waitForLeader(cluster);
Assert.assertNotNull(leader);
LOG.info("killing leader = " + leader);
@@ -222,7 +220,7 @@ public interface RaftTestUtil {
}
static void assertLogEntries(MiniRaftCluster cluster, SimpleMessage expectedMessage) {
- final int size = cluster.getServers().size();
+ final int size = cluster.getNumServers();
final long count = cluster.getServerAliveStream()
.map(s -> s.getState().getLog())
.filter(log -> logEntriesContains(log, expectedMessage))
@@ -426,8 +424,7 @@ public interface RaftTestUtil {
}
}
- static <SERVER extends RaftServer> void blockQueueAndSetDelay(
- Collection<SERVER> servers,
+ static void blockQueueAndSetDelay(Iterable<RaftServer> servers,
DelayLocalExecutionInjection injection, String leaderId, int delayMs,
TimeDuration maxTimeout) throws InterruptedException {
// block reqeusts sent to leader if delayMs > 0
@@ -441,7 +438,8 @@ public interface RaftTestUtil {
}
// delay RaftServerRequest for other servers
- servers.stream().filter(s -> !s.getId().toString().equals(leaderId))
+ StreamSupport.stream(servers.spliterator(), false)
+ .filter(s -> !s.getId().toString().equals(leaderId))
.forEach(s -> {
if (block) {
injection.setDelayMs(s.getId().toString(), delayMs);
diff --git a/ratis-server/src/test/java/org/apache/ratis/RequestLimitAsyncBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RequestLimitAsyncBaseTest.java
index 5328873..9b5a378 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RequestLimitAsyncBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RequestLimitAsyncBaseTest.java
@@ -28,7 +28,6 @@ import org.apache.ratis.protocol.exceptions.ResourceUnavailableException;
import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.util.Log4jUtils;
@@ -67,7 +66,7 @@ public abstract class RequestLimitAsyncBaseTest<CLUSTER extends MiniRaftCluster>
}
void runTestWriteElementLimit(CLUSTER cluster) throws Exception {
- final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
+ final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
try (RaftClient c1 = cluster.createClient(leader.getId())) {
{ // send first message to make sure the cluster is working
diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
index d6b23a4..ddc649b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
@@ -222,7 +222,7 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
SimpleStateMachine4Testing.get(leader).blockStartTransaction();
// blockFlushStateMachineData a follower so that no transaction can be ALL_COMMITTED
- final List<RaftServer.Division> followers = cluster.getFollowerDivisions();
+ final List<RaftServer.Division> followers = cluster.getFollowers();
final RaftServer.Division blockedFollower = followers.get(ThreadLocalRandom.current().nextInt(followers.size()));
LOG.info("block follower {}", blockedFollower.getId());
SimpleStateMachine4Testing.get(blockedFollower).blockFlushStateMachineData();
@@ -339,7 +339,7 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
final int numMessages = p.numMessages;
// blockFlushStateMachineData a follower so that no transaction can be ALL_COMMITTED
- final List<RaftServer.Division> followers = cluster.getFollowerDivisions();
+ final List<RaftServer.Division> followers = cluster.getFollowers();
final RaftServer.Division blockedFollower = followers.get(ThreadLocalRandom.current().nextInt(followers.size()));
LOG.info("block follower {}", blockedFollower.getId());
SimpleStateMachine4Testing.get(blockedFollower).blockFlushStateMachineData();
@@ -396,7 +396,7 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
SimpleStateMachine4Testing.get(leader).blockStartTransaction();
// blockFlushStateMachineData a follower so that no transaction can be ALL_COMMITTED
- final List<RaftServer.Division> followers = cluster.getFollowerDivisions();
+ final List<RaftServer.Division> followers = cluster.getFollowers();
final RaftServer.Division blockedFollower = followers.get(ThreadLocalRandom.current().nextInt(followers.size()));
LOG.info("block follower {}", blockedFollower.getId());
SimpleStateMachine4Testing.get(blockedFollower).blockFlushStateMachineData();
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
index 124801b..5531def 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
@@ -291,8 +291,8 @@ public abstract class GroupManagementBaseTest extends BaseTest {
final RaftGroup g = groups[i];
LOG.info(i + ") close " + cluster.printServers(g.getGroupId()));
for(RaftPeer p : g.getPeers()) {
- final File root = cluster.getServer(p.getId()).getImpl(g.getGroupId())
- .getState().getStorage().getStorageDir().getRoot();
+ final RaftServer.Division d = cluster.getDivision(p.getId(), g.getGroupId());
+ final File root = RaftServerTestUtil.getRaftStorage(d).getStorageDir().getRoot();
Assert.assertTrue(root.exists());
Assert.assertTrue(root.isDirectory());
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
index c8da02f..880ddac 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/LeaderElectionTests.java
@@ -155,7 +155,7 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
cluster.initServers();
// start all except one servers
- final Iterator<RaftServerProxy> i = cluster.getServers().iterator();
+ final Iterator<RaftServer> i = cluster.getServers().iterator();
for(int j = 1; j < numServer; j++) {
i.next().start();
}
@@ -166,7 +166,7 @@ public abstract class LeaderElectionTests<CLUSTER extends MiniRaftCluster>
sleepTime.sleep();
// start the last server
- final RaftServerProxy lastServer = i.next();
+ final RaftServerProxy lastServer = (RaftServerProxy) i.next();
lastServer.start();
final RaftPeerId lastServerLeaderId = JavaUtils.attemptRepeatedly(
() -> Optional.ofNullable(lastServer.getImpls().iterator().next().getState().getLeaderId())
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index cba7000..351434b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -321,7 +321,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
void runTestBootstrapReconf(int numNewPeer, boolean startNewPeer, CLUSTER cluster) throws Exception {
LOG.info("Originally {} peer(s), add {} more, startNewPeer={}",
- cluster.getServers().size(), numNewPeer, startNewPeer);
+ cluster.getNumServers(), numNewPeer, startNewPeer);
RaftTestUtil.waitForLeader(cluster);
final RaftPeerId leaderId = cluster.getLeader().getId();
try (final RaftClient client = cluster.createClient(leaderId)) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index f0e527f..4df2680 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -73,7 +73,7 @@ public class RaftServerTestUtil {
int deadIncluded = 0;
final RaftConfiguration current = RaftConfiguration.newBuilder()
.setConf(peers).setLogEntryIndex(0).build();
- for (RaftServer.Division d : cluster.iterateServerImpls()) {
+ for (RaftServer.Division d : cluster.iterateDivisions()) {
final RaftServerImpl server = (RaftServerImpl)d;
LOG.info("checking {}", server);
if (deadPeers != null && deadPeers.contains(server.getId())) {
@@ -108,6 +108,14 @@ public class RaftServerTestUtil {
return ((RaftServerImpl)server).getState().getLastAppliedIndex();
}
+ public static long getNextIndex(RaftServer.Division server) {
+ return ((RaftServerImpl)server).getState().getNextIndex();
+ }
+
+ public static long[] getFollowerNextIndices(RaftServer.Division server) {
+ return ((RaftServerImpl)server).getFollowerNextIndices();
+ }
+
public static long getLatestInstalledSnapshotIndex(RaftServer.Division server) {
return ((RaftServerImpl)server).getState().getLatestInstalledSnapshotIndex();
}
@@ -124,8 +132,8 @@ public class RaftServerTestUtil {
return entry.isFailed();
}
- public static RaftPeerRole getRole(RaftServerImpl server) {
- return server.getRole().getRaftPeerRole();
+ public static RaftPeerRole getRole(RaftServer.Division server) {
+ return ((RaftServerImpl)server).getRole().getRaftPeerRole();
}
public static RaftConfiguration getRaftConf(RaftServer.Division server) {
@@ -156,18 +164,14 @@ public class RaftServerTestUtil {
return getLeaderState(server).map(LeaderState::getLogAppenders).orElse(null);
}
- public static void restartLogAppenders(RaftServerImpl server) {
+ public static void restartLogAppenders(RaftServer.Division server) {
final LeaderState leaderState = getLeaderState(server).orElseThrow(
() -> new IllegalStateException(server + " is not the leader"));
leaderState.getLogAppenders().forEach(leaderState::restartSender);
}
- public static List<RaftServerImpl> getRaftServerImpls(RaftServerProxy proxy) {
- return JavaUtils.callAsUnchecked(proxy::getImpls);
- }
-
- public static RaftServerImpl getRaftServerImpl(RaftServerProxy proxy, RaftGroupId groupId) {
- return JavaUtils.callAsUnchecked(() -> proxy.getImpl(groupId));
+ public static RaftServer.Division getDivision(RaftServer server, RaftGroupId groupId) {
+ return JavaUtils.callAsUnchecked(() -> server.getDivision(groupId));
}
public static DataStreamMap newDataStreamMap(Object name) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
index ae9c5f6..e48888c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
@@ -126,7 +126,7 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
Assert.assertNotNull(reply.getStateMachineException());
}
- for (RaftServerImpl server : cluster.iterateServerImpls()) {
+ for (RaftServer.Division server : cluster.iterateDivisions()) {
LOG.info("check server " + server.getId());
JavaUtils.attemptRepeatedly(() -> {
@@ -135,7 +135,7 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
return null;
}, 5, BaseTest.ONE_SECOND, "GetRetryEntry", LOG);
- final RaftLog log = server.getState().getLog();
+ final RaftLog log = RaftServerTestUtil.getRaftLog(server);
RaftTestUtil.logEntriesContains(log, oldLastApplied + 1, log.getNextIndex(), message);
}
@@ -149,7 +149,7 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
}
private void runTestRetryOnExceptionDuringReplication(CLUSTER cluster) throws Exception {
- final RaftServerImpl oldLeader = RaftTestUtil.waitForLeader(cluster);
+ final RaftServer.Division oldLeader = RaftTestUtil.waitForLeader(cluster);
cluster.getLeaderAndSendFirstMessage(true);
// turn on the preAppend failure switch
failPreAppend = true;
@@ -166,7 +166,7 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
Assert.assertTrue(RaftServerTestUtil.isRetryCacheEntryFailed(oldEntry));
// At this point of time the old leader would have stepped down. wait for leader election to complete
- final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
+ final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
// retry
r = cluster.newRaftClientRequest(client.getId(), leader.getId(), callId, message);
reply = rpc.sendRequest(r);
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
index ff3aa5c..216d2c5 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
@@ -27,6 +27,7 @@ import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.RaftTestUtil.SimpleMessage;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.raftlog.RaftLog;
import org.junit.Assert;
import org.junit.Test;
@@ -45,18 +46,18 @@ public abstract class ServerPauseResumeTest <CLUSTER extends MiniRaftCluster>
void runTestPauseResume(CLUSTER cluster) throws InterruptedException, IOException {
// wait leader be elected.
- RaftServerImpl leader = waitForLeader(cluster);
+ final RaftServer.Division leader = waitForLeader(cluster);
RaftPeerId leaderId = leader.getId();
- List<RaftServerImpl> followers = cluster.getFollowers();
+ final List<RaftServer.Division> followers = cluster.getFollowers();
Assert.assertTrue(followers.size() >= 1);
- RaftServerImpl follower = followers.get(0);
+ final RaftServerImpl follower = (RaftServerImpl)followers.get(0);
SimpleMessage[] batch1 = SimpleMessage.create(100, "batch1");
Thread writeThread = RaftTestUtil.sendMessageInNewThread(cluster, leaderId, batch1);
writeThread.join();
Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) * 5);
- RaftLog leaderLog = leader.getState().getLog();
+ final RaftLog leaderLog = RaftServerTestUtil.getRaftLog(leader);
// leader should contain all logs.
Assert.assertTrue(RaftTestUtil.logEntriesContains(leaderLog, batch1));
RaftLog followerLog = follower.getState().getLog();
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRatisServerMetricsBase.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRatisServerMetricsBase.java
index 7a1334d..de94d28 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRatisServerMetricsBase.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/TestRatisServerMetricsBase.java
@@ -55,14 +55,14 @@ public abstract class TestRatisServerMetricsBase<CLUSTER extends MiniRaftCluster
void runTestClientFailedRequest(CLUSTER cluster)
throws InterruptedException, IOException, ExecutionException {
- RaftServerImpl leaderImpl = RaftTestUtil.waitForLeader(cluster);
+ final RaftServer.Division leaderImpl = RaftTestUtil.waitForLeader(cluster);
ClientId clientId = ClientId.randomId();
// StaleRead with Long.MAX_VALUE minIndex will fail.
RaftClientRequest r = new RaftClientRequest(clientId, leaderImpl.getId(), cluster.getGroupId(),
0, Message.EMPTY, RaftClientRequest.staleReadRequestType(Long.MAX_VALUE), null);
- CompletableFuture<RaftClientReply> f = leaderImpl.submitClientRequestAsync(r);
+ final CompletableFuture<RaftClientReply> f = leaderImpl.getRaftServer().submitClientRequestAsync(r);
Assert.assertTrue(!f.get().isSuccess());
- assertEquals(1L,
- leaderImpl.getRaftServerMetrics().getRegistry().counter(RATIS_SERVER_FAILED_CLIENT_STALE_READ_COUNT).getCount());
+ assertEquals(1L, RaftServerTestUtil.getRaftServerMetrics(leaderImpl).getRegistry()
+ .counter(RATIS_SERVER_FAILED_CLIENT_STALE_READ_COUNT).getCount());
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
index 0403456..ea36a4a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/simulation/MiniRaftClusterWithSimulatedRpc.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.ThreadLocalRandom;
+import java.util.stream.StreamSupport;
import static org.apache.ratis.conf.ConfUtils.requireMin;
@@ -106,7 +107,8 @@ public class MiniRaftClusterWithSimulatedRpc extends MiniRaftCluster {
serverRequestReply.getQueue(leaderId).blockSendRequestTo.set(block);
// set delay takeRequest for the other queues
- getServers().stream().filter(s -> !s.getId().toString().equals(leaderId))
+ StreamSupport.stream(getServers().spliterator(), false)
+ .filter(s -> !s.getId().toString().equals(leaderId))
.map(s -> serverRequestReply.getQueue(s.getId().toString()))
.forEach(q -> q.delayTakeRequestTo.set(delayMs));
diff --git a/ratis-test/src/test/java/org/apache/ratis/TestRaftServerNoLeaderTimeout.java b/ratis-test/src/test/java/org/apache/ratis/TestRaftServerNoLeaderTimeout.java
index 6d497e8..f2bf587 100644
--- a/ratis-test/src/test/java/org/apache/ratis/TestRaftServerNoLeaderTimeout.java
+++ b/ratis-test/src/test/java/org/apache/ratis/TestRaftServerNoLeaderTimeout.java
@@ -22,7 +22,6 @@ import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
@@ -78,8 +77,8 @@ public class TestRaftServerNoLeaderTimeout extends BaseTest {
RaftTestUtil.waitForLeader(cluster);
final TimeDuration noLeaderTimeout = RaftServerConfigKeys.Notification.noLeaderTimeout(cluster.getProperties());
- RaftServerImpl healthyFollower = cluster.getFollowers().get(1);
- RaftServerImpl failedFollower = cluster.getFollowers().get(0);
+ final RaftServer.Division healthyFollower = cluster.getFollowers().get(1);
+ final RaftServer.Division failedFollower = cluster.getFollowers().get(0);
// fail the leader and one of the followers to that quorum is not present
// for next leader election to succeed.
cluster.killServer(failedFollower.getId());
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
index ae5ac90..a4121ff 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
@@ -28,8 +28,6 @@ import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.util.CollectionUtils;
import org.apache.ratis.util.TimeDuration;
import org.junit.Assert;
@@ -84,8 +82,8 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER extends MiniRaftCluste
client.async().watch(maxIndex, ReplicationLevel.ALL).join();
}
// assert all streams are linked
- for (RaftServerProxy proxy : cluster.getServers()) {
- final RaftServerImpl impl = proxy.getImpl(cluster.getGroupId());
+ for (RaftServer proxy : cluster.getServers()) {
+ final RaftServer.Division impl = proxy.getDivision(cluster.getGroupId());
final MultiDataStreamStateMachine stateMachine = (MultiDataStreamStateMachine) impl.getStateMachine();
for (SingleDataStream s : stateMachine.getStreams()) {
Assert.assertFalse(s.getDataChannel().isOpen());
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
index f46015e..2bc8c20 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
@@ -28,8 +28,7 @@ import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
-import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.RaftServerProxy;
+import org.apache.ratis.server.RaftServer;
import org.apache.ratis.util.Timestamp;
import org.apache.ratis.util.function.CheckedConsumer;
import org.junit.Assert;
@@ -148,8 +147,8 @@ public abstract class DataStreamClusterTests<CLUSTER extends MiniRaftCluster> ex
}
void assertLogEntry(CLUSTER cluster, RaftClientRequest request) throws Exception {
- for (RaftServerProxy proxy : cluster.getServers()) {
- final RaftServerImpl impl = proxy.getImpl(cluster.getGroupId());
+ for (RaftServer proxy : cluster.getServers()) {
+ final RaftServer.Division impl = proxy.getDivision(cluster.getGroupId());
final MultiDataStreamStateMachine stateMachine = (MultiDataStreamStateMachine) impl.getStateMachine();
final SingleDataStream s = stateMachine.getSingleDataStream(request);
Assert.assertFalse(s.getDataChannel().isOpen());
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
index 47d88d4..ececc33 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
@@ -34,7 +34,7 @@ import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
@@ -370,12 +370,13 @@ public interface DataStreamTestUtils {
Assert.assertEquals(request.getClientId().toByteString(), s.getClientId());
}
- static void assertLogEntry(RaftServerImpl impl, SingleDataStream stream) throws Exception {
+ static void assertLogEntry(RaftServer.Division division, SingleDataStream stream) throws Exception {
final RaftClientRequest request = stream.getWriteRequest();
final LogEntryProto entryFromStream = stream.getLogEntry();
assertLogEntry(entryFromStream, request);
- final LogEntryProto entryFromLog = searchLogEntry(ClientInvocationId.valueOf(request), impl.getState().getLog());
+ final LogEntryProto entryFromLog = searchLogEntry(ClientInvocationId.valueOf(request),
+ RaftServerTestUtil.getRaftLog(division));
Assert.assertSame(entryFromStream, entryFromLog);
}
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcOutputStream.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcOutputStream.java
index 125fe4c..77c311c 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcOutputStream.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestGrpcOutputStream.java
@@ -23,10 +23,8 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.client.GrpcClientStreamer;
import org.apache.ratis.grpc.client.GrpcOutputStream;
import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.util.Log4jUtils;
import org.apache.ratis.util.SizeInBytes;
-import org.junit.Ignore;
import java.io.OutputStream;
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
index 8a47f5d..1477a94 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestLogAppenderWithGrpc.java
@@ -25,9 +25,9 @@ import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.metrics.GrpcServerMetrics;
import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.FollowerInfo;
-import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
@@ -64,12 +64,12 @@ public class TestLogAppenderWithGrpc
// client and leader setup
try (final RaftClient client = cluster.createClient(cluster.getGroup())) {
client.io().send(new RaftTestUtil.SimpleMessage("m"));
- RaftServerImpl leader = waitForLeader(cluster);
- long initialNextIndex = leader.getState().getNextIndex();
+ final RaftServer.Division leader = waitForLeader(cluster);
+ long initialNextIndex = RaftServerTestUtil.getNextIndex(leader);
- for (RaftServerImpl server : cluster.getFollowers()) {
+ for (RaftServer.Division server : cluster.getFollowers()) {
// block the appends in the follower
- ((SimpleStateMachine4Testing) server.getStateMachine()).blockWriteStateMachineData();
+ SimpleStateMachine4Testing.get(server).blockWriteStateMachineData();
}
Collection<CompletableFuture<RaftClientReply>> futures = new ArrayList<>(maxAppends * 2);
for (int i = 0; i < maxAppends * 2; i++) {
@@ -77,14 +77,14 @@ public class TestLogAppenderWithGrpc
}
FIVE_SECONDS.sleep();
- for (long nextIndex : leader.getFollowerNextIndices()) {
+ for (long nextIndex : RaftServerTestUtil.getFollowerNextIndices(leader)) {
// Verify nextIndex does not progress due to pendingRequests limit
Assert.assertEquals(initialNextIndex + maxAppends, nextIndex);
}
ONE_SECOND.sleep();
- for (RaftServerImpl server : cluster.getFollowers()) {
+ for (RaftServer.Division server : cluster.getFollowers()) {
// unblock the appends in the follower
- ((SimpleStateMachine4Testing) server.getStateMachine()).unblockWriteStateMachineData();
+ SimpleStateMachine4Testing.get(server).unblockWriteStateMachineData();
}
JavaUtils.allOf(futures).join();
@@ -98,7 +98,7 @@ public class TestLogAppenderWithGrpc
}
private void runTestRestartLogAppender(MiniRaftClusterWithGrpc cluster) throws Exception {
- final RaftServerImpl leader = waitForLeader(cluster);
+ final RaftServer.Division leader = waitForLeader(cluster);
int messageCount = 0;
// Send some messages
@@ -126,7 +126,7 @@ public class TestLogAppenderWithGrpc
}
}
- final RaftServerImpl newLeader = waitForLeader(cluster);
+ final RaftServer.Division newLeader = waitForLeader(cluster);
if (leader == newLeader) {
final GrpcServerMetrics newleaderMetrics = new GrpcServerMetrics(leader.getMemberId().toString());
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index da05fe0..ebee514 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -48,9 +48,9 @@ import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.retry.RetryPolicies;
+import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerRpc;
-import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.metrics.RaftServerMetrics;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.impl.ServerImplUtils;
@@ -94,11 +94,11 @@ public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterW
}
void runTestServerRestartOnException(MiniRaftClusterWithGrpc cluster) throws Exception {
- final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
+ final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
final RaftPeerId leaderId = leader.getId();
final RaftProperties p = getProperties();
- GrpcConfigKeys.Server.setPort(p, leader.getServerRpc().getInetSocketAddress().getPort());
+ GrpcConfigKeys.Server.setPort(p, RaftServerTestUtil.getServerRpc(leader).getInetSocketAddress().getPort());
// Create a raft server proxy with server rpc bound to a different address
// compared to leader. This helps in locking the raft storage directory to
@@ -145,7 +145,7 @@ public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterW
}
void runTestLeaderRestart(MiniRaftClusterWithGrpc cluster) throws Exception {
- final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
+ final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
try (final RaftClient client = cluster.createClient()) {
// send a request to make sure leader is ready
@@ -269,8 +269,8 @@ public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterW
void testRaftClientRequestMetrics(MiniRaftClusterWithGrpc cluster) throws IOException,
ExecutionException, InterruptedException {
- final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
- RaftServerMetrics raftServerMetrics = leader.getRaftServerMetrics();
+ final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+ RaftServerMetrics raftServerMetrics = RaftServerTestUtil.getRaftServerMetrics(leader);
try (final RaftClient client = cluster.createClient()) {
final CompletableFuture<RaftClientReply> f1 = client.async().send(new SimpleMessage("testing"));
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
index ef1c76b..6714e1c 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
@@ -112,7 +112,7 @@ public class TestRaftLogMetrics extends BaseTest
assertRaftLogWritePathMetrics(cluster.getLeader());
// For followers, flush can be lagged behind. Attempt multiple times.
- for(RaftServer.Division f : cluster.getFollowerDivisions()) {
+ for(RaftServer.Division f : cluster.getFollowers()) {
JavaUtils.attempt(() -> assertFlushCount(f), 10, HUNDRED_MILLIS, f.getId() + "-assertFlushCount", null);
// We have already waited enough for follower metrics to populate.
assertRaftLogWritePathMetrics(f);