You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ratis.apache.org by GitBox <gi...@apache.org> on 2020/12/08 23:44:15 UTC

[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #338: RATIS-1220. FileStore stream to send small packets for MappedByteBuffer and NettyFileRegion.

runzhiwang commented on a change in pull request #338:
URL: https://github.com/apache/incubator-ratis/pull/338#discussion_r538894193



##########
File path: ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
##########
@@ -134,46 +166,124 @@ private long waitStreamFinish(Map<String, List<CompletableFuture<DataStreamReply
     return totalBytes;
   }
 
-  private List<CompletableFuture<DataStreamReply>> writeByDirectByteBuffer(DataStreamOutput dataStreamOutput,
-      FileChannel fileChannel) throws IOException {
-    final int fileSize = getFileSizeInBytes();
-    final int bufferSize = getBufferSizeInBytes();
-    if (fileSize <= 0) {
-      return Collections.emptyList();
+  abstract static class TransferType {
+    private final String path;
+    private final File file;
+    private final long fileSize;
+    private final int bufferSize;
+    private final long syncSize;
+    private long syncPosition = 0;
+
+    TransferType(String path, DataStream cli) {
+      this.path = path;
+      this.file = new File(path);
+      this.fileSize = cli.getFileSizeInBytes();
+      this.bufferSize = cli.getBufferSizeInBytes();
+      this.syncSize = cli.getSyncSize();
+
+      final long actualSize = file.length();
+      Preconditions.assertTrue(actualSize == fileSize, () -> "Unexpected file size: expected size is "
+          + fileSize + " but actual size is " + actualSize + ", path=" + path);
+    }
+
+    File getFile() {
+      return file;
+    }
+
+    int getBufferSize() {
+      return bufferSize;
+    }
+
+    long getPacketSize(long offset) {
+      return Math.min(bufferSize, fileSize - offset);
     }
-    List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
-    final ByteBufAllocator alloc = PooledByteBufAllocator.DEFAULT;
 
-    for(long offset = 0L; offset < fileSize;) {
-      final ByteBuf buf = alloc.directBuffer(bufferSize);
-      final int bytesRead = buf.writeBytes(fileChannel, bufferSize);
+    boolean isSync(long position) {
+      if (syncSize > 0) {
+        if (position >= fileSize || syncPosition - position >= syncSize) {

Review comment:
       position  - syncPosition >= syncSize ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org