You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/10/19 09:07:03 UTC

[ratis] branch master updated: RATIS-1157. Buffer packets when the size of packets are too small. (#748)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new cf0dce981 RATIS-1157. Buffer packets when the size of packets are too small. (#748)
cf0dce981 is described below

commit cf0dce981cb3dccb0501e7648f348b6f8634c1f2
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Wed Oct 19 17:06:57 2022 +0800

    RATIS-1157. Buffer packets when the size of packets are too small. (#748)
---
 .../apache/ratis/client/RaftClientConfigKeys.java  | 22 +++++++-
 .../ratis/client/impl/DataStreamClientImpl.java    |  5 --
 .../ratis/client/impl/OrderedStreamAsync.java      |  2 +-
 .../impl/DataStreamRequestByteBuffer.java          | 10 ++--
 .../impl/DataStreamRequestFilePositionCount.java   |  8 +--
 .../org/apache/ratis/io/StandardWriteOption.java   |  4 +-
 .../main/java/org/apache/ratis/io/WriteOption.java |  4 +-
 .../apache/ratis/protocol/DataStreamRequest.java   |  8 ++-
 .../ratis/protocol/DataStreamRequestHeader.java    |  2 +-
 .../apache/ratis/netty/NettyDataStreamUtils.java   | 11 +++-
 .../ratis/netty/client/NettyClientStreamRpc.java   | 66 +++++++++++++++++++++-
 .../ratis/netty/server/DataStreamManagement.java   | 10 +---
 .../netty/server/DataStreamRequestByteBuf.java     |  8 +--
 .../datastream/DataStreamAsyncClusterTests.java    |  2 +-
 ...NettyDataStreamStarTopologyWithGrpcCluster.java | 22 ++++++--
 15 files changed, 141 insertions(+), 43 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
index 0e950ee2e..d52fcab4f 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
@@ -95,7 +95,7 @@ public interface RaftClientConfigKeys {
     String PREFIX = RaftClientConfigKeys.PREFIX + ".data-stream";
 
     String OUTSTANDING_REQUESTS_MAX_KEY = PREFIX + ".outstanding-requests.max";
-    int OUTSTANDING_REQUESTS_MAX_DEFAULT = 10;
+    int OUTSTANDING_REQUESTS_MAX_DEFAULT = 100;
     static int outstandingRequestsMax(RaftProperties properties) {
       return getInt(properties::getInt, OUTSTANDING_REQUESTS_MAX_KEY,
           OUTSTANDING_REQUESTS_MAX_DEFAULT, getDefaultLog(), requireMin(2));
@@ -104,6 +104,26 @@ public interface RaftClientConfigKeys {
       setInt(properties::setInt, OUTSTANDING_REQUESTS_MAX_KEY, outstandingRequests);
     }
 
+    String FLUSH_REQUEST_COUNT_MIN_KEY = PREFIX + ".flush.request.count.min";
+    int FLUSH_REQUEST_COUNT_MIN_DEFAULT = 0;
+    static int flushRequestCountMin(RaftProperties properties) {
+      return getInt(properties::getInt, FLUSH_REQUEST_COUNT_MIN_KEY,
+          FLUSH_REQUEST_COUNT_MIN_DEFAULT, getDefaultLog(), requireMin(0));
+    }
+    static void setFlushRequestCountMin(RaftProperties properties, int flushRequestCountMin) {
+      setInt(properties::setInt, FLUSH_REQUEST_COUNT_MIN_KEY, flushRequestCountMin);
+    }
+
+    String FLUSH_REQUEST_BYTES_MIN_KEY = PREFIX + ".flush.request.bytes.min";
+    SizeInBytes FLUSH_REQUEST_BYTES_MIN_DEFAULT = SizeInBytes.ONE_MB;
+    static SizeInBytes flushRequestBytesMin(RaftProperties properties) {
+      return getSizeInBytes(properties::getSizeInBytes, FLUSH_REQUEST_COUNT_MIN_KEY,
+          FLUSH_REQUEST_BYTES_MIN_DEFAULT, getDefaultLog(), requireMinSizeInByte(SizeInBytes.ZERO));
+    }
+    static void setFlushRequestBytesMin(RaftProperties properties, SizeInBytes flushRequestBytesMin) {
+      setSizeInBytes(properties::set, FLUSH_REQUEST_BYTES_MIN_KEY, flushRequestBytesMin);
+    }
+
     String REQUEST_TIMEOUT_KEY = PREFIX + ".request.timeout";
     TimeDuration REQUEST_TIMEOUT_DEFAULT = TimeDuration.valueOf(10000, TimeUnit.MILLISECONDS);
     static TimeDuration requestTimeout(RaftProperties properties) {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index cd1e1f256..7a25a8e4b 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -153,11 +153,6 @@ public class DataStreamClientImpl implements DataStreamClient {
       return f;
     }
 
-    @Override
-    public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, WriteOption... options) {
-      return writeAsync(src, Arrays.asList(options));
-    }
-
     @Override
     public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, Iterable<WriteOption> options) {
       return writeAsyncImpl(src, src.remaining(), options);
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
index cf203a03b..fe51359b2 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -149,7 +149,7 @@ public class OrderedStreamAsync {
         request.getDataStreamRequest());
     long seqNum = request.getSeqNum();
 
-    final boolean isClose = StandardWriteOption.CLOSE.isOneOf(request.getDataStreamRequest().getWriteOptions());
+    final boolean isClose = request.getDataStreamRequest().getWriteOptionList().contains(StandardWriteOption.CLOSE);
     scheduleWithTimeout(request, isClose? closeTimeout: requestTimeout);
 
     requestFuture.thenApply(reply -> {
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
index 282b4f928..938ed793b 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
@@ -20,29 +20,27 @@ package org.apache.ratis.datastream.impl;
 import org.apache.ratis.io.WriteOption;
 import org.apache.ratis.protocol.DataStreamRequest;
 import org.apache.ratis.protocol.DataStreamRequestHeader;
-import org.apache.ratis.thirdparty.com.google.common.collect.Lists;
 import org.apache.ratis.util.Preconditions;
 
 import java.nio.ByteBuffer;
-import java.util.Collections;
 import java.util.List;
 
 /**
  * Implements {@link DataStreamRequest} with {@link ByteBuffer}.
- *
+ * <p>
  * This class is immutable.
  */
 public class DataStreamRequestByteBuffer extends DataStreamPacketByteBuffer implements DataStreamRequest {
-  private List<WriteOption> options;
+  private final List<WriteOption> options;
 
   public DataStreamRequestByteBuffer(DataStreamRequestHeader header, ByteBuffer buffer) {
     super(header.getClientId(), header.getType(), header.getStreamId(), header.getStreamOffset(), buffer);
-    this.options = Collections.unmodifiableList(Lists.newArrayList(header.getWriteOptions()));
+    this.options = header.getWriteOptionList();
     Preconditions.assertTrue(header.getDataLength() == buffer.remaining());
   }
 
   @Override
-  public List<WriteOption> getWriteOptions() {
+  public List<WriteOption> getWriteOptionList() {
     return options;
   }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
index b1fb73620..ceb84ff6e 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
@@ -21,14 +21,12 @@ import org.apache.ratis.io.FilePositionCount;
 import org.apache.ratis.io.WriteOption;
 import org.apache.ratis.protocol.DataStreamRequest;
 import org.apache.ratis.protocol.DataStreamRequestHeader;
-import org.apache.ratis.thirdparty.com.google.common.collect.Lists;
 
-import java.util.Collections;
 import java.util.List;
 
 /**
  * Implements {@link DataStreamRequest} with {@link FilePositionCount}.
- *
+ * <p>
  * This class is immutable.
  */
 public class DataStreamRequestFilePositionCount extends DataStreamPacketImpl implements DataStreamRequest {
@@ -37,7 +35,7 @@ public class DataStreamRequestFilePositionCount extends DataStreamPacketImpl imp
 
   public DataStreamRequestFilePositionCount(DataStreamRequestHeader header, FilePositionCount file) {
     super(header.getClientId(), header.getType(), header.getStreamId(), header.getStreamOffset());
-    this.options = Collections.unmodifiableList(Lists.newArrayList(header.getWriteOptions()));
+    this.options = header.getWriteOptionList();
     this.file = file;
   }
 
@@ -52,7 +50,7 @@ public class DataStreamRequestFilePositionCount extends DataStreamPacketImpl imp
   }
 
   @Override
-  public List<WriteOption> getWriteOptions() {
+  public List<WriteOption> getWriteOptionList() {
     return options;
   }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java b/ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java
index 0aae8f9b9..27c91a5d8 100644
--- a/ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java
+++ b/ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java
@@ -21,5 +21,7 @@ public enum StandardWriteOption implements WriteOption {
   /** Sync the data to the underlying storage. */
   SYNC,
   /** Close the data to the underlying storage. */
-  CLOSE
+  CLOSE,
+  /** Flush the data out to the network. */
+  FLUSH,
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java b/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java
index b5f29a2f2..a734f0dab 100644
--- a/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java
+++ b/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java
@@ -20,6 +20,8 @@ package org.apache.ratis.io;
 import java.util.Arrays;
 
 public interface WriteOption {
+  WriteOption[] EMPTY_ARRAY = {};
+
   static boolean containsOption(Iterable<WriteOption> options,
                                 WriteOption target) {
     for (WriteOption option : options) {
@@ -36,7 +38,7 @@ public interface WriteOption {
     return containsOption(Arrays.asList(options), target);
   }
 
-  default boolean isOneOf(Iterable<WriteOption> options) {
+  default boolean isOneOf(WriteOption... options) {
     return containsOption(options, this);
   }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
index cde07c415..36d0d8b63 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
@@ -22,5 +22,11 @@ import org.apache.ratis.io.WriteOption;
 import java.util.List;
 
 public interface DataStreamRequest extends DataStreamPacket {
-  List<WriteOption> getWriteOptions();
+  List<WriteOption> getWriteOptionList();
+
+  /** @deprecated use {@link #getWriteOptionList()}. */
+  @Deprecated
+  default WriteOption[] getWriteOptions() {
+    return getWriteOptionList().toArray(WriteOption.EMPTY_ARRAY);
+  }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
index a0c68eff1..7cbf17ea6 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
@@ -45,7 +45,7 @@ public class DataStreamRequestHeader extends DataStreamPacketHeader implements D
   }
 
   @Override
-  public List<WriteOption> getWriteOptions() {
+  public List<WriteOption> getWriteOptionList() {
     return options;
   }
 }
\ No newline at end of file
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
index 0f595c7b4..10f35157c 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -56,7 +56,7 @@ public interface NettyDataStreamUtils {
         .setStreamOffset(request.getStreamOffset())
         .setType(request.getType())
         .setDataLength(request.getDataLength());
-    for (WriteOption option : request.getWriteOptions()) {
+    for (WriteOption option : request.getWriteOptionList()) {
       b.addOptions(DataStreamPacketHeaderProto.Option.forNumber(
           ((StandardWriteOption) option).ordinal()));
     }
@@ -105,7 +105,14 @@ public interface NettyDataStreamUtils {
   static void encodeDataStreamRequestByteBuffer(DataStreamRequestByteBuffer request, Consumer<Object> out,
       ByteBufAllocator allocator) {
     encodeDataStreamRequestHeader(request, out, allocator);
-    out.accept(Unpooled.wrappedBuffer(request.slice()));
+    encodeByteBuffer(request.slice(), out);
+  }
+
+  static void encodeByteBuffer(ByteBuffer buffer, Consumer<Object> out) {
+    if (buffer.remaining() == 0) {
+      return;
+    }
+    out.accept(Unpooled.wrappedBuffer(buffer));
   }
 
   static void encodeDataStreamRequestFilePositionCount(
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index a03013748..51326d13e 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -19,9 +19,12 @@
 package org.apache.ratis.netty.client;
 
 import org.apache.ratis.client.DataStreamClientRpc;
+import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
 import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
+import org.apache.ratis.io.StandardWriteOption;
+import org.apache.ratis.io.WriteOption;
 import org.apache.ratis.netty.NettyConfigKeys;
 import org.apache.ratis.netty.NettyDataStreamUtils;
 import org.apache.ratis.netty.NettyUtils;
@@ -51,6 +54,7 @@ import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContext;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
 import org.apache.ratis.util.NetUtils;
+import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.util.TimeoutExecutor;
 import org.slf4j.Logger;
@@ -58,6 +62,7 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Optional;
@@ -68,8 +73,11 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
+import static org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER;
+
 public class NettyClientStreamRpc implements DataStreamClientRpc {
   public static final Logger LOG = LoggerFactory.getLogger(NettyClientStreamRpc.class);
 
@@ -238,9 +246,39 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
     }
   }
 
+  class OutstandingRequests {
+    private int count;
+    private long bytes;
+
+    synchronized boolean write(DataStreamRequest request) {
+      count++;
+      bytes += request.getDataLength();
+      final List<WriteOption> options = request.getWriteOptionList();
+      final boolean isClose = options.contains(StandardWriteOption.CLOSE);
+      final boolean isFlush = options.contains(StandardWriteOption.FLUSH);
+      final boolean flush = shouldFlush(isClose || isFlush, flushRequestCountMin, flushRequestBytesMin);
+      LOG.debug("Stream{} outstanding: count={}, bytes={}, options={}, flush? {}",
+          request.getStreamId(), count, bytes, options, flush);
+      return flush;
+    }
+
+    synchronized boolean shouldFlush(boolean force, int countMin, SizeInBytes bytesMin) {
+      if (force || count >= countMin || bytes >= bytesMin.getSize()) {
+        count = 0;
+        bytes = 0;
+        return true;
+      }
+      return false;
+    }
+  }
+
   private final String name;
   private final Connection connection;
 
+  private final int flushRequestCountMin;
+  private final SizeInBytes flushRequestBytesMin;
+  private final OutstandingRequests outstandingRequests = new OutstandingRequests();
+
   private final ConcurrentMap<ClientInvocationId, ReplyQueue> replies = new ConcurrentHashMap<>();
   private final TimeDuration replyQueueGracePeriod;
   private final TimeoutExecutor timeoutScheduler = TimeoutExecutor.getInstance();
@@ -248,6 +286,8 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
   public NettyClientStreamRpc(RaftPeer server, TlsConf tlsConf, RaftProperties properties) {
     this.name = JavaUtils.getClassSimpleName(getClass()) + "->" + server;
     this.replyQueueGracePeriod = NettyConfigKeys.DataStream.Client.replyQueueGracePeriod(properties);
+    this.flushRequestCountMin = RaftClientConfigKeys.DataStream.flushRequestCountMin(properties);
+    this.flushRequestBytesMin = RaftClientConfigKeys.DataStream.flushRequestBytesMin(properties);
 
     final InetSocketAddress address = NetUtils.createSocketAddr(server.getDataStreamAddress());
     final SslContext sslContext = NettyUtils.buildSslContextForClient(tlsConf);
@@ -307,7 +347,7 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
       }
 
       @Override
-      public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+      public void channelInactive(ChannelHandlerContext ctx) {
         if (!connection.isClosed()) {
           connection.scheduleReconnect("channel is inactive", null);
         }
@@ -326,6 +366,7 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
         }
         p.addLast(newEncoder());
         p.addLast(newEncoderDataStreamRequestFilePositionCount());
+        p.addLast(newEncoderByteBuffer());
         p.addLast(newDecoder());
         p.addLast(handler);
       }
@@ -350,6 +391,15 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
     };
   }
 
+  static MessageToMessageEncoder<ByteBuffer> newEncoderByteBuffer() {
+    return new MessageToMessageEncoder<ByteBuffer>() {
+      @Override
+      protected void encode(ChannelHandlerContext ctx, ByteBuffer request, List<Object> out) {
+        NettyDataStreamUtils.encodeByteBuffer(request, out::add);
+      }
+    };
+  }
+
   static ByteToMessageDecoder newDecoder() {
     return new ByteToMessageDecoder() {
       {
@@ -378,7 +428,9 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
       return f;
     }
     LOG.debug("{}: write {}", this, request);
-    channel.writeAndFlush(request).addListener(future -> {
+    final Function<DataStreamRequest, ChannelFuture> writeMethod = outstandingRequests.write(request)?
+        channel::writeAndFlush: channel::write;
+    writeMethod.apply(request).addListener(future -> {
       if (!future.isSuccess()) {
         final IOException e = new IOException(this + ": Failed to send " + request, future.cause());
         LOG.error("Channel write failed", e);
@@ -390,7 +442,15 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
 
   @Override
   public void close() {
-    connection.close();
+    final boolean flush = outstandingRequests.shouldFlush(true, 0, SizeInBytes.ZERO);
+    LOG.debug("flush? {}", flush);
+    if (flush) {
+      Optional.ofNullable(connection.getChannelUninterruptibly())
+          .map(c -> c.writeAndFlush(EMPTY_BYTE_BUFFER))
+          .ifPresent(f -> f.addListener(dummy -> connection.close()));
+    } else {
+      connection.close();
+    }
   }
 
   @Override
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 8cdf15fe7..362199323 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -112,8 +112,7 @@ public class DataStreamManagement {
     CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request, Executor executor) {
       final Timekeeper.Context context = metrics.start();
       return composeAsync(sendFuture, executor,
-          n -> out.writeAsync(request.slice().nioBuffer(),
-                          request.getWriteOptions())
+          n -> out.writeAsync(request.slice().nioBuffer(), request.getWriteOptionList())
               .whenComplete((l, e) -> metrics.stop(context, e == null)));
     }
   }
@@ -392,8 +391,7 @@ public class DataStreamManagement {
 
   private void readImpl(DataStreamRequestByteBuf request, ChannelHandlerContext ctx, ByteBuf buf,
       CheckedBiFunction<RaftClientRequest, Set<RaftPeer>, Set<DataStreamOutputRpc>, IOException> getStreams) {
-    boolean close = WriteOption.containsOption(request.getWriteOptions(),
-            StandardWriteOption.CLOSE);
+    final boolean close = request.getWriteOptionList().contains(StandardWriteOption.CLOSE);
     ClientInvocationId key =  ClientInvocationId.valueOf(request.getClientId(), request.getStreamId());
     final StreamInfo info;
     if (request.getType() == Type.STREAM_HEADER) {
@@ -422,9 +420,7 @@ public class DataStreamManagement {
       localWrite = CompletableFuture.completedFuture(0L);
       remoteWrites = Collections.emptyList();
     } else if (request.getType() == Type.STREAM_DATA) {
-      localWrite = info.getLocal().write(buf,
-              request.getWriteOptions(),
-              writeExecutor);
+      localWrite = info.getLocal().write(buf, request.getWriteOptionList(), writeExecutor);
       remoteWrites = info.applyToRemotes(out -> out.write(request, requestExecutor));
     } else {
       throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
index 84803eb0e..0dcd46e02 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
@@ -33,7 +33,7 @@ import java.util.List;
 
 /**
  * Implements {@link DataStreamRequest} with {@link ByteBuf}.
- *
+ * <p>
  * This class is immutable.
  */
 public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements DataStreamRequest {
@@ -41,7 +41,7 @@ public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements Da
   private final List<WriteOption> options;
 
   public DataStreamRequestByteBuf(ClientId clientId, Type type, long streamId, long streamOffset,
-                                  List<WriteOption> options, ByteBuf buf) {
+                                  Iterable<WriteOption> options, ByteBuf buf) {
     super(clientId, type, streamId, streamOffset);
     this.buf = buf != null? buf.asReadOnly(): Unpooled.EMPTY_BUFFER;
     this.options = Collections.unmodifiableList(Lists.newArrayList(options));
@@ -49,7 +49,7 @@ public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements Da
 
   public DataStreamRequestByteBuf(DataStreamRequestHeader header, ByteBuf buf) {
     this(header.getClientId(), header.getType(), header.getStreamId(), header.getStreamOffset(),
-         header.getWriteOptions(), buf);
+         header.getWriteOptionList(), buf);
   }
 
   @Override
@@ -62,7 +62,7 @@ public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements Da
   }
 
   @Override
-  public List<WriteOption> getWriteOptions() {
+  public List<WriteOption> getWriteOptionList() {
     return options;
   }
 }
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
index e18448748..c18f7dea6 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamAsyncClusterTests.java
@@ -90,7 +90,7 @@ public abstract class DataStreamAsyncClusterTests<CLUSTER extends MiniRaftCluste
     RaftTestUtil.waitForLeader(cluster);
 
     final List<CompletableFuture<Long>> futures = new ArrayList<>();
-    futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster, 5, 10, 1_000_000, 10, stepDownLeader), executor));
+    futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster, 5, 10, 100_000, 10, stepDownLeader), executor));
     futures.add(CompletableFuture.supplyAsync(() -> runTestDataStream(cluster, 2, 20, 1_000, 5_000, stepDownLeader), executor));
     final long maxIndex = futures.stream()
         .map(CompletableFuture::join)
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java
index cd6bbc75f..14c62b74f 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/TestNettyDataStreamStarTopologyWithGrpcCluster.java
@@ -17,9 +17,14 @@
  */
 package org.apache.ratis.datastream;
 
+import org.apache.ratis.client.RaftClientConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.protocol.RoutingTable;
+import org.apache.ratis.util.SizeInBytes;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.Before;
 
 import java.util.Collection;
 import java.util.List;
@@ -29,13 +34,22 @@ public class TestNettyDataStreamStarTopologyWithGrpcCluster
     extends DataStreamAsyncClusterTests<MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty>
     implements MiniRaftClusterWithRpcTypeGrpcAndDataStreamTypeNetty.FactoryGet {
 
+  @Before
+  public void setup() {
+    final RaftProperties p = getProperties();
+    RaftClientConfigKeys.DataStream.setRequestTimeout(p, TimeDuration.ONE_MINUTE);
+    RaftClientConfigKeys.DataStream.setFlushRequestCountMin(p, 4);
+    RaftClientConfigKeys.DataStream.setFlushRequestBytesMin(p, SizeInBytes.valueOf("10MB"));
+    RaftClientConfigKeys.DataStream.setOutstandingRequestsMax(p, 2 << 16);
+  }
+
   @Override
   public RoutingTable getRoutingTable(Collection<RaftPeer> peers, RaftPeer primary) {
-    RoutingTable.Builder builder = RoutingTable.newBuilder();
     final List<RaftPeerId> others = peers.stream()
-        .filter(p -> !p.getId().equals(primary.getId())).map(v -> v.getId())
+        .map(RaftPeer::getId).filter(id -> !id.equals(primary.getId()))
         .collect(Collectors.toList());
-    builder.addSuccessors(primary.getId(), others);
-    return builder.build();
+    return RoutingTable.newBuilder()
+        .addSuccessors(primary.getId(), others)
+        .build();
   }
 }