You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/11 07:51:04 UTC

[incubator-ratis] branch master updated: RATIS-1229. FileStore client generate files in parallel (#347)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fff9c5  RATIS-1229. FileStore client generate files in parallel (#347)
5fff9c5 is described below

commit 5fff9c597f44d2731605e23a758f00030700c5f8
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Fri Dec 11 15:50:57 2020 +0800

    RATIS-1229. FileStore client generate files in parallel (#347)
    
    * RATIS-1229. FileStore client generate files in parallel
    
    * drop system cache
    
    * fix code review
---
 .../apache/ratis/examples/filestore/FileStore.java | 29 ++++++--
 .../ratis/examples/filestore/FileStoreCommon.java  | 12 ++-
 .../examples/filestore/FileStoreStateMachine.java  |  9 +--
 .../ratis/examples/filestore/cli/Client.java       | 87 ++++++++++++++++------
 .../ratis/examples/filestore/cli/DataStream.java   |  8 +-
 .../ratis/examples/filestore/cli/LoadGen.java      |  8 +-
 .../ratis/examples/filestore/cli/Server.java       | 26 ++++---
 7 files changed, 127 insertions(+), 52 deletions(-)

diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
index c0af78e..1ecff52 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStore.java
@@ -17,6 +17,8 @@
  */
 package org.apache.ratis.examples.filestore;
 
+import org.apache.ratis.conf.ConfUtils;
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.examples.filestore.FileInfo.ReadOnly;
 import org.apache.ratis.examples.filestore.FileInfo.UnderConstruction;
 import org.apache.ratis.proto.ExamplesProtos.ReadReplyProto;
@@ -101,14 +103,31 @@ public class FileStore implements Closeable {
   private final List<Supplier<Path>> rootSuppliers;
   private final FileMap files;
 
-  private final ExecutorService writer = Executors.newFixedThreadPool(10);
-  private final ExecutorService committer = Executors.newFixedThreadPool(3);
-  private final ExecutorService reader = Executors.newFixedThreadPool(10);
-  private final ExecutorService deleter = Executors.newFixedThreadPool(3);
+  private final ExecutorService writer;
+  private final ExecutorService committer;
+  private final ExecutorService reader;
+  private final ExecutorService deleter;
 
-  public FileStore(Supplier<RaftPeerId> idSupplier, List<File> dirs) {
+  public FileStore(Supplier<RaftPeerId> idSupplier, RaftProperties properties) {
     this.idSupplier = idSupplier;
     this.rootSuppliers = new ArrayList<>();
+
+    int writeThreadNum = ConfUtils.getInt(properties::getInt, FileStoreCommon.STATEMACHINE_WRITE_THREAD_NUM,
+        1, LOG::info);
+    int readThreadNum = ConfUtils.getInt(properties::getInt, FileStoreCommon.STATEMACHINE_READ_THREAD_NUM,
+        1, LOG::info);
+    int commitThreadNum = ConfUtils.getInt(properties::getInt, FileStoreCommon.STATEMACHINE_COMMIT_THREAD_NUM,
+        1, LOG::info);
+    int deleteThreadNum = ConfUtils.getInt(properties::getInt, FileStoreCommon.STATEMACHINE_DELETE_THREAD_NUM,
+        1, LOG::info);
+    writer = Executors.newFixedThreadPool(writeThreadNum);
+    reader = Executors.newFixedThreadPool(readThreadNum);
+    committer = Executors.newFixedThreadPool(commitThreadNum);
+    deleter = Executors.newFixedThreadPool(deleteThreadNum);
+
+    final List<File> dirs = ConfUtils.getFiles(properties::getFiles, FileStoreCommon.STATEMACHINE_DIR_KEY,
+        null, LOG::info);
+    Objects.requireNonNull(dirs, FileStoreCommon.STATEMACHINE_DIR_KEY + " is not set.");
     for (File dir : dirs) {
       this.rootSuppliers.add(
           JavaUtils.memoize(() -> dir.toPath().resolve(getId().toString()).normalize().toAbsolutePath()));
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreCommon.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreCommon.java
index cc769fc..152bc5a 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreCommon.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreCommon.java
@@ -25,7 +25,17 @@ import java.nio.file.Path;
 import java.util.concurrent.CompletableFuture;
 
 public interface FileStoreCommon {
-  String STATEMACHINE_DIR_KEY = "example.filestore.statemachine.dir";
+  String STATEMACHINE_PREFIX = "example.filestore.statemachine";
+
+  String STATEMACHINE_DIR_KEY = STATEMACHINE_PREFIX + ".dir";
+
+  String STATEMACHINE_WRITE_THREAD_NUM = STATEMACHINE_PREFIX + ".write.thread.num";
+
+  String STATEMACHINE_READ_THREAD_NUM = STATEMACHINE_PREFIX + ".read.thread.num";
+
+  String STATEMACHINE_COMMIT_THREAD_NUM = STATEMACHINE_PREFIX + ".commit.thread.num";
+
+  String STATEMACHINE_DELETE_THREAD_NUM = STATEMACHINE_PREFIX + ".delete.thread.num";
 
   SizeInBytes MAX_CHUNK_SIZE = SizeInBytes.valueOf(64, TraditionalBinaryPrefix.MEGA);
 
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
index 65833e0..64e1efa 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/FileStoreStateMachine.java
@@ -17,7 +17,6 @@
  */
 package org.apache.ratis.examples.filestore;
 
-import org.apache.ratis.conf.ConfUtils;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.ExamplesProtos;
 import org.apache.ratis.proto.ExamplesProtos.DeleteReplyProto;
@@ -42,11 +41,8 @@ import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
 import org.apache.ratis.thirdparty.com.google.protobuf.InvalidProtocolBufferException;
 import org.apache.ratis.util.FileUtils;
 
-import java.io.File;
 import java.io.IOException;
 import java.nio.file.Path;
-import java.util.List;
-import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
 
 public class FileStoreStateMachine extends BaseStateMachine {
@@ -55,10 +51,7 @@ public class FileStoreStateMachine extends BaseStateMachine {
   private final FileStore files;
 
   public FileStoreStateMachine(RaftProperties properties) {
-    final List<File> dirs = ConfUtils.getFiles(properties::getFiles, FileStoreCommon.STATEMACHINE_DIR_KEY,
-        null, LOG::info);
-    Objects.requireNonNull(dirs, FileStoreCommon.STATEMACHINE_DIR_KEY + " is not set.");
-    this.files = new FileStore(this::getId, dirs);
+    this.files = new FileStore(this::getId, properties);
   }
 
   @Override
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
index cbf86b1..48c3f1e 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
@@ -33,16 +33,22 @@ import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.rpc.SupportedRpcType;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.FileUtils;
 import org.apache.ratis.util.SizeInBytes;
 import org.apache.ratis.util.TimeDuration;
 
+import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
 
 /**
@@ -51,7 +57,7 @@ import java.util.concurrent.TimeUnit;
 public abstract class Client extends SubCommandBase {
 
   @Parameter(names = {"--size"}, description = "Size of each file in bytes", required = true)
-  private int fileSizeInBytes;
+  private long fileSizeInBytes;
 
   @Parameter(names = {"--bufferSize"}, description = "Size of buffer in bytes, should less than 4MB, " +
       "i.e BUFFER_BYTE_LIMIT_DEFAULT", required = true)
@@ -60,13 +66,17 @@ public abstract class Client extends SubCommandBase {
   @Parameter(names = {"--numFiles"}, description = "Number of files to be written", required = true)
   private int numFiles;
 
+  @Parameter(names = {"--storage", "-s"}, description = "Storage dir, eg. --storage dir1 --storage dir2",
+      required = true)
+  private List<File> storageDir = new ArrayList<>();
+
   private static final int MAX_THREADS_NUM = 100;
 
   public int getNumThread() {
     return numFiles < MAX_THREADS_NUM ? numFiles : MAX_THREADS_NUM;
   }
 
-  public int getFileSizeInBytes() {
+  public long getFileSizeInBytes() {
     return fileSizeInBytes;
   }
 
@@ -112,6 +122,10 @@ public abstract class Client extends SubCommandBase {
     builder.setPrimaryDataStreamServer(getPrimary());
     RaftClient client = builder.build();
 
+    for (File dir : storageDir) {
+      FileUtils.createDirectories(dir);
+    }
+
     operation(client);
   }
 
@@ -121,38 +135,69 @@ public abstract class Client extends SubCommandBase {
     System.exit(0);
   }
 
-  protected List<String> generateFiles() throws IOException {
+  public String getPath(String fileName) {
+    int hash = fileName.hashCode() % storageDir.size();
+    return new File(storageDir.get(Math.abs(hash)), fileName).getAbsolutePath();
+  }
+
+  protected void dropCache() throws InterruptedException, IOException {
+    String[] cmds = {"/bin/sh","-c","echo 3 > /proc/sys/vm/drop_caches"};
+    try {
+      Process pro = Runtime.getRuntime().exec(cmds);
+      pro.waitFor();
+    } catch (Throwable t) {
+      System.err.println("Failed to run command:" + Arrays.toString(cmds) + ":" + t.getMessage());
+    }
+  }
+
+  private CompletableFuture<Long> writeFileAsync(String path, ExecutorService executor) {
+    final CompletableFuture<Long> future = new CompletableFuture<>();
+    CompletableFuture.supplyAsync(() -> {
+      try {
+        future.complete(
+            writeFile(path, fileSizeInBytes, bufferSizeInBytes, new Random().nextInt(127) + 1));
+      } catch (IOException e) {
+        future.completeExceptionally(e);
+      }
+      return future;
+    }, executor);
+    return future;
+  }
+
+  protected List<String> generateFiles(ExecutorService executor) {
     UUID uuid = UUID.randomUUID();
     List<String> paths = new ArrayList<>();
+    List<CompletableFuture<Long>> futures = new ArrayList<>();
     for (int i = 0; i < numFiles; i ++) {
-      String path = "file-" + uuid + "-" + i;
+      String path = getPath("file-" + uuid + "-" + i);
       paths.add(path);
-      writeFile(path, fileSizeInBytes, bufferSizeInBytes, new Random().nextInt(127) + 1);
+      futures.add(writeFileAsync(path, executor));
+    }
+
+    for (int i = 0; i < futures.size(); i ++) {
+      long size = futures.get(i).join();
+      if (size != fileSizeInBytes) {
+        System.err.println("Error: path:" + paths.get(i) + " write:" + size +
+            " mismatch expected size:" + fileSizeInBytes);
+      }
     }
 
     return paths;
   }
 
-  protected void writeFile(String path, int fileSize, int bufferSize, int random) throws IOException {
-    RandomAccessFile raf = null;
-    try {
-      raf = new RandomAccessFile(path, "rw");
-      int offset = 0;
+  protected long writeFile(String path, long fileSize, long bufferSize, int random) throws IOException {
+    final byte[] buffer = new byte[Math.toIntExact(bufferSize)];
+    long offset = 0;
+    try(RandomAccessFile raf = new RandomAccessFile(path, "rw")) {
       while (offset < fileSize) {
-        final int remaining = fileSize - offset;
-        final int chunkSize = Math.min(remaining, bufferSize);
-        byte[] buffer = new byte[chunkSize];
-        for (int i = 0; i < chunkSize; i ++) {
-          buffer[i]= (byte) (i % random);
-        }
-        raf.write(buffer);
+        final long remaining = fileSize - offset;
+        final long chunkSize = Math.min(remaining, bufferSize);
+        ThreadLocalRandom.current().nextBytes(buffer);
+        raf.write(buffer, 0, Math.toIntExact(chunkSize));
         offset += chunkSize;
       }
-    } finally {
-      if (raf != null) {
-        raf.close();
-      }
     }
+    return offset;
   }
 
   protected abstract void operation(RaftClient client) throws IOException, ExecutionException, InterruptedException;
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
index 76c4a8a..9afc4bd 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/DataStream.java
@@ -117,12 +117,12 @@ public class DataStream extends Client {
       stop(client);
     }
 
-    List<String> paths = generateFiles();
+    final ExecutorService executor = Executors.newFixedThreadPool(getNumThread());
+    List<String> paths = generateFiles(executor);
+    dropCache();
     FileStoreClient fileStoreClient = new FileStoreClient(client);
     System.out.println("Starting DataStream write now ");
 
-    final ExecutorService executor = Executors.newFixedThreadPool(getNumThread());
-
     long startTime = System.currentTimeMillis();
 
     long totalWrittenBytes = waitStreamFinish(streamWrite(paths, fileStoreClient, executor));
@@ -230,7 +230,7 @@ public class DataStream extends Client {
       }
 
       final List<CompletableFuture<DataStreamReply>> futures = new ArrayList<>();
-      final DataStreamOutput out = client.getStreamOutput(path, fileSize);
+      final DataStreamOutput out = client.getStreamOutput(file.getName(), fileSize);
       try (FileInputStream fis = new FileInputStream(file)) {
         final FileChannel in = fis.getChannel();
         for (long offset = 0L; offset < fileSize; ) {
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
index f211f29..77cec3a 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/LoadGen.java
@@ -48,12 +48,12 @@ public class LoadGen extends Client {
 
   @Override
   protected void operation(RaftClient client) throws IOException, ExecutionException, InterruptedException {
-    List<String> paths = generateFiles();
+    final ExecutorService executor = Executors.newFixedThreadPool(getNumThread());
+    List<String> paths = generateFiles(executor);
+    dropCache();
     FileStoreClient fileStoreClient = new FileStoreClient(client);
     System.out.println("Starting Async write now ");
 
-    final ExecutorService executor = Executors.newFixedThreadPool(getNumThread());
-
     long startTime = System.currentTimeMillis();
 
     long totalWrittenBytes = waitWriteFinish(writeByHeapByteBuffer(paths, fileStoreClient, executor));
@@ -99,7 +99,7 @@ public class LoadGen extends Client {
         try (FileInputStream fis = new FileInputStream(file)) {
           final FileChannel in = fis.getChannel();
           for (long offset = 0L; offset < getFileSizeInBytes(); ) {
-            offset += write(in, offset, fileStoreClient, path, futures);
+            offset += write(in, offset, fileStoreClient, file.getName(), futures);
           }
         } catch (Throwable e) {
           future.completeExceptionally(e);
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
index 2ab348b..b94b6c8 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
@@ -31,7 +31,6 @@ import org.apache.ratis.metrics.JVMMetrics;
 import org.apache.ratis.netty.NettyConfigKeys;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
-import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
@@ -44,10 +43,8 @@ import org.apache.ratis.util.TimeDuration;
 
 import java.io.File;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 /**
  * Class to start a ratis arithmetic example server.
@@ -62,6 +59,18 @@ public class Server extends SubCommandBase {
       required = true)
   private List<File> storageDir = new ArrayList<>();
 
+  @Parameter(names = {"--writeThreadNum"}, description = "Number of write thread")
+  private int writeThreadNum = 20;
+
+  @Parameter(names = {"--readThreadNum"}, description = "Number of read thread")
+  private int readThreadNum = 20;
+
+  @Parameter(names = {"--commitThreadNum"}, description = "Number of commit thread")
+  private int commitThreadNum = 3;
+
+  @Parameter(names = {"--deleteThreadNum"}, description = "Number of delete thread")
+  private int deleteThreadNum = 3;
+
   @Override
   public void run() throws Exception {
     JVMMetrics.initJvmMetrics(TimeDuration.valueOf(10, TimeUnit.SECONDS));
@@ -82,8 +91,11 @@ public class Server extends SubCommandBase {
     RaftServerConfigKeys.setStorageDir(properties, storageDir);
     RaftServerConfigKeys.Write.setElementLimit(properties, 40960);
     RaftServerConfigKeys.Write.setByteLimit(properties, SizeInBytes.valueOf("1000MB"));
-    ConfUtils.setFiles(properties::setFiles, FileStoreCommon.STATEMACHINE_DIR_KEY,
-        storageDir);
+    ConfUtils.setFiles(properties::setFiles, FileStoreCommon.STATEMACHINE_DIR_KEY, storageDir);
+    ConfUtils.setInt(properties::setInt, FileStoreCommon.STATEMACHINE_WRITE_THREAD_NUM, writeThreadNum);
+    ConfUtils.setInt(properties::setInt, FileStoreCommon.STATEMACHINE_READ_THREAD_NUM, readThreadNum);
+    ConfUtils.setInt(properties::setInt, FileStoreCommon.STATEMACHINE_COMMIT_THREAD_NUM, commitThreadNum);
+    ConfUtils.setInt(properties::setInt, FileStoreCommon.STATEMACHINE_DELETE_THREAD_NUM, deleteThreadNum);
     StateMachine stateMachine = new FileStoreStateMachine(properties);
 
     final RaftGroup raftGroup = RaftGroup.valueOf(RaftGroupId.valueOf(ByteString.copyFromUtf8(getRaftGroupId())),
@@ -100,8 +112,4 @@ public class Server extends SubCommandBase {
       TimeUnit.SECONDS.sleep(1);
     }
   }
-
-  private Collection<RaftPeer> getOtherRaftPeers(Collection<RaftPeer> peers) {
-    return peers.stream().filter(r -> !r.getId().toString().equals(id)).collect(Collectors.toList());
-  }
 }