You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/18 04:00:23 UTC
[incubator-ratis] branch master updated: RATIS-1162. Enable testing
clusters with Netty data stream and gRPC. (#282)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new dc60590 RATIS-1162. Enable testing clusters with Netty data stream and gRPC. (#282)
dc60590 is described below
commit dc6059064b783686d72a3d2b8bf42bc374db095b
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Wed Nov 18 12:00:13 2020 +0800
RATIS-1162. Enable testing clusters with Netty data stream and gRPC. (#282)
* RATIS-1162. Enable testing clusters with Netty data stream and gRPC.
* Add MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty.
---
.../apache/ratis/grpc/MiniRaftClusterWithGrpc.java | 10 ++--
.../ratis/netty/MiniRaftClusterWithNetty.java | 8 +--
.../ratis/datastream/DataStreamBaseTest.java | 59 +++++++++++++++++-----
...treamTests.java => DataStreamClusterTests.java} | 43 ++++------------
...usterWithRpcTypeGrpcAndDataStreamTypeNetty.java | 50 ++++++++----------
...sterWithRpcTypeNettyAndDataStreamTypeNetty.java | 48 ++++++++----------
...ava => TestNettyDataStreamWithGrpcCluster.java} | 11 ++--
...va => TestNettyDataStreamWithNettyCluster.java} | 11 ++--
8 files changed, 116 insertions(+), 124 deletions(-)
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
index 0174860..9a2cee3 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
+++ b/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -30,12 +30,14 @@ import org.apache.ratis.statemachine.StateMachine;
import java.io.IOException;
+/**
+ * A {@link MiniRaftCluster} with {{@link SupportedRpcType#GRPC}} and data stream disabled.
+ */
public class MiniRaftClusterWithGrpc extends MiniRaftCluster.RpcBase {
public static final Factory<MiniRaftClusterWithGrpc> FACTORY
= new Factory<MiniRaftClusterWithGrpc>() {
@Override
- public MiniRaftClusterWithGrpc newCluster(
- String[] ids, RaftProperties prop) {
+ public MiniRaftClusterWithGrpc newCluster(String[] ids, RaftProperties prop) {
RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.GRPC);
return new MiniRaftClusterWithGrpc(ids, prop);
}
@@ -51,7 +53,7 @@ public class MiniRaftClusterWithGrpc extends MiniRaftCluster.RpcBase {
public static final DelayLocalExecutionInjection sendServerRequestInjection =
new DelayLocalExecutionInjection(GrpcService.GRPC_SEND_SERVER_REQUEST);
- private MiniRaftClusterWithGrpc(String[] ids, RaftProperties properties) {
+ protected MiniRaftClusterWithGrpc(String[] ids, RaftProperties properties) {
super(ids, properties, null);
}
diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
index 657dd11..f5a9b15 100644
--- a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
+++ b/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -30,6 +30,9 @@ import org.apache.ratis.statemachine.StateMachine;
import java.io.IOException;
+/**
+ * A {@link MiniRaftCluster} with {{@link SupportedRpcType#NETTY}} and data stream disabled.
+ */
public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase {
public static final Factory<MiniRaftClusterWithNetty> FACTORY
= new Factory<MiniRaftClusterWithNetty>() {
@@ -50,7 +53,7 @@ public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase {
public static final DelayLocalExecutionInjection sendServerRequest
= new DelayLocalExecutionInjection(NettyRpcService.SEND_SERVER_REQUEST);
- private MiniRaftClusterWithNetty(String[] ids, RaftProperties properties) {
+ protected MiniRaftClusterWithNetty(String[] ids, RaftProperties properties) {
super(ids, properties, null);
}
@@ -59,7 +62,6 @@ public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase {
RaftPeerId id, StateMachine.Registry stateMachineRegistry , RaftGroup group,
RaftProperties properties) throws IOException {
NettyConfigKeys.Server.setPort(properties, getPort(id, group));
- NettyConfigKeys.DataStream.setPort(properties, getDataStreamPort(id, group));
return ServerImplUtils.newRaftServer(id, group, stateMachineRegistry, properties, null);
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index f075048..b206b0c 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -63,6 +63,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
@@ -88,17 +89,43 @@ abstract class DataStreamBaseTest extends BaseTest {
private final Executor executor = Executors.newFixedThreadPool(16);
static class MultiDataStreamStateMachine extends BaseStateMachine {
- final ConcurrentMap<Long, SingleDataStream> streams = new ConcurrentHashMap<>();
+ static class Key {
+ private final ClientId clientId;
+ private final long callId;
+
+ Key(RaftClientRequest request) {
+ this.clientId = request.getClientId();
+ this.callId = request.getCallId();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ } else if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ final Key that = (Key) obj;
+ return this.callId == that.callId && Objects.equals(this.clientId, that.clientId);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(clientId, callId);
+ }
+ }
+
+ private final ConcurrentMap<Key, SingleDataStream> streams = new ConcurrentHashMap<>();
@Override
public CompletableFuture<DataStream> stream(RaftClientRequest request) {
final SingleDataStream s = new SingleDataStream(request);
- streams.put(request.getCallId(), s);
+ streams.put(new Key(request), s);
return CompletableFuture.completedFuture(s);
}
- SingleDataStream getSingleDataStream(long callId) {
- return streams.get(callId);
+ SingleDataStream getSingleDataStream(RaftClientRequest request) {
+ return streams.get(new Key(request));
}
}
@@ -260,7 +287,7 @@ abstract class DataStreamBaseTest extends BaseTest {
@Override
public CompletableFuture<RaftClientReply> submitClientRequestAsync(RaftClientRequest request) {
final MultiDataStreamStateMachine stateMachine = getStateMachine(request.getRaftGroupId());
- final SingleDataStream stream = stateMachine.getSingleDataStream(request.getCallId());
+ final SingleDataStream stream = stateMachine.getSingleDataStream(request);
Assert.assertFalse(stream.getWritableByteChannel().isOpen());
return CompletableFuture.completedFuture(RaftClientReply.newBuilder()
.setRequest(request)
@@ -449,8 +476,7 @@ abstract class DataStreamBaseTest extends BaseTest {
}
}
- CompletableFuture<RaftClientReply> runTestDataStream(DataStreamOutputImpl out, int bufferSize, int bufferNum) {
- LOG.info("start stream {}", out.getHeader().getCallId());
+ static int writeAndAssertReplies(DataStreamOutputImpl out, int bufferSize, int bufferNum) {
final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
final List<Integer> sizes = new ArrayList<>();
@@ -480,20 +506,25 @@ abstract class DataStreamBaseTest extends BaseTest {
Assert.assertEquals(sizes.get(i).longValue(), reply.getBytesWritten());
Assert.assertEquals(reply.getType(), Type.STREAM_DATA);
}
+ return dataSize;
+ }
+
+ CompletableFuture<RaftClientReply> runTestDataStream(DataStreamOutputImpl out, int bufferSize, int bufferNum) {
+ LOG.info("start Stream{}", out.getHeader().getCallId());
+ final int bytesWritten = writeAndAssertReplies(out, bufferSize, bufferNum);
try {
for (Server s : servers) {
- assertHeader(s, out.getHeader(), dataSize);
+ assertHeader(s, out.getHeader(), bytesWritten);
}
} catch (Throwable e) {
throw new CompletionException(e);
}
- final long byteWritten = dataSize;
- return out.closeAsync().thenCompose(reply -> assertCloseReply(out, reply, byteWritten));
+ return out.closeAsync().thenCompose(reply -> assertCloseReply(out, reply, bytesWritten));
}
- CompletableFuture<RaftClientReply> assertCloseReply(DataStreamOutputImpl out, DataStreamReply dataStreamReply,
- long byteWritten) {
+ static CompletableFuture<RaftClientReply> assertCloseReply(DataStreamOutputImpl out, DataStreamReply dataStreamReply,
+ long bytesWritten) {
// Test close idempotent
Assert.assertSame(dataStreamReply, out.closeAsync().join());
testFailureCase("writeAsync should fail",
@@ -507,7 +538,7 @@ abstract class DataStreamBaseTest extends BaseTest {
if (reply.isSuccess()) {
final ByteString bytes = reply.getMessage().getContent();
if (!bytes.equals(MOCK)) {
- Assert.assertEquals(bytesWritten2ByteString(byteWritten), bytes);
+ Assert.assertEquals(bytesWritten2ByteString(bytesWritten), bytes);
}
}
@@ -519,7 +550,7 @@ abstract class DataStreamBaseTest extends BaseTest {
void assertHeader(Server server, RaftClientRequest header, int dataSize) throws Exception {
final MultiDataStreamStateMachine s = server.getStateMachine(header.getRaftGroupId());
- final SingleDataStream stream = s.getSingleDataStream(header.getCallId());
+ final SingleDataStream stream = s.getSingleDataStream(header);
Assert.assertEquals(raftGroup.getGroupId(), header.getRaftGroupId());
Assert.assertEquals(dataSize, stream.getByteWritten());
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
similarity index 67%
rename from ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTests.java
rename to ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
index 1e6459f..4b9cac2 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
@@ -17,17 +17,13 @@
*/
package org.apache.ratis.datastream;
-import static org.apache.ratis.RaftTestUtil.waitForLeader;
-
import org.apache.ratis.BaseTest;
import org.apache.ratis.MiniRaftCluster;
-import org.apache.ratis.RaftConfigKeys;
-import org.apache.ratis.client.DataStreamOutputRpc;
import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
import org.apache.ratis.datastream.DataStreamBaseTest.MultiDataStreamStateMachine;
-import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.proto.RaftProtos.LogEntryProto;
import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
-import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.server.impl.RaftServerImpl;
@@ -36,21 +32,17 @@ import org.apache.ratis.server.raftlog.RaftLog;
import org.junit.Assert;
import org.junit.Test;
-import java.nio.ByteBuffer;
import java.util.Collection;
-import java.util.concurrent.ThreadLocalRandom;
-public abstract class DataStreamTests <CLUSTER extends MiniRaftCluster> extends BaseTest
+import static org.apache.ratis.RaftTestUtil.waitForLeader;
+
+public abstract class DataStreamClusterTests<CLUSTER extends MiniRaftCluster> extends BaseTest
implements MiniRaftCluster.Factory.Get<CLUSTER> {
{
- RaftConfigKeys.DataStream.setType(getProperties(), SupportedDataStreamType.NETTY);
setStateMachine(MultiDataStreamStateMachine.class);
}
public static final int NUM_SERVERS = 3;
- // TODO: change bufferSize and bufferNum configurable
- private static int bufferSize = 1_000_000;
- private static int bufferNum = 10;
@Test
public void testStreamWrites() throws Exception {
@@ -66,31 +58,18 @@ public abstract class DataStreamTests <CLUSTER extends MiniRaftCluster> extends
RaftPeer raftPeer = peers.iterator().next();
try (RaftClient client = cluster.createClient(raftPeer)) {
- // send header
- DataStreamOutputRpc dataStreamOutputRpc = (DataStreamOutputRpc) client.getDataStreamApi().stream();
-
- // send data
- final int halfBufferSize = bufferSize / 2;
- int dataSize = 0;
- for(int i = 0; i < bufferNum; i++) {
- final int size = halfBufferSize + ThreadLocalRandom.current().nextInt(halfBufferSize);
-
- final ByteBuffer bf = DataStreamBaseTest.initBuffer(dataSize, size);
- dataStreamOutputRpc.writeAsync(bf);
- dataSize += size;
+ // send a stream request
+ final long callId;
+ try(final DataStreamOutputImpl out = (DataStreamOutputImpl) client.getDataStreamApi().stream()) {
+ DataStreamBaseTest.writeAndAssertReplies(out, 1000, 10);
+ callId = out.getHeader().getCallId();
}
- // send close
- dataStreamOutputRpc.closeAsync().join();
-
- // get request call id
- long callId = dataStreamOutputRpc.getHeaderFuture().get().getStreamId();
-
// verify the write request is in the Raft log.
RaftLog log = leader.getState().getLog();
boolean transactionFound = false;
for (TermIndex termIndex : log.getEntries(0, Long.MAX_VALUE)) {
- RaftProtos.LogEntryProto entryProto = log.get(termIndex.getIndex());
+ final LogEntryProto entryProto = log.get(termIndex.getIndex());
if (entryProto.hasStateMachineLogEntry()) {
StateMachineLogEntryProto stateMachineEntryProto = entryProto.getStateMachineLogEntry();
if (stateMachineEntryProto.getCallId() == callId) {
diff --git a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.java
similarity index 50%
copy from ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
copy to ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.java
index 0174860..a68b2e1 100644
--- a/ratis-grpc/src/test/java/org/apache/ratis/grpc/MiniRaftClusterWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,58 +15,50 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.grpc;
+package org.apache.ratis.datastream;
import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.RaftConfigKeys;
-import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.grpc.server.GrpcService;
+import org.apache.ratis.grpc.MiniRaftClusterWithGrpc;
+import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.server.impl.*;
+import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.statemachine.StateMachine;
import java.io.IOException;
-public class MiniRaftClusterWithGrpc extends MiniRaftCluster.RpcBase {
- public static final Factory<MiniRaftClusterWithGrpc> FACTORY
- = new Factory<MiniRaftClusterWithGrpc>() {
+/**
+ * A {@link MiniRaftCluster} with {{@link SupportedRpcType#GRPC}} and {@link SupportedDataStreamType#NETTY}.
+ */
+public class MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty extends MiniRaftClusterWithGrpc {
+ public static final Factory<MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty> FACTORY
+ = new Factory<MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty>() {
@Override
- public MiniRaftClusterWithGrpc newCluster(
- String[] ids, RaftProperties prop) {
+ public MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty newCluster(String[] ids, RaftProperties prop) {
RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.GRPC);
- return new MiniRaftClusterWithGrpc(ids, prop);
+ RaftConfigKeys.DataStream.setType(prop, SupportedDataStreamType.NETTY);
+ return new MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty(ids, prop);
}
};
- public interface FactoryGet extends Factory.Get<MiniRaftClusterWithGrpc> {
+ public interface FactoryGet extends Factory.Get<MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty> {
@Override
- default Factory<MiniRaftClusterWithGrpc> getFactory() {
+ default Factory<MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty> getFactory() {
return FACTORY;
}
}
- public static final DelayLocalExecutionInjection sendServerRequestInjection =
- new DelayLocalExecutionInjection(GrpcService.GRPC_SEND_SERVER_REQUEST);
-
- private MiniRaftClusterWithGrpc(String[] ids, RaftProperties properties) {
- super(ids, properties, null);
+ private MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty(String[] ids, RaftProperties properties) {
+ super(ids, properties);
}
@Override
- protected RaftServerProxy newRaftServer(
- RaftPeerId id, StateMachine.Registry stateMachineRegistry, RaftGroup group,
+ protected RaftServerProxy newRaftServer(RaftPeerId id, StateMachine.Registry stateMachineRegistry, RaftGroup group,
RaftProperties properties) throws IOException {
- GrpcConfigKeys.Server.setPort(properties, getPort(id, group));
- return ServerImplUtils.newRaftServer(id, group, stateMachineRegistry, properties, null);
- }
-
- @Override
- protected void blockQueueAndSetDelay(String leaderId, int delayMs)
- throws InterruptedException {
- RaftTestUtil.blockQueueAndSetDelay(getServers(), sendServerRequestInjection,
- leaderId, delayMs, getTimeoutMax());
+ NettyConfigKeys.DataStream.setPort(properties, getDataStreamPort(id, group));
+ return super.newRaftServer(id, stateMachineRegistry, group, properties);
}
}
diff --git a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java b/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty.java
similarity index 52%
copy from ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
copy to ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty.java
index 657dd11..1a493e4 100644
--- a/ratis-netty/src/test/java/org/apache/ratis/netty/MiniRaftClusterWithNetty.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -15,58 +15,50 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.ratis.netty;
+package org.apache.ratis.datastream;
import org.apache.ratis.MiniRaftCluster;
import org.apache.ratis.RaftConfigKeys;
-import org.apache.ratis.RaftTestUtil;
import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.netty.server.NettyRpcService;
+import org.apache.ratis.netty.MiniRaftClusterWithNetty;
+import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.SupportedRpcType;
-import org.apache.ratis.server.impl.*;
+import org.apache.ratis.server.impl.RaftServerProxy;
import org.apache.ratis.statemachine.StateMachine;
import java.io.IOException;
-public class MiniRaftClusterWithNetty extends MiniRaftCluster.RpcBase {
- public static final Factory<MiniRaftClusterWithNetty> FACTORY
- = new Factory<MiniRaftClusterWithNetty>() {
+/**
+ * A {@link MiniRaftCluster} with {{@link SupportedRpcType#NETTY}} and {@link SupportedDataStreamType#NETTY}.
+ */
+public class MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty extends MiniRaftClusterWithNetty {
+ public static final Factory<MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty> FACTORY
+ = new Factory<MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty>() {
@Override
- public MiniRaftClusterWithNetty newCluster(String[] ids, RaftProperties prop) {
+ public MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty newCluster(String[] ids, RaftProperties prop) {
RaftConfigKeys.Rpc.setType(prop, SupportedRpcType.NETTY);
- return new MiniRaftClusterWithNetty(ids, prop);
+ RaftConfigKeys.DataStream.setType(prop, SupportedDataStreamType.NETTY);
+ return new MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty(ids, prop);
}
};
- public interface FactoryGet extends Factory.Get<MiniRaftClusterWithNetty> {
+ public interface FactoryGet extends Factory.Get<MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty> {
@Override
- default Factory<MiniRaftClusterWithNetty> getFactory() {
+ default Factory<MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty> getFactory() {
return FACTORY;
}
}
- public static final DelayLocalExecutionInjection sendServerRequest
- = new DelayLocalExecutionInjection(NettyRpcService.SEND_SERVER_REQUEST);
-
- private MiniRaftClusterWithNetty(String[] ids, RaftProperties properties) {
- super(ids, properties, null);
+ private MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty(String[] ids, RaftProperties properties) {
+ super(ids, properties);
}
@Override
- protected RaftServerProxy newRaftServer(
- RaftPeerId id, StateMachine.Registry stateMachineRegistry , RaftGroup group,
+ protected RaftServerProxy newRaftServer(RaftPeerId id, StateMachine.Registry stateMachineRegistry, RaftGroup group,
RaftProperties properties) throws IOException {
- NettyConfigKeys.Server.setPort(properties, getPort(id, group));
NettyConfigKeys.DataStream.setPort(properties, getDataStreamPort(id, group));
- return ServerImplUtils.newRaftServer(id, group, stateMachineRegistry, properties, null);
- }
-
- @Override
- protected void blockQueueAndSetDelay(String leaderId, int delayMs)
- throws InterruptedException {
- RaftTestUtil.blockQueueAndSetDelay(getServers(), sendServerRequest,
- leaderId, delayMs, getTimeoutMax());
+ return super.newRaftServer(id, stateMachineRegistry, group, properties);
}
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamWithNettyMiniRaftCluster.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithGrpcCluster.java
similarity index 71%
copy from ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamWithNettyMiniRaftCluster.java
copy to ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithGrpcCluster.java
index 8156820..ce5b25d 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamWithNettyMiniRaftCluster.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithGrpcCluster.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,10 +17,7 @@
*/
package org.apache.ratis.datastream;
-import org.apache.ratis.RaftConfigKeys;
-import org.apache.ratis.datastream.DataStreamBaseTest.MultiDataStreamStateMachine;
-import org.apache.ratis.netty.MiniRaftClusterWithNetty;
-
-public class TestDataStreamWithNettyMiniRaftCluster extends DataStreamTests<MiniRaftClusterWithNetty>
- implements MiniRaftClusterWithNetty.FactoryGet {
+public class TestNettyDataStreamWithGrpcCluster
+ extends DataStreamClusterTests<MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty>
+ implements MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.FactoryGet {
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamWithNettyMiniRaftCluster.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithNettyCluster.java
similarity index 71%
rename from ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamWithNettyMiniRaftCluster.java
rename to ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithNettyCluster.java
index 8156820..dd27924 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamWithNettyMiniRaftCluster.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamWithNettyCluster.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -17,10 +17,7 @@
*/
package org.apache.ratis.datastream;
-import org.apache.ratis.RaftConfigKeys;
-import org.apache.ratis.datastream.DataStreamBaseTest.MultiDataStreamStateMachine;
-import org.apache.ratis.netty.MiniRaftClusterWithNetty;
-
-public class TestDataStreamWithNettyMiniRaftCluster extends DataStreamTests<MiniRaftClusterWithNetty>
- implements MiniRaftClusterWithNetty.FactoryGet {
+public class TestNettyDataStreamWithNettyCluster
+ extends DataStreamClusterTests<MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty>
+ implements MiniRaftClusterWithRpcTypeNettyAndDataStreamTypeNetty.FactoryGet {
}