You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/09 12:04:52 UTC
[incubator-ratis] branch master updated: RATIS-1223. FileStore
write file in parallel (#340)
This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 582bf58 RATIS-1223. FileStore write file in parallel (#340)
582bf58 is described below
commit 582bf58cf8e37611d49e0147fc5a7166d910426f
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Wed Dec 9 20:04:46 2020 +0800
RATIS-1223. FileStore write file in parallel (#340)
* RATIS-1223. FileStore write file in parallel
* fix code review
---
.../ratis/examples/filestore/cli/Client.java | 15 ++--
.../ratis/examples/filestore/cli/DataStream.java | 50 ++++++++-----
.../ratis/examples/filestore/cli/LoadGen.java | 82 ++++++++++++++--------
3 files changed, 96 insertions(+), 51 deletions(-)
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
index 8b45453..cbf86b1 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
@@ -18,7 +18,6 @@
package org.apache.ratis.examples.filestore.cli;
import com.beust.jcommander.Parameter;
-import org.apache.commons.lang3.RandomStringUtils;
import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.client.RaftClientConfigKeys;
@@ -42,6 +41,8 @@ import java.io.RandomAccessFile;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
/**
@@ -59,6 +60,12 @@ public abstract class Client extends SubCommandBase {
@Parameter(names = {"--numFiles"}, description = "Number of files to be written", required = true)
private int numFiles;
+ private static final int MAX_THREADS_NUM = 100;
+
+ public int getNumThread() {
+ return numFiles < MAX_THREADS_NUM ? numFiles : MAX_THREADS_NUM;
+ }
+
public int getFileSizeInBytes() {
return fileSizeInBytes;
}
@@ -115,10 +122,10 @@ public abstract class Client extends SubCommandBase {
}
protected List<String> generateFiles() throws IOException {
- String entropy = RandomStringUtils.randomAlphanumeric(numFiles);
+ UUID uuid = UUID.randomUUID();
List<String> paths = new ArrayList<>();
for (int i = 0; i < numFiles; i ++) {
- String path = "file-" + entropy + "-" + i;
+ String path = "file-" + uuid + "-" + i;
paths.add(path);
writeFile(path, fileSizeInBytes, bufferSizeInBytes, new Random().nextInt(127) + 1);
}
@@ -148,5 +155,5 @@ public abstract class Client extends SubCommandBase {
}
}
- protected abstract void operation(RaftClient client) throws IOException;
+ protected abstract void operation(RaftClient client) throws IOException, ExecutionException, InterruptedException;
}
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
index a5b4a72..76c4a8a 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
@@ -41,6 +41,9 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.function.BiFunction;
/**
@@ -109,7 +112,7 @@ public class DataStream extends Client {
}
@Override
- protected void operation(RaftClient client) throws IOException {
+ protected void operation(RaftClient client) throws IOException, ExecutionException, InterruptedException {
if (!checkParam()) {
stop(client);
}
@@ -118,9 +121,11 @@ public class DataStream extends Client {
FileStoreClient fileStoreClient = new FileStoreClient(client);
System.out.println("Starting DataStream write now ");
+ final ExecutorService executor = Executors.newFixedThreadPool(getNumThread());
+
long startTime = System.currentTimeMillis();
- long totalWrittenBytes = waitStreamFinish(streamWrite(paths, fileStoreClient));
+ long totalWrittenBytes = waitStreamFinish(streamWrite(paths, fileStoreClient, executor));
long endTime = System.currentTimeMillis();
@@ -132,28 +137,39 @@ public class DataStream extends Client {
stop(client);
}
- private Map<String, List<CompletableFuture<DataStreamReply>>> streamWrite(
- List<String> paths, FileStoreClient fileStoreClient) throws IOException {
- Map<String, List<CompletableFuture<DataStreamReply>>> fileMap = new HashMap<>();
+ private Map<String, CompletableFuture<List<CompletableFuture<DataStreamReply>>>> streamWrite(
+ List<String> paths, FileStoreClient fileStoreClient, ExecutorService executor) {
+ Map<String, CompletableFuture<List<CompletableFuture<DataStreamReply>>>> fileMap = new HashMap<>();
+
for(String path : paths) {
- File file = new File(path);
- final long fileLength = file.length();
- Preconditions.assertTrue(fileLength == getFileSizeInBytes(), "Unexpected file size: expected size is "
- + getFileSizeInBytes() + " but actual size is " + fileLength);
-
- final Type type = Optional.ofNullable(Type.valueOfIgnoreCase(dataStreamType))
- .orElseThrow(IllegalStateException::new);
- final TransferType writer = type.getConstructor().apply(path, this);
- fileMap.put(path, writer.transfer(fileStoreClient));
+ final CompletableFuture<List<CompletableFuture<DataStreamReply>>> future = new CompletableFuture<>();
+ CompletableFuture.supplyAsync(() -> {
+ File file = new File(path);
+ final long fileLength = file.length();
+ Preconditions.assertTrue(fileLength == getFileSizeInBytes(), "Unexpected file size: expected size is "
+ + getFileSizeInBytes() + " but actual size is " + fileLength);
+
+ final Type type = Optional.ofNullable(Type.valueOfIgnoreCase(dataStreamType))
+ .orElseThrow(IllegalStateException::new);
+ final TransferType writer = type.getConstructor().apply(path, this);
+ try {
+ future.complete(writer.transfer(fileStoreClient));
+ } catch (IOException e) {
+ future.completeExceptionally(e);
+ }
+ return future;
+ }, executor);
+ fileMap.put(path, future);
}
return fileMap;
}
- private long waitStreamFinish(Map<String, List<CompletableFuture<DataStreamReply>>> fileMap) {
+ private long waitStreamFinish(Map<String, CompletableFuture<List<CompletableFuture<DataStreamReply>>>> fileMap)
+ throws ExecutionException, InterruptedException {
long totalBytes = 0;
- for (List<CompletableFuture<DataStreamReply>> futures : fileMap.values()) {
+ for (CompletableFuture<List<CompletableFuture<DataStreamReply>>> futures : fileMap.values()) {
long writtenLen = 0;
- for (CompletableFuture<DataStreamReply> future : futures) {
+ for (CompletableFuture<DataStreamReply> future : futures.get()) {
writtenLen += future.join().getBytesWritten();
}
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
index 45ee600..f211f29 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
@@ -21,16 +21,21 @@ import com.beust.jcommander.Parameter;
import com.beust.jcommander.Parameters;
import org.apache.ratis.client.RaftClient;
import org.apache.ratis.examples.filestore.FileStoreClient;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
/**
* Subcommand to generate load in filestore state machine.
@@ -42,14 +47,16 @@ public class LoadGen extends Client {
private int sync = 0;
@Override
- protected void operation(RaftClient client) throws IOException {
+ protected void operation(RaftClient client) throws IOException, ExecutionException, InterruptedException {
List<String> paths = generateFiles();
FileStoreClient fileStoreClient = new FileStoreClient(client);
System.out.println("Starting Async write now ");
+ final ExecutorService executor = Executors.newFixedThreadPool(getNumThread());
+
long startTime = System.currentTimeMillis();
- long totalWrittenBytes = waitWriteFinish(writeByHeapByteBuffer(paths, fileStoreClient));
+ long totalWrittenBytes = waitWriteFinish(writeByHeapByteBuffer(paths, fileStoreClient, executor));
long endTime = System.currentTimeMillis();
@@ -61,44 +68,59 @@ public class LoadGen extends Client {
stop(client);
}
- private Map<String, List<CompletableFuture<Long>>> writeByHeapByteBuffer(
- List<String> paths, FileStoreClient fileStoreClient) throws IOException {
- Map<String, List<CompletableFuture<Long>>> fileMap = new HashMap<>();
+ long write(FileChannel in, long offset, FileStoreClient fileStoreClient, String path,
+ List<CompletableFuture<Long>> futures) throws IOException {
+ final int bufferSize = getBufferSizeInBytes();
+ final ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(bufferSize);
+ final int bytesRead = buf.writeBytes(in, bufferSize);
+
+ if (bytesRead < 0) {
+ throw new IllegalStateException("Failed to read " + bufferSize + " byte(s) from " + this
+ + ". The channel has reached end-of-stream at " + offset);
+ } else if (bytesRead > 0) {
+ final CompletableFuture<Long> f = fileStoreClient.writeAsync(
+ path, offset, offset + bytesRead == getFileSizeInBytes(), buf.nioBuffer(),
+ sync == 1);
+ f.thenRun(buf::release);
+ futures.add(f);
+ }
+ return bytesRead;
+ }
- for(String path : paths) {
- List<CompletableFuture<Long>> futures = new ArrayList<>();
- File file = new File(path);
- FileInputStream fis = new FileInputStream(file);
+ private Map<String, CompletableFuture<List<CompletableFuture<Long>>>> writeByHeapByteBuffer(
+ List<String> paths, FileStoreClient fileStoreClient, ExecutorService executor) {
+ Map<String, CompletableFuture<List<CompletableFuture<Long>>>> fileMap = new HashMap<>();
- int bytesToRead = getBufferSizeInBytes();
- if (getFileSizeInBytes() > 0L && getFileSizeInBytes() < (long)getBufferSizeInBytes()) {
- bytesToRead = getFileSizeInBytes();
- }
-
- byte[] buffer = new byte[bytesToRead];
- long offset = 0L;
- while(fis.read(buffer, 0, bytesToRead) > 0) {
- ByteBuffer b = ByteBuffer.wrap(buffer);
- futures.add(fileStoreClient.writeAsync(path, offset, offset + bytesToRead == getFileSizeInBytes(), b,
- sync == 1));
- offset += bytesToRead;
- bytesToRead = (int)Math.min(getFileSizeInBytes() - offset, getBufferSizeInBytes());
- if (bytesToRead > 0) {
- buffer = new byte[bytesToRead];
+ for(String path : paths) {
+ final CompletableFuture<List<CompletableFuture<Long>>> future = new CompletableFuture<>();
+ CompletableFuture.supplyAsync(() -> {
+ List<CompletableFuture<Long>> futures = new ArrayList<>();
+ File file = new File(path);
+ try (FileInputStream fis = new FileInputStream(file)) {
+ final FileChannel in = fis.getChannel();
+ for (long offset = 0L; offset < getFileSizeInBytes(); ) {
+ offset += write(in, offset, fileStoreClient, path, futures);
+ }
+ } catch (Throwable e) {
+ future.completeExceptionally(e);
}
- }
- fileMap.put(path, futures);
+ future.complete(futures);
+ return future;
+ }, executor);
+
+ fileMap.put(path, future);
}
return fileMap;
}
- private long waitWriteFinish(Map<String, List<CompletableFuture<Long>>> fileMap) {
+ private long waitWriteFinish(Map<String, CompletableFuture<List<CompletableFuture<Long>>>> fileMap)
+ throws ExecutionException, InterruptedException {
long totalBytes = 0;
- for (List<CompletableFuture<Long>> futures : fileMap.values()) {
+ for (CompletableFuture<List<CompletableFuture<Long>>> futures : fileMap.values()) {
long writtenLen = 0;
- for (CompletableFuture<Long> future : futures) {
+ for (CompletableFuture<Long> future : futures.get()) {
writtenLen += future.join();
}