You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/12/06 11:58:02 UTC
[incubator-ratis] 01/01: RATIS-1209. Compare the performance
between DataStreamApi and AsyncApi.
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch RATIS-1209
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
commit 20998ee4c36685bc5f32fee3c17ac887d8352448
Author: Tsz-Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Sun Dec 6 19:55:41 2020 +0800
RATIS-1209. Compare the performance between DataStreamApi and AsyncApi.
---
.../ratis/examples/filestore/cli/DataStream.java | 47 +++++++++++++++-------
1 file changed, 32 insertions(+), 15 deletions(-)
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
index 5ca07d2..7857cd4 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
@@ -23,6 +23,10 @@ import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.api.DataStreamOutput;
import org.apache.ratis.examples.filestore.FileStoreClient;
import org.apache.ratis.protocol.DataStreamReply;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBufAllocator;
+import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.ratis.util.Preconditions;
import java.io.File;
import java.io.FileInputStream;
@@ -31,6 +35,7 @@ import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -71,6 +76,10 @@ public class DataStream extends Client {
Map<String, List<CompletableFuture<DataStreamReply>>> fileMap = new HashMap<>();
for(String path : paths) {
File file = new File(path);
+ final long fileLength = file.length();
+ Preconditions.assertTrue(fileLength == getFileSizeInBytes(),
+ "Unexpected file size: expected size is " + getFileSizeInBytes()
+ + " but actual size is " + fileLength);
FileInputStream fis = new FileInputStream(file);
final DataStreamOutput dataStreamOutput = fileStoreClient.getStreamOutput(path, (int) file.length());
@@ -106,23 +115,31 @@ public class DataStream extends Client {
private List<CompletableFuture<DataStreamReply>> writeByDirectByteBuffer(DataStreamOutput dataStreamOutput,
FileChannel fileChannel) throws IOException {
- List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
-
- int bytesToRead = getBufferSizeInBytes();
- if (getFileSizeInBytes() > 0L && getFileSizeInBytes() < getBufferSizeInBytes()) {
- bytesToRead = getFileSizeInBytes();
+ final int fileSize = getFileSizeInBytes();
+ final int bufferSize = getBufferSizeInBytes();
+ if (fileSize <= 0) {
+ return Collections.emptyList();
}
- ByteBuffer byteBuffer = ByteBuffer.allocateDirect(bytesToRead);
- long offset = 0L;
-
- while (fileChannel.read(byteBuffer) > 0) {
- byteBuffer.flip();
- futures.add(dataStreamOutput.writeAsync(byteBuffer, offset + bytesToRead == getFileSizeInBytes()));
- offset += bytesToRead;
- bytesToRead = (int) Math.min(getFileSizeInBytes() - offset, getBufferSizeInBytes());
- if (bytesToRead > 0) {
- byteBuffer = ByteBuffer.allocateDirect(bytesToRead);
+ List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
+ final ByteBufAllocator alloc = PooledByteBufAllocator.DEFAULT;
+
+ for(long offset = 0L; offset < fileSize;) {
+ final ByteBuf buf = alloc.directBuffer(bufferSize);
+ final ByteBuffer byteBuffer = buf.nioBuffers()[0];
+ Preconditions.assertTrue(byteBuffer.remaining() > 0);
+
+ final int bytesRead = fileChannel.read(byteBuffer);
+ if (bytesRead < 0) {
+ throw new IllegalStateException("Failed to read " + fileSize
+ + " byte(s). The channel has reached end-of-stream at " + offset);
+ } else if (bytesRead > 0) {
+ offset += bytesRead;
+
+ byteBuffer.flip();
+ final CompletableFuture<DataStreamReply> f = dataStreamOutput.writeAsync(byteBuffer, offset == fileSize);
+ f.thenRun(buf::release);
+ futures.add(f);
}
}