You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2021/04/29 06:16:17 UTC
[ratis] branch master updated: RATIS-1373. Use async model for
StreamInfo#applyToRemotes (#476)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 8bfe8a2 RATIS-1373. Use async model for StreamInfo#applyToRemotes (#476)
8bfe8a2 is described below
commit 8bfe8a22bdf6a9ba4a7d2118fe21104e4920f823
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Thu Apr 29 14:16:09 2021 +0800
RATIS-1373. Use async model for StreamInfo#applyToRemotes (#476)
---
.../java/org/apache/ratis/netty/server/DataStreamManagement.java | 9 ++++++---
.../main/java/org/apache/ratis/server/RaftServerConfigKeys.java | 2 +-
2 files changed, 7 insertions(+), 4 deletions(-)
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index b034771..415ac29 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -90,13 +90,16 @@ public class DataStreamManagement {
static class RemoteStream {
private final DataStreamOutputRpc out;
+ private final AtomicReference<CompletableFuture<DataStreamReply>> sendFuture
+ = new AtomicReference<>(CompletableFuture.completedFuture(null));
RemoteStream(DataStreamOutputRpc out) {
this.out = out;
}
- CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request) {
- return out.writeAsync(request.slice().nioBuffer(), request.getWriteOptions());
+ CompletableFuture<DataStreamReply> write(DataStreamRequestByteBuf request, Executor executor) {
+ return composeAsync(sendFuture, executor,
+ n -> out.writeAsync(request.slice().nioBuffer(), request.getWriteOptions()));
}
}
@@ -369,7 +372,7 @@ public class DataStreamManagement {
remoteWrites = Collections.emptyList();
} else if (request.getType() == Type.STREAM_DATA) {
localWrite = info.getLocal().write(buf, request.getWriteOptions(), writeExecutor);
- remoteWrites = info.applyToRemotes(out -> out.write(request));
+ remoteWrites = info.applyToRemotes(out -> out.write(request, requestExecutor));
} else {
throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
}
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index 9b5d86b..7e888ff 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -421,7 +421,7 @@ public interface RaftServerConfigKeys {
String PREFIX = RaftServerConfigKeys.PREFIX + ".data-stream";
String ASYNC_REQUEST_THREAD_POOL_SIZE_KEY = PREFIX + ".async.request.thread.pool.size";
- int ASYNC_REQUEST_THREAD_POOL_SIZE_DEFAULT = 16;
+ int ASYNC_REQUEST_THREAD_POOL_SIZE_DEFAULT = 32;
static int asyncRequestThreadPoolSize(RaftProperties properties) {
return getInt(properties::getInt, ASYNC_REQUEST_THREAD_POOL_SIZE_KEY,