You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/11/04 11:25:17 UTC

[incubator-ratis] branch master updated: RATIS-1122. Use thenApplyAsync instead of thenApply (#250)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new daa7afb  RATIS-1122. Use thenApplyAsync instead of thenApply (#250)
daa7afb is described below

commit daa7afba3ce0b76b5fd9ec31dc49320fddaeb965
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Wed Nov 4 19:25:09 2020 +0800

    RATIS-1122. Use thenApplyAsync instead of thenApply (#250)
---
 .../org/apache/ratis/netty/server/NettyServerStreamRpc.java    | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
index 041d64d..0f24b0b 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
@@ -318,21 +318,21 @@ public class NettyServerStreamRpc implements DataStreamServerRpc {
       }
     } else {
       info = streams.get(key);
-      localWrite = info.getStream().thenApply(stream -> writeTo(buf, stream));
+      localWrite = info.getPrevious().get()
+          .thenCombineAsync(info.getStream(), (u, stream) -> writeTo(buf, stream), executorService);
       for (DataStreamOutput out : info.getDataStreamOutputs()) {
         remoteWrites.add(out.writeAsync(request.slice().nioBuffer()));
       }
     }
 
-    final AtomicReference<CompletableFuture<?>> previous = info.getPrevious();
-    final CompletableFuture<?> current = previous.get()
-        .thenCombineAsync(JavaUtils.allOf(remoteWrites), (u, v) -> null, executorService)
+    final CompletableFuture<?> current = JavaUtils.allOf(remoteWrites)
         .thenCombineAsync(localWrite, (v, bytesWritten) -> {
           buf.release();
           sendReply(remoteWrites, request, bytesWritten, ctx);
           return null;
         }, executorService);
-    previous.set(current);
+
+    info.getPrevious().set(current);
   }
 
   private boolean checkSuccessRemoteWrite(