You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ratis.apache.org by GitBox <gi...@apache.org> on 2020/11/15 03:37:47 UTC

[GitHub] [incubator-ratis] szetszwo opened a new pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

szetszwo opened a new pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278


   See https://issues.apache.org/jira/browse/RATIS-1153


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278#discussion_r523929700



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
##########
@@ -331,7 +333,24 @@ static void replyDataStreamException(RaftServer server, Throwable cause, RaftCli
       DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
     DataStreamException dataStreamException = new DataStreamException(server.getId(), cause);
     RaftClientReply reply = new RaftClientReply(raftClientRequest, dataStreamException, null);
-    ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply));
+    sendDataStreamException(cause, request, reply, ctx);
+  }
+
+  void replyDataStreamException(Throwable cause, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    DataStreamException dataStreamException = new DataStreamException(server.getId(), cause);
+    RaftClientReply reply = new RaftClientReply(ClientId.emptyClientId(), server.getId(), RaftGroupId.emptyGroupId(),
+        -1, false, null, dataStreamException, 0L, null);
+    sendDataStreamException(cause, request, reply, ctx);
+  }
+
+  static void sendDataStreamException(Throwable throwable, DataStreamRequestByteBuf request, RaftClientReply reply,
+      ChannelHandlerContext ctx) {
+    LOG.warn("Failed to process {}",  request, throwable);

Review comment:
       Try this:
   ```
       LOG.info("abc {}", 123, new Throwable("testing"));
   ```
   It will show the stack trace.
   ```
   2020-11-16 14:57:21,463 [Time-limited test] INFO  datastream.TestDataStreamNetty (TestDataStreamNetty.java:testDataStreamExceptionGetStateMachine(199)) - abc 123
   java.lang.Throwable: testing
   	at org.apache.ratis.datastream.TestDataStreamNetty.testDataStreamExceptionGetStateMachine(TestDataStreamNetty.java:199)
   	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
   	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
   	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
   	at java.lang.reflect.Method.invoke(Method.java:498)
   	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
   	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
   	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
   	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
   	at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
   	at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
   	at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:298)
   	at org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:292)
   	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
   	at java.lang.Thread.run(Thread.java:748)
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278#discussion_r523703178



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -151,11 +152,23 @@ public void addRaftPeers(Collection<RaftPeer> newPeers) {
 
   private ChannelInboundHandler newChannelInboundHandlerAdapter(){
     return new ChannelInboundHandlerAdapter(){
+      private final AtomicReference<DataStreamRequestByteBuf> currentRef = new AtomicReference<>();
+
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
-        if (msg instanceof DataStreamRequestByteBuf) {
-          requests.read((DataStreamRequestByteBuf)msg, ctx, proxies::getDataStreamOutput);
+        if (!(msg instanceof DataStreamRequestByteBuf)) {
+          LOG.error("Unexpected message class {}, ignoring ...", msg.getClass().getName());
+          return;
         }
+
+        final DataStreamRequestByteBuf current = (DataStreamRequestByteBuf) msg;
+        currentRef.set(current);
+        requests.read(current, ctx, proxies::getDataStreamOutput);
+      }
+
+      @Override
+      public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) {
+        DataStreamManagement.sendException(currentRef.get(), throwable, ctx);

Review comment:
       @szetszwo Hi, I'm not sure whether this reply can respond to the right request in this multi-thread environment.
   Maybe the exception was caused by request1, but currentRef has been set to request2.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang edited a comment on pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

Posted by GitBox <gi...@apache.org>.
runzhiwang edited a comment on pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278#issuecomment-727660896


   @amaliujia Could you take the failed ut: `TestDataStreamWithNettyMiniRaftCluster>DataStreamTests.testStreamWrites:57->DataStreamTests.testStreamWrites:84 » TestTimedOut` as high priority ? It make CI hard to pass.  Thanks a lot.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278#issuecomment-727692859


   @szetszwo Seems the commit is wrong ?
   ![image](https://user-images.githubusercontent.com/51938049/99205886-d260e100-27f4-11eb-98d2-bd1babc6fb48.png)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278#discussion_r523943833



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
##########
@@ -331,7 +333,24 @@ static void replyDataStreamException(RaftServer server, Throwable cause, RaftCli
       DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
     DataStreamException dataStreamException = new DataStreamException(server.getId(), cause);
     RaftClientReply reply = new RaftClientReply(raftClientRequest, dataStreamException, null);
-    ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply));
+    sendDataStreamException(cause, request, reply, ctx);
+  }
+
+  void replyDataStreamException(Throwable cause, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    DataStreamException dataStreamException = new DataStreamException(server.getId(), cause);
+    RaftClientReply reply = new RaftClientReply(ClientId.emptyClientId(), server.getId(), RaftGroupId.emptyGroupId(),
+        -1, false, null, dataStreamException, 0L, null);
+    sendDataStreamException(cause, request, reply, ctx);
+  }
+
+  static void sendDataStreamException(Throwable throwable, DataStreamRequestByteBuf request, RaftClientReply reply,
+      ChannelHandlerContext ctx) {
+    LOG.warn("Failed to process {}",  request, throwable);

Review comment:
       Got it! Thanks!




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278#discussion_r523703693



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -151,11 +152,23 @@ public void addRaftPeers(Collection<RaftPeer> newPeers) {
 
   private ChannelInboundHandler newChannelInboundHandlerAdapter(){
     return new ChannelInboundHandlerAdapter(){
+      private final AtomicReference<DataStreamRequestByteBuf> currentRef = new AtomicReference<>();
+
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
-        if (msg instanceof DataStreamRequestByteBuf) {
-          requests.read((DataStreamRequestByteBuf)msg, ctx, proxies::getDataStreamOutput);
+        if (!(msg instanceof DataStreamRequestByteBuf)) {
+          LOG.error("Unexpected message class {}, ignoring ...", msg.getClass().getName());
+          return;
         }
+
+        final DataStreamRequestByteBuf current = (DataStreamRequestByteBuf) msg;
+        currentRef.set(current);
+        requests.read(current, ctx, proxies::getDataStreamOutput);
+      }
+
+      @Override
+      public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) {
+        DataStreamManagement.sendException(currentRef.get(), throwable, ctx);

Review comment:
       It seems won't happen in Netty.  Let's test it. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278#discussion_r523706892



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -151,11 +152,23 @@ public void addRaftPeers(Collection<RaftPeer> newPeers) {
 
   private ChannelInboundHandler newChannelInboundHandlerAdapter(){
     return new ChannelInboundHandlerAdapter(){
+      private final AtomicReference<DataStreamRequestByteBuf> currentRef = new AtomicReference<>();
+
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
-        if (msg instanceof DataStreamRequestByteBuf) {
-          requests.read((DataStreamRequestByteBuf)msg, ctx, proxies::getDataStreamOutput);
+        if (!(msg instanceof DataStreamRequestByteBuf)) {
+          LOG.error("Unexpected message class {}, ignoring ...", msg.getClass().getName());
+          return;
         }
+
+        final DataStreamRequestByteBuf current = (DataStreamRequestByteBuf) msg;
+        currentRef.set(current);
+        requests.read(current, ctx, proxies::getDataStreamOutput);
+      }
+
+      @Override
+      public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) {
+        DataStreamManagement.sendException(currentRef.get(), throwable, ctx);

Review comment:
       @szetszwo Thanks a lot. Got it.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278#discussion_r523704166



##########
File path: ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
##########
@@ -83,6 +85,15 @@ public Builder setDataStreamPacket(DataStreamPacket packet) {
           .setStreamOffset(packet.getStreamOffset());
     }
 
+    public Builder setDataStreamException(DataStreamException exception) {
+      return setSuccess(false)
+          .setBuffer(DataStreamProtoUtils.exception2proto(exception).toByteString().asReadOnlyByteBuffer());
+    }
+
+    public Builder setThrowable(Throwable t) {

Review comment:
       Agree.  Let's wrap DataStreamException in RaftClientReply as your change in #277.  It looks good.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang closed pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

Posted by GitBox <gi...@apache.org>.
runzhiwang closed pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278#issuecomment-727660896


   @amaliujia Could you take the failed ut: `TestDataStreamWithNettyMiniRaftCluster>DataStreamTests.testStreamWrites:57->DataStreamTests.testStreamWrites:84 » TestTimedOut` as high priority ? It make CI hard to pass.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

Posted by GitBox <gi...@apache.org>.
szetszwo commented on pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278#issuecomment-727379514


   @runzhiwang , my change here should have some overlap with your change in RATIS-1150.  We should use protobuf to serialise the reply body.   Please take a look.  Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

Posted by GitBox <gi...@apache.org>.
szetszwo commented on pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278#issuecomment-727577997


   Note that this pull request requires the code from #279 .


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

Posted by GitBox <gi...@apache.org>.
szetszwo commented on pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278#issuecomment-727390547


   @runzhiwang , just saw that you already had a pull request #277 .  I will wait for your change. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278#discussion_r523703178



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -151,11 +152,23 @@ public void addRaftPeers(Collection<RaftPeer> newPeers) {
 
   private ChannelInboundHandler newChannelInboundHandlerAdapter(){
     return new ChannelInboundHandlerAdapter(){
+      private final AtomicReference<DataStreamRequestByteBuf> currentRef = new AtomicReference<>();
+
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
-        if (msg instanceof DataStreamRequestByteBuf) {
-          requests.read((DataStreamRequestByteBuf)msg, ctx, proxies::getDataStreamOutput);
+        if (!(msg instanceof DataStreamRequestByteBuf)) {
+          LOG.error("Unexpected message class {}, ignoring ...", msg.getClass().getName());
+          return;
         }
+
+        final DataStreamRequestByteBuf current = (DataStreamRequestByteBuf) msg;
+        currentRef.set(current);
+        requests.read(current, ctx, proxies::getDataStreamOutput);
+      }
+
+      @Override
+      public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) {
+        DataStreamManagement.sendException(currentRef.get(), throwable, ctx);

Review comment:
       @szetszwo Hi, I'm not sure whether this reply can respond to the right request in this multi-thread environment.
   Maybe the exception was caused by request1, but currentRef has been set to request2 immediately.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

Posted by GitBox <gi...@apache.org>.
szetszwo commented on pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278#issuecomment-727698822


   @amaliujia  and @runzhiwang , yes, the commit was incorrect.  Just have pushed a new one.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278#discussion_r523703609



##########
File path: ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
##########
@@ -83,6 +85,15 @@ public Builder setDataStreamPacket(DataStreamPacket packet) {
           .setStreamOffset(packet.getStreamOffset());
     }
 
+    public Builder setDataStreamException(DataStreamException exception) {
+      return setSuccess(false)
+          .setBuffer(DataStreamProtoUtils.exception2proto(exception).toByteString().asReadOnlyByteBuffer());
+    }
+
+    public Builder setThrowable(Throwable t) {

Review comment:
       @szetszwo Not sure whether should we wrap DataStreamException in RaftClientReply, and wrap RaftClientReply in DataStreamReplyByteBuffer. Because we wrap StateMachineException in RaftClientReply, if we do not wrap DataStreamException in RaftClientReply, it maybe cause user confused and user maybe hard to understand the reply.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang closed pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

Posted by GitBox <gi...@apache.org>.
runzhiwang closed pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278#issuecomment-727731399


   @runzhiwang 
   
   I have filed https://issues.apache.org/jira/browse/RATIS-1159 and found 1 timeout instance from 50 runs locally. 
   
   Right now I cannot tell what is obvious reason for the flakness. If you find this test becomes pretty flaky, please add a `@Ignore` to it to allow CI pass now.
   
   The test setup looks right. This might be a real issue that happens on a cluster. I will try to figure out the root cause but not sure if I can have some news soon.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang merged pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

Posted by GitBox <gi...@apache.org>.
runzhiwang merged pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

Posted by GitBox <gi...@apache.org>.
amaliujia commented on pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278#issuecomment-727659560


   @szetszwo  you might have overwritten the change in `NettyServerStreamRpc.java `?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278#discussion_r523703609



##########
File path: ratis-common/src/main/java/org/apache/ratis/datastream/impl/DataStreamReplyByteBuffer.java
##########
@@ -83,6 +85,15 @@ public Builder setDataStreamPacket(DataStreamPacket packet) {
           .setStreamOffset(packet.getStreamOffset());
     }
 
+    public Builder setDataStreamException(DataStreamException exception) {
+      return setSuccess(false)
+          .setBuffer(DataStreamProtoUtils.exception2proto(exception).toByteString().asReadOnlyByteBuffer());
+    }
+
+    public Builder setThrowable(Throwable t) {

Review comment:
       @szetszwo Not sure whether should we wrap DataStreamException in RaftClientReply, and wrap RaftClientReply in DataStreamReplyByteBuffer. Because we wrap StateMachineException in RaftClientReply, if we do not wrap DataStreamException in RaftClientReply, it maybe cause user confused and user maybe hard to understand to the reply.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang closed pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

Posted by GitBox <gi...@apache.org>.
runzhiwang closed pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang closed pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

Posted by GitBox <gi...@apache.org>.
runzhiwang closed pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang closed pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

Posted by GitBox <gi...@apache.org>.
runzhiwang closed pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo closed pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

Posted by GitBox <gi...@apache.org>.
szetszwo closed pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang closed pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

Posted by GitBox <gi...@apache.org>.
runzhiwang closed pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang closed pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

Posted by GitBox <gi...@apache.org>.
runzhiwang closed pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] amaliujia commented on a change in pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

Posted by GitBox <gi...@apache.org>.
amaliujia commented on a change in pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278#discussion_r523926434



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
##########
@@ -331,7 +333,24 @@ static void replyDataStreamException(RaftServer server, Throwable cause, RaftCli
       DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
     DataStreamException dataStreamException = new DataStreamException(server.getId(), cause);
     RaftClientReply reply = new RaftClientReply(raftClientRequest, dataStreamException, null);
-    ctx.writeAndFlush(newDataStreamReplyByteBuffer(request, reply));
+    sendDataStreamException(cause, request, reply, ctx);
+  }
+
+  void replyDataStreamException(Throwable cause, DataStreamRequestByteBuf request, ChannelHandlerContext ctx) {
+    DataStreamException dataStreamException = new DataStreamException(server.getId(), cause);
+    RaftClientReply reply = new RaftClientReply(ClientId.emptyClientId(), server.getId(), RaftGroupId.emptyGroupId(),
+        -1, false, null, dataStreamException, 0L, null);
+    sendDataStreamException(cause, request, reply, ctx);
+  }
+
+  static void sendDataStreamException(Throwable throwable, DataStreamRequestByteBuf request, RaftClientReply reply,
+      ChannelHandlerContext ctx) {
+    LOG.warn("Failed to process {}",  request, throwable);

Review comment:
       `LOG.warn("Failed to process {} because of {}",  request, throwable);`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #278: RATIS-1153. Implement ChannelInboundHandler.exceptionCaught in NettyServerStreamRpc

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #278:
URL: https://github.com/apache/incubator-ratis/pull/278#discussion_r523706250



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -151,11 +152,23 @@ public void addRaftPeers(Collection<RaftPeer> newPeers) {
 
   private ChannelInboundHandler newChannelInboundHandlerAdapter(){
     return new ChannelInboundHandlerAdapter(){
+      private final AtomicReference<DataStreamRequestByteBuf> currentRef = new AtomicReference<>();
+
       @Override
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
-        if (msg instanceof DataStreamRequestByteBuf) {
-          requests.read((DataStreamRequestByteBuf)msg, ctx, proxies::getDataStreamOutput);
+        if (!(msg instanceof DataStreamRequestByteBuf)) {
+          LOG.error("Unexpected message class {}, ignoring ...", msg.getClass().getName());
+          return;
         }
+
+        final DataStreamRequestByteBuf current = (DataStreamRequestByteBuf) msg;
+        currentRef.set(current);
+        requests.read(current, ctx, proxies::getDataStreamOutput);
+      }
+
+      @Override
+      public void exceptionCaught(ChannelHandlerContext ctx, Throwable throwable) {
+        DataStreamManagement.sendException(currentRef.get(), throwable, ctx);

Review comment:
       As shown in Netty's source code, it will try-catch `channelRead(..)`.  If there is an exception, it calls  `invokeExceptionCaught(t)` and then `exceptionCaught(..)`. https://github.com/netty/netty/blob/944a0205862e32f1020647bbf4ef4eca5587446e/transport/src/main/java/io/netty/channel/AbstractChannelHandlerContext.java#L379
   Since two channelRead(..) calls won't overlap, it is correct that our exceptionCaught(..) is can read the current request from currentRef.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org