You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ru...@apache.org on 2020/12/14 07:02:41 UTC

[incubator-ratis] branch master updated: RATIS-1238. Seperate request and write executor (#353)

This is an automated email from the ASF dual-hosted git repository.

runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 89c886f  RATIS-1238. Seperate request and write executor (#353)
89c886f is described below

commit 89c886f2976c5af8a43c3509433b6842ddf2360b
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Mon Dec 14 15:00:24 2020 +0800

    RATIS-1238. Seperate request and write executor (#353)
---
 .../ratis/examples/common/SubCommandBase.java      |  3 ---
 .../ratis/examples/filestore/cli/Client.java       |  8 +++----
 .../ratis/examples/filestore/cli/Server.java       |  2 ++
 .../ratis/netty/server/DataStreamManagement.java   | 19 +++++++++-------
 .../apache/ratis/server/RaftServerConfigKeys.java  | 26 +++++++++++++++++-----
 5 files changed, 36 insertions(+), 22 deletions(-)

diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/common/SubCommandBase.java b/ratis-examples/src/main/java/org/apache/ratis/examples/common/SubCommandBase.java
index 15f1c37..f168e79 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/common/SubCommandBase.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/common/SubCommandBase.java
@@ -54,9 +54,6 @@ public abstract class SubCommandBase {
     return parsePeers(peers)[0];
   }
 
-  public boolean isPrimary(String id) {
-    return getPrimary().getId().toString().equals(id);
-  }
   public abstract void run() throws Exception;
 
   public String getRaftGroupId() {
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
index 48c3f1e..814236b 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Client.java
@@ -43,7 +43,6 @@ import java.io.RandomAccessFile;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Random;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
@@ -140,7 +139,7 @@ public abstract class Client extends SubCommandBase {
     return new File(storageDir.get(Math.abs(hash)), fileName).getAbsolutePath();
   }
 
-  protected void dropCache() throws InterruptedException, IOException {
+  protected void dropCache() {
     String[] cmds = {"/bin/sh","-c","echo 3 > /proc/sys/vm/drop_caches"};
     try {
       Process pro = Runtime.getRuntime().exec(cmds);
@@ -154,8 +153,7 @@ public abstract class Client extends SubCommandBase {
     final CompletableFuture<Long> future = new CompletableFuture<>();
     CompletableFuture.supplyAsync(() -> {
       try {
-        future.complete(
-            writeFile(path, fileSizeInBytes, bufferSizeInBytes, new Random().nextInt(127) + 1));
+        future.complete(writeFile(path, fileSizeInBytes, bufferSizeInBytes));
       } catch (IOException e) {
         future.completeExceptionally(e);
       }
@@ -185,7 +183,7 @@ public abstract class Client extends SubCommandBase {
     return paths;
   }
 
-  protected long writeFile(String path, long fileSize, long bufferSize, int random) throws IOException {
+  protected long writeFile(String path, long fileSize, long bufferSize) throws IOException {
     final byte[] buffer = new byte[Math.toIntExact(bufferSize)];
     long offset = 0;
     try(RandomAccessFile raf = new RandomAccessFile(path, "rw")) {
diff --git a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
index b94b6c8..c8f4f23 100644
--- a/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
+++ b/ratis-examples/src/main/java/org/apache/ratis/examples/filestore/cli/Server.java
@@ -92,6 +92,8 @@ public class Server extends SubCommandBase {
     RaftServerConfigKeys.Write.setElementLimit(properties, 40960);
     RaftServerConfigKeys.Write.setByteLimit(properties, SizeInBytes.valueOf("1000MB"));
     ConfUtils.setFiles(properties::setFiles, FileStoreCommon.STATEMACHINE_DIR_KEY, storageDir);
+    RaftServerConfigKeys.DataStream.setAsyncRequestThreadPoolSize(properties, writeThreadNum);
+    RaftServerConfigKeys.DataStream.setAsyncWriteThreadPoolSize(properties, writeThreadNum);
     ConfUtils.setInt(properties::setInt, FileStoreCommon.STATEMACHINE_WRITE_THREAD_NUM, writeThreadNum);
     ConfUtils.setInt(properties::setInt, FileStoreCommon.STATEMACHINE_READ_THREAD_NUM, readThreadNum);
     ConfUtils.setInt(properties::setInt, FileStoreCommon.STATEMACHINE_COMMIT_THREAD_NUM, commitThreadNum);
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 73aa303..e5ede5e 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -202,15 +202,18 @@ public class DataStreamManagement {
   private final String name;
 
   private final StreamMap streams = new StreamMap();
-  private final Executor executor;
+  private final Executor requestExecutor;
+  private final Executor writeExecutor;
 
   DataStreamManagement(RaftServer server) {
     this.server = server;
     this.name = server.getId() + "-" + JavaUtils.getClassSimpleName(getClass());
 
     final RaftProperties properties = server.getProperties();
-    this.executor = Executors.newFixedThreadPool(
-        RaftServerConfigKeys.DataStream.asyncThreadPoolSize(properties));
+    this.requestExecutor = Executors.newFixedThreadPool(
+        RaftServerConfigKeys.DataStream.asyncRequestThreadPoolSize(properties));
+    this.writeExecutor = Executors.newFixedThreadPool(
+        RaftServerConfigKeys.DataStream.asyncWriteThreadPoolSize(properties));
   }
 
   private CompletableFuture<DataStream> computeDataStreamIfAbsent(RaftClientRequest request) throws IOException {
@@ -316,7 +319,7 @@ public class DataStreamManagement {
           .getRaftClient()
           .async());
       return asyncRpcApi.sendForward(info.request)
-          .thenAcceptAsync(reply -> ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply)), executor);
+          .thenAcceptAsync(reply -> ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply)), requestExecutor);
     } catch (IOException e) {
       throw new CompletionException(e);
     }
@@ -378,16 +381,16 @@ public class DataStreamManagement {
       localWrite = CompletableFuture.completedFuture(0L);
       remoteWrites = Collections.emptyList();
     } else if (request.getType() == Type.STREAM_DATA || request.getType() == Type.STREAM_DATA_SYNC) {
-      localWrite = info.getLocal().write(buf, request.getType() == Type.STREAM_DATA_SYNC, executor);
+      localWrite = info.getLocal().write(buf, request.getType() == Type.STREAM_DATA_SYNC, writeExecutor);
       remoteWrites = info.applyToRemotes(out -> out.write(request));
     } else if (request.getType() == Type.STREAM_CLOSE) {
-      localWrite = info.getLocal().close(executor);
+      localWrite = info.getLocal().close(writeExecutor);
       remoteWrites = info.isPrimary()? info.applyToRemotes(RemoteStream::close): Collections.emptyList();
     } else {
       throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
     }
 
-    composeAsync(info.getPrevious(), executor, n -> JavaUtils.allOf(remoteWrites)
+    composeAsync(info.getPrevious(), requestExecutor, n -> JavaUtils.allOf(remoteWrites)
         .thenCombineAsync(localWrite, (v, bytesWritten) -> {
           if (request.getType() == Type.STREAM_HEADER
               || request.getType() == Type.STREAM_DATA || request.getType() == Type.STREAM_DATA_SYNC) {
@@ -403,7 +406,7 @@ public class DataStreamManagement {
             throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
           }
           return null;
-        }, executor)).whenComplete((v, exception) -> {
+        }, requestExecutor)).whenComplete((v, exception) -> {
       try {
         if (exception != null) {
           replyDataStreamException(server, exception, info.getRequest(), request, ctx);
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index c2b38e7..f2f0596 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -420,16 +420,30 @@ public interface RaftServerConfigKeys {
   interface DataStream {
     String PREFIX = RaftServerConfigKeys.PREFIX + ".data-stream";
 
-    String ASYNC_THREAD_POOL_SIZE_KEY = PREFIX + ".async.thread.pool.size";
-    int ASYNC_THREAD_POOL_SIZE_DEFAULT = 16;
+    String ASYNC_REQUEST_THREAD_POOL_SIZE_KEY = PREFIX + ".async.request.thread.pool.size";
+    int ASYNC_REQUEST_THREAD_POOL_SIZE_DEFAULT = 16;
 
-    static int asyncThreadPoolSize(RaftProperties properties) {
-      return getInt(properties::getInt, ASYNC_THREAD_POOL_SIZE_KEY, ASYNC_THREAD_POOL_SIZE_DEFAULT, getDefaultLog(),
+    static int asyncRequestThreadPoolSize(RaftProperties properties) {
+      return getInt(properties::getInt, ASYNC_REQUEST_THREAD_POOL_SIZE_KEY,
+          ASYNC_REQUEST_THREAD_POOL_SIZE_DEFAULT, getDefaultLog(),
           requireMin(0), requireMax(65536));
     }
 
-    static void setAsyncThreadPoolSize(RaftProperties properties, int port) {
-      setInt(properties::setInt, ASYNC_THREAD_POOL_SIZE_KEY, port);
+    static void setAsyncRequestThreadPoolSize(RaftProperties properties, int port) {
+      setInt(properties::setInt, ASYNC_REQUEST_THREAD_POOL_SIZE_KEY, port);
+    }
+
+    String ASYNC_WRITE_THREAD_POOL_SIZE_KEY = PREFIX + ".async.write.thread.pool.size";
+    int ASYNC_WRITE_THREAD_POOL_SIZE_DEFAULT = 16;
+
+    static int asyncWriteThreadPoolSize(RaftProperties properties) {
+      return getInt(properties::getInt, ASYNC_WRITE_THREAD_POOL_SIZE_KEY,
+          ASYNC_WRITE_THREAD_POOL_SIZE_DEFAULT, getDefaultLog(),
+          requireMin(0), requireMax(65536));
+    }
+
+    static void setAsyncWriteThreadPoolSize(RaftProperties properties, int port) {
+      setInt(properties::setInt, ASYNC_WRITE_THREAD_POOL_SIZE_KEY, port);
     }
   }