You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@ratis.apache.org by GitBox <gi...@apache.org> on 2020/11/12 00:43:26 UTC

[GitHub] [incubator-ratis] runzhiwang commented on a change in pull request #270: RATIS-1143. Return exception of submitClientRequestAsync to client

runzhiwang commented on a change in pull request #270:
URL: https://github.com/apache/incubator-ratis/pull/270#discussion_r521740010



##########
File path: ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyServerStreamRpc.java
##########
@@ -390,31 +391,82 @@ public void channelRead(ChannelHandlerContext ctx, Object msg) {
         } else if (request.getType() == Type.STREAM_CLOSE) {
           // if this server is not the leader, forward start transition to the other peers
           // there maybe other unexpected reason cause failure except not leader, forwardStartTransaction anyway
-          forwardStartTransaction(info, request, ctx);
+          forwardStartTransaction(info, request, ctx, reply);
         } else if (request.getType() == Type.START_TRANSACTION){
-          sendReplyNotSuccess(request, ctx);
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplyNotSuccess(request, buffer, ctx);
         } else {
           throw new IllegalStateException(this + ": Unexpected type " + request.getType() + ", request=" + request);
         }
       }, executor);
     } catch (IOException e) {
-      sendReplyNotSuccess(request, ctx);
+      sendReplyNotSuccess(request, null, ctx);
       return CompletableFuture.completedFuture(null);
     }
   }
 
+  private void sendLeaderFailedReply(
+      final List<RaftClientReply> replies, final DataStreamRequestByteBuf request, final ChannelHandlerContext ctx) {
+    RaftPeer suggestedLeader = null;
+    for (RaftClientReply reply : replies) {
+      if (reply.getNotLeaderException() != null && reply.getNotLeaderException().getSuggestedLeader() != null) {
+        suggestedLeader = reply.getNotLeaderException().getSuggestedLeader();
+        break;
+      }
+    }
+
+    if (suggestedLeader == null) {
+      sendReplyNotSuccess(request, null, ctx);
+    } else {
+      for (RaftClientReply reply : replies) {
+        if (reply.getServerId().equals(suggestedLeader.getId())) {
+          ByteBuffer buffer = ClientProtoUtils.toRaftClientReplyProto(reply).toByteString().asReadOnlyByteBuffer();
+          sendReplyNotSuccess(request, buffer, ctx);
+          return;
+        }
+      }
+
+      throw new IllegalStateException(this + ": Failed to find suggestedLeader:" + suggestedLeader.getId());
+    }
+  }

Review comment:
       @szetszwo  Thanks the suggestions. If we choose leader's reply, the condition in `filter `maybe should be `reply -> reply.getNotLeaderException() == null`, rather than `reply -> reply.getNotLeaderException() != null` ?
   
   >     // choose the leader's reply if there is any.  Otherwise, use the local reply
   >     final RaftClientReply chosen = Stream.concat(Stream.of(localReply), remoteReplies)
   >         .filter(reply -> reply.getNotLeaderException() != null)
   >         .findAny().orElse(localReply);




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org