You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/10/19 09:07:03 UTC
[ratis] branch master updated: RATIS-1157. Buffer packets when the size of packets are too small. (#748)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new cf0dce981 RATIS-1157. Buffer packets when the size of packets are too small. (#748)
cf0dce981 is described below
commit cf0dce981cb3dccb0501e7648f348b6f8634c1f2
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Wed Oct 19 17:06:57 2022 +0800
RATIS-1157. Buffer packets when the size of packets are too small. (#748)
---
.../apache/ratis/client/RaftClientConfigKeys.java | 22 +++++++-
.../ratis/client/impl/DataStreamClientImpl.java | 5 --
.../ratis/client/impl/OrderedStreamAsync.java | 2 +-
.../impl/DataStreamRequestByteBuffer.java | 10 ++--
.../impl/DataStreamRequestFilePositionCount.java | 8 +--
.../org/apache/ratis/io/StandardWriteOption.java | 4 +-
.../main/java/org/apache/ratis/io/WriteOption.java | 4 +-
.../apache/ratis/protocol/DataStreamRequest.java | 8 ++-
.../ratis/protocol/DataStreamRequestHeader.java | 2 +-
.../apache/ratis/netty/NettyDataStreamUtils.java | 11 +++-
.../ratis/netty/client/NettyClientStreamRpc.java | 66 +++++++++++++++++++++-
.../ratis/netty/server/DataStreamManagement.java | 10 +---
.../netty/server/DataStreamRequestByteBuf.java | 8 +--
.../datastream/DataStreamAsyncClusterTests.java | 2 +-
...NettyDataStreamStarTopologyWithGrpcCluster.java | 22 ++++++--
15 files changed, 141 insertions(+), 43 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
index 0e950ee2e..d52fcab4f 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
@@ -95,7 +95,7 @@ public interface RaftClientConfigKeys {
String PREFIX = RaftClientConfigKeys.PREFIX + ".data-stream";
String OUTSTANDING_REQUESTS_MAX_KEY = PREFIX + ".outstanding-requests.max";
- int OUTSTANDING_REQUESTS_MAX_DEFAULT = 10;
+ int OUTSTANDING_REQUESTS_MAX_DEFAULT = 100;
static int outstandingRequestsMax(RaftProperties properties) {
return getInt(properties::getInt, OUTSTANDING_REQUESTS_MAX_KEY,
OUTSTANDING_REQUESTS_MAX_DEFAULT, getDefaultLog(), requireMin(2));
@@ -104,6 +104,26 @@ public interface RaftClientConfigKeys {
setInt(properties::setInt, OUTSTANDING_REQUESTS_MAX_KEY, outstandingRequests);
}
+ String FLUSH_REQUEST_COUNT_MIN_KEY = PREFIX + ".flush.request.count.min";
+ int FLUSH_REQUEST_COUNT_MIN_DEFAULT = 0;
+ static int flushRequestCountMin(RaftProperties properties) {
+ return getInt(properties::getInt, FLUSH_REQUEST_COUNT_MIN_KEY,
+ FLUSH_REQUEST_COUNT_MIN_DEFAULT, getDefaultLog(), requireMin(0));
+ }
+ static void setFlushRequestCountMin(RaftProperties properties, int flushRequestCountMin) {
+ setInt(properties::setInt, FLUSH_REQUEST_COUNT_MIN_KEY, flushRequestCountMin);
+ }
+
+ String FLUSH_REQUEST_BYTES_MIN_KEY = PREFIX + ".flush.request.bytes.min";
+ SizeInBytes FLUSH_REQUEST_BYTES_MIN_DEFAULT = SizeInBytes.ONE_MB;
+ static SizeInBytes flushRequestBytesMin(RaftProperties properties) {
+ return getSizeInBytes(properties::getSizeInBytes, FLUSH_REQUEST_COUNT_MIN_KEY,
+ FLUSH_REQUEST_BYTES_MIN_DEFAULT, getDefaultLog(), requireMinSizeInByte(SizeInBytes.ZERO));
+ }
+ static void setFlushRequestBytesMin(RaftProperties properties, SizeInBytes flushRequestBytesMin) {
+ setSizeInBytes(properties::set, FLUSH_REQUEST_BYTES_MIN_KEY, flushRequestBytesMin);
+ }
+
String REQUEST_TIMEOUT_KEY = PREFIX + ".request.timeout";
TimeDuration REQUEST_TIMEOUT_DEFAULT = TimeDuration.valueOf(10000, TimeUnit.MILLISECONDS);
static TimeDuration requestTimeout(RaftProperties properties) {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index cd1e1f256..7a25a8e4b 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -153,11 +153,6 @@ public class DataStreamClientImpl implements DataStreamClient {
return f;
}
- @Override
- public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, WriteOption... options) {
- return writeAsync(src, Arrays.asList(options));
- }
-
@Override
public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, Iterable<WriteOption> options) {
return writeAsyncImpl(src, src.remaining(), options);
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
index cf203a03b..fe51359b2 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -149,7 +149,7 @@ public class OrderedStreamAsync {
request.getDataStreamRequest());
long seqNum = request.getSeqNum();
- final boolean isClose = StandardWriteOption.CLOSE.isOneOf(request.getDataStreamRequest().getWriteOptions());
+ final boolean isClose = request.getDataStreamRequest().getWriteOptionList().contains(StandardWriteOption.CLOSE);
scheduleWithTimeout(request, isClose? closeTimeout: requestTimeout);
requestFuture.thenApply(reply -> {
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
index 282b4f928..938ed793b 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
@@ -20,29 +20,27 @@ package org.apache.ratis.datastream.impl;
import org.apache.ratis.io.WriteOption;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.DataStreamRequestHeader;
-import org.apache.ratis.thirdparty.com.google.common.collect.Lists;
import org.apache.ratis.util.Preconditions;
import java.nio.ByteBuffer;
-import java.util.Collections;
import java.util.List;
/**
* Implements {@link DataStreamRequest} with {@link ByteBuffer}.
- *
+ * <p>
* This class is immutable.
*/
public class DataStreamRequestByteBuffer extends DataStreamPacketByteBuffer implements DataStreamRequest {
- private List<WriteOption> options;
+ private final List<WriteOption> options;
public DataStreamRequestByteBuffer(DataStreamRequestHeader header, ByteBuffer buffer) {
super(header.getClientId(), header.getType(), header.getStreamId(), header.getStreamOffset(), buffer);
- this.options = Collections.unmodifiableList(Lists.newArrayList(header.getWriteOptions()));
+ this.options = header.getWriteOptionList();
Preconditions.assertTrue(header.getDataLength() == buffer.remaining());
}
@Override
- public List<WriteOption> getWriteOptions() {
+ public List<WriteOption> getWriteOptionList() {
return options;
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
index b1fb73620..ceb84ff6e 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
@@ -21,14 +21,12 @@ import org.apache.ratis.io.FilePositionCount;
import org.apache.ratis.io.WriteOption;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.DataStreamRequestHeader;
-import org.apache.ratis.thirdparty.com.google.common.collect.Lists;
-import java.util.Collections;
import java.util.List;
/**
* Implements {@link DataStreamRequest} with {@link FilePositionCount}.
- *
+ * <p>
* This class is immutable.
*/
public class DataStreamRequestFilePositionCount extends DataStreamPacketImpl implements DataStreamRequest {
@@ -37,7 +35,7 @@ public class DataStreamRequestFilePositionCount extends DataStreamPacketImpl imp
public DataStreamRequestFilePositionCount(DataStreamRequestHeader header, FilePositionCount file) {
super(header.getClientId(), header.getType(), header.getStreamId(), header.getStreamOffset());
- this.options = Collections.unmodifiableList(Lists.newArrayList(header.getWriteOptions()));
+ this.options = header.getWriteOptionList();
this.file = file;
}
@@ -52,7 +50,7 @@ public class DataStreamRequestFilePositionCount extends DataStreamPacketImpl imp
}
@Override
- public List<WriteOption> getWriteOptions() {
+ public List<WriteOption> getWriteOptionList() {
return options;
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java b/ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java
index 0aae8f9b9..27c91a5d8 100644
--- a/ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java
+++ b/ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java
@@ -21,5 +21,7 @@ public enum StandardWriteOption implements WriteOption {
/** Sync the data to the underlying storage. */
SYNC,
/** Close the data to the underlying storage. */
- CLOSE
+ CLOSE,
+ /** Flush the data out to the network. */
+ FLUSH,
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java b/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java
index b5f29a2f2..a734f0dab 100644
--- a/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java
+++ b/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java
@@ -20,6 +20,8 @@ package org.apache.ratis.io;
import java.util.Arrays;
public interface WriteOption {
+ WriteOption[] EMPTY_ARRAY = {};
+
static boolean containsOption(Iterable<WriteOption> options,
WriteOption target) {
for (WriteOption option : options) {
@@ -36,7 +38,7 @@ public interface WriteOption {
return containsOption(Arrays.asList(options), target);
}
- default boolean isOneOf(Iterable<WriteOption> options) {
+ default boolean isOneOf(WriteOption... options) {
return containsOption(options, this);
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
index cde07c415..36d0d8b63 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
@@ -22,5 +22,11 @@ import org.apache.ratis.io.WriteOption;
import java.util.List;
public interface DataStreamRequest extends DataStreamPacket {
- List<WriteOption> getWriteOptions();
+ List<WriteOption> getWriteOptionList();
+
+ /** @deprecated use {@link #getWriteOptionList()}. */
+ @Deprecated
+ default WriteOption[] getWriteOptions() {
+ return getWriteOptionList().toArray(WriteOption.EMPTY_ARRAY);
+ }
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
index a0c68eff1..7cbf17ea6 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
@@ -45,7 +45,7 @@ public class DataStreamRequestHeader extends DataStreamPacketHeader implements D
}
@Override
- public List<WriteOption> getWriteOptions() {
+ public List<WriteOption> getWriteOptionList() {
return options;
}
}
\ No newline at end of file
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
index 0f595c7b4..10f35157c 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -56,7 +56,7 @@ public interface NettyDataStreamUtils {
.setStreamOffset(request.getStreamOffset())
.setType(request.getType())
.setDataLength(request.getDataLength());
- for (WriteOption option : request.getWriteOptions()) {
+ for (WriteOption option : request.getWriteOptionList()) {
b.addOptions(DataStreamPacketHeaderProto.Option.forNumber(
((StandardWriteOption) option).ordinal()));
}
@@ -105,7 +105,14 @@ public interface NettyDataStreamUtils {
static void encodeDataStreamRequestByteBuffer(DataStreamRequestByteBuffer request, Consumer<Object> out,
ByteBufAllocator allocator) {
encodeDataStreamRequestHeader(request, out, allocator);
- out.accept(Unpooled.wrappedBuffer(request.slice()));
+ encodeByteBuffer(request.slice(), out);
+ }
+
+ static void encodeByteBuffer(ByteBuffer buffer, Consumer<Object> out) {
+ if (buffer.remaining() == 0) {
+ return;
+ }
+ out.accept(Unpooled.wrappedBuffer(buffer));
}
static void encodeDataStreamRequestFilePositionCount(
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index a03013748..51326d13e 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -19,9 +19,12 @@
package org.apache.ratis.netty.client;
import org.apache.ratis.client.DataStreamClientRpc;
+import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
+import org.apache.ratis.io.StandardWriteOption;
+import org.apache.ratis.io.WriteOption;
import org.apache.ratis.netty.NettyConfigKeys;
import org.apache.ratis.netty.NettyDataStreamUtils;
import org.apache.ratis.netty.NettyUtils;
@@ -51,6 +54,7 @@ import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
import org.apache.ratis.util.NetUtils;
+import org.apache.ratis.util.SizeInBytes;
import org.apache.ratis.util.TimeDuration;
import org.apache.ratis.util.TimeoutExecutor;
import org.slf4j.Logger;
@@ -58,6 +62,7 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
@@ -68,8 +73,11 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
import java.util.function.Supplier;
+import static org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER;
+
public class NettyClientStreamRpc implements DataStreamClientRpc {
public static final Logger LOG = LoggerFactory.getLogger(NettyClientStreamRpc.class);
@@ -238,9 +246,39 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
}
}
+ class OutstandingRequests {
+ private int count;
+ private long bytes;
+
+ synchronized boolean write(DataStreamRequest request) {
+ count++;
+ bytes += request.getDataLength();
+ final List<WriteOption> options = request.getWriteOptionList();
+ final boolean isClose = options.contains(StandardWriteOption.CLOSE);
+ final boolean isFlush = options.contains(StandardWriteOption.FLUSH);
+ final boolean flush = shouldFlush(isClose || isFlush, flushRequestCountMin, flushRequestBytesMin);
+ LOG.debug("Stream{} outstanding: count={}, bytes={}, options={}, flush? {}",
+ request.getStreamId(), count, bytes, options, flush);
+ return flush;
+ }
+
+ synchronized boolean shouldFlush(boolean force, int countMin, SizeInBytes bytesMin) {
+ if (force || count >= countMin || bytes >= bytesMin.getSize()) {
+ count = 0;
+ bytes = 0;
+ return true;
+ }
+ return false;
+ }
+ }
+
private final String name;
private final Connection connection;
+ private final int flushRequestCountMin;
+ private final SizeInBytes flushRequestBytesMin;
+ private final OutstandingRequests outstandingRequests = new OutstandingRequests();
+
private final ConcurrentMap<ClientInvocationId, ReplyQueue> replies = new ConcurrentHashMap<>();
private final TimeDuration replyQueueGracePeriod;
private final TimeoutExecutor timeoutScheduler = TimeoutExecutor.getInstance();
@@ -248,6 +286,8 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
public NettyClientStreamRpc(RaftPeer server, TlsConf tlsConf, RaftProperties properties) {
this.name = JavaUtils.getClassSimpleName(getClass()) + "->" + server;
this.replyQueueGracePeriod = NettyConfigKeys.DataStream.Client.replyQueueGracePeriod(properties);
+ this.flushRequestCountMin = RaftClientConfigKeys.DataStream.flushRequestCountMin(properties);
+ this.flushRequestBytesMin = RaftClientConfigKeys.DataStream.flushRequestBytesMin(properties);
final InetSocketAddress address = NetUtils.createSocketAddr(server.getDataStreamAddress());
final SslContext sslContext = NettyUtils.buildSslContextForClient(tlsConf);
@@ -307,7 +347,7 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
}
@Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ public void channelInactive(ChannelHandlerContext ctx) {
if (!connection.isClosed()) {
connection.scheduleReconnect("channel is inactive", null);
}
@@ -326,6 +366,7 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
}
p.addLast(newEncoder());
p.addLast(newEncoderDataStreamRequestFilePositionCount());
+ p.addLast(newEncoderByteBuffer());
p.addLast(newDecoder());
p.addLast(handler);
}
@@ -350,6 +391,15 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
};
}
+ static MessageToMessageEncoder<ByteBuffer> newEncoderByteBuffer() {
+ return new MessageToMessageEncoder<ByteBuffer>() {
+ @Override
+ protected void encode(ChannelHandlerContext ctx, ByteBuffer request, List<Object> out) {
+ NettyDataStreamUtils.encodeByteBuffer(request, out::add);
+ }
+ };
+ }
+
static ByteToMessageDecoder newDecoder() {
return new ByteToMessageDecoder() {
{
@@ -378,7 +428,9 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
return f;
}
LOG.debug("{}: write {}", this, request);
- channel.writeAndFlush(request).addListener(future -> {
+ final Function<DataStreamRequest, ChannelFuture> writeMethod = outstandingRequests.write(request)?
+ channel::writeAndFlush: channel::write;
+ writeMethod.apply(request).addListener(future -> {
if (!future.isSuccess()) {
final IOException e = new IOException(this + ": Failed to send " + request, future.cause());
LOG.error("Channel write failed", e);
@@ -390,7 +442,15 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
@Override
public void close() {
- connection.close();
+ final boolean flush = outstandingRequests.shouldFlush(true, 0, SizeInBytes.ZERO);
+ LOG.debug("flush? {}", flush);
+ if (flush) {
+ Optional.ofNullable(connection.getChannelUninterruptibly())
+ .map(c -> c.writeAndFlush(EMPTY_BYTE_BUFFER))
+ .ifPresent(f -> f.addListener(dummy -> connection.close()));
+ } else {
+ connection.close();
+ }
}
@Override
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 8cdf15fe7..362199323 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -112,8 +112,7 @@ public class DataStreamManagement {
CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request, Executor executor) {
final Timekeeper.Context context = metrics.start();
return composeAsync(sendFuture, executor,
- n -> out.writeAsync(request.slice().nioBuffer(),
- request.getWriteOptions())
+ n -> out.writeAsync(request.slice().nioBuffer(), request.getWriteOptionList())
.whenComplete((l, e) -> metrics.stop(context, e == null)));
}
}
@@ -392,8 +391,7 @@ public class DataStreamManagement {
private void readImpl(DataStreamRequestByteBuf request, ChannelHandlerContext ctx, ByteBuf buf,
CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputRpc>, IOException> getStreams) {
- boolean close = WriteOption.containsOption(request.getWriteOptions(),
- StandardWriteOption.CLOSE);
+ final boolean close = request.getWriteOptionList().contains(StandardWriteOption.CLOSE);
ClientInvocationId key = ClientInvocationId.valueOf(request.getClientId(), request.getStreamId());
final StreamInfo info;
if (request.getType() == Type.STREAM_HEADER) {
@@ -422,9 +420,7 @@ public class DataStreamManagement {
localWrite = CompletableFuture.completedFuture(0L);
remoteWrites = Collections.emptyList();
} else if (request.getType() == Type.STREAM_DATA) {
- localWrite = info.getLocal().write(buf,
- request.getWriteOptions(),
- writeExecutor);
+ localWrite = info.getLocal().write(buf, request.getWriteOptionList(), writeExecutor);
remoteWrites = info.applyToRemotes(out -> out.write(request, requestExecutor));
} else {
throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
index 84803eb0e..0dcd46e02 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
@@ -33,7 +33,7 @@ import java.util.List;
/**
* Implements {@link DataStreamRequest} with {@link ByteBuf}.
- *
+ * <p>
* This class is immutable.
*/
public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements DataStreamRequest {
@@ -41,7 +41,7 @@ public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements Da
private final List<WriteOption> options;
public DataStreamRequestByteBuf(ClientId clientId, Type type, long streamId, long streamOffset,
- List<WriteOption> options, ByteBuf buf) {
+ Iterable<WriteOption> options, ByteBuf buf) {
super(clientId, type, streamId, streamOffset);
this.buf = buf != null? buf.asReadOnly(): Unpooled.EMPTY_BUFFER;
this.options = Collections.unmodifiableList(Lists.newArrayList(options));
@@ -49,7 +49,7 @@ public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements Da
public DataStreamRequestByteBuf(DataStreamRequestHeader header, ByteBuf buf) {
this(header.getClientId(), header.getType(), header.getStreamId(), header.getStreamOffset(),
- header.getWriteOptions(), buf);
+ header.getWriteOptionList(), buf);
}
@Override
@@ -62,7 +62,7 @@ public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements Da
}
@Override
- public List<WriteOption> getWriteOptions() {
+ public List<WriteOption> getWriteOptionList() {
return options;
}
}
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
index e18448748..c18f7dea6 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
@@ -90,7 +90,7 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER extends MiniRaftCluste
RaftTestUtil.waitForLeader(cluster);
final List<CompletableFuture<Long>> futures = new ArrayList<>();
- futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster, 5, 10, 1_000_000, 10, stepDownLeader), executor));
+ futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster, 5, 10, 100_000, 10, stepDownLeader), executor));
futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster, 2, 20, 1_000, 5_000, stepDownLeader), executor));
final long maxIndex = futures.stream()
.map(CompletableFuture::join)
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java
index cd6bbc75f..14c62b74f 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java
@@ -17,9 +17,14 @@
*/
package org.apache.ratis.datastream;
+import org.apache.ratis.client.RaftClientConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.protocol.RoutingTable;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.Before;
import java.util.Collection;
import java.util.List;
@@ -29,13 +34,22 @@ public class TestNettyDataStreamStarTopologyWithGrpcCluster
extends DataStreamAsyncClusterTests<MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty>
implements MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.FactoryGet {
+ @Before
+ public void setup() {
+ final RaftProperties p = getProperties();
+ RaftClientConfigKeys.DataStream.setRequestTimeout(p, TimeDuration.ONE_MINUTE);
+ RaftClientConfigKeys.DataStream.setFlushRequestCountMin(p, 4);
+ RaftClientConfigKeys.DataStream.setFlushRequestBytesMin(p, SizeInBytes.valueOf("10MB"));
+ RaftClientConfigKeys.DataStream.setOutstandingRequestsMax(p, 2 << 16);
+ }
+
@Override
public RoutingTable getRoutingTable(Collection<RaftPeer> peers, RaftPeer primary) {
- RoutingTable.Builder builder = RoutingTable.newBuilder();
final List<RaftPeerId> others = peers.stream()
- .filter(p -> !p.getId().equals(primary.getId())).map(v -> v.getId())
+ .map(RaftPeer::getId).filter(id -> !id.equals(primary.getId()))
.collect(Collectors.toList());
- builder.addSuccessors(primary.getId(), others);
- return builder.build();
+ return RoutingTable.newBuilder()
+ .addSuccessors(primary.getId(), others)
+ .build();
}
}