You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2021/04/29 06:16:17 UTC

[ratis] branch master updated: RATIS-1373. Use async model for StreamInfo#applyToRemotes (#476)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 8bfe8a2  RATIS-1373. Use async model for StreamInfo#applyToRemotes (#476)
8bfe8a2 is described below

commit 8bfe8a22bdf6a9ba4a7d2118fe21104e4920f823
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Thu Apr 29 14:16:09 2021 +0800

    RATIS-1373. Use async model for StreamInfo#applyToRemotes (#476)
---
 .../java/org/apache/ratis/netty/server/DataStreamManagement.java | 9 ++++++---
 .../main/java/org/apache/ratis/server/RaftServerConfigKeys.java  | 2 +-
 2 files changed, 7 insertions(+), 4 deletions(-)

diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index b034771..415ac29 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -90,13 +90,16 @@ public class DataStreamManagement {
 
   static class RemoteStream {
     private final DataStreamOutputRpc out;
+    private final AtomicReference<CompletableFuture<DataStreamReply>> sendFuture
+        = new AtomicReference<>(CompletableFuture.completedFuture(null));
 
     RemoteStream(DataStreamOutputRpc out) {
       this.out = out;
     }
 
-    CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request) {
-      return out.writeAsync(request.slice().nioBuffer(), request.getWriteOptions());
+    CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request, Executor executor) {
+      return composeAsync(sendFuture, executor,
+          n -> out.writeAsync(request.slice().nioBuffer(), request.getWriteOptions()));
     }
   }
 
@@ -369,7 +372,7 @@ public class DataStreamManagement {
       remoteWrites = Collections.emptyList();
     } else if (request.getType() == Type.STREAM_DATA) {
       localWrite = info.getLocal().write(buf, request.getWriteOptions(), writeExecutor);
-      remoteWrites = info.applyToRemotes(out -> out.write(request));
+      remoteWrites = info.applyToRemotes(out -> out.write(request, requestExecutor));
     } else {
       throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
     }
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 9b5d86b..7e888ff 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -421,7 +421,7 @@ public interface RaftServerConfigKeys {
     String PREFIX = RaftServerConfigKeys.PREFIX + ".data-stream";
 
     String ASYNC_REQUEST_THREAD_POOL_SIZE_KEY = PREFIX + ".async.request.thread.pool.size";
-    int ASYNC_REQUEST_THREAD_POOL_SIZE_DEFAULT = 16;
+    int ASYNC_REQUEST_THREAD_POOL_SIZE_DEFAULT = 32;
 
     static int asyncRequestThreadPoolSize(RaftProperties properties) {
       return getInt(properties::getInt, ASYNC_REQUEST_THREAD_POOL_SIZE_KEY,