You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2019/03/14 19:06:44 UTC
[incubator-ratis] branch master updated: RATIS-459. Async requests
may become out-of-order in some rare case.
This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git
The following commit(s) were added to refs/heads/master by this push:
new 03c5fd8 RATIS-459. Async requests may become out-of-order in some rare case.
03c5fd8 is described below
commit 03c5fd82044612d71c021c0c8a7a15999d28f170
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Mar 14 12:05:15 2019 -0700
RATIS-459. Async requests may become out-of-order in some rare case.
---
.../apache/ratis/client/impl/ClientProtoUtils.java | 26 +++++++------
.../apache/ratis/client/impl/RaftClientImpl.java | 45 ++++++++++++++--------
.../apache/ratis/client/impl/UnorderedAsync.java | 18 ++++++++-
.../apache/ratis/protocol/GroupInfoRequest.java | 7 ++--
.../apache/ratis/protocol/GroupListRequest.java | 7 ++--
.../ratis/protocol/GroupManagementRequest.java | 4 +-
.../apache/ratis/protocol/RaftClientRequest.java | 20 +++++-----
.../ratis/protocol/SetConfigurationRequest.java | 4 +-
.../java/org/apache/ratis/util/ProtoUtils.java | 9 +++++
.../java/org/apache/ratis/util/SlidingWindow.java | 20 +++++++---
.../grpc/client/GrpcClientProtocolService.java | 9 ++++-
ratis-proto/src/main/proto/Raft.proto | 7 +++-
.../ratis/server/impl/RaftServerConstants.java | 3 +-
.../apache/ratis/server/impl/ServerProtoUtils.java | 3 +-
.../java/org/apache/ratis/MiniRaftCluster.java | 8 ++--
.../java/org/apache/ratis/RetryCacheTests.java | 8 ++--
.../impl/RaftStateMachineExceptionTests.java | 10 ++---
17 files changed, 127 insertions(+), 81 deletions(-)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 4378443..4cbebb0 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -45,19 +45,24 @@ public interface ClientProtoUtils {
}
static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
- ByteString requesterId, ByteString replyId, RaftGroupId groupId, long callId, long seqNum) {
+ ByteString requesterId, ByteString replyId, RaftGroupId groupId, long callId,
+ SlidingWindowEntry slidingWindowEntry) {
+ if (slidingWindowEntry == null) {
+ slidingWindowEntry = SlidingWindowEntry.getDefaultInstance();
+ }
return RaftRpcRequestProto.newBuilder()
.setRequestorId(requesterId)
.setReplyId(replyId)
.setRaftGroupId(ProtoUtils.toRaftGroupIdProtoBuilder(groupId))
.setCallId(callId)
- .setSeqNum(seqNum);
+ .setSlidingWindowEntry(slidingWindowEntry);
}
static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
- ClientId requesterId, RaftPeerId replyId, RaftGroupId groupId, long callId, long seqNum) {
+ ClientId requesterId, RaftPeerId replyId, RaftGroupId groupId, long callId,
+ SlidingWindowEntry slidingWindowEntry) {
return toRaftRpcRequestProtoBuilder(
- requesterId.toByteString(), replyId.toByteString(), groupId, callId, seqNum);
+ requesterId.toByteString(), replyId.toByteString(), groupId, callId, slidingWindowEntry);
}
static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
@@ -67,7 +72,7 @@ public interface ClientProtoUtils {
request.getServerId(),
request.getRaftGroupId(),
request.getCallId(),
- request.getSeqNum());
+ request.getSlidingWindowEntry());
}
static RaftClientRequest.Type toRaftClientRequestType(RaftClientRequestProto p) {
@@ -94,9 +99,9 @@ public interface ClientProtoUtils {
RaftPeerId.valueOf(request.getReplyId()),
ProtoUtils.toRaftGroupId(request.getRaftGroupId()),
request.getCallId(),
- request.getSeqNum(),
toMessage(p.getMessage()),
- type);
+ type,
+ request.getSlidingWindowEntry());
}
static RaftClientRequestProto toRaftClientRequestProto(
@@ -134,7 +139,7 @@ public interface ClientProtoUtils {
long seqNum, ByteString content) {
return RaftClientRequestProto.newBuilder()
.setRpcRequest(toRaftRpcRequestProtoBuilder(
- clientId, serverId, groupId, callId, seqNum))
+ clientId, serverId, groupId, callId, ProtoUtils.toSlidingWindowEntry(seqNum, false)))
.setWrite(WriteRequestTypeProto.getDefaultInstance())
.setMessage(toClientMessageEntryProtoBuilder(content))
.build();
@@ -401,7 +406,7 @@ public interface ClientProtoUtils {
static String toString(RaftClientRequestProto proto) {
final RaftRpcRequestProto rpc = proto.getRpcRequest();
return ClientId.valueOf(rpc.getRequestorId()) + "->" + rpc.getReplyId().toStringUtf8()
- + "#" + rpc.getCallId() + "-" + rpc.getSeqNum();
+ + "#" + rpc.getCallId() + "-" + ProtoUtils.toString(rpc.getSlidingWindowEntry());
}
static String toString(RaftClientReplyProto proto) {
@@ -409,5 +414,4 @@ public interface ClientProtoUtils {
return ClientId.valueOf(rpc.getRequestorId()) + "<-" + rpc.getReplyId().toStringUtf8()
+ "#" + rpc.getCallId();
}
-
}
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index b49fbbf..c489dff 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -22,6 +22,7 @@ import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
import org.apache.ratis.protocol.*;
import org.apache.ratis.retry.RetryPolicy;
import org.apache.ratis.util.*;
@@ -49,18 +50,15 @@ final class RaftClientImpl implements RaftClient {
return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
}
- static class PendingClientRequest {
- private final Supplier<RaftClientRequest> requestConstructor;
+ abstract static class PendingClientRequest {
private final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
private final AtomicInteger attemptCount = new AtomicInteger();
- PendingClientRequest(Supplier<RaftClientRequest> requestConstructor) {
- this.requestConstructor = requestConstructor;
- }
+ abstract RaftClientRequest newRequestImpl();
- RaftClientRequest newRequest() {
+ final RaftClientRequest newRequest() {
attemptCount.incrementAndGet();
- return requestConstructor.get();
+ return newRequestImpl();
}
CompletableFuture<RaftClientReply> getReplyFuture() {
@@ -72,12 +70,25 @@ final class RaftClientImpl implements RaftClient {
}
}
- static class PendingAsyncRequest extends PendingClientRequest implements SlidingWindow.Request<RaftClientReply> {
+ static class PendingAsyncRequest extends PendingClientRequest
+ implements SlidingWindow.ClientSideRequest<RaftClientReply> {
+ private final Function<SlidingWindowEntry, RaftClientRequest> requestConstructor;
private final long seqNum;
+ private volatile boolean isFirst = false;
- PendingAsyncRequest(long seqNum, LongFunction<RaftClientRequest> requestConstructor) {
- super(() -> requestConstructor.apply(seqNum));
+ PendingAsyncRequest(long seqNum, Function<SlidingWindowEntry, RaftClientRequest> requestConstructor) {
this.seqNum = seqNum;
+ this.requestConstructor = requestConstructor;
+ }
+
+ @Override
+ RaftClientRequest newRequestImpl() {
+ return requestConstructor.apply(ProtoUtils.toSlidingWindowEntry(seqNum, isFirst));
+ }
+
+ @Override
+ public void setFirstRequest() {
+ isFirst = true;
}
@Override
@@ -192,7 +203,7 @@ final class RaftClientImpl implements RaftClient {
final long callId = nextCallId();
final LongFunction<PendingAsyncRequest> constructor = seqNum -> new PendingAsyncRequest(seqNum,
- seq -> newRaftClientRequest(server, callId, seq, message, type));
+ slidingWindowEntry -> newRaftClientRequest(server, callId, message, type, slidingWindowEntry));
return getSlidingWindow(server).submitNewRequest(constructor, this::sendRequestWithRetryAsync
).getReplyFuture(
).thenApply(reply -> handleStateMachineException(reply, CompletionException::new)
@@ -200,9 +211,10 @@ final class RaftClientImpl implements RaftClient {
}
RaftClientRequest newRaftClientRequest(
- RaftPeerId server, long callId, long seq, Message message, RaftClientRequest.Type type) {
+ RaftPeerId server, long callId, Message message, RaftClientRequest.Type type,
+ SlidingWindowEntry slidingWindowEntry) {
return new RaftClientRequest(clientId, server != null? server: leaderId, groupId,
- callId, seq, message, type);
+ callId, message, type, slidingWindowEntry);
}
@Override
@@ -233,8 +245,7 @@ final class RaftClientImpl implements RaftClient {
}
final long callId = nextCallId();
- return sendRequestWithRetry(() -> newRaftClientRequest(
- server, callId, 0L, message, type));
+ return sendRequestWithRetry(() -> newRaftClientRequest(server, callId, message, type, null));
}
@Override
@@ -342,7 +353,7 @@ final class RaftClientImpl implements RaftClient {
reply = handleNotLeaderException(request, reply, true);
if (reply != null) {
getSlidingWindow(request).receiveReply(
- request.getSeqNum(), reply, this::sendRequestWithRetryAsync);
+ request.getSlidingWindowEntry().getSeqNum(), reply, this::sendRequestWithRetryAsync);
} else if (!retryPolicy.shouldRetry(attemptCount)) {
handleAsyncRetryFailure(request, attemptCount);
}
@@ -374,7 +385,7 @@ final class RaftClientImpl implements RaftClient {
private void handleAsyncRetryFailure(RaftClientRequest request, int attemptCount) {
final RaftRetryFailureException rfe = newRaftRetryFailureException(request, attemptCount, retryPolicy);
- getSlidingWindow(request).fail(request.getSeqNum(), rfe);
+ getSlidingWindow(request).fail(request.getSlidingWindowEntry().getSeqNum(), rfe);
}
private RaftClientReply sendRequest(RaftClientRequest request)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
index d4ab14d..73e485c 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -31,15 +31,29 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
+import java.util.function.Supplier;
/** Send unordered asynchronous requests to a raft service. */
public interface UnorderedAsync {
Logger LOG = LoggerFactory.getLogger(UnorderedAsync.class);
+ class PendingUnorderedRequest extends PendingClientRequest {
+ private final Supplier<RaftClientRequest> requestConstructor;
+
+ PendingUnorderedRequest(Supplier<RaftClientRequest> requestConstructor) {
+ this.requestConstructor = requestConstructor;
+ }
+
+ @Override
+ RaftClientRequest newRequestImpl() {
+ return requestConstructor.get();
+ }
+ }
+
static CompletableFuture<RaftClientReply> send(RaftClientRequest.Type type, RaftClientImpl client) {
final long callId = RaftClientImpl.nextCallId();
- final PendingClientRequest pending = new PendingClientRequest(
- () -> client.newRaftClientRequest(null, callId, -1L, null, type));
+ final PendingClientRequest pending = new PendingUnorderedRequest(
+ () -> client.newRaftClientRequest(null, callId, null, type, null));
sendRequestWithRetry(pending, client);
return pending.getReplyFuture()
.thenApply(reply -> RaftClientImpl.handleStateMachineException(reply, CompletionException::new));
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoRequest.java
index c9a4469..567c2be 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoRequest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -22,8 +22,7 @@ package org.apache.ratis.protocol;
* the server itself.
*/
public class GroupInfoRequest extends RaftClientRequest {
- public GroupInfoRequest(ClientId clientId, RaftPeerId serverId,
- RaftGroupId groupId, long callId) {
- super(clientId, serverId, groupId, callId, 0L, null, RaftClientRequest.readRequestType());
+ public GroupInfoRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId) {
+ super(clientId, serverId, groupId, callId, readRequestType());
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListRequest.java
index d661f52..af38b6a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListRequest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -22,8 +22,7 @@ package org.apache.ratis.protocol;
* the server itself.
*/
public class GroupListRequest extends RaftClientRequest {
- public GroupListRequest(ClientId clientId, RaftPeerId serverId,
- RaftGroupId groupId, long callId) {
- super(clientId, serverId, groupId, callId, 0L, null, RaftClientRequest.readRequestType());
+ public GroupListRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId) {
+ super(clientId, serverId, groupId, callId, readRequestType());
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
index cbaae3b..9972fdb 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -80,7 +80,7 @@ public class GroupManagementRequest extends RaftClientRequest {
private final Op op;
private GroupManagementRequest(ClientId clientId, RaftPeerId serverId, long callId, Op op) {
- super(clientId, serverId, op.getGroupId(), callId);
+ super(clientId, serverId, op.getGroupId(), callId, writeRequestType());
this.op = op;
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 08397c3..a007e83 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -19,6 +19,7 @@ package org.apache.ratis.protocol;
import org.apache.ratis.proto.RaftProtos.*;
import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ProtoUtils;
import java.util.Objects;
@@ -152,24 +153,23 @@ public class RaftClientRequest extends RaftClientMessage {
}
private final long callId;
- private final long seqNum;
-
private final Message message;
private final Type type;
- public RaftClientRequest(ClientId clientId, RaftPeerId serverId,
- RaftGroupId groupId, long callId) {
- this(clientId, serverId, groupId, callId, 0L, null, WRITE_DEFAULT);
+ private final SlidingWindowEntry slidingWindowEntry;
+
+ RaftClientRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId, Type type) {
+ this(clientId, serverId, groupId, callId, null, type, null);
}
public RaftClientRequest(
ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
- long callId, long seqNum, Message message, Type type) {
+ long callId, Message message, Type type, SlidingWindowEntry slidingWindowEntry) {
super(clientId, serverId, groupId);
this.callId = callId;
- this.seqNum = seqNum;
this.message = message;
this.type = type;
+ this.slidingWindowEntry = slidingWindowEntry != null? slidingWindowEntry: SlidingWindowEntry.getDefaultInstance();
}
@Override
@@ -181,8 +181,8 @@ public class RaftClientRequest extends RaftClientMessage {
return callId;
}
- public long getSeqNum() {
- return seqNum;
+ public SlidingWindowEntry getSlidingWindowEntry() {
+ return slidingWindowEntry;
}
public Message getMessage() {
@@ -199,7 +199,7 @@ public class RaftClientRequest extends RaftClientMessage {
@Override
public String toString() {
- return super.toString() + ", cid=" + callId + ", seq=" + seqNum + " "
+ return super.toString() + ", cid=" + callId + ", seq=" + ProtoUtils.toString(slidingWindowEntry)
+ type + ", " + getMessage();
}
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
index e25da2a..1e45046 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -24,7 +24,7 @@ public class SetConfigurationRequest extends RaftClientRequest {
public SetConfigurationRequest(ClientId clientId, RaftPeerId serverId,
RaftGroupId groupId, long callId, RaftPeer[] peers) {
- super(clientId, serverId, groupId, callId);
+ super(clientId, serverId, groupId, callId, writeRequestType());
this.peers = peers;
}
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index b1bfdbe..08a4562 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -24,6 +24,7 @@ import org.apache.ratis.proto.RaftProtos.RaftPeerProto;
import org.apache.ratis.proto.RaftProtos.RaftRpcReplyProto;
import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
+import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.protocol.RaftPeer;
@@ -137,6 +138,14 @@ public interface ProtoUtils {
return protos.stream().map(ProtoUtils::toString).collect(Collectors.toList()).toString();
}
+ static SlidingWindowEntry toSlidingWindowEntry(long seqNum, boolean isFirst) {
+ return SlidingWindowEntry.newBuilder().setSeqNum(seqNum).setIsFirst(isFirst).build();
+ }
+
+ static String toString(SlidingWindowEntry proto) {
+ return proto.getSeqNum() + (proto.getIsFirst()? "*": "");
+ }
+
static IOException toIOException(ServiceException se) {
final Throwable t = se.getCause();
if (t == null) {
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
index a616f07..d0a1a52 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
@@ -43,9 +43,16 @@ public interface SlidingWindow {
void setReply(REPLY reply);
boolean hasReply();
+ }
- default void fail(Exception e) {
- }
+ interface ClientSideRequest<REPLY> extends Request<REPLY> {
+ void setFirstRequest();
+
+ void fail(Exception e);
+ }
+
+ interface ServerSideRequest<REPLY> extends Request<REPLY> {
+ boolean isFirstRequest();
}
/** A seqNum-to-request map, sorted by seqNum. */
@@ -161,14 +168,14 @@ public interface SlidingWindow {
* Depend on the replies/exceptions, the client may retry the requests
* to the same or a different server.
*/
- class Client<REQUEST extends Request<REPLY>, REPLY> {
+ class Client<REQUEST extends ClientSideRequest<REPLY>, REPLY> {
/** The requests in the sliding window. */
private final RequestMap<REQUEST, REPLY> requests;
/** Delayed requests. */
private final SortedMap<Long, Long> delayedRequests = new TreeMap<>();
/** The seqNum for the next new request. */
- private long nextSeqNum = 0;
+ private long nextSeqNum = 1;
/** The seqNum of the first request. */
private long firstSeqNum = -1;
/** Is the first request replied? */
@@ -240,6 +247,7 @@ public interface SlidingWindow {
// first request is not yet submitted and this is the first request, submit it.
LOG.debug("{}: detect firstSubmitted {} in {}", requests.getName(), request, this);
firstSeqNum = seqNum;
+ request.setFirstRequest();
sendMethod.accept(request);
return true;
}
@@ -351,7 +359,7 @@ public interface SlidingWindow {
* (3) receive replies from the processing unit;
* (4) send replies to the client.
*/
- class Server<REQUEST extends Request<REPLY>, REPLY> implements Closeable {
+ class Server<REQUEST extends ServerSideRequest<REPLY>, REPLY> implements Closeable {
/** The requests in the sliding window. */
private final RequestMap<REQUEST, REPLY> requests;
/** The end of requests */
@@ -373,7 +381,7 @@ public interface SlidingWindow {
/** A request (or a retry) arrives (may be out-of-order except for the first request). */
public synchronized void receivedRequest(REQUEST request, Consumer<REQUEST> processingMethod) {
final long seqNum = request.getSeqNum();
- if (nextToProcess == -1) {
+ if (nextToProcess == -1 && (request.isFirstRequest() || seqNum == 0)) {
nextToProcess = seqNum;
LOG.debug("{}: got seq={} (first request), set nextToProcess in {}", requests.getName(), seqNum, this);
} else {
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
index db082a2..2883795 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
@@ -44,7 +44,7 @@ import java.util.function.Supplier;
public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase {
public static final Logger LOG = LoggerFactory.getLogger(GrpcClientProtocolService.class);
- private static class PendingAppend implements SlidingWindow.Request<RaftClientReply> {
+ private static class PendingAppend implements SlidingWindow.ServerSideRequest<RaftClientReply> {
private final RaftClientRequest request;
private volatile RaftClientReply reply;
@@ -72,7 +72,12 @@ public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase
@Override
public long getSeqNum() {
- return request != null? request.getSeqNum(): Long.MAX_VALUE;
+ return request != null? request.getSlidingWindowEntry().getSeqNum(): Long.MAX_VALUE;
+ }
+
+ @Override
+ public boolean isFirstRequest() {
+ return request != null && request.getSlidingWindowEntry().getIsFirst();
}
@Override
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index 103c478..b081352 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -94,7 +94,12 @@ message RaftRpcRequestProto {
RaftGroupIdProto raftGroupId = 3;
uint64 callId = 4;
- uint64 seqNum = 15;
+ SlidingWindowEntry slidingWindowEntry = 15;
+}
+
+message SlidingWindowEntry {
+ uint64 seqNum = 1; // 0 for non-sliding-window requests; >= 1 for sliding-window requests
+ bool isFirst = 2; // Is this the first request of the sliding window?
}
message RaftRpcReplyProto {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
index a2b7057..f70f98a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -20,7 +20,6 @@ package org.apache.ratis.server.impl;
public interface RaftServerConstants {
long INVALID_LOG_INDEX = -1;
long DEFAULT_CALLID = 0;
- long DEFAULT_SEQNUM = 0L;
enum StartupOption {
FORMAT("format"),
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 29dee9b..62c21e9 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -37,7 +37,6 @@ import java.util.Optional;
import java.util.stream.Collectors;
import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
-import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
/** Server proto utilities for internal use. */
public interface ServerProtoUtils {
@@ -261,7 +260,7 @@ public interface ServerProtoUtils {
static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId) {
return ClientProtoUtils.toRaftRpcRequestProtoBuilder(
- requestorId.toByteString(), replyId.toByteString(), groupId, DEFAULT_CALLID, DEFAULT_SEQNUM);
+ requestorId.toByteString(), replyId.toByteString(), groupId, DEFAULT_CALLID, null);
}
static RequestVoteRequestProto toRequestVoteRequestProto(
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 54f93c7..eef9d1b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -79,7 +79,6 @@ import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
-import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
public abstract class MiniRaftCluster implements Closeable {
public static final Logger LOG = LoggerFactory.getLogger(MiniRaftCluster.class);
@@ -679,14 +678,13 @@ public abstract class MiniRaftCluster implements Closeable {
public RaftClientRequest newRaftClientRequest(
ClientId clientId, RaftPeerId leaderId, Message message) {
- return newRaftClientRequest(clientId, leaderId,
- DEFAULT_CALLID, DEFAULT_SEQNUM, message);
+ return newRaftClientRequest(clientId, leaderId, DEFAULT_CALLID, message);
}
public RaftClientRequest newRaftClientRequest(
- ClientId clientId, RaftPeerId leaderId, long callId, long seqNum, Message message) {
+ ClientId clientId, RaftPeerId leaderId, long callId, Message message) {
return new RaftClientRequest(clientId, leaderId, getGroupId(),
- callId, seqNum, message, RaftClientRequest.writeRequestType());
+ callId, message, RaftClientRequest.writeRequestType(), null);
}
public SetConfigurationRequest newSetConfigurationRequest(
diff --git a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
index 2aa1d85..60f62cd 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
@@ -68,9 +68,8 @@ public abstract class RetryCacheTests<CLUSTER extends MiniRaftCluster>
final RaftClient client = cluster.createClient(leaderId);
final RaftClientRpc rpc = client.getClientRpc();
final long callId = 999;
- final long seqNum = 111;
RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId,
- callId, seqNum, new SimpleMessage("message"));
+ callId, new SimpleMessage("message"));
assertReply(rpc.sendRequest(r), client, callId);
// retry with the same callId
@@ -130,9 +129,8 @@ public abstract class RetryCacheTests<CLUSTER extends MiniRaftCluster>
final RaftClient client = cluster.createClient(leaderId);
RaftClientRpc rpc = client.getClientRpc();
final long callId = 999;
- final long seqNum = 111;
RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId,
- callId, seqNum, new SimpleMessage("message"));
+ callId, new SimpleMessage("message"));
assertReply(rpc.sendRequest(r), client, callId);
long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex();
@@ -151,7 +149,7 @@ public abstract class RetryCacheTests<CLUSTER extends MiniRaftCluster>
Assert.assertNotEquals(leaderId, newLeaderId);
// same clientId and callId in the request
r = cluster.newRaftClientRequest(client.getId(), newLeaderId,
- callId, seqNum, new SimpleMessage("message"));
+ callId, new SimpleMessage("message"));
rpc.addServers(Arrays.asList(change.newPeers));
for (int i = 0; i < 10; i++) {
try {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
index cf3a490..8fd60b7 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
@@ -1,4 +1,4 @@
-/**
+/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -107,9 +107,8 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
final RaftClient client = cluster.createClient(leaderId);
final RaftClientRpc rpc = client.getClientRpc();
final long callId = 999;
- final long seqNum = 111;
final SimpleMessage message = new SimpleMessage("message");
- final RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId, callId, seqNum, message);
+ final RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId, callId, message);
RaftClientReply reply = rpc.sendRequest(r);
Assert.assertFalse(reply.isSuccess());
Assert.assertNotNull(reply.getStateMachineException());
@@ -156,9 +155,8 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
final RaftClient client = cluster.createClient(leaderId);
final RaftClientRpc rpc = client.getClientRpc();
final long callId = 999;
- final long seqNum = 111;
RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId,
- callId, seqNum, new RaftTestUtil.SimpleMessage("message"));
+ callId, new RaftTestUtil.SimpleMessage("message"));
RaftClientReply reply = rpc.sendRequest(r);
Objects.requireNonNull(reply.getStateMachineException());
@@ -174,7 +172,7 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
leaderId = leader.getId();
// retry
r = cluster.newRaftClientRequest(client.getId(), leaderId,
- callId, seqNum, new RaftTestUtil.SimpleMessage("message"));
+ callId, new RaftTestUtil.SimpleMessage("message"));
reply = rpc.sendRequest(r);
Objects.requireNonNull(reply.getStateMachineException());