You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/11/26 02:12:50 UTC

[incubator-ratis] branch master updated: RATIS-1179. Add a new API to DataStreamOutput for sending a File. (#299)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 1ec750d  RATIS-1179. Add a new API to DataStreamOutput for sending a File. (#299)
1ec750d is described below

commit 1ec750d3d30a510dd69fba8b3a689e4adc96827b
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Nov 26 10:12:41 2020 +0800

    RATIS-1179. Add a new API to DataStreamOutput for sending a File. (#299)
---
 .../apache/ratis/client/api/DataStreamOutput.java  | 39 +++++++++++--
 .../apache/ratis/client/impl/ClientProtoUtils.java |  7 ++-
 .../ratis/client/impl/DataStreamClientImpl.java    | 32 +++++++----
 .../ratis/client/impl/OrderedStreamAsync.java      | 39 +++++++++----
 .../impl/DataStreamRequestByteBuffer.java          |  8 ++-
 ...ava => DataStreamRequestFilePositionCount.java} | 26 ++++++---
 .../FilePositionCount.java}                        | 42 ++++++++++----
 .../ratis/protocol/DataStreamPacketHeader.java     |  2 +-
 .../ratis/protocol/DataStreamReplyHeader.java      | 19 +++----
 .../ratis/protocol/DataStreamRequestHeader.java    | 16 +++---
 .../apache/ratis/netty/NettyDataStreamUtils.java   | 23 ++++++--
 .../ratis/netty/client/NettyClientStreamRpc.java   | 11 ++++
 .../ratis/datastream/DataStreamClusterTests.java   | 64 +++++++++++++++++-----
 .../ratis/datastream/DataStreamTestUtils.java      | 15 +++--
 14 files changed, 249 insertions(+), 94 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
index 579367a..c054d35 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/api/DataStreamOutput.java
@@ -18,9 +18,11 @@
 package org.apache.ratis.client.api;
 
 import org.apache.ratis.io.CloseAsync;
+import org.apache.ratis.io.FilePositionCount;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.RaftClientReply;
 
+import java.io.File;
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
 import java.util.concurrent.CompletableFuture;
@@ -28,21 +30,46 @@ import java.util.concurrent.CompletableFuture;
 /** An asynchronous output stream supporting zero buffer copying. */
 public interface DataStreamOutput extends CloseAsync<DataStreamReply> {
   /**
-   * This method is the same as writeAsync(buffer, sync_default),
+   * This method is the same as writeAsync(src, sync_default),
    * where sync_default depends on the underlying implementation.
    */
-  default CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buffer) {
-    return writeAsync(buffer, false);
+  default CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src) {
+    return writeAsync(src, false);
   }
 
   /**
-   * Send out the data in the buffer asynchronously.
+   * Send out the data in the source buffer asynchronously.
    *
-   * @param buffer the data to be sent.
+   * @param src the source buffer to be sent.
    * @param sync Should the data be sync'ed to the underlying storage?
    * @return a future of the reply.
    */
-  CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buffer, boolean sync);
+  CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, boolean sync);
+
+
+  /**
+   * The same as writeAsync(src, 0, src.length(), sync_default).
+   * where sync_default depends on the underlying implementation.
+   */
+  default CompletableFuture<DataStreamReply> writeAsync(File src) {
+    return writeAsync(src, 0, src.length(), false);
+  }
+
+  /**
+   * The same as writeAsync(FilePositionCount.valueOf(src, position, count), sync).
+   */
+  default CompletableFuture<DataStreamReply> writeAsync(File src, long position, long count, boolean sync) {
+    return writeAsync(FilePositionCount.valueOf(src, position, count), sync);
+  }
+
+  /**
+   * Send out the data in the source file asynchronously.
+   *
+   * @param src the source file with the starting position and the number of bytes.
+   * @param sync Should the data be sync'ed to the underlying storage?
+   * @return a future of the reply.
+   */
+  CompletableFuture<DataStreamReply> writeAsync(FilePositionCount src, boolean sync);
 
   /**
    * Return the future of the {@link RaftClientReply}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 0807b3b..1a8ebc9 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -121,8 +121,11 @@ public interface ClientProtoUtils {
         request.getSlidingWindowEntry());
   }
 
-  static RaftClientRequestProto toRaftClientRequestProto(
-      RaftClientRequest request) {
+  static ByteBuffer toRaftClientRequestProtoByteBuffer(RaftClientRequest request) {
+    return toRaftClientRequestProto(request).toByteString().asReadOnlyByteBuffer();
+  }
+
+  static RaftClientRequestProto toRaftClientRequestProto(RaftClientRequest request) {
     final RaftClientRequestProto.Builder b = RaftClientRequestProto.newBuilder()
         .setRpcRequest(toRaftRpcRequestProtoBuilder(request));
     if (request.getMessage() != null) {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
index ffae192..e6d5851 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/DataStreamClientImpl.java
@@ -21,11 +21,12 @@ import org.apache.ratis.client.DataStreamClient;
 import org.apache.ratis.client.DataStreamClientRpc;
 import org.apache.ratis.client.DataStreamOutputRpc;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
+import org.apache.ratis.io.FilePositionCount;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.ClientInvocationId;
 import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.protocol.DataStreamRequestHeader;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftGroupId;
@@ -95,35 +96,44 @@ public class DataStreamClientImpl implements DataStreamClient {
 
     private DataStreamOutputImpl(RaftClientRequest request) {
       this.header = request;
-      this.headerFuture = orderedStreamAsync.sendRequest(request.getCallId(), -1,
-          ClientProtoUtils.toRaftClientRequestProto(header).toByteString().asReadOnlyByteBuffer(), Type.STREAM_HEADER);
+      final ByteBuffer buffer = ClientProtoUtils.toRaftClientRequestProtoByteBuffer(header);
+      this.headerFuture = send(Type.STREAM_HEADER, buffer, buffer.remaining());
     }
 
-    private CompletableFuture<DataStreamReply> send(Type type, ByteBuffer buffer) {
-      return orderedStreamAsync.sendRequest(header.getCallId(), streamOffset, buffer, type);
+    private CompletableFuture<DataStreamReply> send(Type type, Object data, long length) {
+      final DataStreamRequestHeader h = new DataStreamRequestHeader(type, header.getCallId(), streamOffset, length);
+      return orderedStreamAsync.sendRequest(h, data);
     }
 
     private CompletableFuture<DataStreamReply> send(Type type) {
-      return combineHeader(send(type, DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER));
+      return combineHeader(send(type, null, 0));
     }
 
     private CompletableFuture<DataStreamReply> combineHeader(CompletableFuture<DataStreamReply> future) {
       return future.thenCombine(headerFuture, (reply, headerReply) -> headerReply.isSuccess()? reply : headerReply);
     }
 
-    // send to the attached dataStreamClientRpc
-    @Override
-    public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer buf, boolean sync) {
+    private CompletableFuture<DataStreamReply> writeAsyncImpl(Object data, long length, boolean sync) {
       if (isClosed()) {
         return JavaUtils.completeExceptionally(new AlreadyClosedException(
             clientId + ": stream already closed, request=" + header));
       }
-      final CompletableFuture<DataStreamReply> f = send(sync ? Type.STREAM_DATA_SYNC : Type.STREAM_DATA, buf);
-      streamOffset += buf.remaining();
+      final CompletableFuture<DataStreamReply> f = send(sync ? Type.STREAM_DATA_SYNC : Type.STREAM_DATA, data, length);
+      streamOffset += length;
       return combineHeader(f);
     }
 
     @Override
+    public CompletableFuture<DataStreamReply> writeAsync(ByteBuffer src, boolean sync) {
+      return writeAsyncImpl(src, src.remaining(), sync);
+    }
+
+    @Override
+    public CompletableFuture<DataStreamReply> writeAsync(FilePositionCount src, boolean sync) {
+      return writeAsyncImpl(src, src.getCount(), sync);
+    }
+
+    @Override
     public CompletableFuture<DataStreamReply> closeAsync() {
       return closeSupplier.get();
     }
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
index 1318d4b..10c4db4 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -20,10 +20,14 @@ package org.apache.ratis.client.impl;
 import org.apache.ratis.client.DataStreamClientRpc;
 import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.datastream.impl.DataStreamPacketByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
+import org.apache.ratis.io.FilePositionCount;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.DataStreamReply;
-import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
-import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
+import org.apache.ratis.protocol.DataStreamRequest;
+import org.apache.ratis.protocol.DataStreamRequestHeader;
 import org.apache.ratis.util.IOUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.SlidingWindow;
@@ -38,16 +42,29 @@ import java.util.function.LongFunction;
 public class OrderedStreamAsync {
   public static final Logger LOG = LoggerFactory.getLogger(OrderedStreamAsync.class);
 
-  static class DataStreamWindowRequest extends DataStreamRequestByteBuffer
-      implements SlidingWindow.ClientSideRequest<DataStreamReply> {
+  static class DataStreamWindowRequest implements SlidingWindow.ClientSideRequest<DataStreamReply> {
+    private final DataStreamRequestHeader header;
+    private final Object data;
     private final long seqNum;
     private final CompletableFuture<DataStreamReply> replyFuture = new CompletableFuture<>();
 
-    DataStreamWindowRequest(Type type, long streamId, long offset, ByteBuffer data, long seqNum) {
-      super(type, streamId, offset, data);
+    DataStreamWindowRequest(DataStreamRequestHeader header, Object data, long seqNum) {
+      this.header = header;
+      this.data = data;
       this.seqNum = seqNum;
     }
 
+    DataStreamRequest getDataStreamRequest() {
+      if (header.getDataLength() == 0) {
+        return new DataStreamRequestByteBuffer(header, DataStreamPacketByteBuffer.EMPTY_BYTE_BUFFER);
+      } else if (data instanceof ByteBuffer) {
+        return new DataStreamRequestByteBuffer(header, (ByteBuffer)data);
+      } else if (data instanceof FilePositionCount) {
+        return new DataStreamRequestFilePositionCount(header, (FilePositionCount)data);
+      }
+      throw new IllegalStateException("Unexpected " + data.getClass());
+    }
+
     @Override
     public void setFirstRequest() {
     }
@@ -87,16 +104,15 @@ public class OrderedStreamAsync {
     this.requestSemaphore = new Semaphore(RaftClientConfigKeys.DataStream.outstandingRequestsMax(properties));
   }
 
-  CompletableFuture<DataStreamReply> sendRequest(long streamId, long offset, ByteBuffer data, Type type){
-    final int length = data.remaining();
+  CompletableFuture<DataStreamReply> sendRequest(DataStreamRequestHeader header, Object data) {
     try {
       requestSemaphore.acquire();
     } catch (InterruptedException e){
       return JavaUtils.completeExceptionally(IOUtils.toInterruptedIOException(
-          "Interrupted when sending streamId=" + streamId + ", offset= " + offset + ", length=" + length, e));
+          "Interrupted when sending " + JavaUtils.getClassSimpleName(data.getClass()) + ", header= " + header, e));
     }
     final LongFunction<DataStreamWindowRequest> constructor
-        = seqNum -> new DataStreamWindowRequest(type, streamId, offset, data.slice(), seqNum);
+        = seqNum -> new DataStreamWindowRequest(header, data, seqNum);
     return slidingWindow.submitNewRequest(constructor, this::sendRequestToNetwork).
            getReplyFuture().whenComplete((r, e) -> requestSemaphore.release());
   }
@@ -109,7 +125,8 @@ public class OrderedStreamAsync {
     if(slidingWindow.isFirst(request.getSeqNum())){
       request.setFirstRequest();
     }
-    final CompletableFuture<DataStreamReply> requestFuture = dataStreamClientRpc.streamAsync(request);
+    final CompletableFuture<DataStreamReply> requestFuture = dataStreamClientRpc.streamAsync(
+        request.getDataStreamRequest());
     requestFuture.thenApply(reply -> {
       slidingWindow.receiveReply(
           request.getSeqNum(), reply, this::sendRequestToNetwork);
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
index 584f567..0de14fc 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
@@ -18,7 +18,8 @@
 package org.apache.ratis.datastream.impl;
 
 import org.apache.ratis.protocol.DataStreamRequest;
-import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
+import org.apache.ratis.protocol.DataStreamRequestHeader;
+import org.apache.ratis.util.Preconditions;
 
 import java.nio.ByteBuffer;
 
@@ -28,7 +29,8 @@ import java.nio.ByteBuffer;
  * This class is immutable.
  */
 public class DataStreamRequestByteBuffer extends DataStreamPacketByteBuffer implements DataStreamRequest {
-  public DataStreamRequestByteBuffer(Type type, long streamId, long streamOffset, ByteBuffer buffer) {
-    super(type, streamId, streamOffset, buffer);
+  public DataStreamRequestByteBuffer(DataStreamRequestHeader header, ByteBuffer buffer) {
+    super(header.getType(), header.getStreamId(), header.getStreamOffset(), buffer);
+    Preconditions.assertTrue(header.getDataLength() == buffer.remaining());
   }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
similarity index 56%
copy from ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
copy to ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
index 584f567..8dd3b8f 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestFilePositionCount.java
@@ -17,18 +17,30 @@
  */
 package org.apache.ratis.datastream.impl;
 
+import org.apache.ratis.io.FilePositionCount;
 import org.apache.ratis.protocol.DataStreamRequest;
-import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
-
-import java.nio.ByteBuffer;
+import org.apache.ratis.protocol.DataStreamRequestHeader;
 
 /**
- * Implements {@link DataStreamRequest} with {@link ByteBuffer}.
+ * Implements {@link DataStreamRequest} with {@link FilePositionCount}.
  *
  * This class is immutable.
  */
-public class DataStreamRequestByteBuffer extends DataStreamPacketByteBuffer implements DataStreamRequest {
-  public DataStreamRequestByteBuffer(Type type, long streamId, long streamOffset, ByteBuffer buffer) {
-    super(type, streamId, streamOffset, buffer);
+public class DataStreamRequestFilePositionCount extends DataStreamPacketImpl implements DataStreamRequest {
+  private final FilePositionCount file;
+
+  public DataStreamRequestFilePositionCount(DataStreamRequestHeader header, FilePositionCount file) {
+    super(header.getType(), header.getStreamId(), header.getStreamOffset());
+    this.file = file;
+  }
+
+  @Override
+  public long getDataLength() {
+    return file.getCount();
+  }
+
+  /** @return the file with the starting position and the byte count. */
+  public FilePositionCount getFile() {
+    return file;
   }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java b/ratis-common/src/main/java/org/apache/ratis/io/FilePositionCount.java
similarity index 50%
copy from ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
copy to ratis-common/src/main/java/org/apache/ratis/io/FilePositionCount.java
index 584f567..150e042 100644
--- a/ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamRequestByteBuffer.java
+++ b/ratis-common/src/main/java/org/apache/ratis/io/FilePositionCount.java
@@ -15,20 +15,42 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.ratis.datastream.impl;
+package org.apache.ratis.io;
 
-import org.apache.ratis.protocol.DataStreamRequest;
-import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
-
-import java.nio.ByteBuffer;
+import java.io.File;
 
 /**
- * Implements {@link DataStreamRequest} with {@link ByteBuffer}.
+ * Encapsulate a {@link File} with a starting position and a byte count.
  *
- * This class is immutable.
+ * The class is immutable.
  */
-public class DataStreamRequestByteBuffer extends DataStreamPacketByteBuffer implements DataStreamRequest {
-  public DataStreamRequestByteBuffer(Type type, long streamId, long streamOffset, ByteBuffer buffer) {
-    super(type, streamId, streamOffset, buffer);
+public final class FilePositionCount {
+  public static FilePositionCount valueOf(File file, long position, long count) {
+    return new FilePositionCount(file, position, count);
+  }
+
+  private final File file;
+  private final long position;
+  private final long count;
+
+  private FilePositionCount(File file, long position, long count) {
+    this.file = file;
+    this.position = position;
+    this.count = count;
+  }
+
+  /** @return the file. */
+  public File getFile() {
+    return file;
+  }
+
+  /** @return the starting position. */
+  public long getPosition() {
+    return position;
+  }
+
+  /** @return the byte count. */
+  public long getCount() {
+    return count;
   }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
index e93b178..18a2b6c 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamPacketHeader.java
@@ -28,7 +28,7 @@ public class DataStreamPacketHeader extends DataStreamPacketImpl {
 
   private final long dataLength;
 
-  public DataStreamPacketHeader(long streamId, long streamOffset, long dataLength, Type type) {
+  public DataStreamPacketHeader(Type type, long streamId, long streamOffset, long dataLength) {
     super(type, streamId, streamOffset);
     this.dataLength = dataLength;
   }
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
index e829b9f..d02c933 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamReplyHeader.java
@@ -18,8 +18,9 @@
 
 package org.apache.ratis.protocol;
 
-import org.apache.ratis.proto.RaftProtos.DataStreamReplyHeaderProto;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
+import org.apache.ratis.proto.RaftProtos.DataStreamReplyHeaderProto;
 import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.slf4j.Logger;
@@ -44,15 +45,11 @@ public class DataStreamReplyHeader extends DataStreamPacketHeader implements Dat
       ByteBuf headerBuf = buf.slice(buf.readerIndex(), headerBufLen);
       DataStreamReplyHeaderProto header = DataStreamReplyHeaderProto.parseFrom(headerBuf.nioBuffer());
 
+      final DataStreamPacketHeaderProto h = header.getPacketHeader();
       if (header.getPacketHeader().getDataLength() + headerBufLen <= buf.readableBytes()) {
         buf.readerIndex(buf.readerIndex() + headerBufLen);
-        return new DataStreamReplyHeader(
-            header.getPacketHeader().getStreamId(),
-            header.getPacketHeader().getStreamOffset(),
-            header.getPacketHeader().getDataLength(),
-            header.getPacketHeader().getType(),
-            header.getBytesWritten(),
-            header.getSuccess());
+        return new DataStreamReplyHeader(h.getType(), h.getStreamId(), h.getStreamOffset(), h.getDataLength(),
+            header.getBytesWritten(), header.getSuccess());
       } else {
         buf.resetReaderIndex();
         return null;
@@ -67,9 +64,9 @@ public class DataStreamReplyHeader extends DataStreamPacketHeader implements Dat
   private final long bytesWritten;
   private final boolean success;
 
-  public DataStreamReplyHeader(
-      long streamId, long streamOffset, long dataLength, Type type, long bytesWritten, boolean success) {
-    super(streamId, streamOffset, dataLength, type);
+  public DataStreamReplyHeader(Type type, long streamId, long streamOffset, long dataLength,
+      long bytesWritten, boolean success) {
+    super(type, streamId, streamOffset, dataLength);
     this.bytesWritten = bytesWritten;
     this.success = success;
   }
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
index b87d68f..7781cd6 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/DataStreamRequestHeader.java
@@ -18,8 +18,9 @@
 
 package org.apache.ratis.protocol;
 
-import org.apache.ratis.proto.RaftProtos.DataStreamRequestHeaderProto;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
+import org.apache.ratis.proto.RaftProtos.DataStreamRequestHeaderProto;
 import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.slf4j.Logger;
@@ -47,13 +48,10 @@ public class DataStreamRequestHeader extends DataStreamPacketHeader implements D
       ByteBuf headerBuf = buf.slice(buf.readerIndex(), headerBufLen);
       DataStreamRequestHeaderProto header = DataStreamRequestHeaderProto.parseFrom(headerBuf.nioBuffer());
 
-      if (header.getPacketHeader().getDataLength() + headerBufLen <= buf.readableBytes()) {
+      final DataStreamPacketHeaderProto h = header.getPacketHeader();
+      if (h.getDataLength() + headerBufLen <= buf.readableBytes()) {
         buf.readerIndex(buf.readerIndex() + headerBufLen);
-        return new DataStreamRequestHeader(
-            header.getPacketHeader().getStreamId(),
-            header.getPacketHeader().getStreamOffset(),
-            header.getPacketHeader().getDataLength(),
-            header.getPacketHeader().getType());
+        return new DataStreamRequestHeader(h.getType(), h.getStreamId(), h.getStreamOffset(), h.getDataLength());
       } else {
         buf.resetReaderIndex();
         return null;
@@ -65,7 +63,7 @@ public class DataStreamRequestHeader extends DataStreamPacketHeader implements D
     }
   }
 
-  public DataStreamRequestHeader(long streamId, long streamOffset, long dataLength, Type type) {
-    super(streamId, streamOffset, dataLength, type);
+  public DataStreamRequestHeader(Type type, long streamId, long streamOffset, long dataLength) {
+    super(type, streamId, streamOffset, dataLength);
   }
 }
\ No newline at end of file
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
index a3f850d..ccf25c7 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyDataStreamUtils.java
@@ -19,16 +19,20 @@ package org.apache.ratis.netty;
 
 import org.apache.ratis.datastream.impl.DataStreamReplyByteBuffer;
 import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
+import org.apache.ratis.io.FilePositionCount;
 import org.apache.ratis.netty.server.DataStreamRequestByteBuf;
 import org.apache.ratis.proto.RaftProtos.DataStreamReplyHeaderProto;
 import org.apache.ratis.proto.RaftProtos.DataStreamRequestHeaderProto;
 import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto;
 import org.apache.ratis.protocol.DataStreamPacketHeader;
 import org.apache.ratis.protocol.DataStreamReplyHeader;
+import org.apache.ratis.protocol.DataStreamRequest;
 import org.apache.ratis.protocol.DataStreamRequestHeader;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.buffer.ByteBufAllocator;
 import org.apache.ratis.thirdparty.io.netty.buffer.Unpooled;
+import org.apache.ratis.thirdparty.io.netty.channel.DefaultFileRegion;
 
 import java.nio.ByteBuffer;
 import java.util.Optional;
@@ -36,8 +40,7 @@ import java.util.function.Consumer;
 import java.util.function.Function;
 
 public interface NettyDataStreamUtils {
-
-  static ByteBuffer getDataStreamRequestHeaderProtoByteBuf(DataStreamRequestByteBuffer request) {
+  static ByteBuffer getDataStreamRequestHeaderProtoByteBuffer(DataStreamRequest request) {
     DataStreamPacketHeaderProto.Builder b = DataStreamPacketHeaderProto
         .newBuilder()
         .setStreamId(request.getStreamId())
@@ -69,16 +72,28 @@ public interface NettyDataStreamUtils {
         .asReadOnlyByteBuffer();
   }
 
-  static void encodeDataStreamRequestByteBuffer(DataStreamRequestByteBuffer request, Consumer<ByteBuf> out,
+  static void encodeDataStreamRequestHeader(DataStreamRequest request, Consumer<Object> out,
       ByteBufAllocator allocator) {
-    ByteBuffer headerBuf = getDataStreamRequestHeaderProtoByteBuf(request);
+    final ByteBuffer headerBuf = getDataStreamRequestHeaderProtoByteBuffer(request);
     final ByteBuf headerLenBuf = allocator.directBuffer(DataStreamPacketHeader.getSizeOfHeaderLen());
     headerLenBuf.writeInt(headerBuf.remaining());
     out.accept(headerLenBuf);
     out.accept(Unpooled.wrappedBuffer(headerBuf));
+  }
+
+  static void encodeDataStreamRequestByteBuffer(DataStreamRequestByteBuffer request, Consumer<Object> out,
+      ByteBufAllocator allocator) {
+    encodeDataStreamRequestHeader(request, out, allocator);
     out.accept(Unpooled.wrappedBuffer(request.slice()));
   }
 
+  static void encodeDataStreamRequestFilePositionCount(
+      DataStreamRequestFilePositionCount request, Consumer<Object> out, ByteBufAllocator allocator) {
+    encodeDataStreamRequestHeader(request, out, allocator);
+    final FilePositionCount f = request.getFile();
+    out.accept(new DefaultFileRegion(f.getFile(), f.getPosition(), f.getCount()));
+  }
+
   static void encodeDataStreamReplyByteBuffer(DataStreamReplyByteBuffer reply, Consumer<ByteBuf> out,
       ByteBufAllocator allocator) {
     ByteBuffer headerBuf = getDataStreamReplyHeaderProtoByteBuf(reply);
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 17b240b..0c369f5 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -21,6 +21,7 @@ package org.apache.ratis.netty.client;
 import org.apache.ratis.client.DataStreamClientRpc;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
+import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
 import org.apache.ratis.netty.NettyDataStreamUtils;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.DataStreamRequest;
@@ -94,6 +95,7 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
       public void initChannel(SocketChannel ch) {
         ChannelPipeline p = ch.pipeline();
         p.addLast(newEncoder());
+        p.addLast(newEncoderDataStreamRequestFilePositionCount());
         p.addLast(newDecoder());
         p.addLast(getClientHandler());
       }
@@ -109,6 +111,15 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
     };
   }
 
+  MessageToMessageEncoder<DataStreamRequestFilePositionCount> newEncoderDataStreamRequestFilePositionCount() {
+    return new MessageToMessageEncoder<DataStreamRequestFilePositionCount>() {
+      @Override
+      protected void encode(ChannelHandlerContext ctx, DataStreamRequestFilePositionCount request, List<Object> out) {
+        NettyDataStreamUtils.encodeDataStreamRequestFilePositionCount(request, out::add, ctx.alloc());
+      }
+    };
+  }
+
   ByteToMessageDecoder newDecoder() {
     return new ByteToMessageDecoder() {
       {
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
index c93c294..f46015e 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamClusterTests.java
@@ -23,11 +23,15 @@ import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.impl.DataStreamClientImpl.DataStreamOutputImpl;
 import org.apache.ratis.datastream.DataStreamTestUtils.MultiDataStreamStateMachine;
 import org.apache.ratis.datastream.DataStreamTestUtils.SingleDataStream;
+import org.apache.ratis.proto.RaftProtos.DataStreamPacketHeaderProto.Type;
 import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.RaftServerProxy;
+import org.apache.ratis.util.Timestamp;
+import org.apache.ratis.util.function.CheckedConsumer;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -54,7 +58,16 @@ public abstract class DataStreamClusterTests<CLUSTER extends MiniRaftCluster> ex
   void testStreamWrites(CLUSTER cluster) throws Exception {
     waitForLeader(cluster);
     runTestDataStreamOutput(cluster);
-    runTestTransferTo(cluster);
+
+    // create data file
+    final int size = 10_000_000 + ThreadLocalRandom.current().nextInt(1_000_000);
+    final File f = new File(getTestDir(), "a.txt");
+    DataStreamTestUtils.createFile(f, size);
+
+    for(int i = 0; i < 3; i++) {
+      runTestWriteFile(cluster, i, writeAsyncDefaultFileRegion(f, size));
+      runTestWriteFile(cluster, i, transferToWritableByteChannel(f, size));
+    }
   }
 
   void runTestDataStreamOutput(CLUSTER cluster) throws Exception {
@@ -74,13 +87,8 @@ public abstract class DataStreamClusterTests<CLUSTER extends MiniRaftCluster> ex
     assertLogEntry(cluster, request);
   }
 
-  void runTestTransferTo(CLUSTER cluster) throws Exception {
-    final int size = 4_000_000 + ThreadLocalRandom.current().nextInt(1_000_000);
-
-    // create data file
-    final File f = new File(getTestDir(), "a.txt");
-    DataStreamTestUtils.createFile(f, size);
-
+  void runTestWriteFile(CLUSTER cluster, int i,
+      CheckedConsumer<DataStreamOutputImpl, Exception> testCase) throws Exception {
     final RaftClientRequest request;
     final CompletableFuture<RaftClientReply> reply;
     try (RaftClient client = cluster.createClient()) {
@@ -88,11 +96,9 @@ public abstract class DataStreamClusterTests<CLUSTER extends MiniRaftCluster> ex
         request = out.getHeader();
         reply = out.getRaftClientReplyFuture();
 
-        // write using transferTo WritableByteChannel
-        try(FileInputStream in = new FileInputStream(f)) {
-          final long transferred = in.getChannel().transferTo(0, size, out.getWritableByteChannel());
-          Assert.assertEquals(size, transferred);
-        }
+        final Timestamp start = Timestamp.currentTime();
+        testCase.accept(out);
+        LOG.info("{}: {} elapsed {}ms", i, testCase, start.elapsedTimeMs());
       }
     }
 
@@ -100,6 +106,38 @@ public abstract class DataStreamClusterTests<CLUSTER extends MiniRaftCluster> ex
     assertLogEntry(cluster, request);
   }
 
+  static CheckedConsumer<DataStreamOutputImpl, Exception> transferToWritableByteChannel(File f, int size) {
+    return new CheckedConsumer<DataStreamOutputImpl, Exception>() {
+      @Override
+      public void accept(DataStreamOutputImpl out) throws Exception {
+        try (FileInputStream in = new FileInputStream(f)) {
+          final long transferred = in.getChannel().transferTo(0, size, out.getWritableByteChannel());
+          Assert.assertEquals(size, transferred);
+        }
+      }
+
+      @Override
+      public String toString() {
+        return "transferToWritableByteChannel";
+      }
+    };
+  }
+
+  static CheckedConsumer<DataStreamOutputImpl, Exception> writeAsyncDefaultFileRegion(File f, int size) {
+    return new CheckedConsumer<DataStreamOutputImpl, Exception>() {
+      @Override
+      public void accept(DataStreamOutputImpl out) {
+        final DataStreamReply dataStreamReply = out.writeAsync(f).join();
+        DataStreamTestUtils.assertSuccessReply(Type.STREAM_DATA, size, dataStreamReply);
+      }
+
+      @Override
+      public String toString() {
+        return "writeAsyncDefaultFileRegion";
+      }
+    };
+  }
+
   void watchOrSleep(CLUSTER cluster, long index) throws Exception {
     try (RaftClient client = cluster.createClient()) {
       client.async().watch(index, ReplicationLevel.ALL).join();
diff --git a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
index 73d4b24..47d88d4 100644
--- a/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
+++ b/ratis-test/src/test/java/org/apache/ratis/datastream/DataStreamTestUtils.java
@@ -265,21 +265,24 @@ public interface DataStreamTestUtils {
 
     { // check header
       final DataStreamReply reply = out.getHeaderFuture().join();
-      Assert.assertTrue(reply.isSuccess());
-      Assert.assertEquals(0, reply.getBytesWritten());
-      Assert.assertEquals(reply.getType(), Type.STREAM_HEADER);
+      assertSuccessReply(Type.STREAM_HEADER, 0, reply);
     }
 
     // check writeAsync requests
     for (int i = 0; i < futures.size(); i++) {
       final DataStreamReply reply = futures.get(i).join();
-      Assert.assertTrue(reply.isSuccess());
-      Assert.assertEquals(sizes.get(i).longValue(), reply.getBytesWritten());
-      Assert.assertEquals(reply.getType(), i == futures.size() - 1 ? Type.STREAM_DATA_SYNC : Type.STREAM_DATA);
+      final Type expectedType = i == futures.size() - 1 ? Type.STREAM_DATA_SYNC : Type.STREAM_DATA;
+      assertSuccessReply(expectedType, sizes.get(i).longValue(), reply);
     }
     return dataSize;
   }
 
+  static void assertSuccessReply(Type expectedType, long expectedBytesWritten, DataStreamReply reply) {
+    Assert.assertTrue(reply.isSuccess());
+    Assert.assertEquals(expectedBytesWritten, reply.getBytesWritten());
+    Assert.assertEquals(expectedType, reply.getType());
+  }
+
   static CompletableFuture<RaftClientReply> writeAndCloseAndAssertReplies(
       Iterable<RaftServer> servers, RaftPeerId leader, DataStreamOutputImpl out, int bufferSize, int bufferNum) {
     LOG.info("start Stream{}", out.getHeader().getCallId());