You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2021/11/29 14:48:01 UTC
[ratis] branch master updated: RATIS-1445. NettyClientStreamRpc exception handling (#541)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 2ad0389 RATIS-1445. NettyClientStreamRpc exception handling (#541)
2ad0389 is described below
commit 2ad0389037dd56dda2a609040ec68aa3cb6bdbd0
Author: hao guo <gu...@360.cn>
AuthorDate: Mon Nov 29 22:47:56 2021 +0800
RATIS-1445. NettyClientStreamRpc exception handling (#541)
---
.../java/org/apache/ratis/client/impl/OrderedStreamAsync.java | 5 ++++-
.../org/apache/ratis/netty/client/NettyClientStreamRpc.java | 11 ++++++++++-
2 files changed, 14 insertions(+), 2 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
index 94814ca..107e741 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/OrderedStreamAsync.java
@@ -114,7 +114,10 @@ public class OrderedStreamAsync {
final LongFunction<DataStreamWindowRequest> constructor
= seqNum -> new DataStreamWindowRequest(header, data, seqNum);
return slidingWindow.submitNewRequest(constructor, this::sendRequestToNetwork).
- getReplyFuture().whenComplete((r, e) -> requestSemaphore.release());
+ getReplyFuture().whenComplete((r, e) -> {
+ LOG.error("Failed to send request, header=" + header, e);
+ requestSemaphore.release();
+ });
}
private void sendRequestToNetwork(DataStreamWindowRequest request){
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 8e60a2e..39a73fd 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -41,6 +41,7 @@ import org.apache.ratis.util.NetUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.LinkedList;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
@@ -110,6 +111,9 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
private ChannelInboundHandler getClientHandler(){
return new ChannelInboundHandlerAdapter(){
+
+ private ClientInvocationId clientInvocationId;
+
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
if (!(msg instanceof DataStreamReply)) {
@@ -118,7 +122,7 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
}
final DataStreamReply reply = (DataStreamReply) msg;
LOG.debug("{}: read {}", this, reply);
- ClientInvocationId clientInvocationId = ClientInvocationId.valueOf(reply.getClientId(), reply.getStreamId());
+ clientInvocationId = ClientInvocationId.valueOf(reply.getClientId(), reply.getStreamId());
Optional.ofNullable(replies.get(clientInvocationId))
.map(Queue::poll)
.ifPresent(f -> f.complete(reply));
@@ -126,6 +130,11 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ Optional.ofNullable(clientInvocationId)
+ .map(replies::remove)
+ .orElseGet(LinkedList::new)
+ .forEach(f -> f.completeExceptionally(cause));
+
LOG.warn(name + ": exceptionCaught", cause);
ctx.close();
}