You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/12/06 11:56:39 UTC

[incubator-ratis] 02/02: RATIS-1209. Compare the performance between DataStreamApi and AsyncApi.

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch RATIS-1209
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git

commit c140982b3a402d41df7d35754e3877511b3d9b3a
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Sun Dec 6 19:55:41 2020 +0800

    RATIS-1209. Compare the performance between DataStreamApi and AsyncApi.
---
 .../ratis/examples/filestore/cli/DataStream.java   | 47 +++++++++++++++-------
 1 file changed, 32 insertions(+), 15 deletions(-)

diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
index 5ca07d2..7857cd4 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
@@ -23,6 +23,10 @@ import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.api.DataStreamOutput;
 import org.apache.ratis.examples.filestore.FileStoreClient;
 import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBufAllocator;
+import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.ratis.util.Preconditions;
 
 import java.io.File;
 import java.io.FileInputStream;
@@ -31,6 +35,7 @@ import java.nio.ByteBuffer;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -71,6 +76,10 @@ public class DataStream extends Client {
     Map<String, List<CompletableFuture<DataStreamReply>>> fileMap = new HashMap<>();
     for(String path : paths) {
       File file = new File(path);
+      final long fileLength = file.length();
+      Preconditions.assertTrue(fileLength == getFileSizeInBytes(),
+          "Unexpected file size: expected size is " + getFileSizeInBytes()
+              + " but actual size is " + fileLength);
       FileInputStream fis = new FileInputStream(file);
       final DataStreamOutput dataStreamOutput = fileStoreClient.getStreamOutput(path, (int) file.length());
 
@@ -106,23 +115,31 @@ public class DataStream extends Client {
 
   private List<CompletableFuture<DataStreamReply>> writeByDirectByteBuffer(DataStreamOutput dataStreamOutput,
       FileChannel fileChannel) throws IOException {
-    List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
-
-    int bytesToRead = getBufferSizeInBytes();
-    if (getFileSizeInBytes() > 0L && getFileSizeInBytes() < getBufferSizeInBytes()) {
-      bytesToRead = getFileSizeInBytes();
+    final int fileSize = getFileSizeInBytes();
+    final int bufferSize = getBufferSizeInBytes();
+    if (fileSize <= 0) {
+      return Collections.emptyList();
     }
 
-    ByteBuffer byteBuffer = ByteBuffer.allocateDirect(bytesToRead);
-    long offset = 0L;
-
-    while (fileChannel.read(byteBuffer) > 0) {
-      byteBuffer.flip();
-      futures.add(dataStreamOutput.writeAsync(byteBuffer, offset + bytesToRead == getFileSizeInBytes()));
-      offset += bytesToRead;
-      bytesToRead = (int) Math.min(getFileSizeInBytes() - offset, getBufferSizeInBytes());
-      if (bytesToRead > 0) {
-        byteBuffer = ByteBuffer.allocateDirect(bytesToRead);
+    List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+    final ByteBufAllocator alloc = PooledByteBufAllocator.DEFAULT;
+
+    for(long offset = 0L; offset < fileSize;) {
+      final ByteBuf buf = alloc.directBuffer(bufferSize);
+      final ByteBuffer byteBuffer = buf.nioBuffers()[0];
+      Preconditions.assertTrue(byteBuffer.remaining() > 0);
+
+      final int bytesRead = fileChannel.read(byteBuffer);
+      if (bytesRead < 0) {
+        throw new IllegalStateException("Failed to read " + fileSize
+            + " byte(s). The channel has reached end-of-stream at " + offset);
+      } else if (bytesRead > 0) {
+        offset += bytesRead;
+
+        byteBuffer.flip();
+        final CompletableFuture<DataStreamReply> f = dataStreamOutput.writeAsync(byteBuffer, offset == fileSize);
+        f.thenRun(buf::release);
+        futures.add(f);
       }
     }