You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/03 10:50:37 UTC
[incubator-ratis] branch master updated: RATIS-1199. Change
RaftServerProxy/RaftServerImpl to package private. (#318)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new ef95e2c RATIS-1199. Change RaftServerProxy/RaftServerImpl to package private. (#318)
ef95e2c is described below
commit ef95e2cad987d66cc094f4f5966f2594ab78d04d
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Dec 3 18:50:27 2020 +0800
RATIS-1199. Change RaftServerProxy/RaftServerImpl to package private. (#318)
* RATIS-1199. Change RaftServerProxy/RaftServerImpl to package private.
* Add @SuppressWarnings("parameternumber") for a checkstyle warning
---
.../java/org/apache/ratis/server/RaftServer.java | 9 ++++--
.../apache/ratis/server/impl/LeaderStateImpl.java | 2 +-
.../apache/ratis/server/impl/RaftServerImpl.java | 25 +++++++--------
.../apache/ratis/server/impl/RaftServerProxy.java | 34 ++++++++++++--------
.../org/apache/ratis/server/impl/ServerState.java | 6 +++-
.../ratis/server/impl/StateMachineUpdater.java | 2 +-
.../server/raftlog/segmented/SegmentedRaftLog.java | 34 ++++++--------------
.../raftlog/segmented/SegmentedRaftLogWorker.java | 23 +++++---------
.../test/java/org/apache/ratis/RaftAsyncTests.java | 1 -
.../test/java/org/apache/ratis/RaftBasicTests.java | 2 +-
.../apache/ratis/server/impl/MiniRaftCluster.java | 20 +++++++-----
.../ratis/server/impl/RaftServerTestUtil.java | 36 +++++++++++++++++++---
.../ratis/server/impl/RetryCacheTestUtil.java | 5 +--
.../server/impl/StateMachineShutdownTests.java | 2 +-
.../segmented/SegmentedRaftLogTestUtils.java | 1 -
.../ratis/datastream/DataStreamBaseTest.java | 3 ++
.../apache/ratis/grpc/TestRaftServerWithGrpc.java | 13 ++++++--
.../raftlog/segmented/TestCacheEviction.java | 7 ++---
.../raftlog/segmented/TestSegmentedRaftLog.java | 33 ++++++++------------
19 files changed, 143 insertions(+), 115 deletions(-)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index 7c7d426..c4b9b0f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -44,8 +44,8 @@ public interface RaftServer extends Closeable, RpcType.Get,
AdminProtocol, AdminAsynchronousProtocol {
Logger LOG = LoggerFactory.getLogger(RaftServer.class);
- /** A division of a {@link RaftServer} for a particular group. */
- interface Division {
+ /** A division of a {@link RaftServer} for a particular {@link RaftGroup}. */
+ interface Division extends Closeable {
Logger LOG = LoggerFactory.getLogger(Division.class);
/** @return the {@link RaftGroupMemberId} for this division. */
@@ -63,6 +63,7 @@ public interface RaftServer extends Closeable, RpcType.Get,
.orElseGet(() -> getRaftServer().getPeer());
}
+ /** @return the information about this division. */
DivisionInfo getInfo();
/** @return the {@link RaftGroup} for this division. */
@@ -80,7 +81,11 @@ public interface RaftServer extends Closeable, RpcType.Get,
/** @return the data stream map of this division. */
DataStreamMap getDataStreamMap();
+ /** @return the internal {@link RaftClient} of this division. */
RaftClient getRaftClient();
+
+ @Override
+ void close();
}
/** @return the server ID. */
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index d50b26f..1543dc3 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -773,7 +773,7 @@ class LeaderStateImpl implements LeaderState {
}
// the pending request handler will send NotLeaderException for
// pending client requests when it stops
- server.shutdown();
+ server.close();
}
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 8206f6e..b90fe57 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -83,7 +83,7 @@ import static org.apache.ratis.util.LifeCycle.State.STARTING;
import com.codahale.metrics.Timer;
-public class RaftServerImpl implements RaftServer.Division,
+class RaftServerImpl implements RaftServer.Division,
RaftServerProtocol, RaftServerAsynchronousProtocol,
RaftClientProtocol, RaftClientAsynchronousProtocol {
private static final String CLASS_NAME = JavaUtils.getClassSimpleName(RaftServerImpl.class);
@@ -270,7 +270,7 @@ public class RaftServerImpl implements RaftServer.Division,
return proxy;
}
- public RaftServerRpc getServerRpc() {
+ RaftServerRpc getServerRpc() {
return proxy.getServerRpc();
}
@@ -327,7 +327,7 @@ public class RaftServerImpl implements RaftServer.Division,
// do not start FollowerState
}
- public ServerState getState() {
+ ServerState getState() {
return state;
}
@@ -365,7 +365,7 @@ public class RaftServerImpl implements RaftServer.Division,
final RaftStorageDirectory dir = state.getStorage().getStorageDir();
/* Shutdown is triggered here inorder to avoid any locked files. */
- shutdown();
+ close();
getStateMachine().event().notifyGroupRemove();
if (deleteDirectory) {
for (int i = 0; i < FileUtils.NUM_ATTEMPTS; i ++) {
@@ -398,7 +398,8 @@ public class RaftServerImpl implements RaftServer.Division,
}
}
- public void shutdown() {
+ @Override
+ public void close() {
lifeCycle.checkStateAndClose(() -> {
LOG.info("{}: shutdown", getMemberId());
try {
@@ -699,7 +700,7 @@ public class RaftServerImpl implements RaftServer.Division,
return pending.getFuture();
}
- public void stepDownOnJvmPause() {
+ void stepDownOnJvmPause() {
if (getInfo().isLeader()) {
role.getLeaderState().ifPresent(leader -> leader.submitStepDownEvent(LeaderState.StepDownReason.JVM_PAUSE));
}
@@ -1282,7 +1283,7 @@ public class RaftServerImpl implements RaftServer.Division,
return reply;
}
- public boolean pause() throws IOException {
+ boolean pause() throws IOException {
// TODO: should pause() be limited on only working for a follower?
// Now the state of lifeCycle should be PAUSING, which will prevent future other operations.
@@ -1299,7 +1300,7 @@ public class RaftServerImpl implements RaftServer.Division,
return true;
}
- public boolean resume() throws IOException {
+ boolean resume() throws IOException {
synchronized (this) {
if (!lifeCycle.compareAndTransition(PAUSED, STARTING)) {
return false;
@@ -1516,7 +1517,7 @@ public class RaftServerImpl implements RaftServer.Division,
return ServerProtoUtils.toRequestVoteRequestProto(getMemberId(), targetId, term, lastEntry);
}
- public void submitUpdateCommitEvent() {
+ void submitUpdateCommitEvent() {
role.getLeaderState().ifPresent(LeaderStateImpl::submitUpdateCommitEvent);
}
@@ -1605,7 +1606,7 @@ public class RaftServerImpl implements RaftServer.Division,
*
* @param logEntry the log entry being truncated
*/
- public void notifyTruncatedLogEntry(LogEntryProto logEntry) {
+ void notifyTruncatedLogEntry(LogEntryProto logEntry) {
if (logEntry.hasStateMachineLogEntry()) {
final ClientInvocationId invocationId = ClientInvocationId.valueOf(logEntry.getStateMachineLogEntry());
final RetryCache.CacheEntry cacheEntry = getRetryCache().get(invocationId);
@@ -1617,11 +1618,11 @@ public class RaftServerImpl implements RaftServer.Division,
}
}
- public LeaderElectionMetrics getLeaderElectionMetrics() {
+ LeaderElectionMetrics getLeaderElectionMetrics() {
return leaderElectionMetrics;
}
- public RaftServerMetrics getRaftServerMetrics() {
+ RaftServerMetrics getRaftServerMetrics() {
return raftServerMetrics;
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index b18b07b..df702bb 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -63,7 +63,7 @@ import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
-public class RaftServerProxy implements RaftServer {
+class RaftServerProxy implements RaftServer {
/**
* A map: {@link RaftGroupId} -> {@link RaftServerImpl} futures.
*
@@ -110,8 +110,22 @@ public class RaftServerProxy implements RaftServer {
return;
}
isClosed = true;
- map.values().parallelStream().map(CompletableFuture::join)
- .forEach(RaftServerImpl::shutdown);
+ map.entrySet().parallelStream().forEach(entry -> close(entry.getKey(), entry.getValue()));
+ }
+
+ private void close(RaftGroupId groupId, CompletableFuture<RaftServerImpl> future) {
+ final RaftServerImpl impl;
+ try {
+ impl = future.join();
+ } catch (Throwable t) {
+ LOG.warn("{}: Failed to join the division for {}", getId(), groupId, t);
+ return;
+ }
+ try {
+ impl.close();
+ } catch (Throwable t) {
+ LOG.warn("{}: Failed to close the division for {}", getId(), groupId, t);
+ }
}
synchronized List<RaftGroupId> getGroupIds() {
@@ -292,19 +306,15 @@ public class RaftServerProxy implements RaftServer {
return properties;
}
- public RaftServerRpc getServerRpc() {
+ RaftServerRpc getServerRpc() {
return serverRpc;
}
- public DataStreamServerRpc getDataStreamServerRpc() {
+ DataStreamServerRpc getDataStreamServerRpc() {
return dataStreamServerRpc;
}
- public boolean containsGroup(RaftGroupId groupId) {
- return impls.containsGroup(groupId);
- }
-
- public CompletableFuture<RaftServerImpl> addGroup(RaftGroup group) {
+ private CompletableFuture<RaftServerImpl> addGroup(RaftGroup group) {
return impls.addNew(group);
}
@@ -316,12 +326,12 @@ public class RaftServerProxy implements RaftServer {
return getImpl(ProtoUtils.toRaftGroupId(proto.getRaftGroupId()));
}
- public RaftServerImpl getImpl(RaftGroupId groupId) throws IOException {
+ private RaftServerImpl getImpl(RaftGroupId groupId) throws IOException {
Objects.requireNonNull(groupId, "groupId == null");
return IOUtils.getFromFuture(getImplFuture(groupId), this::getId);
}
- public List<RaftServerImpl> getImpls() throws IOException {
+ List<RaftServerImpl> getImpls() throws IOException {
final List<RaftServerImpl> list = new ArrayList<>();
for(CompletableFuture<RaftServerImpl> f : impls.getAll()) {
list.add(IOUtils.getFromFuture(f, this::getId));
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index 6dd551b..a40e0d1 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -187,7 +187,11 @@ class ServerState implements Closeable {
if (RaftServerConfigKeys.Log.useMemory(prop)) {
log = new MemoryRaftLog(memberId, lastIndexInSnapshot, prop);
} else {
- log = new SegmentedRaftLog(memberId, server, storage, lastIndexInSnapshot, prop);
+ log = new SegmentedRaftLog(memberId, server,
+ server.getStateMachine(),
+ server::notifyTruncatedLogEntry,
+ server::submitUpdateCommitEvent,
+ storage, lastIndexInSnapshot, prop);
}
log.open(lastIndexInSnapshot, logConsumer);
return log;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index 367c777..671f736 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -186,7 +186,7 @@ class StateMachineUpdater implements Runnable {
} else {
state = State.EXCEPTION;
LOG.error(this + " caught a Throwable.", t);
- server.shutdown();
+ server.close();
}
}
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
index dbdbced..1aa9616 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLog.java
@@ -20,8 +20,8 @@ package org.apache.ratis.server.raftlog.segmented;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.protocol.TermIndex;
import org.apache.ratis.server.raftlog.RaftLog;
@@ -125,14 +125,10 @@ public class SegmentedRaftLog extends RaftLog {
}
}
- /** The methods defined in {@link RaftServerImpl} which are used in {@link SegmentedRaftLog}. */
+ /** The server methods used in {@link SegmentedRaftLog}. */
interface ServerLogMethods {
ServerLogMethods DUMMY = new ServerLogMethods() {};
- default boolean shouldEvictCache() {
- return false;
- }
-
default long[] getFollowerNextIndices() {
return null;
}
@@ -150,18 +146,14 @@ public class SegmentedRaftLog extends RaftLog {
* When the server is null, return the dummy instance of {@link ServerLogMethods}.
* Otherwise, the server is non-null, return the implementation using the given server.
*/
- private ServerLogMethods newServerLogMethods(RaftServerImpl impl) {
+ private ServerLogMethods newServerLogMethods(RaftServer.Division impl,
+ Consumer<LogEntryProto> notifyTruncatedLogEntry) {
if (impl == null) {
return ServerLogMethods.DUMMY;
}
return new ServerLogMethods() {
@Override
- public boolean shouldEvictCache() {
- return cache.shouldEvict();
- }
-
- @Override
public long[] getFollowerNextIndices() {
return impl.getInfo().getFollowerNextIndices();
}
@@ -175,7 +167,7 @@ public class SegmentedRaftLog extends RaftLog {
public void notifyTruncatedLogEntry(TermIndex ti) {
try {
final LogEntryProto entry = get(ti.getIndex());
- impl.notifyTruncatedLogEntry(entry);
+ notifyTruncatedLogEntry.accept(entry);
} catch (RaftLogIOException e) {
LOG.error("{}: Failed to read log {}", getName(), ti, e);
}
@@ -191,18 +183,12 @@ public class SegmentedRaftLog extends RaftLog {
private final long segmentMaxSize;
private final boolean stateMachineCachingEnabled;
- public SegmentedRaftLog(RaftGroupMemberId memberId, RaftServerImpl server,
- RaftStorage storage, long lastIndexInSnapshot, RaftProperties properties) {
- this(memberId, server, server != null? server.getStateMachine(): null,
- server != null? server::submitUpdateCommitEvent: null,
- storage, lastIndexInSnapshot, properties);
- }
-
- SegmentedRaftLog(RaftGroupMemberId memberId, RaftServerImpl server,
- StateMachine stateMachine, Runnable submitUpdateCommitEvent,
+ @SuppressWarnings("parameternumber")
+ public SegmentedRaftLog(RaftGroupMemberId memberId, RaftServer.Division server,
+ StateMachine stateMachine, Consumer<LogEntryProto> notifyTruncatedLogEntry, Runnable submitUpdateCommitEvent,
RaftStorage storage, long lastIndexInSnapshot, RaftProperties properties) {
super(memberId, lastIndexInSnapshot, properties);
- this.server = newServerLogMethods(server);
+ this.server = newServerLogMethods(server, notifyTruncatedLogEntry);
this.storage = storage;
this.stateMachine = stateMachine;
segmentMaxSize = RaftServerConfigKeys.Log.segmentSizeMax(properties).getSize();
@@ -317,7 +303,7 @@ public class SegmentedRaftLog extends RaftLog {
}
private void checkAndEvictCache() {
- if (server.shouldEvictCache()) {
+ if (cache.shouldEvict()) {
// TODO if the cache is hitting the maximum size and we cannot evict any
// segment's cache, should block the new entry appending or new segment
// allocation.
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
index 63dd439..d7f7d35 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogWorker.java
@@ -24,8 +24,8 @@ import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
+import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.metrics.RaftLogMetrics;
import org.apache.ratis.server.raftlog.RaftLog;
@@ -58,7 +58,7 @@ import java.util.function.Supplier;
* This class takes the responsibility of all the raft log related I/O ops for a
* raft peer.
*/
-class SegmentedRaftLogWorker implements Runnable {
+class SegmentedRaftLogWorker {
static final Logger LOG = LoggerFactory.getLogger(SegmentedRaftLogWorker.class);
static final TimeDuration ONE_SECOND = TimeDuration.valueOf(1, TimeUnit.SECONDS);
@@ -169,13 +169,13 @@ class SegmentedRaftLogWorker implements Runnable {
private final long segmentMaxSize;
private final long preallocatedSize;
- private final RaftServerImpl server;
+ private final RaftServer.Division server;
private int flushBatchSize;
private final StateMachineDataPolicy stateMachineDataPolicy;
SegmentedRaftLogWorker(RaftGroupMemberId memberId, StateMachine stateMachine, Runnable submitUpdateCommitEvent,
- RaftServerImpl server, RaftStorage storage, RaftProperties properties,
+ RaftServer.Division server, RaftStorage storage, RaftProperties properties,
RaftLogMetrics metricRegistry) {
this.name = memberId + "-" + JavaUtils.getClassSimpleName(getClass());
LOG.info("new {} for {}", name, storage);
@@ -197,7 +197,7 @@ class SegmentedRaftLogWorker implements Runnable {
this.stateMachineDataPolicy = new StateMachineDataPolicy(properties);
- this.workerThread = new Thread(this, name);
+ this.workerThread = new Thread(this::run, name);
// Server Id can be null in unit tests
metricRegistry.addDataQueueSizeGauge(queue);
@@ -272,9 +272,7 @@ class SegmentedRaftLogWorker implements Runnable {
+ ". The SegmentedRaftLogWorker already stopped.");
} else {
LOG.error("Failed to add IO task {}", task, e);
- if (server != null) {
- server.shutdown();
- }
+ Optional.ofNullable(server).ifPresent(RaftServer.Division::close);
}
}
return task;
@@ -284,9 +282,7 @@ class SegmentedRaftLogWorker implements Runnable {
return running && workerThread.isAlive();
}
- @Override
- public void run() {
-
+ private void run() {
// if and when a log task encounters an exception
RaftLogIOException logIOException = null;
@@ -337,10 +333,7 @@ class SegmentedRaftLogWorker implements Runnable {
Thread.currentThread().getName(), e);
} else {
LOG.error("{} hit exception", Thread.currentThread().getName(), e);
- // Shutdown raft group instead of terminating jvm.
- if (server != null) {
- server.shutdown();
- }
+ Optional.ofNullable(server).ifPresent(RaftServer.Division::close);
}
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index fb891a0..f614a9f 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -39,7 +39,6 @@ import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
import org.apache.ratis.server.impl.MiniRaftCluster;
-import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
import org.apache.ratis.statemachine.StateMachine;
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
index e473bc9..2fe6291 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftBasicTests.java
@@ -434,7 +434,7 @@ public abstract class RaftBasicTests<CLUSTER extends MiniRaftCluster>
// Create an entry corresponding to the callId and clientId
// in each server's retry cache.
cluster.getServerAliveStream().forEach(
- raftServer -> RetryCacheTestUtil.getOrCreateEntry(raftServer.getRetryCache(), invocationId));
+ raftServer -> RetryCacheTestUtil.getOrCreateEntry(raftServer, invocationId));
// Client request for the callId now waits
// as there is already a cache entry in the server for the request.
// Ideally the client request should timeout and the client should retry.
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index 8b85dc5..3a52585 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -331,9 +331,9 @@ public abstract class MiniRaftCluster implements Closeable {
killServer(serverId);
servers.remove(serverId);
- final RaftServerProxy proxy = putNewServer(serverId, group, format);
+ final RaftServer proxy = putNewServer(serverId, group, format);
proxy.start();
- return group == null? null: proxy.getImpl(group.getGroupId());
+ return group == null? null: proxy.getDivision(group.getGroupId());
}
public void restart(boolean format) throws IOException {
@@ -571,7 +571,7 @@ public abstract class MiniRaftCluster implements Closeable {
* from the given group.
*/
private List<RaftServer.Division> getLeaders(RaftGroupId groupId) {
- final Stream<RaftServerImpl> serverAliveStream = getServerAliveStream(groupId);
+ final Stream<RaftServer.Division> serverAliveStream = getServerAliveStream(groupId);
final List<RaftServer.Division> leaders = new ArrayList<>();
serverAliveStream.filter(server -> server.getInfo().isLeader()).forEach(s -> {
if (leaders.isEmpty()) {
@@ -611,25 +611,25 @@ public abstract class MiniRaftCluster implements Closeable {
private Stream<RaftServerProxy> getRaftServerProxyStream(RaftGroupId groupId) {
return servers.values().stream()
- .filter(s -> groupId == null || s.containsGroup(groupId));
+ .filter(s -> groupId == null || s.getGroupIds().contains(groupId));
}
public Iterable<RaftServer.Division> iterateDivisions() {
return CollectionUtils.as(getServers(), this::getDivision);
}
- private Stream<RaftServerImpl> getServerStream(RaftGroupId groupId) {
+ private Stream<RaftServer.Division> getServerStream(RaftGroupId groupId) {
final Stream<RaftServerProxy> stream = getRaftServerProxyStream(groupId);
return groupId != null?
- stream.map(s -> JavaUtils.callAsUnchecked(() -> s.getImpl(groupId)))
+ stream.map(s -> JavaUtils.callAsUnchecked(() -> s.getDivision(groupId)))
: stream.flatMap(s -> JavaUtils.callAsUnchecked(s::getImpls).stream());
}
- public Stream<RaftServerImpl> getServerAliveStream() {
+ public Stream<RaftServer.Division> getServerAliveStream() {
return getServerAliveStream(getGroupId());
}
- private Stream<RaftServerImpl> getServerAliveStream(RaftGroupId groupId) {
+ private Stream<RaftServer.Division> getServerAliveStream(RaftGroupId groupId) {
return getServerStream(groupId).filter(server -> server.getInfo().isAlive());
}
@@ -641,6 +641,10 @@ public abstract class MiniRaftCluster implements Closeable {
return servers.get(id);
}
+ public ServerFactory getServerFactory(RaftPeerId id) {
+ return servers.get(id).getFactory();
+ }
+
public RaftServer.Division getDivision(RaftPeerId id) {
return getDivision(servers.get(id));
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index 0897c80..cf9307c 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -18,20 +18,26 @@
package org.apache.ratis.server.impl;
import org.apache.log4j.Level;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.DataStreamMap;
+import org.apache.ratis.server.DivisionInfo;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.server.metrics.RaftServerMetrics;
+import org.apache.ratis.server.raftlog.segmented.SegmentedRaftLog;
import org.apache.ratis.server.storage.RaftStorage;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.Log4jUtils;
import org.apache.ratis.util.TimeDuration;
import org.junit.Assert;
+import org.mockito.Mockito;
import org.mockito.internal.util.reflection.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,6 +47,11 @@ import java.util.Collection;
import java.util.Optional;
import java.util.stream.Stream;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doCallRealMethod;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
public class RaftServerTestUtil {
static final Logger LOG = LoggerFactory.getLogger(RaftServerTestUtil.class);
@@ -159,13 +170,30 @@ public class RaftServerTestUtil {
return new DataStreamMapImpl(name);
}
- public static void shutdown(RaftServer.Division server) {
- ((RaftServerImpl)server).shutdown();
- }
-
public static void assertLostMajorityHeartbeatsRecently(RaftServer.Division leader) {
final FollowerState f = ((RaftServerImpl)leader).getRole().getFollowerState().orElse(null);
Assert.assertNotNull(f);
Assert.assertTrue(f.lostMajorityHeartbeatsRecently());
}
+
+ public static SegmentedRaftLog newSegmentedRaftLog(RaftGroupMemberId memberId, DivisionInfo info,
+ RaftStorage storage, RaftProperties properties) {
+ final RaftServerImpl server = Mockito.mock(RaftServerImpl.class);
+ Mockito.when(server.getInfo()).thenReturn(info);
+
+ return new SegmentedRaftLog(memberId, server, null,
+ server::notifyTruncatedLogEntry,
+ server::submitUpdateCommitEvent,
+ storage, -1, properties);
+ }
+
+ public static SegmentedRaftLog newSegmentedRaftLog(RaftGroupMemberId memberId, RetryCache retryCache,
+ RaftStorage storage, RaftProperties properties) {
+ final RaftServerImpl server = mock(RaftServerImpl.class);
+ when(server.getRetryCache()).thenReturn(retryCache);
+ when(server.getMemberId()).thenReturn(memberId);
+ doCallRealMethod().when(server).notifyTruncatedLogEntry(any(RaftProtos.LogEntryProto.class));
+ return new SegmentedRaftLog(memberId, server, null,
+ server::notifyTruncatedLogEntry, server::submitUpdateCommitEvent, storage, -1, properties);
+ }
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java
index 2e6875f..70308ef 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RetryCacheTestUtil.java
@@ -19,6 +19,7 @@ package org.apache.ratis.server.impl;
import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.protocol.ClientInvocationId;
+import org.apache.ratis.server.RaftServer;
import org.apache.ratis.util.TimeDuration;
import org.junit.Assert;
@@ -43,7 +44,7 @@ public class RetryCacheTestUtil {
}
}
- public static void getOrCreateEntry(RetryCache cache, ClientInvocationId invocationId) {
- cache.getOrCreateEntry(invocationId);
+ public static void getOrCreateEntry(RaftServer.Division server, ClientInvocationId invocationId) {
+ ((RaftServerImpl)server).getRetryCache().getOrCreateEntry(invocationId);
}
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
index 4775e07..9692eef 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/StateMachineShutdownTests.java
@@ -104,7 +104,7 @@ public abstract class StateMachineShutdownTests<CLUSTER extends MiniRaftCluster>
Assert.assertTrue(secondFollower.getInfo().getLastAppliedIndex() < logIndex);
// Now shutdown the follower in a separate thread
- final Thread t = new Thread(() -> RaftServerTestUtil.shutdown(secondFollower));
+ final Thread t = new Thread(secondFollower::close);
t.start();
// The second follower should still be blocked in apply transaction
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogTestUtils.java b/ratis-server/src/test/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogTestUtils.java
index 98f3a77..04527e7 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogTestUtils.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/raftlog/segmented/SegmentedRaftLogTestUtils.java
@@ -18,7 +18,6 @@
package org.apache.ratis.server.raftlog.segmented;
import org.apache.log4j.Level;
-import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.util.Log4jUtils;
public interface SegmentedRaftLogTestUtils {
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 50068d6..b91fb77 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -128,6 +128,9 @@ abstract class DataStreamBaseTest extends BaseTest {
public RaftClient getRaftClient() {
return this.client;
}
+
+ @Override
+ public void close() {}
}
static class Server {
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index d88980e..e652e86 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -30,6 +30,7 @@ import com.codahale.metrics.Gauge;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.log4j.Level;
import org.apache.ratis.BaseTest;
+import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.RaftTestUtil.SimpleMessage;
@@ -115,10 +116,16 @@ public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterW
// the rpc server on failure.
RaftServerConfigKeys.setStorageDir(p, Collections.singletonList(cluster.getStorageDir(leaderId)));
testFailureCase("start a new server with the same address",
- () -> ServerImplUtils.newRaftServer(leaderId, cluster.getGroup(), gid -> stateMachine, p, null).start(),
+ () -> startNewServer(leaderId, cluster.getGroup(), stateMachine, p),
IOException.class, OverlappingFileLockException.class);
// Try to start a raft server rpc at the leader address.
- cluster.getServer(leaderId).getFactory().newRaftServerRpc(cluster.getServer(leaderId));
+ cluster.getServerFactory(leaderId).newRaftServerRpc(cluster.getServer(leaderId));
+ }
+
+ static void startNewServer(RaftPeerId id, RaftGroup group, StateMachine stateMachine, RaftProperties properties)
+ throws Exception {
+ final RaftServer d = ServerImplUtils.newRaftServer(id, group, gid -> stateMachine, properties, null);
+ d.start();
}
@Test
@@ -128,7 +135,7 @@ public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterW
void runTestUnsupportedMethods(MiniRaftClusterWithGrpc cluster) throws Exception {
final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
- final RaftServerRpc rpc = cluster.getServer(leaderId).getFactory().newRaftServerRpc(cluster.getServer(leaderId));
+ final RaftServerRpc rpc = cluster.getServerFactory(leaderId).newRaftServerRpc(cluster.getServer(leaderId));
testFailureCase("appendEntries",
() -> rpc.appendEntries(null),
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
index 3e545b4..22fabc0 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestCacheEviction.java
@@ -28,7 +28,7 @@ import org.apache.ratis.server.DivisionInfo;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.server.impl.RaftServerConstants;
-import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.raftlog.RaftLog;
import org.apache.ratis.server.raftlog.segmented.CacheInvalidationPolicy.CacheInvalidationPolicyDefault;
@@ -165,13 +165,10 @@ public class TestCacheEviction extends BaseTest {
RaftServerConfigKeys.setStorageDir(prop, Collections.singletonList(storageDir));
RaftStorage storage = new RaftStorage(storageDir, RaftServerConstants.StartupOption.REGULAR);
- RaftServerImpl server = Mockito.mock(RaftServerImpl.class);
final DivisionInfo info = Mockito.mock(DivisionInfo.class);
Mockito.when(info.getLastAppliedIndex()).thenReturn(0L);
Mockito.when(info.getFollowerNextIndices()).thenReturn(new long[]{});
- Mockito.when(server.getInfo()).thenReturn(info);
-
- SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, server, storage, -1, prop);
+ final SegmentedRaftLog raftLog = RaftServerTestUtil.newSegmentedRaftLog(memberId, info, storage, prop);
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
List<SegmentRange> slist = TestSegmentedRaftLog.prepareRanges(0, maxCachedNum, 7, 0);
LogEntryProto[] entries = generateEntries(slist);
diff --git a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
index ce81bb9..e2cd6acc 100644
--- a/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
+++ b/ratis-test/src/test/java/org/apache/ratis/server/raftlog/segmented/TestSegmentedRaftLog.java
@@ -28,9 +28,9 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.exceptions.TimeoutIOException;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.impl.RaftServerConstants;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.impl.RetryCacheTestUtil;
import org.apache.ratis.server.impl.RetryCache;
-import org.apache.ratis.server.impl.RaftServerImpl;
import org.apache.ratis.server.impl.ServerProtoUtils;
import org.apache.ratis.server.metrics.RaftLogMetrics;
import org.apache.ratis.server.protocol.TermIndex;
@@ -64,12 +64,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doCallRealMethod;
-import static org.mockito.Mockito.doNothing;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
import com.codahale.metrics.Timer;
public class TestSegmentedRaftLog extends BaseTest {
@@ -109,7 +103,11 @@ public class TestSegmentedRaftLog extends BaseTest {
private int bufferSize;
SegmentedRaftLog newSegmentedRaftLog() {
- return new SegmentedRaftLog(memberId, null, storage, -1, properties);
+ return newSegmentedRaftLog(storage, properties);
+ }
+
+ static SegmentedRaftLog newSegmentedRaftLog(RaftStorage storage, RaftProperties properties) {
+ return new SegmentedRaftLog(memberId, null, null, null, null, storage, -1, properties);
}
@Before
@@ -430,7 +428,7 @@ public class TestSegmentedRaftLog extends BaseTest {
final RaftProperties p = new RaftProperties();
RaftServerConfigKeys.Log.setPurgeGap(p, purgeGap);
- try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, null, storage, -1, p)) {
+ try (SegmentedRaftLog raftLog = newSegmentedRaftLog(storage, p)) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
entries.stream().map(raftLog::appendEntry).forEach(CompletableFuture::join);
final CompletableFuture<Long> f = raftLog.purge(purgeIndex);
@@ -449,13 +447,8 @@ public class TestSegmentedRaftLog extends BaseTest {
List<SegmentRange> ranges = prepareRanges(0, 5, 200, 0);
List<LogEntryProto> entries = prepareLogEntries(ranges, null);
- RaftServerImpl server = mock(RaftServerImpl.class);
RetryCache retryCache = RetryCacheTestUtil.createRetryCache();
- when(server.getRetryCache()).thenReturn(retryCache);
- final RaftGroupMemberId id = RaftGroupMemberId.valueOf(RaftPeerId.valueOf("s0"), RaftGroupId.randomId());
- when(server.getMemberId()).thenReturn(id);
- doCallRealMethod().when(server).notifyTruncatedLogEntry(any(LogEntryProto.class));
- try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, server, storage, -1, properties)) {
+ try (SegmentedRaftLog raftLog = RaftServerTestUtil.newSegmentedRaftLog(memberId, retryCache, storage, properties)) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
entries.forEach(entry -> RetryCacheTestUtil.createEntry(retryCache, entry));
// append entries to the raftlog
@@ -470,7 +463,7 @@ public class TestSegmentedRaftLog extends BaseTest {
List<LogEntryProto> newEntries = prepareLogEntries(
Arrays.asList(r1, r2, r3), null);
- try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, server, storage, -1, properties)) {
+ try (SegmentedRaftLog raftLog = RaftServerTestUtil.newSegmentedRaftLog(memberId, retryCache, storage, properties)) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
LOG.info("newEntries[0] = {}", newEntries.get(0));
final int last = newEntries.size() - 1;
@@ -487,7 +480,7 @@ public class TestSegmentedRaftLog extends BaseTest {
}
// load the raftlog again and check
- try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, server, storage, -1, properties)) {
+ try (SegmentedRaftLog raftLog = RaftServerTestUtil.newSegmentedRaftLog(memberId, retryCache, storage, properties)) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
checkEntries(raftLog, entries, 0, 650);
checkEntries(raftLog, newEntries, 100, 100);
@@ -507,7 +500,7 @@ public class TestSegmentedRaftLog extends BaseTest {
final List<LogEntryProto> entries = prepareLogEntries(range, null, true, new ArrayList<>());
final SimpleStateMachine4Testing sm = new SimpleStateMachine4Testing();
- try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, null, sm, null, storage, -1, properties)) {
+ try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, null, sm, null, null, storage, -1, properties)) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
int next = 0;
@@ -571,10 +564,8 @@ public class TestSegmentedRaftLog extends BaseTest {
}
};
- RaftServerImpl server = mock(RaftServerImpl.class);
- doNothing().when(server).shutdown();
Throwable ex = null; // TimeoutIOException
- try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, server, sm, null, storage, -1, properties)) {
+ try (SegmentedRaftLog raftLog = new SegmentedRaftLog(memberId, null, sm, null, null, storage, -1, properties)) {
raftLog.open(RaftLog.INVALID_LOG_INDEX, null);
// SegmentedRaftLogWorker should catch TimeoutIOException
CompletableFuture<Long> f = raftLog.appendEntry(entry);