You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/09 23:35:31 UTC
[incubator-ratis] branch master updated: RATIS-1226. The primary
server should get the other peers from the current RaftConfiguration.
(#343)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 6e94069 RATIS-1226. The primary server should get the other peers from the current RaftConfiguration. (#343)
6e94069 is described below
commit 6e940698b30b5a6a2e3970b81cbf23c981dd03b9
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Dec 10 07:35:23 2020 +0800
RATIS-1226. The primary server should get the other peers from the current RaftConfiguration. (#343)
* RATIS-1226. The primary server should get the other peers from the current RaftConfiguration.
* Fix TestNettyDataStreamWithMock.
---
.../ratis/examples/filestore/cli/Server.java | 7 -------
.../org/apache/ratis/grpc/server/GrpcService.java | 2 +-
.../apache/ratis/grpc/MiniRaftClusterWithGrpc.java | 4 ++--
.../ratis/hadooprpc/server/HadoopRpcService.java | 2 +-
.../ratis/netty/server/DataStreamManagement.java | 23 ++++++++++++++++------
.../apache/ratis/netty/server/NettyRpcService.java | 2 +-
.../ratis/netty/server/NettyServerStreamRpc.java | 17 ++++------------
.../ratis/netty/MiniRaftClusterWithNetty.java | 4 ++--
.../java/org/apache/ratis/server/RaftServer.java | 4 ++++
.../server/{impl => }/RaftServerRpcWithProxy.java | 3 +--
.../ratis/server/impl/DataStreamServerImpl.java | 4 ++--
.../apache/ratis/server/impl/RaftServerImpl.java | 3 +--
.../apache/ratis/server/impl/RaftServerProxy.java | 10 +++++++++-
.../apache/ratis/server/impl/MiniRaftCluster.java | 12 -----------
.../ratis/server/impl/RaftServerTestUtil.java | 9 +++++++++
.../ratis/datastream/DataStreamBaseTest.java | 21 +++++++++++++-------
.../datastream/TestNettyDataStreamWithMock.java | 4 ----
17 files changed, 68 insertions(+), 63 deletions(-)
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
index 192d721..ac29d96 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
@@ -35,7 +35,6 @@ import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
-import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.statemachine.StateMachine;
import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
import org.apache.ratis.util.LifeCycle;
@@ -44,7 +43,6 @@ import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import java.io.File;
-import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
@@ -96,11 +94,6 @@ public class Server extends SubCommandBase {
raftServer.start();
- if (isPrimary(id)) {
- ((RaftServerProxy) raftServer).getDataStreamServerRpc()
- .addRaftPeers(getOtherRaftPeers(Arrays.asList(getPeers())));
- }
-
for (; raftServer.getLifeCycleState() != LifeCycle.State.CLOSED; ) {
TimeUnit.SECONDS.sleep(1);
}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index ce0eef7..0d0f72e 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -27,7 +27,7 @@ import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
import org.apache.ratis.server.RaftServerRpc;
-import org.apache.ratis.server.impl.RaftServerRpcWithProxy;
+import org.apache.ratis.server.RaftServerRpcWithProxy;
import org.apache.ratis.thirdparty.io.grpc.ServerInterceptors;
import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
import org.apache.ratis.thirdparty.io.grpc.netty.NettyServerBuilder;
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
index 2c68ad5..4864183 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
@@ -17,7 +17,6 @@
*/
package org.apache.ratis.grpc;
-import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.conf.Parameters;
@@ -26,7 +25,8 @@ import org.apache.ratis.grpc.server.GrpcService;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.server.impl.*;
+import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
+import org.apache.ratis.server.impl.MiniRaftCluster;
/**
* A {@link MiniRaftCluster} with {{@link SupportedRpcType#GRPC}} and data stream disabled.
diff --git a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
index b75938e..8552765 100644
--- a/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
+++ b/ratis-hadoop/src/main/java/org/apache/ratis/hadooprpc/server/HadoopRpcService.java
@@ -38,7 +38,7 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerRpc;
-import org.apache.ratis.server.impl.RaftServerRpcWithProxy;
+import org.apache.ratis.server.RaftServerRpcWithProxy;
import org.apache.ratis.server.protocol.RaftServerProtocol;
import com.google.protobuf.BlockingService;
import com.google.protobuf.ByteString;
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 0b6804b..73aa303 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -31,6 +31,7 @@ import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.exceptions.AlreadyExistsException;
import org.apache.ratis.protocol.exceptions.DataStreamException;
import org.apache.ratis.server.RaftServer;
@@ -43,7 +44,7 @@ import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelId;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
-import org.apache.ratis.util.function.CheckedFunction;
+import org.apache.ratis.util.function.CheckedBiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -226,13 +227,23 @@ public class DataStreamManagement {
}
private StreamInfo newStreamInfo(ByteBuf buf,
- CheckedFunction<RaftClientRequest, List<DataStreamOutputRpc>, IOException> getDataStreamOutput) {
+ CheckedBiFunction<RaftClientRequest, List<RaftPeer>, List<DataStreamOutputRpc>, IOException> getStreams) {
try {
final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(
RaftClientRequestProto.parseFrom(buf.nioBuffer()));
final boolean isPrimary = server.getId().equals(request.getServerId());
- return new StreamInfo(request, isPrimary, computeDataStreamIfAbsent(request),
- isPrimary? getDataStreamOutput.apply(request): Collections.emptyList());
+ final List<DataStreamOutputRpc> outs;
+ if (isPrimary) {
+ final RaftGroupId groupId = request.getRaftGroupId();
+ // get the other peers from the current configuration
+ final List<RaftPeer> others = server.getDivision(groupId).getRaftConf().getCurrentPeers().stream()
+ .filter(p -> !p.getId().equals(server.getId()))
+ .collect(Collectors.toList());
+ outs = getStreams.apply(request, others);
+ } else {
+ outs = Collections.emptyList();
+ }
+ return new StreamInfo(request, isPrimary, computeDataStreamIfAbsent(request), outs);
} catch (Throwable e) {
throw new CompletionException(e);
}
@@ -341,13 +352,13 @@ public class DataStreamManagement {
}
void read(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
- CheckedFunction<RaftClientRequest, List<DataStreamOutputRpc>, IOException> getDataStreamOutput) {
+ CheckedBiFunction<RaftClientRequest, List<RaftPeer>, List<DataStreamOutputRpc>, IOException> getStreams) {
LOG.debug("{}: read {}", this, request);
final ByteBuf buf = request.slice();
final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
final StreamInfo info;
if (request.getType() == Type.STREAM_HEADER) {
- final MemoizedSupplier<StreamInfo> supplier = JavaUtils.memoize(() -> newStreamInfo(buf, getDataStreamOutput));
+ final MemoizedSupplier<StreamInfo> supplier = JavaUtils.memoize(() -> newStreamInfo(buf, getStreams));
info = streams.computeIfAbsent(key, id -> supplier.get());
if (!supplier.isInitialized()) {
throw new IllegalStateException("Failed to create a new stream for " + request
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
index 4b0f02f..7c4b1b3 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
@@ -27,7 +27,7 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.SupportedRpcType;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerRpc;
-import org.apache.ratis.server.impl.RaftServerRpcWithProxy;
+import org.apache.ratis.server.RaftServerRpcWithProxy;
import org.apache.ratis.thirdparty.io.netty.bootstrap.ServerBootstrap;
import org.apache.ratis.thirdparty.io.netty.channel.*;
import org.apache.ratis.thirdparty.io.netty.channel.nio.NioEventLoopGroup;
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 9b06e24..82811b7 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -58,21 +58,14 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class NettyServerStreamRpc implements DataStreamServerRpc {
public static final Logger LOG = LoggerFactory.getLogger(NettyServerStreamRpc.class);
- /**
- * Proxies to other peers.
- *
- * Invariant: all the {@link #peers} must exist in the {@link #map}.
- */
+ /** Proxies to other peers. */
static class Proxies {
- private final Set<RaftPeer> peers = new CopyOnWriteArraySet<>();
private final PeerProxyMap<DataStreamClient> map;
Proxies(PeerProxyMap<DataStreamClient> map) {
@@ -82,14 +75,12 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
void addPeers(Collection<RaftPeer> newPeers) {
// add to the map first in order to preserve the invariant.
map.addRaftPeers(newPeers);
- // must use atomic addAll
- peers.addAll(newPeers);
}
- List<DataStreamOutputRpc> getDataStreamOutput(RaftClientRequest request) throws IOException {
+ List<DataStreamOutputRpc> getDataStreamOutput(RaftClientRequest request, List<RaftPeer> peers) throws IOException {
final List<DataStreamOutputRpc> outs = new ArrayList<>();
try {
- getDataStreamOutput(outs, request);
+ getDataStreamOutput(request, peers, outs);
} catch (IOException e) {
outs.forEach(DataStreamOutputRpc::closeAsync);
throw e;
@@ -97,7 +88,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
return outs;
}
- private void getDataStreamOutput(List<DataStreamOutputRpc> outs, RaftClientRequest request)
+ private void getDataStreamOutput(RaftClientRequest request, List<RaftPeer> peers, List<DataStreamOutputRpc> outs)
throws IOException {
for (RaftPeer peer : peers) {
try {
diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
index d444549..16ccffd 100644
--- a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
+++ b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
@@ -17,7 +17,6 @@
*/
package org.apache.ratis.netty;
-import org.apache.ratis.server.impl.MiniRaftCluster;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.conf.Parameters;
@@ -26,7 +25,8 @@ import org.apache.ratis.netty.server.NettyRpcService;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.server.impl.*;
+import org.apache.ratis.server.impl.DelayLocalExecutionInjection;
+import org.apache.ratis.server.impl.MiniRaftCluster;
/**
* A {@link MiniRaftCluster} with {{@link SupportedRpcType#NETTY}} and data stream disabled.
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
index 838038a..95bd846 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServer.java
@@ -126,6 +126,10 @@ public interface RaftServer extends Closeable, RpcType.Get,
/** @return the rpc service. */
RaftServerRpc getServerRpc();
+ /** @return the data stream rpc service. */
+ DataStreamServerRpc getDataStreamServerRpc();
+
+ /** @return the {@link RpcType}. */
default RpcType getRpcType() {
return getFactory().getRpcType();
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpcWithProxy.java
similarity index 97%
rename from ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java
rename to ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpcWithProxy.java
index ba10ddb..35ac277 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerRpcWithProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpcWithProxy.java
@@ -15,11 +15,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.server.impl;
+package org.apache.ratis.server;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.server.RaftServerRpc;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.LifeCycle;
import org.apache.ratis.util.PeerProxyMap;
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
index 7f2e57c..f1b3854 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/DataStreamServerImpl.java
@@ -29,12 +29,12 @@ import org.apache.ratis.server.RaftServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class DataStreamServerImpl implements DataStreamServer {
+class DataStreamServerImpl implements DataStreamServer {
public static final Logger LOG = LoggerFactory.getLogger(DataStreamServerImpl.class);
private final DataStreamServerRpc serverRpc;
- public DataStreamServerImpl(RaftServer server, Parameters parameters) {
+ DataStreamServerImpl(RaftServer server, Parameters parameters) {
final SupportedDataStreamType type = RaftConfigKeys.DataStream.type(server.getProperties(), LOG::info);
this.serverRpc = DataStreamServerFactory.newInstance(type, parameters).newDataStreamServerRpc(server);
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 6669216..6cccce0 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -903,8 +903,7 @@ class RaftServerImpl implements RaftServer.Division,
return pending.getFuture();
}
- // add new peers into the rpc service
- getServerRpc().addRaftPeers(peersInNewConf);
+ getRaftServer().addRaftPeers(peersInNewConf);
// add staging state into the leaderState
pending = leaderState.startSetConfiguration(request);
}
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
index f285d57..069386f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerProxy.java
@@ -54,6 +54,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -242,10 +243,16 @@ public class RaftServerProxy implements RaftServer {
raftGroup.ifPresent(this::addGroup);
}
+ void addRaftPeers(Collection<RaftPeer> peers) {
+ final List<RaftPeer> others = peers.stream().filter(p -> !p.getId().equals(getId())).collect(Collectors.toList());
+ getServerRpc().addRaftPeers(others);
+ getDataStreamServerRpc().addRaftPeers(others);
+ }
+
private CompletableFuture<RaftServerImpl> newRaftServerImpl(RaftGroup group) {
return CompletableFuture.supplyAsync(() -> {
try {
- serverRpc.addRaftPeers(group.getPeers());
+ addRaftPeers(group.getPeers());
return new RaftServerImpl(group, stateMachineRegistry.apply(group.getGroupId()), this);
} catch(IOException e) {
throw new CompletionException(getId() + ": Failed to initialize server for " + group, e);
@@ -307,6 +314,7 @@ public class RaftServerProxy implements RaftServer {
return serverRpc;
}
+ @Override
public DataStreamServerRpc getDataStreamServerRpc() {
return dataStreamServerRpc;
}
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
index acf0735..177f5e4 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/MiniRaftCluster.java
@@ -280,17 +280,6 @@ public abstract class MiniRaftCluster implements Closeable {
return this;
}
- private void initDataStreamServer() {
- LOG.info("Setting up data stream servers");
- for (RaftServerProxy serverProxy : servers.values()) {
- serverProxy.getDataStreamServerRpc().addRaftPeers(getOtherRaftPeers(serverProxy.getId()));
- }
- }
-
- private Collection<RaftPeer> getOtherRaftPeers(RaftPeerId id) {
- return peers.values().stream().filter(r -> !r.getId().equals(id)).collect(Collectors.toList());
- }
-
public RaftServerProxy putNewServer(RaftPeerId id, RaftGroup group, boolean format) {
final RaftServerProxy s = newRaftServer(id, group, format);
Preconditions.assertTrue(servers.put(id, s) == null);
@@ -313,7 +302,6 @@ public abstract class MiniRaftCluster implements Closeable {
LOG.info(".............................................................. ");
initServers();
- initDataStreamServer();
startServers(servers.values());
this.timer.updateAndGet(t -> t != null? t
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index 55f650b..9fc50f1 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -24,6 +24,7 @@ import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.DataStreamMap;
+import org.apache.ratis.server.DataStreamServer;
import org.apache.ratis.server.DivisionInfo;
import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.RaftServer;
@@ -115,6 +116,10 @@ public class RaftServerTestUtil {
return (ConfigurationManager) Whitebox.getInternalState(getState(server), "configurationManager");
}
+ public static RaftConfiguration newRaftConfiguration(Collection<RaftPeer> peers) {
+ return RaftConfigurationImpl.newBuilder().setConf(peers).build();
+ }
+
public static RaftConfiguration getRaftConf(RaftServer.Division server) {
return ((RaftServerImpl)server).getRaftConf();
}
@@ -149,6 +154,10 @@ public class RaftServerTestUtil {
return JavaUtils.callAsUnchecked(() -> server.getDivision(groupId));
}
+ public static DataStreamServer newDataStreamServer(RaftServer server) {
+ return new DataStreamServerImpl(server, null);
+ }
+
public static DataStreamMap newDataStreamMap(Object name) {
return new DataStreamMapImpl(name);
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 6694db5..579a965 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -18,6 +18,8 @@
package org.apache.ratis.datastream;
import org.apache.ratis.BaseTest;
+import org.apache.ratis.server.DataStreamServer;
+import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.DivisionInfo;
import org.apache.ratis.server.DivisionProperties;
import org.apache.ratis.server.RaftServerRpc;
@@ -54,7 +56,6 @@ import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.SetConfigurationRequest;
import org.apache.ratis.server.DataStreamMap;
import org.apache.ratis.server.RaftServer;
-import org.apache.ratis.server.impl.DataStreamServerImpl;
import org.apache.ratis.server.RaftConfiguration;
import org.apache.ratis.server.impl.RaftServerTestUtil;
import org.apache.ratis.server.ServerFactory;
@@ -79,7 +80,7 @@ import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
abstract class DataStreamBaseTest extends BaseTest {
- static class MyDivision implements RaftServer.Division {
+ class MyDivision implements RaftServer.Division {
private final RaftServer server;
private final MultiDataStreamStateMachine stateMachine = new MultiDataStreamStateMachine();
private final DataStreamMap streamMap;
@@ -107,7 +108,8 @@ abstract class DataStreamBaseTest extends BaseTest {
@Override
public RaftConfiguration getRaftConf() {
- return null;
+ final List<RaftPeer> peers = servers.stream().map(Server::getPeer).collect(Collectors.toList());
+ return RaftServerTestUtil.newRaftConfiguration(peers);
}
@Override
@@ -161,12 +163,12 @@ abstract class DataStreamBaseTest extends BaseTest {
static class Server {
private final RaftPeer peer;
private final RaftServer raftServer;
- private final DataStreamServerImpl dataStreamServer;
+ private final DataStreamServer dataStreamServer;
Server(RaftPeer peer, RaftServer raftServer) {
this.peer = peer;
this.raftServer = raftServer;
- this.dataStreamServer = new DataStreamServerImpl(raftServer, null);
+ this.dataStreamServer = RaftServerTestUtil.newDataStreamServer(raftServer);
}
RaftPeer getPeer() {
@@ -203,7 +205,7 @@ abstract class DataStreamBaseTest extends BaseTest {
return new MyRaftServer(peer, properties);
}
- static class MyRaftServer implements RaftServer {
+ class MyRaftServer implements RaftServer {
private final RaftPeer peer;
private final RaftProperties properties;
private final ConcurrentMap<RaftGroupId, MyDivision> divisions = new ConcurrentHashMap<>();
@@ -259,6 +261,11 @@ abstract class DataStreamBaseTest extends BaseTest {
}
@Override
+ public DataStreamServerRpc getDataStreamServerRpc() {
+ return null;
+ }
+
+ @Override
public RaftClientReply submitClientRequest(RaftClientRequest request) {
return submitClientRequestAsync(request).join();
}
@@ -277,7 +284,7 @@ abstract class DataStreamBaseTest extends BaseTest {
.thenApply(channel -> buildRaftClientReply(request, channel));
}
- static RaftClientReply buildRaftClientReply(RaftClientRequest request, DataChannel channel) {
+ RaftClientReply buildRaftClientReply(RaftClientRequest request, DataChannel channel) {
Assert.assertTrue(channel instanceof MyDataChannel);
final MyDataChannel dataChannel = (MyDataChannel) channel;
return RaftClientReply.newBuilder()
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
index e3076de..10803fa 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithMock.java
@@ -27,10 +27,8 @@ import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.RaftClientReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.protocol.RaftGroupMemberId;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
-import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.apache.ratis.protocol.exceptions.RaftException;
import org.apache.ratis.protocol.exceptions.StateMachineException;
import org.apache.ratis.server.RaftServer;
@@ -45,8 +43,6 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import static org.mockito.Mockito.mock;