You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/12/31 10:41:11 UTC

[incubator-ratis] branch master updated: RATIS-1277. Fix FileStore write failed because out of order (#389)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 074a495  RATIS-1277. Fix FileStore write failed because out of order (#389)
074a495 is described below

commit 074a495eda03392ad7a02d0ca799f96843fae5f4
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Thu Dec 31 18:41:05 2020 +0800

    RATIS-1277. Fix FileStore write failed because out of order (#389)
---
 .../apache/ratis/netty/client/NettyClientStreamRpc.java   | 10 +++++++---
 .../apache/ratis/netty/server/DataStreamManagement.java   | 15 +++++++++++++--
 2 files changed, 20 insertions(+), 5 deletions(-)

diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 0c369f5..551810f 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -23,6 +23,7 @@ import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
 import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
 import org.apache.ratis.netty.NettyDataStreamUtils;
+import org.apache.ratis.protocol.ClientInvocationId;
 import org.apache.ratis.protocol.DataStreamReply;
 import org.apache.ratis.protocol.DataStreamRequest;
 import org.apache.ratis.protocol.RaftPeer;
@@ -54,7 +55,8 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
   private final String name;
   private final EventLoopGroup workerGroup = new NioEventLoopGroup();
   private final Supplier<Channel> channel;
-  private final ConcurrentMap<Long, Queue<CompletableFuture<DataStreamReply>>> replies = new ConcurrentHashMap<>();
+  private final ConcurrentMap<ClientInvocationId, Queue<CompletableFuture<DataStreamReply>>> replies =
+      new ConcurrentHashMap<>();
 
   public NettyClientStreamRpc(RaftPeer server, RaftProperties properties){
     this.name = JavaUtils.getClassSimpleName(getClass()) + "->" + server;
@@ -82,7 +84,8 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
         }
         final DataStreamReply reply = (DataStreamReply) msg;
         LOG.debug("{}: read {}", this, reply);
-        Optional.ofNullable(replies.get(reply.getStreamId()))
+        ClientInvocationId clientInvocationId = ClientInvocationId.valueOf(reply.getClientId(), reply.getStreamId());
+        Optional.ofNullable(replies.get(clientInvocationId))
             .map(Queue::poll)
             .ifPresent(f -> f.complete(reply));
       }
@@ -136,8 +139,9 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
   @Override
   public CompletableFuture<DataStreamReply> streamAsync(DataStreamRequest request) {
     final CompletableFuture<DataStreamReply> f = new CompletableFuture<>();
+    ClientInvocationId clientInvocationId = ClientInvocationId.valueOf(request.getClientId(), request.getStreamId());
     final Queue<CompletableFuture<DataStreamReply>> q = replies.computeIfAbsent(
-        request.getStreamId(), key -> new ConcurrentLinkedQueue<>());
+       clientInvocationId, key -> new ConcurrentLinkedQueue<>());
     if (!q.offer(f)) {
       f.completeExceptionally(new IllegalStateException(this + ": Failed to offer a future for " + request));
       return f;
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 43a6096..d30166a 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -48,6 +48,7 @@ import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
 import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.function.CheckedBiFunction;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -284,7 +285,7 @@ public class DataStreamManagement {
 
   static void sendReply(List<CompletableFuture<DataStreamReply>> remoteWrites,
       DataStreamRequestByteBuf request, long bytesWritten, ChannelHandlerContext ctx) {
-    final boolean success = checkSuccessRemoteWrite(remoteWrites, bytesWritten);
+    final boolean success = checkSuccessRemoteWrite(remoteWrites, bytesWritten, request);
     final DataStreamReplyByteBuffer.Builder builder = DataStreamReplyByteBuffer.newBuilder()
         .setDataStreamPacket(request)
         .setSuccess(success);
@@ -398,9 +399,19 @@ public class DataStreamManagement {
     });
   }
 
-  static boolean checkSuccessRemoteWrite(List<CompletableFuture<DataStreamReply>> replyFutures, long bytesWritten) {
+  static void assertReplyCorrespondingToRequest(
+      final DataStreamRequestByteBuf request, final DataStreamReply reply) {
+    Preconditions.assertTrue(request.getClientId().equals(reply.getClientId()));
+    Preconditions.assertTrue(request.getType() == reply.getType());
+    Preconditions.assertTrue(request.getStreamId() == reply.getStreamId());
+    Preconditions.assertTrue(request.getStreamOffset() == reply.getStreamOffset());
+  }
+
+  static boolean checkSuccessRemoteWrite(List<CompletableFuture<DataStreamReply>> replyFutures, long bytesWritten,
+      final DataStreamRequestByteBuf request) {
     for (CompletableFuture<DataStreamReply> replyFuture : replyFutures) {
       final DataStreamReply reply = replyFuture.join();
+      assertReplyCorrespondingToRequest(request, reply);
       if (!reply.isSuccess() || reply.getBytesWritten() != bytesWritten) {
         return false;
       }