You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/03 01:17:40 UTC
[incubator-ratis] branch master updated: RATIS-1193. Add
getRaftLog() to RaftServer.Division. (#314)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new feed58a RATIS-1193. Add getRaftLog() to RaftServer.Division. (#314)
feed58a is described below
commit feed58a86822fdeef0bee8fe8ee4ff2f7a0f9aef
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Dec 3 09:17:16 2020 +0800
RATIS-1193. Add getRaftLog() to RaftServer.Division. (#314)
* RATIS-1193. Add getRaftLog() to RaftServer.Division.
* Fix checkstyle
---
.../ratis/logservice/server/LogStateMachine.java | 9 ++----
.../java/org/apache/ratis/server/RaftServer.java | 5 +++
.../apache/ratis/server/impl/RaftServerImpl.java | 5 +++
.../org/apache/ratis/server/impl/ServerState.java | 2 +-
.../ratis/InstallSnapshotNotificationTests.java | 8 ++---
.../java/org/apache/ratis/LogAppenderTests.java | 2 +-
.../org/apache/ratis/OutputStreamBaseTest.java | 6 ++--
.../test/java/org/apache/ratis/RaftBasicTests.java | 11 ++++---
.../test/java/org/apache/ratis/RaftTestUtil.java | 5 ++-
.../java/org/apache/ratis/RetryCacheTests.java | 4 +--
.../apache/ratis/server/impl/MiniRaftCluster.java | 2 +-
.../server/impl/RaftReconfigurationBaseTest.java | 16 ++++------
.../ratis/server/impl/RaftServerTestUtil.java | 14 +++++---
.../impl/RaftStateMachineExceptionTests.java | 2 +-
.../ratis/server/impl/ServerPauseResumeTest.java | 4 +--
.../ratis/statemachine/RaftSnapshotBaseTest.java | 6 ++--
.../ratis/datastream/DataStreamBaseTest.java | 6 ++++
.../ratis/datastream/DataStreamTestUtils.java | 8 ++---
.../org/apache/ratis/grpc/TestRaftWithGrpc.java | 4 +--
.../apache/ratis/server/ServerRestartTests.java | 16 +++++-----
.../ratis/server/raftlog/TestRaftLogMetrics.java | 37 +++++++++++-----------
21 files changed, 90 insertions(+), 82 deletions(-)
diff --git a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
index 8373a44..59db307 100644
--- a/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
+++ b/ratis-logservice/src/main/java/org/apache/ratis/logservice/server/LogStateMachine.java
@@ -69,8 +69,6 @@ import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.logservice.server.ArchivalInfo.ArchivalStatus;
-import org.apache.ratis.server.impl.RaftServerProxy;
-import org.apache.ratis.server.impl.ServerState;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.RaftStorage;
@@ -121,7 +119,7 @@ public class LogStateMachine extends BaseStateMachine {
private RaftLog log;
- private RaftServerProxy proxy ;
+ private RaftServer proxy;
private ExecutorService executorService;
private boolean isArchivalRequest;
private ArchivalInfo archivalInfo;
@@ -155,7 +153,7 @@ public class LogStateMachine extends BaseStateMachine {
RaftStorage raftStorage) throws IOException {
super.initialize(server, groupId, raftStorage);
this.storage.init(raftStorage);
- this.proxy = (RaftServerProxy) server;
+ this.proxy = server;
//TODO: using groupId for metric now but better to tag it with LogName
this.logServiceMetrics = new LogServiceMetrics(groupId.toString(),
server.getId().toString());
@@ -181,8 +179,7 @@ public class LogStateMachine extends BaseStateMachine {
private void checkInitialization() throws IOException {
if (this.log == null) {
- ServerState serverState = proxy.getImpl(getGroupId()).getState();
- this.log = serverState.getLog();
+ this.log = proxy.getDivision(getGroupId()).getRaftLog();
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index 14ca81c..7c7d426 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -26,6 +26,7 @@ import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.server.impl.ServerImplUtils;
import org.apache.ratis.server.protocol.RaftServerAsynchronousProtocol;
import org.apache.ratis.server.protocol.RaftServerProtocol;
+import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.util.LifeCycle;
import org.slf4j.Logger;
@@ -73,6 +74,10 @@ public interface RaftServer extends Closeable, RpcType.Get,
/** @return the {@link StateMachine} for this division. */
StateMachine getStateMachine();
+ /** @return the raft log of this division. */
+ RaftLog getRaftLog();
+
+ /** @return the data stream map of this division. */
DataStreamMap getDataStreamMap();
RaftClient getRaftClient();
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 67fb76f..5499e03 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -223,6 +223,11 @@ public class RaftServerImpl implements RaftServer.Division,
}
@Override
+ public RaftLog getRaftLog() {
+ return getState().getLog();
+ }
+
+ @Override
public DataStreamMap getDataStreamMap() {
return dataStreamMap;
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index f1117de..e192902 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -274,7 +274,7 @@ public class ServerState implements Closeable {
setLeader(getMemberId().getPeerId(), "becomeLeader");
}
- public RaftLog getLog() {
+ RaftLog getLog() {
return log;
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
index 80bb73e..6f72b54 100644
--- a/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/InstallSnapshotNotificationTests.java
@@ -133,7 +133,7 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
final RaftServer.Division leader = cluster.getLeader();
final RaftStorageDirectory storageDirectory = RaftServerTestUtil.getRaftStorage(leader).getStorageDir();
- final long nextIndex = RaftServerTestUtil.getRaftLog(leader).getNextIndex();
+ final long nextIndex = leader.getRaftLog().getNextIndex();
LOG.info("nextIndex = {}", nextIndex);
final List<File> snapshotFiles = RaftSnapshotBaseTest.getSnapshotFiles(cluster,
nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
@@ -209,7 +209,7 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
}
// wait for the snapshot to be done
- final long oldLeaderNextIndex = RaftServerTestUtil.getRaftLog(leader).getNextIndex();
+ final long oldLeaderNextIndex = leader.getRaftLog().getNextIndex();
{
LOG.info("{}: oldLeaderNextIndex = {}", leaderId, oldLeaderNextIndex);
final List<File> snapshotFiles = RaftSnapshotBaseTest.getSnapshotFiles(cluster,
@@ -232,10 +232,10 @@ public abstract class InstallSnapshotNotificationTests<CLUSTER extends MiniRaftC
cluster.restartServer(followerId, false);
final RaftServer.Division follower = cluster.getDivision(followerId);
JavaUtils.attempt(() -> {
- final long newLeaderNextIndex = RaftServerTestUtil.getRaftLog(leader).getNextIndex();
+ final long newLeaderNextIndex = leader.getRaftLog().getNextIndex();
LOG.info("{}: newLeaderNextIndex = {}", leaderId, newLeaderNextIndex);
Assert.assertTrue(newLeaderNextIndex > oldLeaderNextIndex);
- Assert.assertEquals(newLeaderNextIndex, RaftServerTestUtil.getRaftLog(follower).getNextIndex());
+ Assert.assertEquals(newLeaderNextIndex, follower.getRaftLog().getNextIndex());
}, 10, ONE_SECOND, "followerNextIndex", LOG);
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
index 8d2362d..d526e10 100644
--- a/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/LogAppenderTests.java
@@ -209,7 +209,7 @@ public abstract class LogAppenderTests<CLUSTER extends MiniRaftCluster>
}
final RaftServer.Division leader = cluster.getLeader();
- final RaftLog leaderLog = RaftServerTestUtil.getRaftLog(cluster.getLeader());
+ final RaftLog leaderLog = cluster.getLeader().getRaftLog();
final EnumMap<LogEntryBodyCase, AtomicLong> counts = RaftTestUtil.countEntries(leaderLog);
LOG.info("counts = " + counts);
Assert.assertEquals(6 * numMsgs * numClients, counts.get(LogEntryBodyCase.STATEMACHINELOGENTRY).get());
diff --git a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
index 2042093..1bede3e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/OutputStreamBaseTest.java
@@ -86,7 +86,7 @@ public abstract class OutputStreamBaseTest<CLUSTER extends MiniRaftCluster>
}
// check the leader's raft log
- final RaftLog raftLog = RaftServerTestUtil.getRaftLog(cluster.getLeader());
+ final RaftLog raftLog = cluster.getLeader().getRaftLog();
final AtomicInteger i = new AtomicInteger();
checkLog(raftLog, numRequests, () -> toBytes(i.getAndIncrement()));
}
@@ -151,12 +151,12 @@ public abstract class OutputStreamBaseTest<CLUSTER extends MiniRaftCluster>
LOG.info("Start to check leader's log");
final AtomicInteger index = new AtomicInteger(0);
- checkLog(RaftServerTestUtil.getRaftLog(leader), expectedTxs.size(),
+ checkLog(leader.getRaftLog(), expectedTxs.size(),
() -> expectedTxs.get(index.getAndIncrement()));
}
private RaftLog assertRaftLog(int expectedEntries, RaftServer.Division server) throws Exception {
- final RaftLog raftLog = RaftServerTestUtil.getRaftLog(server);
+ final RaftLog raftLog = server.getRaftLog();
final EnumMap<LogEntryBodyCase, AtomicLong> counts = RaftTestUtil.countEntries(raftLog);
Assert.assertEquals(expectedEntries, counts.get(LogEntryBodyCase.STATEMACHINELOGENTRY).get());
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index dbbbed4..881cbf0 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -196,7 +196,7 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) + 100);
for (RaftServer.Division followerToSendLog : followersToSendLog) {
- RaftLog followerLog = RaftServerTestUtil.getRaftLog(followerToSendLog);
+ RaftLog followerLog = followerToSendLog.getRaftLog();
Assert.assertTrue(RaftTestUtil.logEntriesContains(followerLog, messages));
}
@@ -217,7 +217,8 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
Assert.assertTrue(followersToSendLogIds.contains(newLeaderId));
- cluster.getServerAliveStream().map(s -> s.getState().getLog())
+ cluster.getServerAliveStream()
+ .map(RaftServer.Division::getRaftLog)
.forEach(log -> RaftTestUtil.assertLogEntries(log, term, messages));
}
@@ -244,7 +245,7 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
RaftTestUtil.sendMessageInNewThread(cluster, leaderId, messages);
Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) + 100);
- RaftTestUtil.logEntriesContains(RaftServerTestUtil.getRaftLog(followerToCommit), messages);
+ RaftTestUtil.logEntriesContains(followerToCommit.getRaftLog(), messages);
cluster.killServer(leaderId);
cluster.killServer(followerToCommit.getId());
@@ -257,8 +258,8 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
final Predicate<LogEntryProto> predicate = l -> l.getTerm() != 1;
cluster.getServerAliveStream()
- .map(s -> s.getState().getLog())
- .forEach(log -> RaftTestUtil.checkLogEntries(log, messages, predicate));
+ .map(RaftServer.Division::getRaftLog)
+ .forEach(log -> RaftTestUtil.checkLogEntries(log, messages, predicate));
}
static class Client4TestWithLoad extends Thread {
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index e44e44b..fe924e9 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -31,7 +31,6 @@ import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.BlockRequestHandlingInjection;
import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
import org.apache.ratis.server.impl.MiniRaftCluster;
-import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
@@ -223,7 +222,7 @@ public interface RaftTestUtil {
static void assertLogEntries(MiniRaftCluster cluster, SimpleMessage expectedMessage) {
final int size = cluster.getNumServers();
final long count = cluster.getServerAliveStream()
- .map(s -> s.getState().getLog())
+ .map(RaftServer.Division::getRaftLog)
.filter(log -> logEntriesContains(log, expectedMessage))
.count();
if (2*count <= size) {
@@ -234,7 +233,7 @@ public interface RaftTestUtil {
static void assertLogEntries(RaftServer.Division server, long expectedTerm, SimpleMessage... expectedMessages) {
LOG.info("checking raft log for {}", server.getMemberId());
- final RaftLog log = RaftServerTestUtil.getRaftLog(server);
+ final RaftLog log = server.getRaftLog();
try {
RaftTestUtil.assertLogEntries(log, expectedTerm, expectedMessages);
} catch (AssertionError e) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
index 990c0d2..f29b386 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
@@ -100,7 +100,7 @@ public abstract class RetryCacheTests<CLUSTER extends MiniRaftCluster>
Assert.assertEquals(2, RaftServerTestUtil.getRetryCacheSize(server));
Assert.assertNotNull(RaftServerTestUtil.getRetryEntry(server, clientId, callId));
// make sure there is only one log entry committed
- Assert.assertEquals(1, count(RaftServerTestUtil.getRaftLog(server), oldLastApplied + 1));
+ Assert.assertEquals(1, count(server.getRaftLog(), oldLastApplied + 1));
}
}
@@ -163,7 +163,7 @@ public abstract class RetryCacheTests<CLUSTER extends MiniRaftCluster>
}
// check the new leader and make sure the retry did not get committed
- Assert.assertEquals(0, count(RaftServerTestUtil.getRaftLog(cluster.getLeader()), oldLastApplied + 1));
+ Assert.assertEquals(0, count(cluster.getLeader().getRaftLog(), oldLastApplied + 1));
}
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 9084bf5..e757335 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -496,7 +496,7 @@ public abstract class MiniRaftCluster implements Closeable {
b.append(" ");
b.append(s).append("\n");
- final RaftLog log = RaftServerTestUtil.getRaftLog(s);
+ final RaftLog log = s.getRaftLog();
if (log instanceof MemoryRaftLog) {
b.append(" ");
b.append(((MemoryRaftLog) log).getEntryString());
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index 4e0ac9a..9c6a226 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -31,7 +31,6 @@ import org.apache.ratis.protocol.exceptions.ReconfigurationInProgressException;
import org.apache.ratis.protocol.exceptions.ReconfigurationTimeoutException;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.leader.LeaderState;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.RaftStorageTestUtils;
import org.apache.ratis.util.JavaUtils;
@@ -39,7 +38,6 @@ import org.apache.ratis.util.Log4jUtils;
import org.apache.ratis.util.TimeDuration;
import org.junit.Assert;
import org.junit.Test;
-import org.mockito.internal.util.reflection.Whitebox;
import java.io.IOException;
import java.util.ArrayList;
@@ -241,9 +239,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
// check configuration manager's internal state
// each reconf will generate two configurations: (old, new) and (new)
cluster.getServerAliveStream().forEach(server -> {
- ConfigurationManager confManager =
- (ConfigurationManager) Whitebox.getInternalState(server.getState(),
- "configurationManager");
+ final ConfigurationManager confManager = RaftServerTestUtil.getConfigurationManager(server);
// each reconf will generate two configurations: (old, new) and (new)
// each leader change generates one configuration.
// expectedConf = 1 (init) + 2*2 (two conf changes) + #leader
@@ -359,11 +355,11 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
LOG.info(cluster.printServers());
assertSuccess(success);
- final RaftLog leaderLog = RaftServerTestUtil.getRaftLog(cluster.getLeader());
+ final RaftLog leaderLog = cluster.getLeader().getRaftLog();
for (RaftPeer newPeer : c1.newPeers) {
final RaftServer.Division d = cluster.getDivision(newPeer.getId());
Assert.assertArrayEquals(leaderLog.getEntries(0, Long.MAX_VALUE),
- RaftServerTestUtil.getRaftLog(d).getEntries(0, Long.MAX_VALUE));
+ d.getRaftLog().getEntries(0, Long.MAX_VALUE));
}
}
}
@@ -414,7 +410,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
// only (0) the first conf entry, (1) the 1st setConf entry and (2) a metadata entry
{
- final RaftLog leaderLog = RaftServerTestUtil.getRaftLog(cluster.getLeader());
+ final RaftLog leaderLog = cluster.getLeader().getRaftLog();
for(LogEntryProto e : RaftTestUtil.getLogEntryProtos(leaderLog)) {
LOG.info("{}", ServerProtoUtils.toLogEntryString(e));
}
@@ -469,7 +465,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
try(final RaftClient client = cluster.createClient(leader.getId())) {
client.io().send(new SimpleMessage("m"));
- final RaftLog leaderLog = RaftServerTestUtil.getRaftLog(leader);
+ final RaftLog leaderLog = leader.getRaftLog();
final long committedIndex = leaderLog.getLastCommittedIndex();
final RaftConfiguration confBefore = RaftServerTestUtil.getRaftConf(cluster.getLeader());
@@ -576,7 +572,7 @@ public abstract class RaftReconfigurationBaseTest<CLUSTER extends MiniRaftCluste
final RaftServer.Division leader = cluster.getLeader();
final RaftPeerId leaderId = leader.getId();
- final RaftLog log = RaftServerTestUtil.getRaftLog(leader);
+ final RaftLog log = leader.getRaftLog();
log2 = log;
Thread.sleep(1000);
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index 7d3786d..10336b9 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -27,12 +27,12 @@ import org.apache.ratis.server.DataStreamMap;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.metrics.RaftServerMetrics;
-import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Log4jUtils;
import org.apache.ratis.util.TimeDuration;
import org.junit.Assert;
+import org.mockito.internal.util.reflection.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -125,12 +125,16 @@ public class RaftServerTestUtil {
return entry.isFailed();
}
- public static RaftConfiguration getRaftConf(RaftServer.Division server) {
- return ((RaftServerImpl)server).getRaftConf();
+ static ServerState getState(RaftServer.Division server) {
+ return ((RaftServerImpl)server).getState();
+ }
+
+ public static ConfigurationManager getConfigurationManager(RaftServer.Division server) {
+ return (ConfigurationManager) Whitebox.getInternalState(getState(server), "configurationManager");
}
- public static RaftLog getRaftLog(RaftServer.Division server) {
- return ((RaftServerImpl)server).getState().getLog();
+ public static RaftConfiguration getRaftConf(RaftServer.Division server) {
+ return ((RaftServerImpl)server).getRaftConf();
}
public static RaftStorage getRaftStorage(RaftServer.Division server) {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
index 9c3649d..ffd554f 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
@@ -134,7 +134,7 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
return null;
}, 5, BaseTest.ONE_SECOND, "GetRetryEntry", LOG);
- final RaftLog log = RaftServerTestUtil.getRaftLog(server);
+ final RaftLog log = server.getRaftLog();
RaftTestUtil.logEntriesContains(log, oldLastApplied + 1, log.getNextIndex(), message);
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
index 49d4878..9f2ae56 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/ServerPauseResumeTest.java
@@ -56,10 +56,10 @@ public abstract class ServerPauseResumeTest <CLUSTER extends MiniRaftCluster>
writeThread.join();
Thread.sleep(cluster.getTimeoutMax().toLong(TimeUnit.MILLISECONDS) * 5);
- final RaftLog leaderLog = RaftServerTestUtil.getRaftLog(leader);
+ final RaftLog leaderLog = leader.getRaftLog();
// leader should contain all logs.
Assert.assertTrue(RaftTestUtil.logEntriesContains(leaderLog, batch1));
- RaftLog followerLog = follower.getState().getLog();
+ RaftLog followerLog = follower.getRaftLog();
// follower should contain all logs.
Assert.assertTrue(RaftTestUtil.logEntriesContains(followerLog, batch1));
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
index 3c55e16..96b0b4e 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
@@ -86,7 +86,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
public static void assertLeaderContent(MiniRaftCluster cluster) throws Exception {
final RaftServer.Division leader = RaftTestUtil.waitForLeader(cluster);
- final RaftLog leaderLog = RaftServerTestUtil.getRaftLog(leader);
+ final RaftLog leaderLog = leader.getRaftLog();
final long lastIndex = leaderLog.getLastEntryTermIndex().getIndex();
final LogEntryProto e = leaderLog.get(lastIndex);
Assert.assertTrue(e.hasMetadataEntry());
@@ -150,7 +150,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
}
}
- final long nextIndex = RaftServerTestUtil.getRaftLog(cluster.getLeader()).getNextIndex();
+ final long nextIndex = cluster.getLeader().getRaftLog().getNextIndex();
LOG.info("nextIndex = {}", nextIndex);
// wait for the snapshot to be done
final List<File> snapshotFiles = getSnapshotFiles(cluster, nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
@@ -200,7 +200,7 @@ public abstract class RaftSnapshotBaseTest extends BaseTest {
// wait for the snapshot to be done
RaftStorageDirectory storageDirectory = RaftServerTestUtil.getRaftStorage(cluster.getLeader()).getStorageDir();
- final long nextIndex = RaftServerTestUtil.getRaftLog(cluster.getLeader()).getNextIndex();
+ final long nextIndex = cluster.getLeader().getRaftLog().getNextIndex();
LOG.info("nextIndex = {}", nextIndex);
final List<File> snapshotFiles = getSnapshotFiles(cluster, nextIndex - SNAPSHOT_TRIGGER_THRESHOLD, nextIndex);
JavaUtils.attemptRepeatedly(() -> {
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index c504d95..50068d6 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -55,6 +55,7 @@ import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.impl.DataStreamServerImpl;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.impl.ServerFactory;
+import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.statemachine.StateMachine.DataChannel;
import org.apache.ratis.util.CollectionUtils;
@@ -110,6 +111,11 @@ abstract class DataStreamBaseTest extends BaseTest {
}
@Override
+ public RaftLog getRaftLog() {
+ return null;
+ }
+
+ @Override
public DataStreamMap getDataStreamMap() {
return streamMap;
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
index 673eab8..51a028c 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
@@ -18,8 +18,6 @@
package org.apache.ratis.datastream;
import org.apache.ratis.BaseTest;
-import org.apache.ratis.RaftTestUtil;
-import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.client.impl.ClientProtoUtils;
import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
@@ -37,7 +35,7 @@ import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.AlreadyClosedException;
import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
@@ -66,7 +64,6 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
public interface DataStreamTestUtils {
Logger LOG = LoggerFactory.getLogger(DataStreamTestUtils.class);
@@ -386,8 +383,7 @@ public interface DataStreamTestUtils {
final LogEntryProto entryFromStream = stream.getLogEntry();
assertLogEntry(entryFromStream, request);
- final LogEntryProto entryFromLog = searchLogEntry(ClientInvocationId.valueOf(request),
- RaftServerTestUtil.getRaftLog(division));
+ final LogEntryProto entryFromLog = searchLogEntry(ClientInvocationId.valueOf(request), division.getRaftLog());
Assert.assertEquals(entryFromStream, entryFromLog);
}
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
index c145872..4e92efd 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftWithGrpc.java
@@ -90,7 +90,7 @@ public class TestRaftWithGrpc
.map(SimpleStateMachine4Testing::get)
.forEach(SimpleStateMachine4Testing::unblockWriteStateMachineData);
- final RaftLog leaderLog = RaftServerTestUtil.getRaftLog(cluster.getLeader());
+ final RaftLog leaderLog = cluster.getLeader().getRaftLog();
// The entries have been appended in the followers
// although the append entry timed out at the leader
cluster.getServerAliveStream().filter(impl -> !impl.getInfo().isLeader()).forEach(raftServer ->
@@ -98,7 +98,7 @@ public class TestRaftWithGrpc
final long leaderNextIndex = leaderLog.getNextIndex();
final TermIndex[] leaderEntries = leaderLog.getEntries(0, Long.MAX_VALUE);
- final RaftLog followerLog = raftServer.getState().getLog();
+ final RaftLog followerLog = raftServer.getRaftLog();
Assert.assertEquals(leaderNextIndex, followerLog.getNextIndex());
final TermIndex[] serverEntries = followerLog.getEntries(0, Long.MAX_VALUE);
Assert.assertArrayEquals(serverEntries, leaderEntries);
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
index ec82692..5e3d079 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/ServerRestartTests.java
@@ -107,7 +107,7 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
writeSomething(newMessage, cluster);
final int truncatedMessageIndex = messageCount.get() - 1;
- final long leaderLastIndex = RaftServerTestUtil.getRaftLog(cluster.getLeader()).getLastEntryTermIndex().getIndex();
+ final long leaderLastIndex = cluster.getLeader().getRaftLog().getLastEntryTermIndex().getIndex();
// make sure the restarted follower can catchup
final RaftServer.Division followerState = cluster.getDivision(followerId);
JavaUtils.attemptRepeatedly(() -> {
@@ -117,7 +117,7 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
// make sure the restarted peer's log segments is correct
final RaftServer.Division follower = cluster.restartServer(followerId, false);
- final RaftLog followerLog = RaftServerTestUtil.getRaftLog(follower);
+ final RaftLog followerLog = follower.getRaftLog();
final long followerLastIndex = followerLog.getLastEntryTermIndex().getIndex();
Assert.assertTrue(followerLastIndex >= leaderLastIndex);
@@ -165,7 +165,7 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
FileUtils.truncateFile(openLogFile, openLogFile.length() - 1);
final RaftServer.Division server = cluster.restartServer(id, false);
// the last index should be one less than before
- Assert.assertEquals(lastIndex - 1, RaftServerTestUtil.getRaftLog(server).getLastEntryTermIndex().getIndex());
+ Assert.assertEquals(lastIndex - 1, server.getRaftLog().getLastEntryTermIndex().getIndex());
server.getRaftServer().close();
}
@@ -252,7 +252,7 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
final List<RaftPeerId> ids = new ArrayList<>();
final RaftServer.Division leader = cluster.getLeader();
- final RaftLog leaderLog = RaftServerTestUtil.getRaftLog(leader);
+ final RaftLog leaderLog = leader.getRaftLog();
final RaftPeerId leaderId = leader.getId();
ids.add(leaderId);
@@ -278,7 +278,7 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
for(RaftServer.Division s : cluster.iterateDivisions()) {
if (!s.getId().equals(leaderId)) {
ids.add(s.getId());
- RaftTestUtil.assertSameLog(leaderLog, RaftServerTestUtil.getRaftLog(s));
+ RaftTestUtil.assertSameLog(leaderLog, s.getRaftLog());
}
}
@@ -293,7 +293,7 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
for(RaftPeerId id : ids) {
cluster.restartServer(id, false);
final RaftServer.Division server = cluster.getDivision(id);
- final RaftLog raftLog = RaftServerTestUtil.getRaftLog(server);
+ final RaftLog raftLog = server.getRaftLog();
JavaUtils.attemptRepeatedly(() -> {
Assert.assertTrue(raftLog.getLastCommittedIndex() >= loggedCommitIndex);
return null;
@@ -309,7 +309,7 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
}
static void assertLastLogEntry(RaftServer.Division server) throws RaftLogIOException {
- final RaftLog raftLog = RaftServerTestUtil.getRaftLog(server);
+ final RaftLog raftLog = server.getRaftLog();
final long lastIndex = raftLog.getLastEntryTermIndex().getIndex();
final LogEntryProto lastEntry = raftLog.get(lastIndex);
Assert.assertTrue(lastEntry.hasMetadataEntry());
@@ -365,7 +365,7 @@ public abstract class ServerRestartTests<CLUSTER extends MiniRaftCluster>
Assert.assertTrue(client.io().sendReadOnly(lastMessage).isSuccess());
}
- final RaftLog log = RaftServerTestUtil.getRaftLog(leader);
+ final RaftLog log = leader.getRaftLog();
final long size = TestSegmentedRaftLog.getOpenSegmentSize(log);
leader.getRaftServer().close();
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
index 7891141..e3f42cc 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/TestRaftLogMetrics.java
@@ -17,31 +17,14 @@
*/
package org.apache.ratis.server.raftlog;
-import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_APPEND_ENTRY_COUNT;
-import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_APPEND_ENTRY_LATENCY;
-import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_CACHE_HIT_COUNT;
-import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_CACHE_MISS_COUNT;
-import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_DATA_QUEUE_SIZE;
-import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_FLUSH_COUNT;
-import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_FLUSH_TIME;
-import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_SYNC_BATCH_SIZE;
-import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_SYNC_TIME;
-import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_TASK_ENQUEUE_DELAY;
-import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_TASK_EXECUTION_TIME;
-import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_TASK_QUEUE_TIME;
-import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_WORKER_QUEUE_SIZE;
-import static org.apache.ratis.server.metrics.RaftLogMetrics.STATE_MACHINE_LOG_ENTRY_COUNT;
-import static org.apache.ratis.metrics.RatisMetrics.RATIS_APPLICATION_NAME_METRICS;
-
import com.codahale.metrics.Timer;
import org.apache.ratis.BaseTest;
-import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.metrics.JVMMetrics;
import org.apache.ratis.metrics.RatisMetricRegistry;
import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.impl.RaftServerTestUtil;
+import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.metrics.RaftLogMetrics;
import org.apache.ratis.server.simulation.MiniRaftClusterWithSimulatedRpc;
import org.apache.ratis.server.storage.RaftStorageTestUtils;
@@ -58,6 +41,22 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import static org.apache.ratis.metrics.RatisMetrics.RATIS_APPLICATION_NAME_METRICS;
+import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_APPEND_ENTRY_COUNT;
+import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_APPEND_ENTRY_LATENCY;
+import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_CACHE_HIT_COUNT;
+import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_CACHE_MISS_COUNT;
+import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_DATA_QUEUE_SIZE;
+import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_FLUSH_COUNT;
+import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_FLUSH_TIME;
+import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_SYNC_BATCH_SIZE;
+import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_SYNC_TIME;
+import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_TASK_ENQUEUE_DELAY;
+import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_TASK_EXECUTION_TIME;
+import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_TASK_QUEUE_TIME;
+import static org.apache.ratis.server.metrics.RaftLogMetrics.RAFT_LOG_WORKER_QUEUE_SIZE;
+import static org.apache.ratis.server.metrics.RaftLogMetrics.STATE_MACHINE_LOG_ENTRY_COUNT;
+
public class TestRaftLogMetrics extends BaseTest
implements MiniRaftClusterWithSimulatedRpc.FactoryGet {
static {
@@ -123,7 +122,7 @@ public class TestRaftLogMetrics extends BaseTest
}
static void assertCommitCount(RaftServer.Division server, int expectedMsgs) {
- final RatisMetricRegistry rlm = RaftServerTestUtil.getRaftLog(server).getRaftLogMetrics().getRegistry();
+ final RatisMetricRegistry rlm = server.getRaftLog().getRaftLogMetrics().getRegistry();
long stmCount = rlm.counter(STATE_MACHINE_LOG_ENTRY_COUNT).getCount();
Assert.assertEquals(expectedMsgs, stmCount);
}