You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/30 12:08:28 UTC
[incubator-ratis] branch master updated: RATIS-1188. Change
MiniRaftCluster.getLeader() to return RaftServer.Division. (#306)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new ebabdbf RATIS-1188. Change MiniRaftCluster.getLeader() to return RaftServer.Division. (#306)
ebabdbf is described below
commit ebabdbf570b50af0d868a5f4e5acf6fe5adc4876
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Nov 30 20:08:20 2020 +0800
RATIS-1188. Change MiniRaftCluster.getLeader() to return RaftServer.Division. (#306)
---
.../examples/arithmetic/TestArithmeticLogDump.java | 6 +--
.../ratis/logservice/server/TestMetaServer.java | 8 +---
.../ratis/InstallSnapshotNotificationTests.java | 23 ++++++-----
.../java/org/apache/ratis/MiniRaftCluster.java | 2 +-
.../org/apache/ratis/OutputStreamBaseTest.java | 26 ++++++-------
.../test/java/org/apache/ratis/RaftAsyncTests.java | 14 +++----
.../java/org/apache/ratis/RetryCacheTests.java | 15 ++++----
.../java/org/apache/ratis/WatchRequestTests.java | 17 ++++----
.../ratis/server/impl/GroupManagementBaseTest.java | 17 ++++----
.../server/impl/RaftReconfigurationBaseTest.java | 28 +++++++-------
.../impl/RaftStateMachineExceptionTests.java | 2 +-
.../server/impl/StateMachineShutdownTests.java | 27 +++++--------
.../statemachine/SimpleStateMachine4Testing.java | 1 -
.../org/apache/ratis/grpc/TestRaftWithGrpc.java | 4 +-
.../ratis/retry/TestExceptionDependentRetry.java | 45 +++++++++++-----------
.../apache/ratis/server/ServerRestartTests.java | 44 ++++++++++-----------
.../ratis/server/raftlog/TestRaftLogMetrics.java | 19 ++++-----
.../ratis/statemachine/TestStateMachine.java | 10 ++---
18 files changed, 144 insertions(+), 164 deletions(-)
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmeticLogDump.java b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmeticLogDump.java
index 604d05d..4b3f8cc 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmeticLogDump.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/arithmetic/TestArithmeticLogDump.java
@@ -28,7 +28,7 @@ import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
import org.apache.ratis.server.storage.RaftStorageDirectory;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
@@ -76,9 +76,9 @@ public class TestArithmeticLogDump extends BaseTest {
@Test
public void testLogDump() throws Exception {
- RaftServerImpl leaderServer = RaftTestUtil.waitForLeader(cluster);
+ final RaftServer.Division leaderServer = RaftTestUtil.waitForLeader(cluster);
List<RaftStorageDirectory.LogPathAndIndex> files =
- leaderServer.getState().getStorage().getStorageDir().getLogSegmentFiles();
+ RaftServerTestUtil.getRaftStorage(leaderServer).getStorageDir().getLogSegmentFiles();
Assert.assertEquals(1, files.size());
cluster.shutdown();
diff --git a/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java b/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
index 62e5e8b..ccd6d19 100644
--- a/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
+++ b/ratis-logservice/src/test/java/org/apache/ratis/logservice/server/TestMetaServer.java
@@ -31,8 +31,6 @@ import org.apache.ratis.logservice.proto.MetaServiceProtos;
import org.apache.ratis.logservice.util.LogServiceCluster;
import org.apache.ratis.logservice.util.TestUtils;
import org.apache.ratis.metrics.JVMMetrics;
-import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.TimeDuration;
import org.junit.AfterClass;
@@ -183,14 +181,12 @@ public class TestMetaServer {
assert (listLogs.stream().filter(log -> log.getLogName().getName().startsWith("testReadWrite")).count() == 1);
List<LogServer> workers = cluster.getWorkers();
for (LogServer worker : workers) {
- RaftServerImpl server = ((RaftServerProxy) worker.getServer())
- .getImpl(listLogs.get(0).getRaftGroup().getGroupId());
+ worker.getServer().getDivision(listLogs.get(0).getRaftGroup().getGroupId());
// TODO: perform all additional checks on state machine level
}
writer.write(testMessage);
for (LogServer worker : workers) {
- RaftServerImpl server = ((RaftServerProxy) worker.getServer())
- .getImpl(listLogs.get(0).getRaftGroup().getGroupId());
+ worker.getServer().getDivision(listLogs.get(0).getRaftGroup().getGroupId());
}
// assert(stream.getSize() > 0); //TODO: Doesn't work
LogReader reader = stream.createReader();
diff --git a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
index c41a284..3771e01 100644
--- a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
@@ -23,8 +23,8 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
@@ -129,10 +129,10 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
}
// wait for the snapshot to be done
- RaftStorageDirectory storageDirectory = cluster.getLeader().getState()
- .getStorage().getStorageDir();
+ final RaftServer.Division leader = cluster.getLeader();
+ final RaftStorageDirectory storageDirectory = RaftServerTestUtil.getRaftStorage(leader).getStorageDir();
- final long nextIndex = cluster.getLeader().getState().getLog().getNextIndex();
+ final long nextIndex = RaftServerTestUtil.getRaftLog(leader).getNextIndex();
LOG.info("nextIndex = {}", nextIndex);
final List<File> snapshotFiles = RaftSnapshotBaseTest.getSnapshotFiles(cluster,
nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
@@ -176,10 +176,9 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
// Check the installed snapshot index on each Follower matches with the
// leader snapshot.
- for (RaftServerImpl follower : cluster.getFollowers()) {
- follower.getState().getStorage().getStorageDir().getStateMachineDir();
+ for (RaftServer.Division follower : cluster.getFollowers()) {
Assert.assertEquals(leaderSnapshotInfo.getIndex(),
- follower.getState().getLatestInstalledSnapshotIndex());
+ RaftServerTestUtil.getLatestInstalledSnapshotIndex(follower));
}
// restart the peer and check if it can correctly handle conf change
@@ -198,7 +197,7 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
private void testRestartFollower(CLUSTER cluster) throws Exception {
leaderSnapshotInfoRef.set(null);
int i = 0;
- final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
+ final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
final RaftPeerId leaderId = leader.getId();
try (final RaftClient client = cluster.createClient(leaderId)) {
@@ -209,7 +208,7 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
}
// wait for the snapshot to be done
- final long oldLeaderNextIndex = leader.getState().getLog().getNextIndex();
+ final long oldLeaderNextIndex = RaftServerTestUtil.getRaftLog(leader).getNextIndex();
{
LOG.info("{}: oldLeaderNextIndex = {}", leaderId, oldLeaderNextIndex);
final List<File> snapshotFiles = RaftSnapshotBaseTest.getSnapshotFiles(cluster,
@@ -230,12 +229,12 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
FIVE_SECONDS.sleep();
cluster.restartServer(followerId, false);
- final RaftServerImpl follower = cluster.getRaftServerImpl(followerId);
+ final RaftServer.Division follower = cluster.getDivision(followerId);
JavaUtils.attempt(() -> {
- final long newLeaderNextIndex = leader.getState().getLog().getNextIndex();
+ final long newLeaderNextIndex = RaftServerTestUtil.getRaftLog(leader).getNextIndex();
LOG.info("{}: newLeaderNextIndex = {}", leaderId, newLeaderNextIndex);
Assert.assertTrue(newLeaderNextIndex > oldLeaderNextIndex);
- Assert.assertEquals(newLeaderNextIndex, follower.getState().getLog().getNextIndex());
+ Assert.assertEquals(newLeaderNextIndex, RaftServerTestUtil.getRaftLog(follower).getNextIndex());
}, 10, ONE_SECOND, "followerNextIndex", LOG);
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index c40d1d2..4d61d65 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -537,7 +537,7 @@ public abstract class MiniRaftCluster implements Closeable {
* @return the unique leader with the highest term. Or, return null if there is no leader.
* @throws IllegalStateException if there are multiple leaders with the same highest term.
*/
- public RaftServerImpl getLeader() {
+ public RaftServer.Division getLeader() {
return getLeader(getLeaders(null), null, leaders -> {
throw newIllegalStateExceptionForMultipleLeaders(null, leaders);
});
diff --git a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
index 99b2659..b68285b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
@@ -20,7 +20,8 @@ package org.apache.ratis;
import org.apache.ratis.client.impl.RaftOutputStream;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.LogEntryProto.LogEntryBodyCase;
-import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.util.SizeInBytes;
@@ -84,7 +85,7 @@ public abstract class OutputStreamBaseTest<CLUSTER extends MiniRaftCluster>
}
// check the leader's raft log
- final RaftLog raftLog = cluster.getLeader().getState().getLog();
+ final RaftLog raftLog = RaftServerTestUtil.getRaftLog(cluster.getLeader());
final AtomicInteger i = new AtomicInteger();
checkLog(raftLog, numRequests, () -> toBytes(i.getAndIncrement()));
}
@@ -120,7 +121,7 @@ public abstract class OutputStreamBaseTest<CLUSTER extends MiniRaftCluster>
private void runTestWriteAndFlush(CLUSTER cluster) throws Exception {
final int bufferSize = ByteValue.BUFFERSIZE;
- RaftServerImpl leader = waitForLeader(cluster);
+ final RaftServer.Division leader = waitForLeader(cluster);
OutputStream out = newOutputStream(cluster, bufferSize);
int[] lengths = new int[]{1, 500, 1023, 1024, 1025, 2048, 3000, 3072};
@@ -149,19 +150,19 @@ public abstract class OutputStreamBaseTest<CLUSTER extends MiniRaftCluster>
LOG.info("Start to check leader's log");
final AtomicInteger index = new AtomicInteger(0);
- checkLog(leader.getState().getLog(), expectedTxs.size(),
+ checkLog(RaftServerTestUtil.getRaftLog(leader), expectedTxs.size(),
() -> expectedTxs.get(index.getAndIncrement()));
}
- private RaftLog assertRaftLog(int expectedEntries, RaftServerImpl server) throws Exception {
- final RaftLog raftLog = server.getState().getLog();
+ private RaftLog assertRaftLog(int expectedEntries, RaftServer.Division server) throws Exception {
+ final RaftLog raftLog = RaftServerTestUtil.getRaftLog(server);
final EnumMap<LogEntryBodyCase, AtomicLong> counts = RaftTestUtil.countEntries(raftLog);
Assert.assertEquals(expectedEntries, counts.get(LogEntryBodyCase.STATEMACHINELOGENTRY).get());
final LogEntryProto last = RaftTestUtil.getLastEntry(LogEntryBodyCase.STATEMACHINELOGENTRY, raftLog);
Assert.assertNotNull(last);
Assert.assertTrue(raftLog.getLastCommittedIndex() >= last.getIndex());
- Assert.assertTrue(server.getState().getLastAppliedIndex() >= last.getIndex());
+ Assert.assertTrue(RaftServerTestUtil.getLastAppliedIndex(server) >= last.getIndex());
return raftLog;
}
@@ -208,7 +209,7 @@ public abstract class OutputStreamBaseTest<CLUSTER extends MiniRaftCluster>
private void runTestWriteWithOffset(CLUSTER cluster) throws Exception {
final int bufferSize = ByteValue.BUFFERSIZE;
- RaftServerImpl leader = waitForLeader(cluster);
+ final RaftServer.Division leader = waitForLeader(cluster);
final OutputStream out = newOutputStream(cluster, bufferSize);
@@ -269,7 +270,7 @@ public abstract class OutputStreamBaseTest<CLUSTER extends MiniRaftCluster>
private void runTestKillLeader(CLUSTER cluster) throws Exception {
final int bufferSize = 4;
- final RaftServerImpl leader = waitForLeader(cluster);
+ final RaftServer.Division leader = waitForLeader(cluster);
final AtomicBoolean running = new AtomicBoolean(true);
final AtomicReference<Boolean> success = new AtomicReference<>();
@@ -297,7 +298,7 @@ public abstract class OutputStreamBaseTest<CLUSTER extends MiniRaftCluster>
// force change the leader
Thread.sleep(500);
RaftTestUtil.waitAndKillLeader(cluster);
- final RaftServerImpl newLeader = waitForLeader(cluster);
+ final RaftServer.Division newLeader = waitForLeader(cluster);
Assert.assertNotEquals(leader.getId(), newLeader.getId());
Thread.sleep(500);
@@ -309,8 +310,7 @@ public abstract class OutputStreamBaseTest<CLUSTER extends MiniRaftCluster>
// leaders. It may be larger than result+2 because the client may resend
// requests and we do not have retry cache on servers yet.
LOG.info("last applied index: {}. total number of requests: {}",
- newLeader.getState().getLastAppliedIndex(), result.get());
- Assert.assertTrue(
- newLeader.getState().getLastAppliedIndex() >= result.get() + 1);
+ RaftServerTestUtil.getLastAppliedIndex(newLeader), result.get());
+ Assert.assertTrue(RaftServerTestUtil.getLastAppliedIndex(newLeader) >= result.get() + 1);
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index fd37db6..342dff9 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -38,7 +38,7 @@ import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
-import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
@@ -418,8 +418,8 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
LOG.info("Running testCheckLeadershipFailure");
waitForLeader(cluster);
- RaftServerImpl prevLeader = cluster.getLeader();
- long termOfPrevLeader = prevLeader.getState().getCurrentTerm();
+ final RaftServer.Division prevLeader = cluster.getLeader();
+ final long termOfPrevLeader = RaftServerTestUtil.getCurrentTerm(prevLeader);
LOG.info("Previous Leader is elected on term {}", termOfPrevLeader);
try (final RaftClient client = cluster.createClient()) {
@@ -447,8 +447,8 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
}
waitForLeader(cluster);
- RaftServerImpl currLeader = cluster.getLeader();
- long termOfCurrLeader = currLeader.getState().getCurrentTerm();
+ final RaftServer.Division currLeader = cluster.getLeader();
+ final long termOfCurrLeader = RaftServerTestUtil.getCurrentTerm(currLeader);
LOG.info("Current Leader is elected on term {}", termOfCurrLeader);
// leader on termOfPrevLeader should step-down.
@@ -463,8 +463,8 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
}
private void runTestNoRetryWaitOnNotLeaderException(MiniRaftCluster cluster) throws Exception {
- final RaftServerImpl leader = waitForLeader(cluster);
- final List<RaftServerImpl> followers = cluster.getFollowers();
+ final RaftServer.Division leader = waitForLeader(cluster);
+ final List<RaftServer.Division> followers = cluster.getFollowerDivisions();
Assert.assertNotNull(followers);
Assert.assertEquals(2, followers.size());
Assert.assertNotSame(leader, followers.get(0));
diff --git a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
index 8185ba7..b85e180 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
@@ -28,7 +28,6 @@ import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogIOException;
@@ -64,7 +63,7 @@ public abstract class RetryCacheTests<CLUSTER extends MiniRaftCluster>
void runTestBasicRetry(CLUSTER cluster) throws Exception {
RaftTestUtil.waitForLeader(cluster);
final RaftPeerId leaderId = cluster.getLeaderAndSendFirstMessage(false).getId();
- long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex();
+ final long oldLastApplied = RaftServerTestUtil.getLastAppliedIndex(cluster.getLeader());
try (final RaftClient client = cluster.createClient(leaderId)) {
final RaftClientRpc rpc = client.getClientRpc();
@@ -90,17 +89,17 @@ public abstract class RetryCacheTests<CLUSTER extends MiniRaftCluster>
}
public void assertServer(MiniRaftCluster cluster, ClientId clientId, long callId, long oldLastApplied) throws Exception {
- long leaderApplied = cluster.getLeader().getState().getLastAppliedIndex();
+ final long leaderApplied = RaftServerTestUtil.getLastAppliedIndex(cluster.getLeader());
// make sure retry cache has the entry
- for (RaftServerImpl server : cluster.iterateServerImpls()) {
+ for (RaftServer.Division server : cluster.iterateDivisions()) {
LOG.info("check server " + server.getId());
- if (server.getState().getLastAppliedIndex() < leaderApplied) {
+ if (RaftServerTestUtil.getLastAppliedIndex(server) < leaderApplied) {
Thread.sleep(1000);
}
Assert.assertEquals(2, RaftServerTestUtil.getRetryCacheSize(server));
Assert.assertNotNull(RaftServerTestUtil.getRetryEntry(server, clientId, callId));
// make sure there is only one log entry committed
- Assert.assertEquals(1, count(server.getState().getLog(), oldLastApplied + 1));
+ Assert.assertEquals(1, count(RaftServerTestUtil.getRaftLog(server), oldLastApplied + 1));
}
}
@@ -133,7 +132,7 @@ public abstract class RetryCacheTests<CLUSTER extends MiniRaftCluster>
RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId,
callId, new SimpleMessage("message"));
assertReply(rpc.sendRequest(r), client, callId);
- long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex();
+ final long oldLastApplied = RaftServerTestUtil.getLastAppliedIndex(cluster.getLeader());
// trigger the reconfiguration, make sure the original leader is kicked out
PeerChanges change = cluster.addNewPeers(2, true);
@@ -163,7 +162,7 @@ public abstract class RetryCacheTests<CLUSTER extends MiniRaftCluster>
}
// check the new leader and make sure the retry did not get committed
- Assert.assertEquals(0, count(cluster.getLeader().getState().getLog(), oldLastApplied + 1));
+ Assert.assertEquals(0, count(RaftServerTestUtil.getRaftLog(cluster.getLeader()), oldLastApplied + 1));
}
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
index 1928915..d6b23a4 100644
--- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
@@ -32,7 +32,6 @@ import org.apache.ratis.retry.RetryPolicies;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
@@ -218,13 +217,13 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
final int numMessages = p.numMessages;
// blockStartTransaction of the leader so that no transaction can be committed MAJORITY
- final RaftServerImpl leader = cluster.getLeader();
+ final RaftServer.Division leader = cluster.getLeader();
LOG.info("block leader {}", leader.getId());
SimpleStateMachine4Testing.get(leader).blockStartTransaction();
// blockFlushStateMachineData a follower so that no transaction can be ALL_COMMITTED
- final List<RaftServerImpl> followers = cluster.getFollowers();
- final RaftServerImpl blockedFollower = followers.get(ThreadLocalRandom.current().nextInt(followers.size()));
+ final List<RaftServer.Division> followers = cluster.getFollowerDivisions();
+ final RaftServer.Division blockedFollower = followers.get(ThreadLocalRandom.current().nextInt(followers.size()));
LOG.info("block follower {}", blockedFollower.getId());
SimpleStateMachine4Testing.get(blockedFollower).blockFlushStateMachineData();
@@ -340,8 +339,8 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
final int numMessages = p.numMessages;
// blockFlushStateMachineData a follower so that no transaction can be ALL_COMMITTED
- final List<RaftServerImpl> followers = cluster.getFollowers();
- final RaftServerImpl blockedFollower = followers.get(ThreadLocalRandom.current().nextInt(followers.size()));
+ final List<RaftServer.Division> followers = cluster.getFollowerDivisions();
+ final RaftServer.Division blockedFollower = followers.get(ThreadLocalRandom.current().nextInt(followers.size()));
LOG.info("block follower {}", blockedFollower.getId());
SimpleStateMachine4Testing.get(blockedFollower).blockFlushStateMachineData();
@@ -392,13 +391,13 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
final TimeDuration watchTimeoutDenomination = RaftServerConfigKeys.Watch.timeoutDenomination(properties);
// blockStartTransaction of the leader so that no transaction can be committed MAJORITY
- final RaftServerImpl leader = cluster.getLeader();
+ final RaftServer.Division leader = cluster.getLeader();
LOG.info("block leader {}", leader.getId());
SimpleStateMachine4Testing.get(leader).blockStartTransaction();
// blockFlushStateMachineData a follower so that no transaction can be ALL_COMMITTED
- final List<RaftServerImpl> followers = cluster.getFollowers();
- final RaftServerImpl blockedFollower = followers.get(ThreadLocalRandom.current().nextInt(followers.size()));
+ final List<RaftServer.Division> followers = cluster.getFollowerDivisions();
+ final RaftServer.Division blockedFollower = followers.get(ThreadLocalRandom.current().nextInt(followers.size()));
LOG.info("block follower {}", blockedFollower.getId());
SimpleStateMachine4Testing.get(blockedFollower).blockFlushStateMachineData();
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
index 4dd543a..124801b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/GroupManagementBaseTest.java
@@ -120,7 +120,7 @@ public abstract class GroupManagementBaseTest extends BaseTest {
}
JavaUtils.attempt(() -> {
- RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster, newGroup.getGroupId());
+ final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster, newGroup.getGroupId());
Assert.assertTrue(leader.getId() == peers.get(suggestedLeaderIndex).getId());
}, 10, TimeDuration.valueOf(1, TimeUnit.SECONDS), "testMultiGroupWithPriority", LOG);
@@ -134,7 +134,7 @@ public abstract class GroupManagementBaseTest extends BaseTest {
cluster.setBlockRequestsFrom(suggestedLeader, true);
JavaUtils.attempt(() -> {
- RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster, newGroup.getGroupId());
+ final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster, newGroup.getGroupId());
Assert.assertTrue(leader.getId() != peers.get(suggestedLeaderIndex).getId());
}, 10, TimeDuration.valueOf(1, TimeUnit.SECONDS), "testMultiGroupWithPriority", LOG);
@@ -154,13 +154,13 @@ public abstract class GroupManagementBaseTest extends BaseTest {
// suggested leader with highest priority rejoin cluster, then current leader will yield
// leadership to suggested leader when suggested leader catch up the log.
JavaUtils.attempt(() -> {
- RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster, newGroup.getGroupId());
+ final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster, newGroup.getGroupId());
Assert.assertTrue(leader.getId() == peers.get(suggestedLeaderIndex).getId());
}, 10, TimeDuration.valueOf(1, TimeUnit.SECONDS), "testMultiGroupWithPriority", LOG);
cluster.killServer(peers.get(suggestedLeaderIndex).getId());
JavaUtils.attempt(() -> {
- RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster, newGroup.getGroupId());
+ final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster, newGroup.getGroupId());
Assert.assertTrue(leader.getId() != peers.get(suggestedLeaderIndex).getId());
}, 10, TimeDuration.valueOf(1, TimeUnit.SECONDS), "testMultiGroupWithPriority", LOG);
@@ -347,7 +347,7 @@ public abstract class GroupManagementBaseTest extends BaseTest {
final RaftPeerId peerId = peer.getId();
final RaftGroup group = RaftGroup.valueOf(cluster.getGroupId(), peer);
try (final RaftClient client = cluster.createClient()) {
- Assert.assertEquals(group, cluster.getRaftServerImpl(peerId).getGroup());
+ Assert.assertEquals(group, cluster.getDivision(peerId).getGroup());
try {
client.getGroupManagementApi(peer.getId()).add(group);
} catch (IOException ex) {
@@ -355,7 +355,7 @@ public abstract class GroupManagementBaseTest extends BaseTest {
// the exception is instance of AlreadyExistsException
Assert.assertTrue(ex.toString().contains(AlreadyExistsException.class.getCanonicalName()));
}
- Assert.assertEquals(group, cluster.getRaftServerImpl(peerId).getGroup());
+ Assert.assertEquals(group, cluster.getDivision(peerId).getGroup());
cluster.shutdown();
}
}
@@ -372,8 +372,7 @@ public abstract class GroupManagementBaseTest extends BaseTest {
final RaftGroup group1 = RaftGroup.valueOf(cluster1.getGroupId(), peer1);
final RaftGroup group2 = RaftGroup.valueOf(cluster2.getGroupId(), peer1);
try (final RaftClient client = cluster1.createClient()) {
- Assert.assertEquals(group1,
- cluster1.getRaftServerImpl(peerId1).getGroup());
+ Assert.assertEquals(group1, cluster1.getDivision(peerId1).getGroup());
try {
// Group2 is added to one of the peers in Group1
@@ -421,7 +420,7 @@ public abstract class GroupManagementBaseTest extends BaseTest {
final RaftGroup group2 = RaftGroup.valueOf(cluster2.getGroupId(), peer1);
try (final RaftClient client = cluster1.createClient()) {
Assert.assertEquals(group1,
- cluster1.getRaftServerImpl(peerId1).getGroup());
+ cluster1.getDivision(peerId1).getGroup());
try {
// Group2 is added again to one of the peers in Group1
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index fdf8971..cba7000 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -80,8 +80,8 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
for (int i = 0; i < peersWithPriority.size(); i ++) {
RaftPeerId peerId = peersWithPriority.get(i).getId();
- RaftServerImpl server = cluster.getRaftServerImpl(peerId, groupId);
- RaftConfiguration conf = server.getState().getRaftConf();
+ final RaftServer.Division server = cluster.getDivision(peerId, groupId);
+ final RaftConfiguration conf = RaftServerTestUtil.getRaftConf(server);
for (int j = 0; j < peersWithPriority.size(); j ++) {
int priorityInConf = conf.getPeer(peersWithPriority.get(j).getId()).getPriority();
@@ -266,7 +266,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
LOG.info("Start changing the configuration: {}",
asList(c1.allPeersInNewConf));
- Assert.assertFalse(cluster.getLeader().getRaftConf().isTransitional());
+ Assert.assertFalse(RaftServerTestUtil.getRaftConf(cluster.getLeader()).isTransitional());
final RaftClientRpc sender = client.getClientRpc();
final SetConfigurationRequest request = cluster.newSetConfigurationRequest(
@@ -359,11 +359,11 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
LOG.info(cluster.printServers());
assertSuccess(success);
- final RaftLog leaderLog = cluster.getLeader().getState().getLog();
+ final RaftLog leaderLog = RaftServerTestUtil.getRaftLog(cluster.getLeader());
for (RaftPeer newPeer : c1.newPeers) {
+ final RaftServer.Division d = cluster.getDivision(newPeer.getId());
Assert.assertArrayEquals(leaderLog.getEntries(0, Long.MAX_VALUE),
- cluster.getRaftServerImpl(newPeer.getId()).getState().getLog()
- .getEntries(0, Long.MAX_VALUE));
+ RaftServerTestUtil.getRaftLog(d).getEntries(0, Long.MAX_VALUE));
}
}
}
@@ -410,11 +410,11 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
TimeUnit.SECONDS.sleep(1);
// the leader cannot generate the (old, new) conf, and it will keep
// bootstrapping the 2 new peers since they have not started yet
- Assert.assertFalse(cluster.getLeader().getRaftConf().isTransitional());
+ Assert.assertFalse(RaftServerTestUtil.getRaftConf(cluster.getLeader()).isTransitional());
// only (0) the first conf entry, (1) the 1st setConf entry and (2) a metadata entry
{
- final RaftLog leaderLog = cluster.getLeader().getState().getLog();
+ final RaftLog leaderLog = RaftServerTestUtil.getRaftLog(cluster.getLeader());
for(LogEntryProto e : RaftTestUtil.getLogEntryProtos(leaderLog)) {
LOG.info("{}", ServerProtoUtils.toLogEntryString(e));
}
@@ -465,13 +465,13 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
}
void runTestNoChangeRequest(CLUSTER cluster) throws Exception {
- final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
+ final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
try(final RaftClient client = cluster.createClient(leader.getId())) {
client.io().send(new SimpleMessage("m"));
- final RaftLog leaderLog = leader.getState().getLog();
+ final RaftLog leaderLog = RaftServerTestUtil.getRaftLog(leader);
final long committedIndex = leaderLog.getLastCommittedIndex();
- final RaftConfiguration confBefore = cluster.getLeader().getRaftConf();
+ final RaftConfiguration confBefore = RaftServerTestUtil.getRaftConf(cluster.getLeader());
// no real configuration change in the request
final RaftClientReply reply = client.setConfiguration(cluster.getPeers().toArray(RaftPeer.emptyArray()));
@@ -481,7 +481,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
final LogEntryProto e = leaderLog.get(i);
Assert.assertTrue(e.hasMetadataEntry());
}
- Assert.assertSame(confBefore, cluster.getLeader().getRaftConf());
+ Assert.assertSame(confBefore, RaftServerTestUtil.getRaftConf(cluster.getLeader()));
client.close();
}
}
@@ -573,10 +573,10 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
try {
RaftTestUtil.waitForLeader(cluster);
- final RaftServerImpl leader = cluster.getLeader();
+ final RaftServer.Division leader = cluster.getLeader();
final RaftPeerId leaderId = leader.getId();
- final RaftLog log = leader.getState().getLog();
+ final RaftLog log = RaftServerTestUtil.getRaftLog(leader);
log2 = log;
Thread.sleep(1000);
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
index dc6171e..ae9c5f6 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
@@ -106,7 +106,7 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
cluster.getLeaderAndSendFirstMessage(true);
- long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex();
+ final long oldLastApplied = RaftServerTestUtil.getLastAppliedIndex(cluster.getLeader());
try (final RaftClient client = cluster.createClient(leaderId)) {
final RaftClientRpc rpc = client.getClientRpc();
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
index 758cd88..6008f2e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -25,6 +25,7 @@ import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.*;
+import org.apache.ratis.server.RaftServer;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.TransactionContext;
@@ -41,12 +42,11 @@ public abstract class StateMachineShutdownTests<CLUSTER extends MiniRaftCluster>
protected static class StateMachineWithConditionalWait extends
SimpleStateMachine4Testing {
- Long objectToWait = new Long(0);
+ private final Long objectToWait = 0L;
volatile boolean blockOnApply = true;
@Override
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
- CompletableFuture<Message> future = new CompletableFuture<Message>();
if (blockOnApply) {
synchronized (objectToWait) {
try {
@@ -59,8 +59,7 @@ public abstract class StateMachineShutdownTests<CLUSTER extends MiniRaftCluster>
}
RaftProtos.LogEntryProto entry = trx.getLogEntry();
updateLastAppliedTermIndex(entry.getTerm(), entry.getIndex());
- future.complete(new RaftTestUtil.SimpleMessage("done"));
- return future;
+ return CompletableFuture.completedFuture(new RaftTestUtil.SimpleMessage("done"));
}
public void unBlockApplyTxn() {
@@ -79,7 +78,7 @@ public abstract class StateMachineShutdownTests<CLUSTER extends MiniRaftCluster>
final MiniRaftCluster cluster = newCluster(3);
cluster.start();
RaftTestUtil.waitForLeader(cluster);
- RaftServerImpl leader = cluster.getLeader();
+ final RaftServer.Division leader = cluster.getLeader();
RaftPeerId leaderId = leader.getId();
//Unblock leader and one follower
@@ -101,20 +100,16 @@ public abstract class StateMachineShutdownTests<CLUSTER extends MiniRaftCluster>
logIndex, RaftProtos.ReplicationLevel.ALL_COMMITTED);
watchReply.getCommitInfos().forEach(
val -> Assert.assertTrue(val.getCommitIndex() >= logIndex));
- RaftServerImpl secondFollower = cluster.getFollowers().get(1);
+ final RaftServer.Division secondFollower = cluster.getFollowers().get(1);
// Second follower is blocked in apply transaction
- Assert.assertTrue(
- secondFollower.getState().getLastAppliedIndex()
- < logIndex);
+ Assert.assertTrue(RaftServerTestUtil.getLastAppliedIndex(secondFollower) < logIndex);
// Now shutdown the follower in a separate thread
- Thread t = new Thread(() -> secondFollower.groupRemove(true, false));
+ final Thread t = new Thread(() -> RaftServerTestUtil.shutdown(secondFollower));
t.start();
// The second follower should still be blocked in apply transaction
- Assert.assertTrue(
- secondFollower.getState().getLastAppliedIndex()
- < logIndex);
+ Assert.assertTrue(RaftServerTestUtil.getLastAppliedIndex(secondFollower) < logIndex);
// Now unblock the second follower
((StateMachineWithConditionalWait) secondFollower.getStateMachine())
@@ -122,9 +117,7 @@ public abstract class StateMachineShutdownTests<CLUSTER extends MiniRaftCluster>
// Now wait for the thread
t.join(5000);
- Assert.assertEquals(
- secondFollower.getState().getLastAppliedIndex(),
- logIndex);
+ Assert.assertEquals(logIndex, RaftServerTestUtil.getLastAppliedIndex(secondFollower));
cluster.shutdown();
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index d8e7c0b..e25c9d2 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -30,7 +30,6 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index 915221b..621b6ca 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -83,14 +83,14 @@ public class TestRaftWithGrpc
replyFuture = client.async().send(new RaftTestUtil.SimpleMessage("abc"));
TimeDuration.valueOf(5 , TimeUnit.SECONDS).sleep();
// replyFuture should not be completed until append request is unblocked.
- Assert.assertTrue(!replyFuture.isDone());
+ Assert.assertFalse(replyFuture.isDone());
// unblock append request.
cluster.getServerAliveStream()
.filter(impl -> !impl.isLeader())
.map(SimpleStateMachine4Testing::get)
.forEach(SimpleStateMachine4Testing::unblockWriteStateMachineData);
- final RaftLog leaderLog = cluster.getLeader().getState().getLog();
+ final RaftLog leaderLog = RaftServerTestUtil.getRaftLog(cluster.getLeader());
// The entries have been appended in the followers
// although the append entry timed out at the leader
cluster.getServerAliveStream().filter(impl -> !impl.isLeader()).forEach(raftServer ->
diff --git a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
index 66fbd71..00b1e7c 100644
--- a/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
+++ b/ratis-test/src/test/java/org/apache/ratis/retry/TestExceptionDependentRetry.java
@@ -18,6 +18,7 @@
package org.apache.ratis.retry;
+import org.apache.ratis.BaseTest;
import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.retry.ClientRetryEvent;
@@ -27,8 +28,8 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.MiniRaftClusterWithGrpc;
import org.apache.ratis.protocol.exceptions.RaftRetryFailureException;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
+import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.TimeDuration;
@@ -45,7 +46,7 @@ import static org.junit.Assert.fail;
/**
* Class to test {@link ExceptionDependentRetry}.
*/
-public class TestExceptionDependentRetry implements MiniRaftClusterWithGrpc.FactoryGet {
+public class TestExceptionDependentRetry extends BaseTest implements MiniRaftClusterWithGrpc.FactoryGet {
@Test
public void testExceptionDependentRetrySuccess() {
@@ -166,38 +167,38 @@ public class TestExceptionDependentRetry implements MiniRaftClusterWithGrpc.Fact
}
@Test
- public void testExceptionRetryAttempts()
- throws InterruptedException, IOException {
- RaftProperties prop = new RaftProperties();
+ public void testExceptionRetryAttempts() throws Exception {
+ final RaftProperties prop = getProperties();
RaftClientConfigKeys.Rpc.setRequestTimeout(prop, TimeDuration.valueOf(100, TimeUnit.MILLISECONDS));
prop.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
SimpleStateMachine4Testing.class, StateMachine.class);
RaftServerConfigKeys.Write.setElementLimit(prop, 1);
- MiniRaftClusterWithGrpc cluster = getFactory().newCluster(1, prop);
- RaftServerImpl leader = null;
- try {
- cluster.start();
- ExceptionDependentRetry.Builder builder =
- ExceptionDependentRetry.newBuilder();
- builder.setExceptionToPolicy(TimeoutIOException.class,
- MultipleLinearRandomRetry.parseCommaSeparated("1ms, 5"));
- builder.setDefaultPolicy(RetryPolicies.retryForeverNoSleep());
- // create a client with the exception dependent policy
- try (final RaftClient client = cluster.createClient(builder.build())) {
- client.async().send(new RaftTestUtil.SimpleMessage("1")).get();
+ runWithNewCluster(1, this::runTestExceptionRetryAttempts);
+ }
+
+ void runTestExceptionRetryAttempts(MiniRaftClusterWithGrpc cluster) throws Exception {
+ final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
+ final ExceptionDependentRetry policy = ExceptionDependentRetry.newBuilder()
+ .setExceptionToPolicy(TimeoutIOException.class, MultipleLinearRandomRetry.parseCommaSeparated("1ms, 5"))
+ .setDefaultPolicy(RetryPolicies.retryForeverNoSleep())
+ .build();
+
+ // create a client with the exception dependent policy
+ try (final RaftClient client = cluster.createClient(policy)) {
+ client.async().send(new RaftTestUtil.SimpleMessage("1")).get();
+ }
- leader = cluster.getLeader();
- ((SimpleStateMachine4Testing) leader.getStateMachine()).blockWriteStateMachineData();
+ try (final RaftClient client = cluster.createClient(policy)) {
+ SimpleStateMachine4Testing.get(leader).blockWriteStateMachineData();
- client.async().send(new RaftTestUtil.SimpleMessage("2")).get();
- }
+ client.async().send(new RaftTestUtil.SimpleMessage("2")).get();
Assert.fail("Test should have failed.");
} catch (ExecutionException e) {
RaftRetryFailureException rrfe = (RaftRetryFailureException) e.getCause();
Assert.assertEquals(16, rrfe.getAttemptCount());
} finally {
- ((SimpleStateMachine4Testing)leader.getStateMachine()).unblockWriteStateMachineData();
+ SimpleStateMachine4Testing.get(leader).unblockWriteStateMachineData();
cluster.shutdown();
}
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
index e1ba5b3..85ab3e7 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -30,10 +30,8 @@ import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.StateMachineException;
-import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.impl.ServerProtoUtils;
-import org.apache.ratis.server.impl.ServerState;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.RaftLogIOException;
@@ -111,20 +109,20 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
final long leaderLastIndex = RaftServerTestUtil.getRaftLog(cluster.getLeader()).getLastEntryTermIndex().getIndex();
// make sure the restarted follower can catchup
- final ServerState followerState = cluster.getRaftServerImpl(followerId).getState();
+ final RaftServer.Division followerState = cluster.getDivision(followerId);
JavaUtils.attemptRepeatedly(() -> {
- Assert.assertTrue(followerState.getLastAppliedIndex() >= leaderLastIndex);
+ Assert.assertTrue(RaftServerTestUtil.getLastAppliedIndex(followerState) >= leaderLastIndex);
return null;
}, 10, ONE_SECOND, "follower catchup", LOG);
// make sure the restarted peer's log segments is correct
- final RaftServerImpl follower = cluster.restartServer(followerId, false);
- final RaftLog followerLog = follower.getState().getLog();
+ final RaftServer.Division follower = cluster.restartServer(followerId, false);
+ final RaftLog followerLog = RaftServerTestUtil.getRaftLog(follower);
final long followerLastIndex = followerLog.getLastEntryTermIndex().getIndex();
Assert.assertTrue(followerLastIndex >= leaderLastIndex);
final File followerOpenLogFile = getOpenLogFile(follower);
- final File leaderOpenLogFile = getOpenLogFile(cluster.getRaftServerImpl(leaderId));
+ final File leaderOpenLogFile = getOpenLogFile(cluster.getDivision(leaderId));
// shutdown all servers
for(RaftServer s : cluster.getServers()) {
@@ -165,20 +163,20 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
static void assertTruncatedLog(RaftPeerId id, File openLogFile, long lastIndex, MiniRaftCluster cluster) throws Exception {
// truncate log
FileUtils.truncateFile(openLogFile, openLogFile.length() - 1);
- final RaftServerImpl server = cluster.restartServer(id, false);
+ final RaftServer.Division server = cluster.restartServer(id, false);
// the last index should be one less than before
- Assert.assertEquals(lastIndex - 1, server.getState().getLog().getLastEntryTermIndex().getIndex());
+ Assert.assertEquals(lastIndex - 1, RaftServerTestUtil.getRaftLog(server).getLastEntryTermIndex().getIndex());
server.getRaftServer().close();
}
- static List<Path> getOpenLogFiles(RaftServerImpl server) throws Exception {
- return server.getState().getStorage().getStorageDir().getLogSegmentFiles().stream()
+ static List<Path> getOpenLogFiles(RaftServer.Division server) throws Exception {
+ return RaftServerTestUtil.getRaftStorage(server).getStorageDir().getLogSegmentFiles().stream()
.filter(LogPathAndIndex::isOpen)
.map(LogPathAndIndex::getPath)
.collect(Collectors.toList());
}
- static File getOpenLogFile(RaftServerImpl server) throws Exception {
+ static File getOpenLogFile(RaftServer.Division server) throws Exception {
final List<Path> openLogs = getOpenLogFiles(server);
Assert.assertEquals(1, openLogs.size());
return openLogs.get(0).toFile();
@@ -191,7 +189,7 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
void runTestRestartWithCorruptedLogHeader(MiniRaftCluster cluster) throws Exception {
RaftTestUtil.waitForLeader(cluster);
- for(RaftServerImpl impl : cluster.iterateServerImpls()) {
+ for(RaftServer.Division impl : cluster.iterateDivisions()) {
JavaUtils.attemptRepeatedly(() -> getOpenLogFile(impl), 10, TimeDuration.valueOf(100, TimeUnit.MILLISECONDS),
impl.getId() + ": wait for log file creation", LOG);
}
@@ -201,7 +199,7 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
s.close();
}
- for(RaftServerImpl impl : cluster.iterateServerImpls()) {
+ for(RaftServer.Division impl : cluster.iterateDivisions()) {
final File openLogFile = JavaUtils.attemptRepeatedly(() -> getOpenLogFile(impl),
10, HUNDRED_MILLIS, impl.getId() + "-getOpenLogFile", LOG);
for(int i = 0; i < SegmentedRaftLogFormat.getHeaderLength(); i++) {
@@ -224,7 +222,7 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
return null;
});
}
- final RaftServerImpl server = cluster.restartServer(id, false);
+ final RaftServer.Division server = cluster.restartServer(id, false);
server.getRaftServer().close();
}
@@ -277,10 +275,10 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
LOG.info("{}: leader lastAppliedTermIndex = {}", leaderId, lastAppliedTermIndex);
// check follower logs
- for(RaftServerImpl s : cluster.iterateServerImpls()) {
+ for(RaftServer.Division s : cluster.iterateDivisions()) {
if (!s.getId().equals(leaderId)) {
ids.add(s.getId());
- RaftTestUtil.assertSameLog(leaderLog, s.getState().getLog());
+ RaftTestUtil.assertSameLog(leaderLog, RaftServerTestUtil.getRaftLog(s));
}
}
@@ -294,18 +292,18 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
// Restart and kill servers one by one so that they won't talk to each other.
for(RaftPeerId id : ids) {
cluster.restartServer(id, false);
- final RaftServerImpl server = cluster.getRaftServerImpl(id);
- final RaftLog raftLog = server.getState().getLog();
+ final RaftServer.Division server = cluster.getDivision(id);
+ final RaftLog raftLog = RaftServerTestUtil.getRaftLog(server);
JavaUtils.attemptRepeatedly(() -> {
Assert.assertTrue(raftLog.getLastCommittedIndex() >= loggedCommitIndex);
return null;
}, 10, HUNDRED_MILLIS, id + "(commitIndex >= loggedCommitIndex)", LOG);
JavaUtils.attemptRepeatedly(() -> {
- Assert.assertTrue(server.getState().getLastAppliedIndex() >= loggedCommitIndex);
+ Assert.assertTrue(RaftServerTestUtil.getLastAppliedIndex(server) >= loggedCommitIndex);
return null;
}, 10, HUNDRED_MILLIS, id + "(lastAppliedIndex >= loggedCommitIndex)", LOG);
LOG.info("{}: commitIndex={}, lastAppliedIndex={}",
- id, raftLog.getLastCommittedIndex(), server.getState().getLastAppliedIndex());
+ id, raftLog.getLastCommittedIndex(), RaftServerTestUtil.getLastAppliedIndex(server));
cluster.killServer(id);
}
}
@@ -352,7 +350,7 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
private void runTestRestartWithCorruptedLogEntry(CLUSTER cluster) throws Exception {
// this is the only server
- final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
+ final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
final RaftPeerId id = leader.getId();
// send a few messages
@@ -367,7 +365,7 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
Assert.assertTrue(client.io().sendReadOnly(lastMessage).isSuccess());
}
- final RaftLog log = leader.getState().getLog();
+ final RaftLog log = RaftServerTestUtil.getRaftLog(leader);
final long size = TestSegmentedRaftLog.getOpenSegmentSize(log);
leader.getRaftServer().close();
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
index dfb8af6..ef1c76b 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
@@ -40,7 +40,8 @@ import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.metrics.JVMMetrics;
import org.apache.ratis.metrics.RatisMetricRegistry;
-import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.metrics.RaftLogMetrics;
import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
import org.apache.ratis.server.storage.RaftStorageTestUtils;
@@ -71,7 +72,7 @@ public class TestRaftLogMetrics extends BaseTest
}
static class MetricsStateMachine extends BaseStateMachine {
- static MetricsStateMachine get(RaftServerImpl s) {
+ static MetricsStateMachine get(RaftServer.Division s) {
return (MetricsStateMachine)s.getStateMachine();
}
@@ -111,7 +112,7 @@ public class TestRaftLogMetrics extends BaseTest
assertRaftLogWritePathMetrics(cluster.getLeader());
// For followers, flush can be lagged behind. Attempt multiple times.
- for(RaftServerImpl f : cluster.getFollowers()) {
+ for(RaftServer.Division f : cluster.getFollowerDivisions()) {
JavaUtils.attempt(() -> assertFlushCount(f), 10, HUNDRED_MILLIS, f.getId() + "-assertFlushCount", null);
// We have already waited enough for follower metrics to populate.
assertRaftLogWritePathMetrics(f);
@@ -121,13 +122,13 @@ public class TestRaftLogMetrics extends BaseTest
JavaUtils.attempt(() -> assertCommitCount(cluster.getLeader(), numMsg), 10, HUNDRED_MILLIS, cluster.getLeader().getId() + "-assertCommitCount", null);
}
- static void assertCommitCount(RaftServerImpl server, int expectedMsgs) throws Exception {
- RatisMetricRegistry rlm = server.getState().getLog().getRaftLogMetrics().getRegistry();
+ static void assertCommitCount(RaftServer.Division server, int expectedMsgs) {
+ final RatisMetricRegistry rlm = RaftServerTestUtil.getRaftLog(server).getRaftLogMetrics().getRegistry();
long stmCount = rlm.counter(STATE_MACHINE_LOG_ENTRY_COUNT).getCount();
- Assert.assertTrue(stmCount == expectedMsgs);
+ Assert.assertEquals(expectedMsgs, stmCount);
}
- static void assertFlushCount(RaftServerImpl server) throws Exception {
+ static void assertFlushCount(RaftServer.Division server) throws Exception {
final String flushTimeMetric = RaftStorageTestUtils.getLogFlushTimeMetric(server.getMemberId().toString());
RatisMetricRegistry ratisMetricRegistry = new RaftLogMetrics(server.getMemberId().toString()).getRegistry();
Timer tm = (Timer) ratisMetricRegistry.get(RAFT_LOG_FLUSH_TIME);
@@ -150,7 +151,7 @@ public class TestRaftLogMetrics extends BaseTest
.intValue());
}
- static void assertRaftLogWritePathMetrics(RaftServerImpl server) throws Exception {
+ static void assertRaftLogWritePathMetrics(RaftServer.Division server) throws Exception {
final String syncTimeMetric = RaftStorageTestUtils.getRaftLogFullMetric(server.getMemberId().toString(), RAFT_LOG_SYNC_TIME);
RatisMetricRegistry ratisMetricRegistry = new RaftLogMetrics(server.getMemberId().toString()).getRegistry();
@@ -169,7 +170,7 @@ public class TestRaftLogMetrics extends BaseTest
.intValue());
long cacheMissCount = ratisMetricRegistry.counter(RAFT_LOG_CACHE_MISS_COUNT).getCount();
- Assert.assertTrue(cacheMissCount == 0);
+ Assert.assertEquals(0, cacheMissCount);
long cacheHitsCount = ratisMetricRegistry.counter(RAFT_LOG_CACHE_HIT_COUNT).getCount();
Assert.assertTrue(cacheHitsCount > 0);
diff --git a/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java b/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
index 77b5cd2..993118c 100644
--- a/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
+++ b/ratis-test/src/test/java/org/apache/ratis/statemachine/TestStateMachine.java
@@ -33,9 +33,6 @@ import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftServerImpl;
-import org.apache.ratis.server.impl.RaftServerProxy;
-import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
import org.apache.ratis.util.Log4jUtils;
import org.junit.*;
@@ -152,7 +149,7 @@ public class TestStateMachine extends BaseTest implements MiniRaftClusterWithSim
// TODO: there eshould be a better way to ensure all data is replicated and applied
Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) + 100);
- for (RaftServerImpl raftServer : cluster.iterateServerImpls()) {
+ for (RaftServer.Division raftServer : cluster.iterateDivisions()) {
final SMTransactionContext sm = SMTransactionContext.get(raftServer);
sm.rethrowIfException();
assertEquals(numTrx, sm.numApplied.get());
@@ -193,10 +190,9 @@ public class TestStateMachine extends BaseTest implements MiniRaftClusterWithSim
}
}
- final RaftServerProxy proxy = cluster.getServer(id);
+ final RaftServer server = cluster.getServer(id);
for(Map.Entry<RaftGroupId, StateMachine> e: registry.entrySet()) {
- final RaftServerImpl impl = RaftServerTestUtil.getRaftServerImpl(proxy, e.getKey());
- Assert.assertSame(e.getValue(), impl.getStateMachine());
+ Assert.assertSame(e.getValue(), server.getDivision(e.getKey()).getStateMachine());
}
}
}