You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2021/11/29 14:48:01 UTC

[ratis] branch master updated: RATIS-1445. NettyClientStreamRpc exception handling (#541)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ad0389  RATIS-1445. NettyClientStreamRpc exception handling (#541)
2ad0389 is described below

commit 2ad0389037dd56dda2a609040ec68aa3cb6bdbd0
Author: hao guo <gu...@360.cn>
AuthorDate: Mon Nov 29 22:47:56 2021 +0800

    RATIS-1445. NettyClientStreamRpc exception handling (#541)
---
 .../java/org/apache/ratis/client/impl/OrderedStreamAsync.java |  5 ++++-
 .../org/apache/ratis/netty/client/NettyClientStreamRpc.java   | 11 ++++++++++-
 2 files changed, 14 insertions(+), 2 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
index 94814ca..107e741 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -114,7 +114,10 @@ public class OrderedStreamAsync {
     final LongFunction<DataStreamWindowRequest> constructor
         = seqNum -> new DataStreamWindowRequest(header, data, seqNum);
     return slidingWindow.submitNewRequest(constructor, this::sendRequestToNetwork).
-           getReplyFuture().whenComplete((r, e) -> requestSemaphore.release());
+           getReplyFuture().whenComplete((r, e) -> {
+      LOG.error("Failed to send request, header=" + header, e);
+      requestSemaphore.release();
+    });
   }
 
   private void sendRequestToNetwork(DataStreamWindowRequest request){
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 8e60a2e..39a73fd 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -41,6 +41,7 @@ import org.apache.ratis.util.NetUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Optional;
 import java.util.Queue;
@@ -110,6 +111,9 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
 
   private ChannelInboundHandler getClientHandler(){
     return new ChannelInboundHandlerAdapter(){
+
+      private ClientInvocationId clientInvocationId;
+
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
         if (!(msg instanceof DataStreamReply)) {
@@ -118,7 +122,7 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
         }
         final DataStreamReply reply = (DataStreamReply) msg;
         LOG.debug("{}: read {}", this, reply);
-        ClientInvocationId clientInvocationId = ClientInvocationId.valueOf(reply.getClientId(), reply.getStreamId());
+        clientInvocationId = ClientInvocationId.valueOf(reply.getClientId(), reply.getStreamId());
         Optional.ofNullable(replies.get(clientInvocationId))
             .map(Queue::poll)
             .ifPresent(f -> f.complete(reply));
@@ -126,6 +130,11 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
 
       @Override
       public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+        Optional.ofNullable(clientInvocationId)
+            .map(replies::remove)
+            .orElseGet(LinkedList::new)
+            .forEach(f -> f.completeExceptionally(cause));
+
         LOG.warn(name + ": exceptionCaught", cause);
         ctx.close();
       }