You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2020/12/31 10:41:11 UTC
[incubator-ratis] branch master updated: RATIS-1277. Fix FileStore
write failed because out of order (#389)
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 074a495 RATIS-1277. Fix FileStore write failed because out of order (#389)
074a495 is described below
commit 074a495eda03392ad7a02d0ca799f96843fae5f4
Author: runzhiwang <51...@users.noreply.github.com>
AuthorDate: Thu Dec 31 18:41:05 2020 +0800
RATIS-1277. Fix FileStore write failed because out of order (#389)
---
.../apache/ratis/netty/client/NettyClientStreamRpc.java | 10 +++++++---
.../apache/ratis/netty/server/DataStreamManagement.java | 15 +++++++++++++--
2 files changed, 20 insertions(+), 5 deletions(-)
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
index 0c369f5..551810f 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/client/NettyClientStreamRpc.java
@@ -23,6 +23,7 @@ import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.datastream.impl.DataStreamRequestByteBuffer;
import org.apache.ratis.datastream.impl.DataStreamRequestFilePositionCount;
import org.apache.ratis.netty.NettyDataStreamUtils;
+import org.apache.ratis.protocol.ClientInvocationId;
import org.apache.ratis.protocol.DataStreamReply;
import org.apache.ratis.protocol.DataStreamRequest;
import org.apache.ratis.protocol.RaftPeer;
@@ -54,7 +55,8 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
private final String name;
private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private final Supplier<Channel> channel;
- private final ConcurrentMap<Long, Queue<CompletableFuture<DataStreamReply>>> replies = new ConcurrentHashMap<>();
+ private final ConcurrentMap<ClientInvocationId, Queue<CompletableFuture<DataStreamReply>>> replies =
+ new ConcurrentHashMap<>();
public NettyClientStreamRpc(RaftPeer server, RaftProperties properties){
this.name = JavaUtils.getClassSimpleName(getClass()) + "->" + server;
@@ -82,7 +84,8 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
}
final DataStreamReply reply = (DataStreamReply) msg;
LOG.debug("{}: read {}", this, reply);
- Optional.ofNullable(replies.get(reply.getStreamId()))
+ ClientInvocationId clientInvocationId = ClientInvocationId.valueOf(reply.getClientId(), reply.getStreamId());
+ Optional.ofNullable(replies.get(clientInvocationId))
.map(Queue::poll)
.ifPresent(f -> f.complete(reply));
}
@@ -136,8 +139,9 @@ public class NettyClientStreamRpc implements DataStreamClientRpc {
@Override
public CompletableFuture<DataStreamReply> streamAsync(DataStreamRequest request) {
final CompletableFuture<DataStreamReply> f = new CompletableFuture<>();
+ ClientInvocationId clientInvocationId = ClientInvocationId.valueOf(request.getClientId(), request.getStreamId());
final Queue<CompletableFuture<DataStreamReply>> q = replies.computeIfAbsent(
- request.getStreamId(), key -> new ConcurrentLinkedQueue<>());
+ clientInvocationId, key -> new ConcurrentLinkedQueue<>());
if (!q.offer(f)) {
f.completeExceptionally(new IllegalStateException(this + ": Failed to offer a future for " + request));
return f;
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
index 43a6096..d30166a 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/DataStreamManagement.java
@@ -48,6 +48,7 @@ import org.apache.ratis.thirdparty.io.netty.buffer.ByteBuf;
import org.apache.ratis.thirdparty.io.netty.channel.ChannelHandlerContext;
import org.apache.ratis.util.JavaUtils;
import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.Preconditions;
import org.apache.ratis.util.function.CheckedBiFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -284,7 +285,7 @@ public class DataStreamManagement {
static void sendReply(List<CompletableFuture<DataStreamReply>> remoteWrites,
DataStreamRequestByteBuf request, long bytesWritten, ChannelHandlerContext ctx) {
- final boolean success = checkSuccessRemoteWrite(remoteWrites, bytesWritten);
+ final boolean success = checkSuccessRemoteWrite(remoteWrites, bytesWritten, request);
final DataStreamReplyByteBuffer.Builder builder = DataStreamReplyByteBuffer.newBuilder()
.setDataStreamPacket(request)
.setSuccess(success);
@@ -398,9 +399,19 @@ public class DataStreamManagement {
});
}
- static boolean checkSuccessRemoteWrite(List<CompletableFuture<DataStreamReply>> replyFutures, long bytesWritten) {
+ static void assertReplyCorrespondingToRequest(
+ final DataStreamRequestByteBuf request, final DataStreamReply reply) {
+ Preconditions.assertTrue(request.getClientId().equals(reply.getClientId()));
+ Preconditions.assertTrue(request.getType() == reply.getType());
+ Preconditions.assertTrue(request.getStreamId() == reply.getStreamId());
+ Preconditions.assertTrue(request.getStreamOffset() == reply.getStreamOffset());
+ }
+
+ static boolean checkSuccessRemoteWrite(List<CompletableFuture<DataStreamReply>> replyFutures, long bytesWritten,
+ final DataStreamRequestByteBuf request) {
for (CompletableFuture<DataStreamReply> replyFuture : replyFutures) {
final DataStreamReply reply = replyFuture.join();
+ assertReplyCorrespondingToRequest(request, reply);
if (!reply.isSuccess() || reply.getBytesWritten() != bytesWritten) {
return false;
}