You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/11/04 11:25:17 UTC
[incubator-ratis] branch master updated: RATIS-1122. Use
thenApplyAsync instead of thenApply (#250)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new daa7afb RATIS-1122. Use thenApplyAsync instead of thenApply (#250)
daa7afb is described below
commit daa7afba3ce0b76b5fd9ec31dc49320fddaeb965
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Wed Nov 4 19:25:09 2020 +0800
RATIS-1122. Use thenApplyAsync instead of thenApply (#250)
---
.../org/apache/ratis/netty/server/NettyServerStreamRpc.java | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 041d64d..0f24b0b 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -318,21 +318,21 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
}
} else {
info = streams.get(key);
- localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
+ localWrite = info.getPrevious().get()
+ .thenCombineAsync(info.getStream(), (u, stream) -> writeTo(buf, stream), executorService);
for (DataStreamOutput out : info.getDataStreamOutputs()) {
remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
}
}
- final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
- final CompletableFuture<?> current = previous.get()
- .thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null, executorService)
+ final CompletableFuture<?> current = JavaUtils.allOf(remoteWrites)
.thenCombineAsync(localWrite, (v, bytesWritten) -> {
buf.release();
sendReply(remoteWrites, request, bytesWritten, ctx);
return null;
}, executorService);
- previous.set(current);
+
+ info.getPrevious().set(current);
}
private boolean checkSuccessRemoteWrite(