You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ratis.apache.org by GitBox <gi...@apache.org> on 2020/11/11 13:14:19 UTC

[GitHub] [incubator-ratis] runzhiwang opened a new pull request #270: RATIS-1143. Return exception of submitClientRequestAsync to client

runzhiwang opened a new pull request #270:
URL: https://github.com/apache/incubator-ratis/pull/270


   
   
   ## What changes were proposed in this pull request?
   
   Return exception of submitClientRequestAsync to client
   
   ## What is the link to the Apache JIRA
   
   https://issues.apache.org/jira/browse/RATIS-1143
   
   ## How was this patch tested?
   
   new ut.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang edited a comment on pull request #270: RATIS-1143. Return exception of submitClientRequestAsync to client

Posted by GitBox <gi...@apache.org>.
runzhiwang edited a comment on pull request #270:
URL: https://github.com/apache/incubator-ratis/pull/270#issuecomment-725771310


   @szetszwo I have updated the patch. Could you help review it again ? Besides,  I think with the commit https://github.com/apache/incubator-ratis/commit/05eaa70128dfa12dcfed459684983267cb079e92, CI is easier to pass,  most of the previous CI failed reason is port conflict, I will continue to find the root cause.
   
   > We should also think about how to send exceptions back to the client when there are the exceptions caught in code.
   
   I will do it in next PR.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #270: RATIS-1143. Return exception of submitClientRequestAsync to client

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #270:
URL: https://github.com/apache/incubator-ratis/pull/270#discussion_r521440946



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -390,31 +391,82 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
         } else if (request.getType() == Type.STREAM_CLOSE) {
           // if this server is not the leader, forward start transition to the other peers
           // there maybe other unexpected reason cause failure except not leader, forwardStartTransaction anyway
-          forwardStartTransaction(info, request, ctx);
+          forwardStartTransaction(info, request, ctx, reply);
         } else if (request.getType() == Type.START_TRANSACTION){
-          sendReplyNotSuccess(request, ctx);
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplyNotSuccess(request, buffer, ctx);
         } else {
           throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
         }
       }, executor);
     } catch (IOException e) {
-      sendReplyNotSuccess(request, ctx);
+      sendReplyNotSuccess(request, null, ctx);
       return CompletableFuture.completedFuture(null);
     }
   }
 
+  private void sendLeaderFailedReply(
+      final List<RaftClientReply> replies, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    RaftPeer suggestedLeader = null;
+    for (RaftClientReply reply : replies) {
+      if (reply.getNotLeaderException() != null && reply.getNotLeaderException().getSuggestedLeader() != null) {
+        suggestedLeader = reply.getNotLeaderException().getSuggestedLeader();
+        break;
+      }
+    }
+
+    if (suggestedLeader == null) {
+      sendReplyNotSuccess(request, null, ctx);
+    } else {
+      for (RaftClientReply reply : replies) {
+        if (reply.getServerId().equals(suggestedLeader.getId())) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplyNotSuccess(request, buffer, ctx);
+          return;
+        }
+      }
+
+      throw new IllegalStateException(this + ": Failed to find suggestedLeader:" + suggestedLeader.getId());
+    }
+  }
+
   private void forwardStartTransaction(
-      final StreamInfo info, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
-    final List<CompletableFuture<Boolean>> results = info.applyToRemotes(
+      final StreamInfo info, final DataStreamRequestByteBuf request,
+      final ChannelHandlerContext ctx, RaftClientReply reply) {
+    final List<CompletableFuture<DataStreamReply>> results = info.applyToRemotes(
         out -> out.startTransaction(request, ctx, executor));
 
     JavaUtils.allOf(results).thenAccept(v -> {
-      if (!results.stream().map(CompletableFuture::join).reduce(false, Boolean::logicalOr)) {
-        sendReplyNotSuccess(request, ctx);
+      for (CompletableFuture<DataStreamReply> result : results) {
+        if (result.join().isSuccess()) {
+          return;
+        }
       }
+
+      List<RaftClientReply> replies = new ArrayList<>();
+      replies.add(reply);
+
+      for (CompletableFuture<DataStreamReply> result : results) {
+        replies.add(getRaftClientReply(result.join()));
+      }

Review comment:
       Move them to sendLeaderFailedReply.

##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -390,31 +391,82 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
         } else if (request.getType() == Type.STREAM_CLOSE) {
           // if this server is not the leader, forward start transition to the other peers
           // there maybe other unexpected reason cause failure except not leader, forwardStartTransaction anyway
-          forwardStartTransaction(info, request, ctx);
+          forwardStartTransaction(info, request, ctx, reply);
         } else if (request.getType() == Type.START_TRANSACTION){
-          sendReplyNotSuccess(request, ctx);
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplyNotSuccess(request, buffer, ctx);
         } else {
           throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
         }
       }, executor);
     } catch (IOException e) {
-      sendReplyNotSuccess(request, ctx);
+      sendReplyNotSuccess(request, null, ctx);
       return CompletableFuture.completedFuture(null);
     }
   }
 
+  private void sendLeaderFailedReply(
+      final List<RaftClientReply> replies, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    RaftPeer suggestedLeader = null;
+    for (RaftClientReply reply : replies) {
+      if (reply.getNotLeaderException() != null && reply.getNotLeaderException().getSuggestedLeader() != null) {
+        suggestedLeader = reply.getNotLeaderException().getSuggestedLeader();
+        break;
+      }
+    }
+
+    if (suggestedLeader == null) {
+      sendReplyNotSuccess(request, null, ctx);
+    } else {
+      for (RaftClientReply reply : replies) {
+        if (reply.getServerId().equals(suggestedLeader.getId())) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplyNotSuccess(request, buffer, ctx);
+          return;
+        }
+      }
+
+      throw new IllegalStateException(this + ": Failed to find suggestedLeader:" + suggestedLeader.getId());
+    }
+  }

Review comment:
       The suggestedLeader is unreliable since a stale server may suggest an old leader.  We may use getNotLeaderException() to find out the leader reply.
   
   Also, the exceptional results should be removed.
   ```
     private void sendLeaderFailedReply(final List<CompletableFuture<DataStreamReply>> results,
         final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx, RaftClientReply localReply) {
       // get replies from the results, ignored exceptional replies
       final Stream<RaftClientReply> remoteReplies = results.stream()
           .filter(r -> !r.isCompletedExceptionally())
           .map(CompletableFuture::join)
           .map(this::getRaftClientReply);
   
       // choose the leader's reply if there is any.  Otherwise, use the local reply
       final RaftClientReply chosen = Stream.concat(Stream.of(localReply), remoteReplies)
           .filter(reply -> reply.getNotLeaderException() != null)
           .findAny().orElse(localReply);
   
       // send reply
       final ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(chosen).toByteString().asReadOnlyByteBuffer();
       sendReplyNotSuccess(request, buffer, ctx);
     }
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on pull request #270: RATIS-1143. Return exception of submitClientRequestAsync to client

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on pull request #270:
URL: https://github.com/apache/incubator-ratis/pull/270#issuecomment-725771310


   @szetszwo I have updated the patch. Could you help review it again ? Besides,  I think with the commit https://github.com/apache/incubator-ratis/commit/05eaa70128dfa12dcfed459684983267cb079e92, CI is easier to pass, I think the previous failed reason is port conflict, I will continue to find the root cause.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo merged pull request #270: RATIS-1143. Return exception of submitClientRequestAsync to client

Posted by GitBox <gi...@apache.org>.
szetszwo merged pull request #270:
URL: https://github.com/apache/incubator-ratis/pull/270


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #270: RATIS-1143. Return exception of submitClientRequestAsync to client

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on a change in pull request #270:
URL: https://github.com/apache/incubator-ratis/pull/270#discussion_r521740010



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -390,31 +391,82 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
         } else if (request.getType() == Type.STREAM_CLOSE) {
           // if this server is not the leader, forward start transition to the other peers
           // there maybe other unexpected reason cause failure except not leader, forwardStartTransaction anyway
-          forwardStartTransaction(info, request, ctx);
+          forwardStartTransaction(info, request, ctx, reply);
         } else if (request.getType() == Type.START_TRANSACTION){
-          sendReplyNotSuccess(request, ctx);
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplyNotSuccess(request, buffer, ctx);
         } else {
           throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
         }
       }, executor);
     } catch (IOException e) {
-      sendReplyNotSuccess(request, ctx);
+      sendReplyNotSuccess(request, null, ctx);
       return CompletableFuture.completedFuture(null);
     }
   }
 
+  private void sendLeaderFailedReply(
+      final List<RaftClientReply> replies, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    RaftPeer suggestedLeader = null;
+    for (RaftClientReply reply : replies) {
+      if (reply.getNotLeaderException() != null && reply.getNotLeaderException().getSuggestedLeader() != null) {
+        suggestedLeader = reply.getNotLeaderException().getSuggestedLeader();
+        break;
+      }
+    }
+
+    if (suggestedLeader == null) {
+      sendReplyNotSuccess(request, null, ctx);
+    } else {
+      for (RaftClientReply reply : replies) {
+        if (reply.getServerId().equals(suggestedLeader.getId())) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplyNotSuccess(request, buffer, ctx);
+          return;
+        }
+      }
+
+      throw new IllegalStateException(this + ": Failed to find suggestedLeader:" + suggestedLeader.getId());
+    }
+  }

Review comment:
       @szetszwo  Thanks the suggestions. If we choose leader's reply, the condition in `filter `maybe should be `reply -> reply.getNotLeaderException() == null`, rather than `reply -> reply.getNotLeaderException() != null` ?
   
   >     // choose the leader's reply if there is any.  Otherwise, use the local reply
   >     final RaftClientReply chosen = Stream.concat(Stream.of(localReply), remoteReplies)
   >         .filter(reply -> reply.getNotLeaderException() != null)
   >         .findAny().orElse(localReply);




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang edited a comment on pull request #270: RATIS-1143. Return exception of submitClientRequestAsync to client

Posted by GitBox <gi...@apache.org>.
runzhiwang edited a comment on pull request #270:
URL: https://github.com/apache/incubator-ratis/pull/270#issuecomment-725771310






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] runzhiwang commented on pull request #270: RATIS-1143. Return exception of submitClientRequestAsync to client

Posted by GitBox <gi...@apache.org>.
runzhiwang commented on pull request #270:
URL: https://github.com/apache/incubator-ratis/pull/270#issuecomment-725417350


   @szetszwo Could you help review this patch ? Thanks a lot.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [incubator-ratis] szetszwo commented on a change in pull request #270: RATIS-1143. Return exception of submitClientRequestAsync to client

Posted by GitBox <gi...@apache.org>.
szetszwo commented on a change in pull request #270:
URL: https://github.com/apache/incubator-ratis/pull/270#discussion_r521742728



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -390,31 +391,82 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
         } else if (request.getType() == Type.STREAM_CLOSE) {
           // if this server is not the leader, forward start transition to the other peers
           // there maybe other unexpected reason cause failure except not leader, forwardStartTransaction anyway
-          forwardStartTransaction(info, request, ctx);
+          forwardStartTransaction(info, request, ctx, reply);
         } else if (request.getType() == Type.START_TRANSACTION){
-          sendReplyNotSuccess(request, ctx);
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplyNotSuccess(request, buffer, ctx);
         } else {
           throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
         }
       }, executor);
     } catch (IOException e) {
-      sendReplyNotSuccess(request, ctx);
+      sendReplyNotSuccess(request, null, ctx);
       return CompletableFuture.completedFuture(null);
     }
   }
 
+  private void sendLeaderFailedReply(
+      final List<RaftClientReply> replies, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    RaftPeer suggestedLeader = null;
+    for (RaftClientReply reply : replies) {
+      if (reply.getNotLeaderException() != null && reply.getNotLeaderException().getSuggestedLeader() != null) {
+        suggestedLeader = reply.getNotLeaderException().getSuggestedLeader();
+        break;
+      }
+    }
+
+    if (suggestedLeader == null) {
+      sendReplyNotSuccess(request, null, ctx);
+    } else {
+      for (RaftClientReply reply : replies) {
+        if (reply.getServerId().equals(suggestedLeader.getId())) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplyNotSuccess(request, buffer, ctx);
+          return;
+        }
+      }
+
+      throw new IllegalStateException(this + ": Failed to find suggestedLeader:" + suggestedLeader.getId());
+    }
+  }

Review comment:
       You are right.  It should  be `getNotLeaderException() == null`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org