You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/12/17 15:24:52 UTC

[incubator-ratis] branch master updated: RATIS-1196. Save STREAM_CLOSE RPC call (#339)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 1c0646d  RATIS-1196. Save STREAM_CLOSE RPC call (#339)
1c0646d is described below

commit 1c0646d46fba1647ceddc3bcd30d09dd67b46476
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Thu Dec 17 23:24:43 2020 +0800

    RATIS-1196. Save STREAM_CLOSE RPC call (#339)
---
 .../apache/ratis/client/api/DataStreamOutput.java  | 25 ++++------
 .../ratis/client/impl/DataStreamClientImpl.java    | 53 ++++++++++++----------
 .../impl/DataStreamRequestByteBuffer.java          |  9 ++++
 .../impl/DataStreamRequestFilePositionCount.java   |  8 ++++
 .../StandardWriteOption.java}                      |  9 ++--
 .../DataStreamRequest.java => io/WriteOption.java} | 12 ++++-
 .../apache/ratis/protocol/DataStreamRequest.java   |  3 ++
 .../ratis/protocol/DataStreamRequestHeader.java    | 20 +++++++-
 .../ratis/examples/filestore/cli/DataStream.java   | 11 +++--
 .../ratis/examples/filestore/FileStoreWriter.java  |  6 ++-
 .../apache/ratis/netty/NettyDataStreamUtils.java   |  6 +++
 .../ratis/netty/server/DataStreamManagement.java   | 43 ++++++++++--------
 .../netty/server/DataStreamRequestByteBuf.java     | 12 ++++-
 ratis-proto/src/main/proto/Raft.proto              | 10 ++--
 .../ratis/datastream/DataStreamTestUtils.java      |  5 +-
 15 files changed, 154 insertions(+), 78 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
index c054d35..be6d13e 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
@@ -19,6 +19,7 @@ package org.apache.ratis.client.api;
 
 import org.apache.ratis.io.CloseAsync;
 import org.apache.ratis.io.FilePositionCount;
+import org.apache.ratis.io.WriteOption;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.RaftClientReply;
 
@@ -30,21 +31,13 @@ import java.util.concurrent.CompletableFuture;
 /** An asynchronous output stream supporting zero buffer copying. */
 public interface DataStreamOutput extends CloseAsync<DataStreamReply> {
   /**
-   * This method is the same as writeAsync(src, sync_default),
-   * where sync_default depends on the underlying implementation.
-   */
-  default CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src) {
-    return writeAsync(src, false);
-  }
-
-  /**
    * Send out the data in the source buffer asynchronously.
    *
    * @param src the source buffer to be sent.
-   * @param sync Should the data be sync'ed to the underlying storage?
+   * @param options - options specifying how the data was written
    * @return a future of the reply.
    */
-  CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, boolean sync);
+  CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, WriteOption... options);
 
 
   /**
@@ -52,24 +45,24 @@ public interface DataStreamOutput extends CloseAsync<DataStreamReply> {
    * where sync_default depends on the underlying implementation.
    */
   default CompletableFuture<DataStreamReply> writeAsync(File src) {
-    return writeAsync(src, 0, src.length(), false);
+    return writeAsync(src, 0, src.length());
   }
 
   /**
-   * The same as writeAsync(FilePositionCount.valueOf(src, position, count), sync).
+   * The same as writeAsync(FilePositionCount.valueOf(src, position, count), options).
    */
-  default CompletableFuture<DataStreamReply> writeAsync(File src, long position, long count, boolean sync) {
-    return writeAsync(FilePositionCount.valueOf(src, position, count), sync);
+  default CompletableFuture<DataStreamReply> writeAsync(File src, long position, long count, WriteOption... options) {
+    return writeAsync(FilePositionCount.valueOf(src, position, count), options);
   }
 
   /**
    * Send out the data in the source file asynchronously.
    *
    * @param src the source file with the starting position and the number of bytes.
-   * @param sync Should the data be sync'ed to the underlying storage?
+   * @param options options specifying how the data was written
    * @return a future of the reply.
    */
-  CompletableFuture<DataStreamReply> writeAsync(FilePositionCount src, boolean sync);
+  CompletableFuture<DataStreamReply> writeAsync(FilePositionCount src, WriteOption... options);
 
   /**
    * Return the future of the {@link RaftClientReply}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index 55506b9..a893d85 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -21,7 +21,10 @@ import org.apache.ratis.client.DataStreamClient;
 import org.apache.ratis.client.DataStreamClientRpc;
 import org.apache.ratis.client.DataStreamOutputRpc;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
 import org.apache.ratis.io.FilePositionCount;
+import org.apache.ratis.io.StandardWriteOption;
+import org.apache.ratis.io.WriteOption;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.ClientInvocationId;
@@ -70,11 +73,7 @@ public class DataStreamClientImpl implements DataStreamClient {
     private final RaftClientRequest header;
     private final CompletableFuture<DataStreamReply> headerFuture;
     private final CompletableFuture<RaftClientReply> raftClientReplyFuture = new CompletableFuture<>();
-    private final MemoizedSupplier<CompletableFuture<DataStreamReply>> closeSupplier = JavaUtils.memoize(() -> {
-      final CompletableFuture<DataStreamReply> f = send(Type.STREAM_CLOSE);
-      f.thenApply(ClientProtoUtils::getRaftClientReply).whenComplete(JavaUtils.asBiConsumer(raftClientReplyFuture));
-      return f;
-    });
+    private CompletableFuture<DataStreamReply> closeFuture;
     private final MemoizedSupplier<WritableByteChannel> writableByteChannelSupplier
         = JavaUtils.memoize(() -> new WritableByteChannel() {
       @Override
@@ -92,7 +91,11 @@ public class DataStreamClientImpl implements DataStreamClient {
 
       @Override
       public void close() throws IOException {
-        IOUtils.getFromFuture(closeAsync(), () -> "close(" + ClientInvocationId.valueOf(header) + ")");
+        if (isClosed()) {
+          return;
+        }
+        IOUtils.getFromFuture(writeAsync(DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER, StandardWriteOption.CLOSE),
+            () -> "close(" + ClientInvocationId.valueOf(header) + ")");
       }
     });
 
@@ -104,46 +107,48 @@ public class DataStreamClientImpl implements DataStreamClient {
       this.headerFuture = send(Type.STREAM_HEADER, buffer, buffer.remaining());
     }
 
-    private CompletableFuture<DataStreamReply> send(Type type, Object data, long length) {
-      final DataStreamRequestHeader h = new DataStreamRequestHeader(type, header.getCallId(), streamOffset, length);
+    private CompletableFuture<DataStreamReply> send(Type type, Object data, long length, WriteOption... options) {
+      final DataStreamRequestHeader h =
+          new DataStreamRequestHeader(type, header.getCallId(), streamOffset, length, options);
       return orderedStreamAsync.sendRequest(h, data);
     }
 
-    private CompletableFuture<DataStreamReply> send(Type type) {
-      return combineHeader(send(type, null, 0));
-    }
-
     private CompletableFuture<DataStreamReply> combineHeader(CompletableFuture<DataStreamReply> future) {
       return future.thenCombine(headerFuture, (reply, headerReply) -> headerReply.isSuccess()? reply : headerReply);
     }
 
-    private CompletableFuture<DataStreamReply> writeAsyncImpl(Object data, long length, boolean sync) {
+    private CompletableFuture<DataStreamReply> writeAsyncImpl(Object data, long length, WriteOption... options) {
       if (isClosed()) {
         return JavaUtils.completeExceptionally(new AlreadyClosedException(
             clientId + ": stream already closed, request=" + header));
       }
-      final CompletableFuture<DataStreamReply> f = send(sync ? Type.STREAM_DATA_SYNC : Type.STREAM_DATA, data, length);
+      final CompletableFuture<DataStreamReply> f = combineHeader(send(Type.STREAM_DATA, data, length, options));
+      if (WriteOption.containsOption(options, StandardWriteOption.CLOSE)) {
+        closeFuture = f;
+        f.thenApply(ClientProtoUtils::getRaftClientReply).whenComplete(JavaUtils.asBiConsumer(raftClientReplyFuture));
+      }
       streamOffset += length;
-      return combineHeader(f);
+      return f;
     }
 
     @Override
-    public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, boolean sync) {
-      return writeAsyncImpl(src, src.remaining(), sync);
+    public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, WriteOption... options) {
+      return writeAsyncImpl(src, src.remaining(), options);
     }
 
     @Override
-    public CompletableFuture<DataStreamReply> writeAsync(FilePositionCount src, boolean sync) {
-      return writeAsyncImpl(src, src.getCount(), sync);
+    public CompletableFuture<DataStreamReply> writeAsync(FilePositionCount src, WriteOption... options) {
+      return writeAsyncImpl(src, src.getCount(), options);
     }
 
-    @Override
-    public CompletableFuture<DataStreamReply> closeAsync() {
-      return closeSupplier.get();
+    boolean isClosed() {
+      return closeFuture != null;
     }
 
-    boolean isClosed() {
-      return closeSupplier.isInitialized();
+    @Override
+    public CompletableFuture<DataStreamReply> closeAsync() {
+      return isClosed() ? closeFuture :
+          writeAsync(DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER, StandardWriteOption.CLOSE);
     }
 
     public RaftClientRequest getHeader() {
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
index 0de14fc..3dc9137 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.ratis.datastream.impl;
 
+import org.apache.ratis.io.WriteOption;
 import org.apache.ratis.protocol.DataStreamRequest;
 import org.apache.ratis.protocol.DataStreamRequestHeader;
 import org.apache.ratis.util.Preconditions;
@@ -29,8 +30,16 @@ import java.nio.ByteBuffer;
  * This class is immutable.
  */
 public class DataStreamRequestByteBuffer extends DataStreamPacketByteBuffer implements DataStreamRequest {
+  private WriteOption[] options;
+
   public DataStreamRequestByteBuffer(DataStreamRequestHeader header, ByteBuffer buffer) {
     super(header.getType(), header.getStreamId(), header.getStreamOffset(), buffer);
+    this.options = header.getWriteOptions();
     Preconditions.assertTrue(header.getDataLength() == buffer.remaining());
   }
+
+  @Override
+  public WriteOption[] getWriteOptions() {
+    return options;
+  }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
index 8dd3b8f..fd60bf8 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.datastream.impl;
 
 import org.apache.ratis.io.FilePositionCount;
+import org.apache.ratis.io.WriteOption;
 import org.apache.ratis.protocol.DataStreamRequest;
 import org.apache.ratis.protocol.DataStreamRequestHeader;
 
@@ -28,9 +29,11 @@ import org.apache.ratis.protocol.DataStreamRequestHeader;
  */
 public class DataStreamRequestFilePositionCount extends DataStreamPacketImpl implements DataStreamRequest {
   private final FilePositionCount file;
+  private WriteOption[] options;
 
   public DataStreamRequestFilePositionCount(DataStreamRequestHeader header, FilePositionCount file) {
     super(header.getType(), header.getStreamId(), header.getStreamOffset());
+    this.options = header.getWriteOptions();
     this.file = file;
   }
 
@@ -43,4 +46,9 @@ public class DataStreamRequestFilePositionCount extends DataStreamPacketImpl imp
   public FilePositionCount getFile() {
     return file;
   }
+
+  @Override
+  public WriteOption[] getWriteOptions() {
+    return options;
+  }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java b/ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java
similarity index 79%
copy from ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
copy to ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java
index 8db10fe..0aae8f9 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/io/StandardWriteOption.java
@@ -15,8 +15,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.ratis.io;
 
-package org.apache.ratis.protocol;
-
-public interface DataStreamRequest extends DataStreamPacket {
+public enum StandardWriteOption implements WriteOption {
+  /** Sync the data to the underlying storage. */
+  SYNC,
+  /** Close the data to the underlying storage. */
+  CLOSE
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java b/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java
similarity index 75%
copy from ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
copy to ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java
index 8db10fe..11760c4 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/io/WriteOption.java
@@ -15,8 +15,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.ratis.io;
 
-package org.apache.ratis.protocol;
+public interface WriteOption {
+  static boolean containsOption(WriteOption[] options, WriteOption target) {
+    for (WriteOption option : options) {
+      if (option == target) {
+        return true;
+      }
+    }
 
-public interface DataStreamRequest extends DataStreamPacket {
+    return false;
+  }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
index 8db10fe..cf09208 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequest.java
@@ -18,5 +18,8 @@
 
 package org.apache.ratis.protocol;
 
+import org.apache.ratis.io.WriteOption;
+
 public interface DataStreamRequest extends DataStreamPacket {
+  WriteOption[] getWriteOptions();
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
index 7781cd6..561f4ba 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
@@ -18,6 +18,8 @@
 
 package org.apache.ratis.protocol;
 
+import org.apache.ratis.io.StandardWriteOption;
+import org.apache.ratis.io.WriteOption;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.proto.RaftProtos.DataStreamRequestHeaderProto;
@@ -51,7 +53,13 @@ public class DataStreamRequestHeader extends DataStreamPacketHeader implements D
       final DataStreamPacketHeaderProto h = header.getPacketHeader();
       if (h.getDataLength() + headerBufLen <= buf.readableBytes()) {
         buf.readerIndex(buf.readerIndex() + headerBufLen);
-        return new DataStreamRequestHeader(h.getType(), h.getStreamId(), h.getStreamOffset(), h.getDataLength());
+        WriteOption[] options = new WriteOption[h.getOptionsCount()];
+        for (int i = 0; i < options.length; i++) {
+          options[i] = StandardWriteOption.values()[h.getOptions(i).ordinal()];
+        }
+
+        return new DataStreamRequestHeader(h.getType(), h.getStreamId(), h.getStreamOffset(), h.getDataLength(),
+            options);
       } else {
         buf.resetReaderIndex();
         return null;
@@ -63,7 +71,15 @@ public class DataStreamRequestHeader extends DataStreamPacketHeader implements D
     }
   }
 
-  public DataStreamRequestHeader(Type type, long streamId, long streamOffset, long dataLength) {
+  private final WriteOption[] options;
+
+  public DataStreamRequestHeader(Type type, long streamId, long streamOffset, long dataLength, WriteOption... options) {
     super(type, streamId, streamOffset, dataLength);
+    this.options = options;
+  }
+
+  @Override
+  public WriteOption[] getWriteOptions() {
+    return options;
   }
 }
\ No newline at end of file
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
index 9afc4bd..c7443f6 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
@@ -22,6 +22,7 @@ import com.beust.jcommander.Parameters;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.api.DataStreamOutput;
 import org.apache.ratis.examples.filestore.FileStoreClient;
+import org.apache.ratis.io.StandardWriteOption;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
@@ -268,7 +269,8 @@ public class DataStream extends Client {
         throw new IllegalStateException("Failed to read " + bufferSize + " byte(s) from " + this
             + ". The channel has reached end-of-stream at " + offset);
       } else if (bytesRead > 0) {
-        final CompletableFuture<DataStreamReply> f = out.writeAsync(buf.nioBuffer(), isSync(offset + bytesRead));
+        final CompletableFuture<DataStreamReply> f = isSync(offset + bytesRead) ?
+            out.writeAsync(buf.nioBuffer(), StandardWriteOption.SYNC) : out.writeAsync(buf.nioBuffer());
         f.thenRun(buf::release);
         futures.add(f);
       }
@@ -287,7 +289,8 @@ public class DataStream extends Client {
       final long packetSize = getPacketSize(offset);
       final MappedByteBuffer mappedByteBuffer = in.map(FileChannel.MapMode.READ_ONLY, offset, packetSize);
       final int remaining = mappedByteBuffer.remaining();
-      futures.add(out.writeAsync(mappedByteBuffer, isSync(offset + remaining)));
+      futures.add(isSync(offset + remaining) ?
+          out.writeAsync(mappedByteBuffer, StandardWriteOption.SYNC) : out.writeAsync(mappedByteBuffer));
       return remaining;
     }
   }
@@ -300,7 +303,9 @@ public class DataStream extends Client {
     @Override
     long write(FileChannel in, DataStreamOutput out, long offset, List<CompletableFuture<DataStreamReply>> futures) {
       final long packetSize = getPacketSize(offset);
-      futures.add(out.writeAsync(getFile(), offset, packetSize, isSync(offset + packetSize)));
+      futures.add(isSync(offset + packetSize) ?
+          out.writeAsync(getFile(), offset, packetSize, StandardWriteOption.SYNC) :
+          out.writeAsync(getFile(), offset, packetSize));
       return packetSize;
     }
   }
diff --git a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
index 1caa59f..991ce82 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/examples/filestore/FileStoreWriter.java
@@ -19,6 +19,7 @@ package org.apache.ratis.examples.filestore;
 
 import org.apache.ratis.client.api.DataStreamOutput;
 import org.apache.ratis.datastream.DataStreamTestUtils;
+import org.apache.ratis.io.StandardWriteOption;
 import org.apache.ratis.proto.RaftProtos;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
@@ -147,7 +148,8 @@ class FileStoreWriter implements Closeable {
       LOG.trace("write {}, offset={}, length={}, close? {}",
           fileName, offset, length, close);
       final ByteBuffer bf = DataStreamTestUtils.initBuffer(0, length);
-      futures.add(dataStreamOutput.writeAsync(bf, close));
+      futures.add(close ?
+          dataStreamOutput.writeAsync(bf, StandardWriteOption.CLOSE) : dataStreamOutput.writeAsync(bf));
       sizes.add(length);
       offset += length;
     }
@@ -161,7 +163,7 @@ class FileStoreWriter implements Closeable {
       reply = futures.get(i).join();
       Assert.assertTrue(reply.isSuccess());
       Assert.assertEquals(sizes.get(i).longValue(), reply.getBytesWritten());
-      Assert.assertEquals(reply.getType(), i == futures.size() - 1 ? RaftProtos.DataStreamPacketHeaderProto.Type.STREAM_DATA_SYNC : RaftProtos.DataStreamPacketHeaderProto.Type.STREAM_DATA);
+      Assert.assertEquals(reply.getType(), RaftProtos.DataStreamPacketHeaderProto.Type.STREAM_DATA);
     }
 
     return this;
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
index ccf25c7..e6111f5 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -21,6 +21,8 @@ import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
 import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
 import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
 import org.apache.ratis.io.FilePositionCount;
+import org.apache.ratis.io.StandardWriteOption;
+import org.apache.ratis.io.WriteOption;
 import org.apache.ratis.netty.server.DataStreamRequestByteBuf;
 import org.apache.ratis.proto.RaftProtos.DataStreamReplyHeaderProto;
 import org.apache.ratis.proto.RaftProtos.DataStreamRequestHeaderProto;
@@ -47,6 +49,10 @@ public interface NettyDataStreamUtils {
         .setStreamOffset(request.getStreamOffset())
         .setType(request.getType())
         .setDataLength(request.getDataLength());
+    for (WriteOption option : request.getWriteOptions()) {
+      b.addOptions(DataStreamPacketHeaderProto.Option.forNumber(
+          ((StandardWriteOption) option).ordinal()));
+    }
     return DataStreamRequestHeaderProto
         .newBuilder()
         .setPacketHeader(b)
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index e5ede5e..121c69e 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -23,6 +23,8 @@ import org.apache.ratis.client.DataStreamOutputRpc;
 import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
+import org.apache.ratis.io.StandardWriteOption;
+import org.apache.ratis.io.WriteOption;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.protocol.ClientId;
@@ -76,9 +78,9 @@ public class DataStreamManagement {
       this.writeFuture = new AtomicReference<>(streamFuture.thenApply(s -> 0L));
     }
 
-    CompletableFuture<Long> write(ByteBuf buf, boolean sync, Executor executor) {
+    CompletableFuture<Long> write(ByteBuf buf, WriteOption[] options, Executor executor) {
       return composeAsync(writeFuture, executor,
-          n -> streamFuture.thenApplyAsync(stream -> writeTo(buf, sync, stream), executor));
+          n -> streamFuture.thenApplyAsync(stream -> writeTo(buf, options, stream), executor));
     }
 
     CompletableFuture<Long> close(Executor executor) {
@@ -95,7 +97,7 @@ public class DataStreamManagement {
     }
 
     CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request) {
-      return out.writeAsync(request.slice().nioBuffer(), request.getType() == Type.STREAM_DATA_SYNC);
+      return out.writeAsync(request.slice().nioBuffer(), request.getWriteOptions());
     }
 
     CompletableFuture<DataStreamReply> close() {
@@ -259,7 +261,7 @@ public class DataStreamManagement {
     return composed;
   }
 
-  static long writeTo(ByteBuf buf, boolean sync, DataStream stream) {
+  static long writeTo(ByteBuf buf, WriteOption[] options, DataStream stream) {
     final DataChannel channel = stream.getDataChannel();
     long byteWritten = 0;
     for (ByteBuffer buffer : buf.nioBuffers()) {
@@ -270,13 +272,17 @@ public class DataStreamManagement {
       }
     }
 
-    if (sync) {
+    if (WriteOption.containsOption(options, StandardWriteOption.SYNC)) {
       try {
         channel.force(false);
       } catch (IOException e) {
         throw new CompletionException(e);
       }
     }
+
+    if (WriteOption.containsOption(options, StandardWriteOption.CLOSE)) {
+      close(stream);
+    }
     return byteWritten;
   }
 
@@ -290,12 +296,13 @@ public class DataStreamManagement {
   }
 
   static DataStreamReplyByteBuffer newDataStreamReplyByteBuffer(
-      DataStreamRequestByteBuf request, RaftClientReply reply) {
+      DataStreamRequestByteBuf request, RaftClientReply reply, long bytesWritten) {
     final ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
     return DataStreamReplyByteBuffer.newBuilder()
         .setDataStreamPacket(request)
         .setBuffer(buffer)
         .setSuccess(reply.isSuccess())
+        .setBytesWritten(bytesWritten)
         .build();
   }
 
@@ -311,15 +318,15 @@ public class DataStreamManagement {
     ctx.writeAndFlush(builder.build());
   }
 
-  private CompletableFuture<Void> startTransaction(StreamInfo info, DataStreamRequestByteBuf request,
+  private CompletableFuture<Void> startTransaction(StreamInfo info, DataStreamRequestByteBuf request, long bytesWritten,
       ChannelHandlerContext ctx) {
     try {
       AsyncRpcApi asyncRpcApi = (AsyncRpcApi) (server.getDivision(info.getRequest()
           .getRaftGroupId())
           .getRaftClient()
           .async());
-      return asyncRpcApi.sendForward(info.request)
-          .thenAcceptAsync(reply -> ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply)), requestExecutor);
+      return asyncRpcApi.sendForward(info.request).thenAcceptAsync(
+          reply -> ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply, bytesWritten)), requestExecutor);
     } catch (IOException e) {
       throw new CompletionException(e);
     }
@@ -348,7 +355,7 @@ public class DataStreamManagement {
       ChannelHandlerContext ctx) {
     LOG.warn("Failed to process {}",  request, throwable);
     try {
-      ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply));
+      ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply, 0));
     } catch (Throwable t) {
       LOG.warn("Failed to sendDataStreamException {} for {}", throwable, request, t);
     }
@@ -358,6 +365,7 @@ public class DataStreamManagement {
       CheckedBiFunction<RaftClientRequest, List<RaftPeer>, List<DataStreamOutputRpc>, IOException> getStreams) {
     LOG.debug("{}: read {}", this, request);
     final ByteBuf buf = request.slice();
+    boolean close = WriteOption.containsOption(request.getWriteOptions(), StandardWriteOption.CLOSE);
     final StreamMap.Key key = new StreamMap.Key(ctx.channel().id(), request.getStreamId());
     final StreamInfo info;
     if (request.getType() == Type.STREAM_HEADER) {
@@ -367,7 +375,7 @@ public class DataStreamManagement {
         throw new IllegalStateException("Failed to create a new stream for " + request
             + " since a stream already exists: " + info);
       }
-    } else if (request.getType() == Type.STREAM_CLOSE) {
+    } else if (close) {
       info = Optional.ofNullable(streams.remove(key)).orElseThrow(
           () -> new IllegalStateException("Failed to remove StreamInfo for " + request));
     } else {
@@ -380,12 +388,9 @@ public class DataStreamManagement {
     if (request.getType() == Type.STREAM_HEADER) {
       localWrite = CompletableFuture.completedFuture(0L);
       remoteWrites = Collections.emptyList();
-    } else if (request.getType() == Type.STREAM_DATA || request.getType() == Type.STREAM_DATA_SYNC) {
-      localWrite = info.getLocal().write(buf, request.getType() == Type.STREAM_DATA_SYNC, writeExecutor);
+    } else if (request.getType() == Type.STREAM_DATA) {
+      localWrite = info.getLocal().write(buf, request.getWriteOptions(), writeExecutor);
       remoteWrites = info.applyToRemotes(out -> out.write(request));
-    } else if (request.getType() == Type.STREAM_CLOSE) {
-      localWrite = info.getLocal().close(writeExecutor);
-      remoteWrites = info.isPrimary()? info.applyToRemotes(RemoteStream::close): Collections.emptyList();
     } else {
       throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
     }
@@ -393,12 +398,12 @@ public class DataStreamManagement {
     composeAsync(info.getPrevious(), requestExecutor, n -> JavaUtils.allOf(remoteWrites)
         .thenCombineAsync(localWrite, (v, bytesWritten) -> {
           if (request.getType() == Type.STREAM_HEADER
-              || request.getType() == Type.STREAM_DATA || request.getType() == Type.STREAM_DATA_SYNC) {
+              || (request.getType() == Type.STREAM_DATA && !close)) {
             sendReply(remoteWrites, request, bytesWritten, ctx);
-          } else if (request.getType() == Type.STREAM_CLOSE) {
+          } else if (close) {
             if (info.isPrimary()) {
               // after all server close stream, primary server start transaction
-              startTransaction(info, request, ctx);
+              startTransaction(info, request, bytesWritten, ctx);
             } else {
               sendReply(remoteWrites, request, bytesWritten, ctx);
             }
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
index afe9465..40cb8e7 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamRequestByteBuf.java
@@ -19,6 +19,7 @@
 package org.apache.ratis.netty.server;
 
 import org.apache.ratis.datastream.impl.DataStreamPacketImpl;
+import org.apache.ratis.io.WriteOption;
 import org.apache.ratis.protocol.DataStreamRequest;
 import org.apache.ratis.protocol.DataStreamRequestHeader;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
@@ -32,14 +33,16 @@ import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
  */
 public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements DataStreamRequest {
   private final ByteBuf buf;
+  private final WriteOption[] options;
 
-  public DataStreamRequestByteBuf(Type type, long streamId, long streamOffset, ByteBuf buf) {
+  public DataStreamRequestByteBuf(Type type, long streamId, long streamOffset, WriteOption[] options, ByteBuf buf) {
     super(type, streamId, streamOffset);
     this.buf = buf != null? buf.asReadOnly(): Unpooled.EMPTY_BUFFER;
+    this.options = options;
   }
 
   public DataStreamRequestByteBuf(DataStreamRequestHeader header, ByteBuf buf) {
-    this(header.getType(), header.getStreamId(), header.getStreamOffset(), buf);
+    this(header.getType(), header.getStreamId(), header.getStreamOffset(), header.getWriteOptions(), buf);
   }
 
   @Override
@@ -50,4 +53,9 @@ public class DataStreamRequestByteBuf extends DataStreamPacketImpl implements Da
   public ByteBuf slice() {
     return buf.slice();
   }
+
+  @Override
+  public WriteOption[] getWriteOptions() {
+    return options;
+  }
 }
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index 5964f2f..f380581 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -300,14 +300,18 @@ message DataStreamPacketHeaderProto {
   enum Type {
     STREAM_HEADER = 0;
     STREAM_DATA = 1;
-    STREAM_DATA_SYNC = 2;
-    STREAM_CLOSE = 3;
+  }
+
+  enum Option {
+    SYNC = 0;
+    CLOSE = 1;
   }
 
   uint64 streamId = 1;
   uint64 streamOffset = 2;
   Type type = 3;
-  uint64 dataLength = 4;
+  repeated Option options = 4;
+  uint64 dataLength = 5;
 }
 
 message DataStreamRequestHeaderProto {
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
index 2de9332..f7cdc36 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
@@ -22,6 +22,7 @@ import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
 import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
+import org.apache.ratis.io.StandardWriteOption;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto;
@@ -260,7 +261,7 @@ public interface DataStreamTestUtils {
       sizes.add(size);
 
       final ByteBuffer bf = initBuffer(dataSize, size);
-      futures.add(out.writeAsync(bf, i == bufferNum - 1));
+      futures.add(i == bufferNum - 1 ? out.writeAsync(bf, StandardWriteOption.SYNC) : out.writeAsync(bf));
       dataSize += size;
     }
 
@@ -272,7 +273,7 @@ public interface DataStreamTestUtils {
     // check writeAsync requests
     for (int i = 0; i < futures.size(); i++) {
       final DataStreamReply reply = futures.get(i).join();
-      final Type expectedType = i == futures.size() - 1 ? Type.STREAM_DATA_SYNC : Type.STREAM_DATA;
+      final Type expectedType = Type.STREAM_DATA;
       assertSuccessReply(expectedType, sizes.get(i).longValue(), reply);
     }
     return dataSize;