You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@ratis.apache.org by GitBox <gi...@apache.org> on 2021/12/29 06:16:32 UTC

[GitHub] [ratis] szetszwo commented on a change in pull request #571: RATIS-1478 NettyClientStreamRpc error reply handling

szetszwo commented on a change in pull request #571:
URL: https://github.com/apache/ratis/pull/571#discussion_r776167118



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
##########
@@ -158,9 +196,18 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
         clientInvocationId = ClientInvocationId.valueOf(reply.getClientId(), reply.getStreamId());
         final ReplyQueue queue = replies.get(clientInvocationId);

Review comment:
       Let's call remove(..) if the reply is NOT success.
   ```
           final ReplyQueue queue = reply.isSuccess()? replies.get(clientInvocationId): replies.remove(clientInvocationId);
   ```

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
##########
@@ -89,35 +90,72 @@ void shutdownGracefully() {
     }
   }
 
-  static class ReplyQueue implements Iterable<CompletableFuture<DataStreamReply>> {
+  static class ReplyQueue implements Iterable<DataStreamRequestEntry> {
     static final ReplyQueue EMPTY = new ReplyQueue();
 
-    private final Queue<CompletableFuture<DataStreamReply>> queue = new ConcurrentLinkedQueue<>();
+    private final Queue<DataStreamRequestEntry> queue = new ConcurrentLinkedQueue<>();
     private int emptyId;
 
     /** @return an empty ID if the queue is empty; otherwise, the queue is non-empty, return null. */
     synchronized Integer getEmptyId() {
       return queue.isEmpty()? emptyId: null;
     }
 
-    synchronized boolean offer(CompletableFuture<DataStreamReply> f) {
+    synchronized boolean offer(DataStreamRequestEntry f) {
       if (queue.offer(f)) {
         emptyId++;
         return true;
       }
       return false;
     }
 
-    CompletableFuture<DataStreamReply> poll() {
+    DataStreamRequestEntry poll() {
       return queue.poll();
     }
 
+    int size() {
+      return queue.size();
+    }
+
     @Override
-    public Iterator<CompletableFuture<DataStreamReply>> iterator() {
+    public Iterator<DataStreamRequestEntry> iterator() {
       return queue.iterator();
     }
   }
 
+  static class DataStreamRequestEntry {
+    private final CompletableFuture<DataStreamReply> replyFuture;
+    private final DataStreamRequest request;

Review comment:
       The request is for building the reply.  The reply should not be built in the client side.  Just complete the future exceptionally.
   
   Also, the request has data which needs memory.
   
   Since we don't need the request, we don't have to create the new DataStreamRequestEntry class.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
##########
@@ -158,9 +196,18 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
         clientInvocationId = ClientInvocationId.valueOf(reply.getClientId(), reply.getStreamId());
         final ReplyQueue queue = replies.get(clientInvocationId);
         if (queue != null) {
-          final CompletableFuture<DataStreamReply> f = queue.poll();
+          final DataStreamRequestEntry f = queue.poll();
           if (f != null) {
-            f.complete(reply);
+            f.getReplyFuture().complete(reply);
+
+            if (!reply.isSuccess() && queue.size() > 0) {
+              replies.remove(clientInvocationId).forEach(cf -> {
+                if (!cf.getReplyFuture().isDone()) {
+                  cf.getReplyFuture().complete(cf.getErrorDataStreamReply());
+                }
+              });
+            }

Review comment:
       Just complete the futures exceptionally and don't build a reply.
   ```
               if (!reply.isSuccess() && queue.size() > 0) {
                 final IllegalStateException e = new IllegalStateException(
                     this + ": an earlier request failed with " + reply);
                 queue.forEach(future -> future.completeExceptionally(e));
               }
   ```
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@ratis.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org