You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/11 01:21:01 UTC
[incubator-ratis] branch master updated: RATIS-1145. In
NettyServerStreamRpc,
the local/remote writes should only wait for the previous write. (#267)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new d289643 RATIS-1145. In NettyServerStreamRpc, the local/remote writes should only wait for the previous write. (#267)
d289643 is described below
commit d2896437e8a28d89de92618ca4041fb5e89ab674
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Wed Nov 11 09:18:12 2020 +0800
RATIS-1145. In NettyServerStreamRpc, the local/remote writes should only wait for the previous write. (#267)
---
.../ratis/netty/server/NettyServerStreamRpc.java | 214 ++++++++++++---------
.../apache/ratis/server/RaftServerConfigKeys.java | 2 +-
.../ratis/datastream/DataStreamBaseTest.java | 64 +++---
.../ratis/datastream/TestDataStreamNetty.java | 6 +-
4 files changed, 166 insertions(+), 120 deletions(-)
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index dff6a87..c8528e5 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -32,7 +32,6 @@ import org.apache.ratis.protocol.ClientId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftPeer;
-import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.server.DataStreamServerRpc;
import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.RaftServerConfigKeys;
@@ -68,11 +67,12 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
+import java.util.stream.Collectors;
public class NettyServerStreamRpc implements DataStreamServerRpc {
public static final Logger LOG = LoggerFactory.getLogger(NettyServerStreamRpc.class);
@@ -124,33 +124,92 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
}
}
+ static class LocalStream {
+ private final CompletableFuture<DataStream> streamFuture;
+ private final AtomicReference<CompletableFuture<Long>> writeFuture;
+
+ LocalStream(CompletableFuture<DataStream> streamFuture) {
+ this.streamFuture = streamFuture;
+ this.writeFuture = new AtomicReference<>(streamFuture.thenApply(s -> 0L));
+ }
+
+ CompletableFuture<Long> write(ByteBuf buf, Executor executor) {
+ return composeAsync(writeFuture, executor,
+ n -> streamFuture.thenApplyAsync(stream -> writeTo(buf, stream), executor));
+ }
+
+ CompletableFuture<Long> close(Executor executor) {
+ return composeAsync(writeFuture, executor,
+ n -> streamFuture.thenApplyAsync(NettyServerStreamRpc::close, executor));
+ }
+ }
+
+ static class RemoteStream {
+ private final DataStreamOutputRpc out;
+ private final AtomicReference<CompletableFuture<DataStreamReply>> writeFuture;
+
+ RemoteStream(DataStreamOutputRpc out) {
+ this.out = out;
+ this.writeFuture = new AtomicReference<>(out.getHeaderFuture());
+ }
+
+ CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request, Executor executor) {
+ return composeAsync(writeFuture, executor, v -> out.writeAsync(request.slice().nioBuffer()));
+ }
+
+ CompletableFuture<Boolean> startTransaction(DataStreamRequestByteBuf request, ChannelHandlerContext ctx,
+ Executor executor) {
+ return out.startTransactionAsync().thenApplyAsync(reply -> {
+ if (reply.isSuccess()) {
+ final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
+ ((DataStreamReplyByteBuffer)reply).slice(): null;
+ sendReplySuccess(request, buffer, -1, ctx);
+ return true;
+ } else {
+ return false;
+ }
+ }, executor);
+ }
+
+ CompletableFuture<DataStreamReply> close(Executor executor) {
+ return composeAsync(writeFuture, executor, v -> out.closeAsync());
+ }
+ }
+
static class StreamInfo {
private final RaftClientRequest request;
- private final CompletableFuture<DataStream> stream;
- private final List<DataStreamOutputRpc> outs;
- private final AtomicReference<CompletableFuture<?>> previous
+ private final boolean primary;
+ private final LocalStream local;
+ private final List<RemoteStream> remotes;
+ private final AtomicReference<CompletableFuture<Void>> previous
= new AtomicReference<>(CompletableFuture.completedFuture(null));
- StreamInfo(RaftClientRequest request, CompletableFuture<DataStream> stream, List<DataStreamOutputRpc> outs) {
+ StreamInfo(RaftClientRequest request, boolean primary,
+ CompletableFuture<DataStream> stream, List<DataStreamOutputRpc> outs) {
this.request = request;
- this.stream = stream;
- this.outs = outs;
+ this.primary = primary;
+ this.local = new LocalStream(stream);
+ this.remotes = outs.stream().map(RemoteStream::new).collect(Collectors.toList());
}
- CompletableFuture<DataStream> getStream() {
- return stream;
+ AtomicReference<CompletableFuture<Void>> getPrevious() {
+ return previous;
}
- List<DataStreamOutputRpc> getDataStreamOutputs() {
- return outs;
+ RaftClientRequest getRequest() {
+ return request;
}
- AtomicReference<CompletableFuture<?>> getPrevious() {
- return previous;
+ boolean isPrimary() {
+ return primary;
}
- RaftClientRequest getRequest() {
- return request;
+ LocalStream getLocal() {
+ return local;
+ }
+
+ <T> List<T> applyToRemotes(Function<RemoteStream, T> function) {
+ return remotes.isEmpty()?Collections.emptyList(): remotes.stream().map(function).collect(Collectors.toList());
}
@Override
@@ -213,10 +272,9 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
private final ChannelFuture channelFuture;
private final StreamMap streams = new StreamMap();
-
private final Proxies proxies;
- private final ExecutorService executorService;
+ private final Executor executor;
public NettyServerStreamRpc(RaftServer server) {
this.server = server;
@@ -232,7 +290,7 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
.childOption(ChannelOption.SO_KEEPALIVE, true)
.bind(port);
this.proxies = new Proxies(new PeerProxyMap<>(name, peer -> newClient(peer, properties)));
- this.executorService = Executors.newFixedThreadPool(
+ this.executor = Executors.newFixedThreadPool(
RaftServerConfigKeys.DataStream.asyncThreadPoolSize(server.getProperties()));
}
@@ -254,19 +312,22 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
final RaftClientRequest request = ClientProtoUtils.toRaftClientRequest(
RaftClientRequestProto.parseFrom(buf.nioBuffer()));
final StateMachine stateMachine = server.getStateMachine(request.getRaftGroupId());
- return new StreamInfo(request, stateMachine.data().stream(request),
- isPrimary(request.getServerId())?
- proxies.getDataStreamOutput(request) : Collections.EMPTY_LIST);
+ final boolean isPrimary = server.getId().equals(request.getServerId());
+ return new StreamInfo(request, isPrimary, stateMachine.data().stream(request),
+ isPrimary? proxies.getDataStreamOutput(request): Collections.emptyList());
} catch (Throwable e) {
throw new CompletionException(e);
}
}
- private long writeTo(ByteBuf buf, DataStream stream) {
- if (stream == null) {
- return 0;
- }
+ static <T> CompletableFuture<T> composeAsync(AtomicReference<CompletableFuture<T>> future, Executor executor,
+ Function<T, CompletableFuture<T>> function) {
+ final CompletableFuture<T> composed = future.get().thenComposeAsync(function, executor);
+ future.set(composed);
+ return composed;
+ }
+ static long writeTo(ByteBuf buf, DataStream stream) {
final WritableByteChannel channel = stream.getWritableByteChannel();
long byteWritten = 0;
for (ByteBuffer buffer : buf.nioBuffers()) {
@@ -279,17 +340,25 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
return byteWritten;
}
+ static long close(DataStream stream) {
+ try {
+ stream.getWritableByteChannel().close();
+ return 0L;
+ } catch (IOException e) {
+ throw new CompletionException("Failed to close " + stream, e);
+ }
+ }
+
private void sendReplyNotSuccess(DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
request.getStreamId(), request.getStreamOffset(), null, -1, false, request.getType());
ctx.writeAndFlush(reply);
}
- private void sendReplySuccess(DataStreamRequestByteBuf request, ByteBuffer buffer, long bytesWritten,
+ static void sendReplySuccess(DataStreamRequestByteBuf request, ByteBuffer buffer, long bytesWritten,
ChannelHandlerContext ctx) {
final DataStreamReplyByteBuffer reply = new DataStreamReplyByteBuffer(
request.getStreamId(), request.getStreamOffset(), buffer, bytesWritten, true, request.getType());
- LOG.debug("{}: write {}", this, reply);
ctx.writeAndFlush(reply);
}
@@ -311,9 +380,10 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
};
}
- private int startTransaction(StreamInfo info, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+ private CompletableFuture<Void> startTransaction(StreamInfo info, DataStreamRequestByteBuf request,
+ ChannelHandlerContext ctx) {
try {
- server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
+ return server.submitClientRequestAsync(info.getRequest()).thenAcceptAsync(reply -> {
if (reply.isSuccess()) {
ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
sendReplySuccess(request, buffer, -1, ctx);
@@ -326,30 +396,17 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
} else {
throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
}
- }, executorService);
+ }, executor);
} catch (IOException e) {
sendReplyNotSuccess(request, ctx);
+ return CompletableFuture.completedFuture(null);
}
- return 0;
}
private void forwardStartTransaction(
final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
- final List<CompletableFuture<Boolean>> results = new ArrayList<>();
- for (DataStreamOutputRpc out : info.getDataStreamOutputs()) {
- final CompletableFuture<Boolean> f = out.startTransactionAsync().thenApplyAsync(reply -> {
- if (reply.isSuccess()) {
- final ByteBuffer buffer = reply instanceof DataStreamReplyByteBuffer?
- ((DataStreamReplyByteBuffer)reply).slice(): null;
- sendReplySuccess(request, buffer, -1, ctx);
- return true;
- } else {
- return false;
- }
- }, executorService);
-
- results.add(f);
- }
+ final List<CompletableFuture<Boolean>> results = info.applyToRemotes(
+ out -> out.startTransaction(request, ctx, executor));
JavaUtils.allOf(results).thenAccept(v -> {
if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
@@ -358,68 +415,46 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
});
}
- private boolean isPrimary(RaftPeerId primaryId) {
- return server.getId().equals(primaryId);
- }
-
private void read(ChannelHandlerContext ctx, DataStreamRequestByteBuf request) {
LOG.debug("{}: read {}", this, request);
final ByteBuf buf = request.slice();
+ final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
+
+ if (request.getType() == Type.START_TRANSACTION) {
+ // for peers to start transaction
+ final StreamInfo info = streams.get(key);
+ composeAsync(info.getPrevious(), executor, v -> startTransaction(info, request, ctx))
+ .thenAccept(v -> buf.release());
+ return;
+ }
final StreamInfo info;
final CompletableFuture<Long> localWrite;
- final List<CompletableFuture<DataStreamReply>> remoteWrites = new ArrayList<>();
- final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
+ final List<CompletableFuture<DataStreamReply>> remoteWrites;
if (request.getType() == Type.STREAM_HEADER) {
info = streams.computeIfAbsent(key, id -> newStreamInfo(buf));
localWrite = CompletableFuture.completedFuture(0L);
- for (DataStreamOutputRpc out : info.getDataStreamOutputs()) {
- remoteWrites.add(out.getHeaderFuture());
- }
+ remoteWrites = Collections.emptyList();
} else if (request.getType() == Type.STREAM_DATA) {
info = streams.get(key);
- final CompletableFuture<?> previous = info.getPrevious().get();
-
- localWrite = previous.thenCombineAsync(info.getStream(), (u, stream) -> writeTo(buf, stream), executorService);
- for (DataStreamOutputRpc out : info.getDataStreamOutputs()) {
- remoteWrites.add(previous.thenComposeAsync(v -> out.writeAsync(request.slice().nioBuffer()), executorService));
- }
+ localWrite = info.getLocal().write(buf, executor);
+ remoteWrites = info.applyToRemotes(out -> out.write(request, executor));
} else if (request.getType() == Type.STREAM_CLOSE) {
info = streams.get(key);
- final CompletableFuture<?> previous = info.getPrevious().get();
-
- localWrite = previous.thenCombineAsync(info.getStream(), (u, stream) -> {
- try {
- stream.getWritableByteChannel().close();
- return 0L;
- } catch (IOException e) {
- throw new CompletionException("Failed to close " + stream, e);
- }
- }, executorService);
-
- if (isPrimary(info.getRequest().getServerId())) {
- for (DataStreamOutputRpc out : info.getDataStreamOutputs()) {
- remoteWrites.add(previous.thenComposeAsync(v -> out.closeAsync(), executorService));
- }
- }
- } else if (request.getType() == Type.START_TRANSACTION) {
- // peer server start transaction
- info = streams.get(key);
- final CompletableFuture<?> previous = info.getPrevious().get();
- previous.thenApplyAsync(v -> startTransaction(streams.get(key), request, ctx), executorService);
- return;
+ localWrite = info.getLocal().close(executor);
+ remoteWrites = info.isPrimary()? info.applyToRemotes(out -> out.close(executor)): Collections.emptyList();
} else {
+ buf.release();
throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
}
- final CompletableFuture<?> current = JavaUtils.allOf(remoteWrites)
+ composeAsync(info.getPrevious(), executor, n -> JavaUtils.allOf(remoteWrites)
.thenCombineAsync(localWrite, (v, bytesWritten) -> {
- buf.release();
if (request.getType() == Type.STREAM_HEADER
|| request.getType() == Type.STREAM_DATA) {
sendReply(remoteWrites, request, bytesWritten, ctx);
} else if (request.getType() == Type.STREAM_CLOSE) {
- if (isPrimary(info.getRequest().getServerId())) {
+ if (info.isPrimary()) {
// after all server close stream, primary server start transaction
// TODO(runzhiwang): send start transaction to leader directly
startTransaction(info, request, ctx);
@@ -429,10 +464,9 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
} else {
throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
}
+ buf.release();
return null;
- }, executorService);
-
- info.getPrevious().set(current);
+ }, executor));
}
private boolean checkSuccessRemoteWrite(
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 5abb03f..472785a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -410,7 +410,7 @@ public interface RaftServerConfigKeys {
String PREFIX = RaftServerConfigKeys.PREFIX + ".data-stream";
String ASYNC_THREAD_POOL_SIZE_KEY = PREFIX + ".async.thread.pool.size";
- int ASYNC_THREAD_POOL_SIZE_DEFAULT = 4;
+ int ASYNC_THREAD_POOL_SIZE_DEFAULT = 16;
static int asyncThreadPoolSize(RaftProperties properties) {
return getInt(properties::getInt, ASYNC_THREAD_POOL_SIZE_KEY, ASYNC_THREAD_POOL_SIZE_DEFAULT, getDefaultLog(),
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
index 010eca8..1b8d261 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamBaseTest.java
@@ -52,7 +52,6 @@ import org.apache.ratis.util.NetUtils;
import org.junit.Assert;
import java.io.IOException;
-import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.WritableByteChannel;
import java.util.ArrayList;
@@ -63,6 +62,8 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
@@ -73,6 +74,8 @@ abstract class DataStreamBaseTest extends BaseTest {
return (byte) ('A' + pos%MODULUS);
}
+ private final Executor executor = Executors.newFixedThreadPool(16);
+
static class MultiDataStreamStateMachine extends BaseStateMachine {
final ConcurrentMap<Long, SingleDataStream> streams = new ConcurrentHashMap<>();
@@ -361,37 +364,42 @@ abstract class DataStreamBaseTest extends BaseTest {
}
}
- protected void runTestDataStream(int numServers, int numClients, int numStreams, int bufferSize, int bufferNum)
- throws Exception {
+ void runTestDataStream(int numServers) throws Exception {
try {
setup(numServers);
- runTestDataStream(numClients, numStreams, bufferSize, bufferNum);
+ final List<CompletableFuture<Void>> futures = new ArrayList<>();
+ futures.add(CompletableFuture.runAsync(() -> runTestDataStream(5, 10, 1_000_000, 10), executor));
+ futures.add(CompletableFuture.runAsync(() -> runTestDataStream(2, 20, 1_000, 10_000), executor));
+ futures.forEach(CompletableFuture::join);
} finally {
shutdown();
}
}
- private void runTestDataStream(int numClients, int numStreams, int bufferSize, int bufferNum) throws Exception {
+ void runTestDataStream(int numClients, int numStreams, int bufferSize, int bufferNum) {
final List<CompletableFuture<Void>> futures = new ArrayList<>();
- final List<RaftClient> clients = new ArrayList<>();
- try {
- for (int j = 0; j < numClients; j++) {
- final RaftClient client = newRaftClientForDataStream();
- clients.add(client);
- for (int i = 0; i < numStreams; i++) {
- futures.add(CompletableFuture.runAsync(() -> runTestDataStream(
- (DataStreamOutputImpl) client.getDataStreamApi().stream(), bufferSize, bufferNum)));
- }
+ for (int j = 0; j < numClients; j++) {
+ futures.add(CompletableFuture.runAsync(() -> runTestDataStream(numStreams, bufferSize, bufferNum), executor));
+ }
+ Assert.assertEquals(numClients, futures.size());
+ futures.forEach(CompletableFuture::join);
+ }
+
+ void runTestDataStream(int numStreams, int bufferSize, int bufferNum) {
+ final List<CompletableFuture<Void>> futures = new ArrayList<>();
+ try(RaftClient client = newRaftClientForDataStream()) {
+ for (int i = 0; i < numStreams; i++) {
+ futures.add(CompletableFuture.runAsync(() -> runTestDataStream(
+ (DataStreamOutputImpl) client.getDataStreamApi().stream(), bufferSize, bufferNum), executor));
}
- Assert.assertEquals(numClients*numStreams, futures.size());
+ Assert.assertEquals(numStreams, futures.size());
futures.forEach(CompletableFuture::join);
- } finally {
- for (int j = 0; j < clients.size(); j++) {
- clients.get(j).close();
- }
+ } catch (IOException e) {
+ throw new CompletionException(e);
}
}
+
void runTestCloseStream(int bufferSize, int bufferNum, RaftClientReply expectedClientReply)
throws IOException {
try (final RaftClient client = newRaftClientForDataStream()) {
@@ -409,6 +417,7 @@ abstract class DataStreamBaseTest extends BaseTest {
}
private void runTestDataStream(DataStreamOutputImpl out, int bufferSize, int bufferNum) {
+ LOG.info("start stream {}", out.getHeader().getCallId());
final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
final List<Integer> sizes = new ArrayList<>();
@@ -450,14 +459,19 @@ abstract class DataStreamBaseTest extends BaseTest {
void assertHeader(Server server, RaftClientRequest header, int dataSize) throws Exception {
final MultiDataStreamStateMachine s = server.getStateMachine(header.getRaftGroupId());
final SingleDataStream stream = s.getSingleDataStream(header.getCallId());
- final RaftClientRequest writeRequest = stream.getWriteRequest();
- Assert.assertEquals(writeRequest.getCallId(), header.getCallId());
- Assert.assertEquals(writeRequest.getRaftGroupId(), header.getRaftGroupId());
Assert.assertEquals(raftGroup.getGroupId(), header.getRaftGroupId());
Assert.assertEquals(dataSize, stream.getByteWritten());
- Assert.assertEquals(writeRequest.getCallId(), header.getCallId());
- Assert.assertEquals(writeRequest.getClientId(), header.getClientId());
- Assert.assertEquals(writeRequest.getServerId(), header.getServerId());
+
+ final RaftClientRequest writeRequest = stream.getWriteRequest();
+ assertRaftClientRequest(header, writeRequest);
+ }
+
+ static void assertRaftClientRequest(RaftClientRequest expected, RaftClientRequest computed) {
+ Assert.assertNotNull(computed);
+ Assert.assertEquals(expected.getClientId(), computed.getClientId());
+ Assert.assertEquals(expected.getServerId(), computed.getServerId());
+ Assert.assertEquals(expected.getRaftGroupId(), computed.getRaftGroupId());
+ Assert.assertEquals(expected.getCallId(), computed.getCallId());
}
static ByteBuffer initBuffer(int offset, int size) {
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
index 3a99815..f9fce8b 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestDataStreamNetty.java
@@ -70,14 +70,12 @@ public class TestDataStreamNetty extends DataStreamBaseTest {
@Test
public void testDataStreamSingleServer() throws Exception {
- runTestDataStream(1, 5, 10, 1_000_000, 10);
- runTestDataStream(1, 2, 20, 1_000, 10_000);
+ runTestDataStream(1);
}
@Test
public void testDataStreamMultipleServer() throws Exception {
- runTestDataStream(3, 5, 10, 1_000_000, 10);
- runTestDataStream(3, 2, 20, 1_000, 10_000);
+ runTestDataStream(3);
}
private void testCloseStream(int leaderIndex, int numServers) throws Exception {