You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/09 12:04:52 UTC

[incubator-ratis] branch master updated: RATIS-1223. FileStore write file in parallel (#340)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 582bf58  RATIS-1223. FileStore write file in parallel (#340)
582bf58 is described below

commit 582bf58cf8e37611d49e0147fc5a7166d910426f
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Wed Dec 9 20:04:46 2020 +0800

    RATIS-1223. FileStore write file in parallel (#340)
    
    * RATIS-1223. FileStore write file in parallel
    
    * fix code review
---
 .../ratis/examples/filestore/cli/Client.java       | 15 ++--
 .../ratis/examples/filestore/cli/DataStream.java   | 50 ++++++++-----
 .../ratis/examples/filestore/cli/LoadGen.java      | 82 ++++++++++++++--------
 3 files changed, 96 insertions(+), 51 deletions(-)

diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
index 8b45453..cbf86b1 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
@@ -18,7 +18,6 @@
 package org.apache.ratis.examples.filestore.cli;
 
 import com.beust.jcommander.Parameter;
-import org.apache.commons.lang3.RandomStringUtils;
 import org.apache.ratis.RaftConfigKeys;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientConfigKeys;
@@ -42,6 +41,8 @@ import java.io.RandomAccessFile;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -59,6 +60,12 @@ public abstract class Client extends SubCommandBase {
   @Parameter(names = {"--numFiles"}, description = "Number of files to be written", required = true)
   private int numFiles;
 
+  private static final int MAX_THREADS_NUM = 100;
+
+  public int getNumThread() {
+    return numFiles < MAX_THREADS_NUM ? numFiles : MAX_THREADS_NUM;
+  }
+
   public int getFileSizeInBytes() {
     return fileSizeInBytes;
   }
@@ -115,10 +122,10 @@ public abstract class Client extends SubCommandBase {
   }
 
   protected List<String> generateFiles() throws IOException {
-    String entropy = RandomStringUtils.randomAlphanumeric(numFiles);
+    UUID uuid = UUID.randomUUID();
     List<String> paths = new ArrayList<>();
     for (int i = 0; i < numFiles; i ++) {
-      String path = "file-" + entropy + "-" + i;
+      String path = "file-" + uuid + "-" + i;
       paths.add(path);
       writeFile(path, fileSizeInBytes, bufferSizeInBytes, new Random().nextInt(127) + 1);
     }
@@ -148,5 +155,5 @@ public abstract class Client extends SubCommandBase {
     }
   }
 
-  protected abstract void operation(RaftClient client) throws IOException;
+  protected abstract void operation(RaftClient client) throws IOException, ExecutionException, InterruptedException;
 }
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
index a5b4a72..76c4a8a 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
@@ -41,6 +41,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.function.BiFunction;
 
 /**
@@ -109,7 +112,7 @@ public class DataStream extends Client {
   }
 
   @Override
-  protected void operation(RaftClient client) throws IOException {
+  protected void operation(RaftClient client) throws IOException, ExecutionException, InterruptedException {
     if (!checkParam()) {
       stop(client);
     }
@@ -118,9 +121,11 @@ public class DataStream extends Client {
     FileStoreClient fileStoreClient = new FileStoreClient(client);
     System.out.println("Starting DataStream write now ");
 
+    final ExecutorService executor = Executors.newFixedThreadPool(getNumThread());
+
     long startTime = System.currentTimeMillis();
 
-    long totalWrittenBytes = waitStreamFinish(streamWrite(paths, fileStoreClient));
+    long totalWrittenBytes = waitStreamFinish(streamWrite(paths, fileStoreClient, executor));
 
     long endTime = System.currentTimeMillis();
 
@@ -132,28 +137,39 @@ public class DataStream extends Client {
     stop(client);
   }
 
-  private Map<String, List<CompletableFuture<DataStreamReply>>> streamWrite(
-      List<String> paths, FileStoreClient fileStoreClient) throws IOException {
-    Map<String, List<CompletableFuture<DataStreamReply>>> fileMap = new HashMap<>();
+  private Map<String, CompletableFuture<List<CompletableFuture<DataStreamReply>>>> streamWrite(
+      List<String> paths, FileStoreClient fileStoreClient, ExecutorService executor) {
+    Map<String, CompletableFuture<List<CompletableFuture<DataStreamReply>>>> fileMap = new HashMap<>();
+
     for(String path : paths) {
-      File file = new File(path);
-      final long fileLength = file.length();
-      Preconditions.assertTrue(fileLength == getFileSizeInBytes(), "Unexpected file size: expected size is "
-          + getFileSizeInBytes() + " but actual size is " + fileLength);
-
-      final Type type = Optional.ofNullable(Type.valueOfIgnoreCase(dataStreamType))
-          .orElseThrow(IllegalStateException::new);
-      final TransferType writer = type.getConstructor().apply(path, this);
-      fileMap.put(path, writer.transfer(fileStoreClient));
+      final CompletableFuture<List<CompletableFuture<DataStreamReply>>> future = new CompletableFuture<>();
+      CompletableFuture.supplyAsync(() -> {
+        File file = new File(path);
+        final long fileLength = file.length();
+        Preconditions.assertTrue(fileLength == getFileSizeInBytes(), "Unexpected file size: expected size is "
+            + getFileSizeInBytes() + " but actual size is " + fileLength);
+
+        final Type type = Optional.ofNullable(Type.valueOfIgnoreCase(dataStreamType))
+            .orElseThrow(IllegalStateException::new);
+        final TransferType writer = type.getConstructor().apply(path, this);
+        try {
+          future.complete(writer.transfer(fileStoreClient));
+        } catch (IOException e) {
+          future.completeExceptionally(e);
+        }
+        return future;
+      }, executor);
+      fileMap.put(path, future);
     }
     return fileMap;
   }
 
-  private long waitStreamFinish(Map<String, List<CompletableFuture<DataStreamReply>>> fileMap) {
+  private long waitStreamFinish(Map<String, CompletableFuture<List<CompletableFuture<DataStreamReply>>>> fileMap)
+      throws ExecutionException, InterruptedException {
     long totalBytes = 0;
-    for (List<CompletableFuture<DataStreamReply>> futures : fileMap.values()) {
+    for (CompletableFuture<List<CompletableFuture<DataStreamReply>>> futures : fileMap.values()) {
       long writtenLen = 0;
-      for (CompletableFuture<DataStreamReply> future : futures) {
+      for (CompletableFuture<DataStreamReply> future : futures.get()) {
         writtenLen += future.join().getBytesWritten();
       }
 
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
index 45ee600..f211f29 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
@@ -21,16 +21,21 @@ import com.beust.jcommander.Parameter;
 import com.beust.jcommander.Parameters;
 import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.examples.filestore.FileStoreClient;
+import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
+import org.apache.ratis.thirdparty.io.netty.buffer.PooledByteBufAllocator;
 
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 /**
  * Subcommand to generate load in filestore state machine.
@@ -42,14 +47,16 @@ public class LoadGen extends Client {
   private int sync = 0;
 
   @Override
-  protected void operation(RaftClient client) throws IOException {
+  protected void operation(RaftClient client) throws IOException, ExecutionException, InterruptedException {
     List<String> paths = generateFiles();
     FileStoreClient fileStoreClient = new FileStoreClient(client);
     System.out.println("Starting Async write now ");
 
+    final ExecutorService executor = Executors.newFixedThreadPool(getNumThread());
+
     long startTime = System.currentTimeMillis();
 
-    long totalWrittenBytes = waitWriteFinish(writeByHeapByteBuffer(paths, fileStoreClient));
+    long totalWrittenBytes = waitWriteFinish(writeByHeapByteBuffer(paths, fileStoreClient, executor));
 
     long endTime = System.currentTimeMillis();
 
@@ -61,44 +68,59 @@ public class LoadGen extends Client {
     stop(client);
   }
 
-  private Map<String, List<CompletableFuture<Long>>> writeByHeapByteBuffer(
-      List<String> paths, FileStoreClient fileStoreClient) throws IOException {
-    Map<String, List<CompletableFuture<Long>>> fileMap = new HashMap<>();
+  long write(FileChannel in, long offset, FileStoreClient fileStoreClient, String path,
+      List<CompletableFuture<Long>> futures) throws IOException {
+    final int bufferSize = getBufferSizeInBytes();
+    final ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(bufferSize);
+    final int bytesRead = buf.writeBytes(in, bufferSize);
+
+    if (bytesRead < 0) {
+      throw new IllegalStateException("Failed to read " + bufferSize + " byte(s) from " + this
+          + ". The channel has reached end-of-stream at " + offset);
+    } else if (bytesRead > 0) {
+      final CompletableFuture<Long> f = fileStoreClient.writeAsync(
+          path, offset, offset + bytesRead == getFileSizeInBytes(), buf.nioBuffer(),
+          sync == 1);
+      f.thenRun(buf::release);
+      futures.add(f);
+    }
+    return bytesRead;
+  }
 
-    for(String path : paths) {
-      List<CompletableFuture<Long>> futures = new ArrayList<>();
-      File file = new File(path);
-      FileInputStream fis = new FileInputStream(file);
+  private Map<String, CompletableFuture<List<CompletableFuture<Long>>>> writeByHeapByteBuffer(
+      List<String> paths, FileStoreClient fileStoreClient, ExecutorService executor) {
+    Map<String, CompletableFuture<List<CompletableFuture<Long>>>> fileMap = new HashMap<>();
 
-      int bytesToRead = getBufferSizeInBytes();
-      if (getFileSizeInBytes() > 0L && getFileSizeInBytes() < (long)getBufferSizeInBytes()) {
-        bytesToRead = getFileSizeInBytes();
-      }
-
-      byte[] buffer = new byte[bytesToRead];
-      long offset = 0L;
-      while(fis.read(buffer, 0, bytesToRead) > 0) {
-        ByteBuffer b = ByteBuffer.wrap(buffer);
-        futures.add(fileStoreClient.writeAsync(path, offset, offset + bytesToRead == getFileSizeInBytes(), b,
-            sync == 1));
-        offset += bytesToRead;
-        bytesToRead = (int)Math.min(getFileSizeInBytes() - offset, getBufferSizeInBytes());
-        if (bytesToRead > 0) {
-          buffer = new byte[bytesToRead];
+    for(String path : paths) {
+      final CompletableFuture<List<CompletableFuture<Long>>> future = new CompletableFuture<>();
+      CompletableFuture.supplyAsync(() -> {
+        List<CompletableFuture<Long>> futures = new ArrayList<>();
+        File file = new File(path);
+        try (FileInputStream fis = new FileInputStream(file)) {
+          final FileChannel in = fis.getChannel();
+          for (long offset = 0L; offset < getFileSizeInBytes(); ) {
+            offset += write(in, offset, fileStoreClient, path, futures);
+          }
+        } catch (Throwable e) {
+          future.completeExceptionally(e);
         }
-      }
 
-      fileMap.put(path, futures);
+        future.complete(futures);
+        return future;
+      }, executor);
+
+      fileMap.put(path, future);
     }
 
     return fileMap;
   }
 
-  private long waitWriteFinish(Map<String, List<CompletableFuture<Long>>> fileMap) {
+  private long waitWriteFinish(Map<String, CompletableFuture<List<CompletableFuture<Long>>>> fileMap)
+      throws ExecutionException, InterruptedException {
     long totalBytes = 0;
-    for (List<CompletableFuture<Long>> futures : fileMap.values()) {
+    for (CompletableFuture<List<CompletableFuture<Long>>> futures : fileMap.values()) {
       long writtenLen = 0;
-      for (CompletableFuture<Long> future : futures) {
+      for (CompletableFuture<Long> future : futures.get()) {
         writtenLen += future.join();
       }