You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2019/03/14 19:06:44 UTC

[incubator-ratis] branch master updated: RATIS-459. Async requests may become out-of-order in some rare case.

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 03c5fd8  RATIS-459. Async requests may become out-of-order in some rare case.
03c5fd8 is described below

commit 03c5fd82044612d71c021c0c8a7a15999d28f170
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Thu Mar 14 12:05:15 2019 -0700

    RATIS-459. Async requests may become out-of-order in some rare case.
---
 .../apache/ratis/client/impl/ClientProtoUtils.java | 26 +++++++------
 .../apache/ratis/client/impl/RaftClientImpl.java   | 45 ++++++++++++++--------
 .../apache/ratis/client/impl/UnorderedAsync.java   | 18 ++++++++-
 .../apache/ratis/protocol/GroupInfoRequest.java    |  7 ++--
 .../apache/ratis/protocol/GroupListRequest.java    |  7 ++--
 .../ratis/protocol/GroupManagementRequest.java     |  4 +-
 .../apache/ratis/protocol/RaftClientRequest.java   | 20 +++++-----
 .../ratis/protocol/SetConfigurationRequest.java    |  4 +-
 .../java/org/apache/ratis/util/ProtoUtils.java     |  9 +++++
 .../java/org/apache/ratis/util/SlidingWindow.java  | 20 +++++++---
 .../grpc/client/GrpcClientProtocolService.java     |  9 ++++-
 ratis-proto/src/main/proto/Raft.proto              |  7 +++-
 .../ratis/server/impl/RaftServerConstants.java     |  3 +-
 .../apache/ratis/server/impl/ServerProtoUtils.java |  3 +-
 .../java/org/apache/ratis/MiniRaftCluster.java     |  8 ++--
 .../java/org/apache/ratis/RetryCacheTests.java     |  8 ++--
 .../impl/RaftStateMachineExceptionTests.java       | 10 ++---
 17 files changed, 127 insertions(+), 81 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 4378443..4cbebb0 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -45,19 +45,24 @@ public interface ClientProtoUtils {
   }
 
   static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
-      ByteString requesterId, ByteString replyId, RaftGroupId groupId, long callId, long seqNum) {
+      ByteString requesterId, ByteString replyId, RaftGroupId groupId, long callId,
+      SlidingWindowEntry slidingWindowEntry) {
+    if (slidingWindowEntry == null) {
+      slidingWindowEntry = SlidingWindowEntry.getDefaultInstance();
+    }
     return RaftRpcRequestProto.newBuilder()
         .setRequestorId(requesterId)
         .setReplyId(replyId)
         .setRaftGroupId(ProtoUtils.toRaftGroupIdProtoBuilder(groupId))
         .setCallId(callId)
-        .setSeqNum(seqNum);
+        .setSlidingWindowEntry(slidingWindowEntry);
   }
 
   static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
-      ClientId requesterId, RaftPeerId replyId, RaftGroupId groupId, long callId, long seqNum) {
+      ClientId requesterId, RaftPeerId replyId, RaftGroupId groupId, long callId,
+      SlidingWindowEntry slidingWindowEntry) {
     return toRaftRpcRequestProtoBuilder(
-        requesterId.toByteString(), replyId.toByteString(), groupId, callId, seqNum);
+        requesterId.toByteString(), replyId.toByteString(), groupId, callId, slidingWindowEntry);
   }
 
   static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
@@ -67,7 +72,7 @@ public interface ClientProtoUtils {
         request.getServerId(),
         request.getRaftGroupId(),
         request.getCallId(),
-        request.getSeqNum());
+        request.getSlidingWindowEntry());
   }
 
   static RaftClientRequest.Type toRaftClientRequestType(RaftClientRequestProto p) {
@@ -94,9 +99,9 @@ public interface ClientProtoUtils {
         RaftPeerId.valueOf(request.getReplyId()),
         ProtoUtils.toRaftGroupId(request.getRaftGroupId()),
         request.getCallId(),
-        request.getSeqNum(),
         toMessage(p.getMessage()),
-        type);
+        type,
+        request.getSlidingWindowEntry());
   }
 
   static RaftClientRequestProto toRaftClientRequestProto(
@@ -134,7 +139,7 @@ public interface ClientProtoUtils {
       long seqNum, ByteString content) {
     return RaftClientRequestProto.newBuilder()
         .setRpcRequest(toRaftRpcRequestProtoBuilder(
-            clientId, serverId, groupId, callId, seqNum))
+            clientId, serverId, groupId, callId, ProtoUtils.toSlidingWindowEntry(seqNum, false)))
         .setWrite(WriteRequestTypeProto.getDefaultInstance())
         .setMessage(toClientMessageEntryProtoBuilder(content))
         .build();
@@ -401,7 +406,7 @@ public interface ClientProtoUtils {
   static String toString(RaftClientRequestProto proto) {
     final RaftRpcRequestProto rpc = proto.getRpcRequest();
     return ClientId.valueOf(rpc.getRequestorId()) + "->" + rpc.getReplyId().toStringUtf8()
-        + "#" + rpc.getCallId() + "-" + rpc.getSeqNum();
+        + "#" + rpc.getCallId() + "-" + ProtoUtils.toString(rpc.getSlidingWindowEntry());
   }
 
   static String toString(RaftClientReplyProto proto) {
@@ -409,5 +414,4 @@ public interface ClientProtoUtils {
     return ClientId.valueOf(rpc.getRequestorId()) + "<-" + rpc.getReplyId().toStringUtf8()
         + "#" + rpc.getCallId();
   }
-
 }
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index b49fbbf..c489dff 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -22,6 +22,7 @@ import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.ReplicationLevel;
+import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
 import org.apache.ratis.protocol.*;
 import org.apache.ratis.retry.RetryPolicy;
 import org.apache.ratis.util.*;
@@ -49,18 +50,15 @@ final class RaftClientImpl implements RaftClient {
     return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
   }
 
-  static class PendingClientRequest {
-    private final Supplier<RaftClientRequest> requestConstructor;
+  abstract static class PendingClientRequest {
     private final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
     private final AtomicInteger attemptCount = new AtomicInteger();
 
-    PendingClientRequest(Supplier<RaftClientRequest> requestConstructor) {
-      this.requestConstructor = requestConstructor;
-    }
+    abstract RaftClientRequest newRequestImpl();
 
-    RaftClientRequest newRequest() {
+    final RaftClientRequest newRequest() {
       attemptCount.incrementAndGet();
-      return requestConstructor.get();
+      return newRequestImpl();
     }
 
     CompletableFuture<RaftClientReply> getReplyFuture() {
@@ -72,12 +70,25 @@ final class RaftClientImpl implements RaftClient {
     }
   }
 
-  static class PendingAsyncRequest extends PendingClientRequest implements SlidingWindow.Request<RaftClientReply> {
+  static class PendingAsyncRequest extends PendingClientRequest
+      implements SlidingWindow.ClientSideRequest<RaftClientReply> {
+    private final Function<SlidingWindowEntry, RaftClientRequest> requestConstructor;
     private final long seqNum;
+    private volatile boolean isFirst = false;
 
-    PendingAsyncRequest(long seqNum, LongFunction<RaftClientRequest> requestConstructor) {
-      super(() -> requestConstructor.apply(seqNum));
+    PendingAsyncRequest(long seqNum, Function<SlidingWindowEntry, RaftClientRequest> requestConstructor) {
       this.seqNum = seqNum;
+      this.requestConstructor = requestConstructor;
+    }
+
+    @Override
+    RaftClientRequest newRequestImpl() {
+      return requestConstructor.apply(ProtoUtils.toSlidingWindowEntry(seqNum, isFirst));
+    }
+
+    @Override
+    public void setFirstRequest() {
+      isFirst = true;
     }
 
     @Override
@@ -192,7 +203,7 @@ final class RaftClientImpl implements RaftClient {
 
     final long callId = nextCallId();
     final LongFunction<PendingAsyncRequest> constructor = seqNum -> new PendingAsyncRequest(seqNum,
-        seq -> newRaftClientRequest(server, callId, seq, message, type));
+        slidingWindowEntry -> newRaftClientRequest(server, callId, message, type, slidingWindowEntry));
     return getSlidingWindow(server).submitNewRequest(constructor, this::sendRequestWithRetryAsync
     ).getReplyFuture(
     ).thenApply(reply -> handleStateMachineException(reply, CompletionException::new)
@@ -200,9 +211,10 @@ final class RaftClientImpl implements RaftClient {
   }
 
   RaftClientRequest newRaftClientRequest(
-      RaftPeerId server, long callId, long seq, Message message, RaftClientRequest.Type type) {
+      RaftPeerId server, long callId, Message message, RaftClientRequest.Type type,
+      SlidingWindowEntry slidingWindowEntry) {
     return new RaftClientRequest(clientId, server != null? server: leaderId, groupId,
-        callId, seq, message, type);
+        callId, message, type, slidingWindowEntry);
   }
 
   @Override
@@ -233,8 +245,7 @@ final class RaftClientImpl implements RaftClient {
     }
 
     final long callId = nextCallId();
-    return sendRequestWithRetry(() -> newRaftClientRequest(
-        server, callId, 0L, message, type));
+    return sendRequestWithRetry(() -> newRaftClientRequest(server, callId, message, type, null));
   }
 
   @Override
@@ -342,7 +353,7 @@ final class RaftClientImpl implements RaftClient {
       reply = handleNotLeaderException(request, reply, true);
       if (reply != null) {
         getSlidingWindow(request).receiveReply(
-            request.getSeqNum(), reply, this::sendRequestWithRetryAsync);
+            request.getSlidingWindowEntry().getSeqNum(), reply, this::sendRequestWithRetryAsync);
       } else if (!retryPolicy.shouldRetry(attemptCount)) {
         handleAsyncRetryFailure(request, attemptCount);
       }
@@ -374,7 +385,7 @@ final class RaftClientImpl implements RaftClient {
 
   private void handleAsyncRetryFailure(RaftClientRequest request, int attemptCount) {
     final RaftRetryFailureException rfe = newRaftRetryFailureException(request, attemptCount, retryPolicy);
-    getSlidingWindow(request).fail(request.getSeqNum(), rfe);
+    getSlidingWindow(request).fail(request.getSlidingWindowEntry().getSeqNum(), rfe);
   }
 
   private RaftClientReply sendRequest(RaftClientRequest request)
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
index d4ab14d..73e485c 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -31,15 +31,29 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.function.Supplier;
 
 /** Send unordered asynchronous requests to a raft service. */
 public interface UnorderedAsync {
   Logger LOG = LoggerFactory.getLogger(UnorderedAsync.class);
 
+  class PendingUnorderedRequest extends PendingClientRequest {
+    private final Supplier<RaftClientRequest> requestConstructor;
+
+    PendingUnorderedRequest(Supplier<RaftClientRequest> requestConstructor) {
+      this.requestConstructor = requestConstructor;
+    }
+
+    @Override
+    RaftClientRequest newRequestImpl() {
+      return requestConstructor.get();
+    }
+  }
+
   static CompletableFuture<RaftClientReply> send(RaftClientRequest.Type type, RaftClientImpl client) {
     final long callId = RaftClientImpl.nextCallId();
-    final PendingClientRequest pending = new PendingClientRequest(
-        () -> client.newRaftClientRequest(null, callId, -1L, null, type));
+    final PendingClientRequest pending = new PendingUnorderedRequest(
+        () -> client.newRaftClientRequest(null, callId, null, type, null));
     sendRequestWithRetry(pending, client);
     return pending.getReplyFuture()
         .thenApply(reply -> RaftClientImpl.handleStateMachineException(reply, CompletionException::new));
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoRequest.java
index c9a4469..567c2be 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupInfoRequest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,8 +22,7 @@ package org.apache.ratis.protocol;
  * the server itself.
  */
 public class GroupInfoRequest extends RaftClientRequest {
-  public GroupInfoRequest(ClientId clientId, RaftPeerId serverId,
-                                  RaftGroupId groupId, long callId) {
-    super(clientId, serverId, groupId, callId, 0L, null, RaftClientRequest.readRequestType());
+  public GroupInfoRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId) {
+    super(clientId, serverId, groupId, callId, readRequestType());
   }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListRequest.java
index d661f52..af38b6a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupListRequest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -22,8 +22,7 @@ package org.apache.ratis.protocol;
  * the server itself.
  */
 public class GroupListRequest extends RaftClientRequest {
-  public GroupListRequest(ClientId clientId, RaftPeerId serverId,
-        RaftGroupId groupId, long callId) {
-    super(clientId, serverId, groupId, callId, 0L, null, RaftClientRequest.readRequestType());
+  public GroupListRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId) {
+    super(clientId, serverId, groupId, callId, readRequestType());
   }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
index cbaae3b..9972fdb 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/GroupManagementRequest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -80,7 +80,7 @@ public class GroupManagementRequest extends RaftClientRequest {
   private final Op op;
 
   private GroupManagementRequest(ClientId clientId, RaftPeerId serverId, long callId, Op op) {
-    super(clientId, serverId, op.getGroupId(), callId);
+    super(clientId, serverId, op.getGroupId(), callId, writeRequestType());
     this.op = op;
   }
 
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 08397c3..a007e83 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -19,6 +19,7 @@ package org.apache.ratis.protocol;
 
 import org.apache.ratis.proto.RaftProtos.*;
 import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.ProtoUtils;
 
 import java.util.Objects;
 
@@ -152,24 +153,23 @@ public class RaftClientRequest extends RaftClientMessage {
   }
 
   private final long callId;
-  private final long seqNum;
-
   private final Message message;
   private final Type type;
 
-  public RaftClientRequest(ClientId clientId, RaftPeerId serverId,
-      RaftGroupId groupId, long callId) {
-    this(clientId, serverId, groupId, callId, 0L, null, WRITE_DEFAULT);
+  private final SlidingWindowEntry slidingWindowEntry;
+
+  RaftClientRequest(ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId, Type type) {
+    this(clientId, serverId, groupId, callId, null, type, null);
   }
 
   public RaftClientRequest(
       ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
-      long callId, long seqNum, Message message, Type type) {
+      long callId, Message message, Type type, SlidingWindowEntry slidingWindowEntry) {
     super(clientId, serverId, groupId);
     this.callId = callId;
-    this.seqNum = seqNum;
     this.message = message;
     this.type = type;
+    this.slidingWindowEntry = slidingWindowEntry != null? slidingWindowEntry: SlidingWindowEntry.getDefaultInstance();
   }
 
   @Override
@@ -181,8 +181,8 @@ public class RaftClientRequest extends RaftClientMessage {
     return callId;
   }
 
-  public long getSeqNum() {
-    return seqNum;
+  public SlidingWindowEntry getSlidingWindowEntry() {
+    return slidingWindowEntry;
   }
 
   public Message getMessage() {
@@ -199,7 +199,7 @@ public class RaftClientRequest extends RaftClientMessage {
 
   @Override
   public String toString() {
-    return super.toString() + ", cid=" + callId + ", seq=" + seqNum + " "
+    return super.toString() + ", cid=" + callId + ", seq=" + ProtoUtils.toString(slidingWindowEntry)
         + type + ", " + getMessage();
   }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
index e25da2a..1e45046 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -24,7 +24,7 @@ public class SetConfigurationRequest extends RaftClientRequest {
 
   public SetConfigurationRequest(ClientId clientId, RaftPeerId serverId,
       RaftGroupId groupId, long callId, RaftPeer[] peers) {
-    super(clientId, serverId, groupId, callId);
+    super(clientId, serverId, groupId, callId, writeRequestType());
     this.peers = peers;
   }
 
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index b1bfdbe..08a4562 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -24,6 +24,7 @@ import org.apache.ratis.proto.RaftProtos.RaftPeerProto;
 import org.apache.ratis.proto.RaftProtos.RaftRpcReplyProto;
 import org.apache.ratis.proto.RaftProtos.RaftRpcRequestProto;
 import org.apache.ratis.proto.RaftProtos.RequestVoteReplyProto;
+import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
 import org.apache.ratis.protocol.RaftGroup;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
@@ -137,6 +138,14 @@ public interface ProtoUtils {
     return protos.stream().map(ProtoUtils::toString).collect(Collectors.toList()).toString();
   }
 
+  static SlidingWindowEntry toSlidingWindowEntry(long seqNum, boolean isFirst) {
+    return SlidingWindowEntry.newBuilder().setSeqNum(seqNum).setIsFirst(isFirst).build();
+  }
+
+  static String toString(SlidingWindowEntry proto) {
+    return proto.getSeqNum() + (proto.getIsFirst()? "*": "");
+  }
+
   static IOException toIOException(ServiceException se) {
     final Throwable t = se.getCause();
     if (t == null) {
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
index a616f07..d0a1a52 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
@@ -43,9 +43,16 @@ public interface SlidingWindow {
     void setReply(REPLY reply);
 
     boolean hasReply();
+  }
 
-    default void fail(Exception e) {
-    }
+  interface ClientSideRequest<REPLY> extends Request<REPLY> {
+    void setFirstRequest();
+
+    void fail(Exception e);
+  }
+
+  interface ServerSideRequest<REPLY> extends Request<REPLY> {
+    boolean isFirstRequest();
   }
 
   /** A seqNum-to-request map, sorted by seqNum. */
@@ -161,14 +168,14 @@ public interface SlidingWindow {
    * Depend on the replies/exceptions, the client may retry the requests
    * to the same or a different server.
    */
-  class Client<REQUEST extends Request<REPLY>, REPLY> {
+  class Client<REQUEST extends ClientSideRequest<REPLY>, REPLY> {
     /** The requests in the sliding window. */
     private final RequestMap<REQUEST, REPLY> requests;
     /** Delayed requests. */
     private final SortedMap<Long, Long> delayedRequests = new TreeMap<>();
 
     /** The seqNum for the next new request. */
-    private long nextSeqNum = 0;
+    private long nextSeqNum = 1;
     /** The seqNum of the first request. */
     private long firstSeqNum = -1;
     /** Is the first request replied? */
@@ -240,6 +247,7 @@ public interface SlidingWindow {
         // first request is not yet submitted and this is the first request, submit it.
         LOG.debug("{}: detect firstSubmitted {} in {}", requests.getName(), request, this);
         firstSeqNum = seqNum;
+        request.setFirstRequest();
         sendMethod.accept(request);
         return true;
       }
@@ -351,7 +359,7 @@ public interface SlidingWindow {
    * (3) receive replies from the processing unit;
    * (4) send replies to the client.
    */
-  class Server<REQUEST extends Request<REPLY>, REPLY> implements Closeable {
+  class Server<REQUEST extends ServerSideRequest<REPLY>, REPLY> implements Closeable {
     /** The requests in the sliding window. */
     private final RequestMap<REQUEST, REPLY> requests;
     /** The end of requests */
@@ -373,7 +381,7 @@ public interface SlidingWindow {
     /** A request (or a retry) arrives (may be out-of-order except for the first request). */
     public synchronized void receivedRequest(REQUEST request, Consumer<REQUEST> processingMethod) {
       final long seqNum = request.getSeqNum();
-      if (nextToProcess == -1) {
+      if (nextToProcess == -1 && (request.isFirstRequest() || seqNum == 0)) {
         nextToProcess = seqNum;
         LOG.debug("{}: got seq={} (first request), set nextToProcess in {}", requests.getName(), seqNum, this);
       } else {
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
index db082a2..2883795 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
@@ -44,7 +44,7 @@ import java.util.function.Supplier;
 public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase {
   public static final Logger LOG = LoggerFactory.getLogger(GrpcClientProtocolService.class);
 
-  private static class PendingAppend implements SlidingWindow.Request<RaftClientReply> {
+  private static class PendingAppend implements SlidingWindow.ServerSideRequest<RaftClientReply> {
     private final RaftClientRequest request;
     private volatile RaftClientReply reply;
 
@@ -72,7 +72,12 @@ public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase
 
     @Override
     public long getSeqNum() {
-      return request != null? request.getSeqNum(): Long.MAX_VALUE;
+      return request != null? request.getSlidingWindowEntry().getSeqNum(): Long.MAX_VALUE;
+    }
+
+    @Override
+    public boolean isFirstRequest() {
+      return request != null && request.getSlidingWindowEntry().getIsFirst();
     }
 
     @Override
diff --git a/ratis-proto/src/main/proto/Raft.proto b/ratis-proto/src/main/proto/Raft.proto
index 103c478..b081352 100644
--- a/ratis-proto/src/main/proto/Raft.proto
+++ b/ratis-proto/src/main/proto/Raft.proto
@@ -94,7 +94,12 @@ message RaftRpcRequestProto {
   RaftGroupIdProto raftGroupId = 3;
   uint64 callId = 4;
 
-  uint64 seqNum = 15;
+  SlidingWindowEntry slidingWindowEntry = 15;
+}
+
+message SlidingWindowEntry {
+  uint64 seqNum = 1; // 0 for non-sliding-window requests; >= 1 for sliding-window requests
+  bool isFirst = 2;  // Is this the first request of the sliding window?
 }
 
 message RaftRpcReplyProto {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
index a2b7057..f70f98a 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -20,7 +20,6 @@ package org.apache.ratis.server.impl;
 public interface RaftServerConstants {
   long INVALID_LOG_INDEX = -1;
   long DEFAULT_CALLID = 0;
-  long DEFAULT_SEQNUM = 0L;
 
   enum StartupOption {
     FORMAT("format"),
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 29dee9b..62c21e9 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -37,7 +37,6 @@ import java.util.Optional;
 import java.util.stream.Collectors;
 
 import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
-import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
 
 /** Server proto utilities for internal use. */
 public interface ServerProtoUtils {
@@ -261,7 +260,7 @@ public interface ServerProtoUtils {
   static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
       RaftPeerId requestorId, RaftPeerId replyId, RaftGroupId groupId) {
     return ClientProtoUtils.toRaftRpcRequestProtoBuilder(
-        requestorId.toByteString(), replyId.toByteString(), groupId, DEFAULT_CALLID, DEFAULT_SEQNUM);
+        requestorId.toByteString(), replyId.toByteString(), groupId, DEFAULT_CALLID, null);
   }
 
   static RequestVoteRequestProto toRequestVoteRequestProto(
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index 54f93c7..eef9d1b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -79,7 +79,6 @@ import java.util.stream.Stream;
 import java.util.stream.StreamSupport;
 
 import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
-import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
 
 public abstract class MiniRaftCluster implements Closeable {
   public static final Logger LOG = LoggerFactory.getLogger(MiniRaftCluster.class);
@@ -679,14 +678,13 @@ public abstract class MiniRaftCluster implements Closeable {
 
   public RaftClientRequest newRaftClientRequest(
       ClientId clientId, RaftPeerId leaderId, Message message) {
-    return newRaftClientRequest(clientId, leaderId,
-        DEFAULT_CALLID, DEFAULT_SEQNUM, message);
+    return newRaftClientRequest(clientId, leaderId, DEFAULT_CALLID, message);
   }
 
   public RaftClientRequest newRaftClientRequest(
-      ClientId clientId, RaftPeerId leaderId, long callId, long seqNum, Message message) {
+      ClientId clientId, RaftPeerId leaderId, long callId, Message message) {
     return new RaftClientRequest(clientId, leaderId, getGroupId(),
-        callId, seqNum, message, RaftClientRequest.writeRequestType());
+        callId, message, RaftClientRequest.writeRequestType(), null);
   }
 
   public SetConfigurationRequest newSetConfigurationRequest(
diff --git a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
index 2aa1d85..60f62cd 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
@@ -68,9 +68,8 @@ public abstract class RetryCacheTests<CLUSTER extends MiniRaftCluster>
     final RaftClient client = cluster.createClient(leaderId);
     final RaftClientRpc rpc = client.getClientRpc();
     final long callId = 999;
-    final long seqNum = 111;
     RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId,
-        callId, seqNum, new SimpleMessage("message"));
+        callId, new SimpleMessage("message"));
     assertReply(rpc.sendRequest(r), client, callId);
 
     // retry with the same callId
@@ -130,9 +129,8 @@ public abstract class RetryCacheTests<CLUSTER extends MiniRaftCluster>
     final RaftClient client = cluster.createClient(leaderId);
     RaftClientRpc rpc = client.getClientRpc();
     final long callId = 999;
-    final long seqNum = 111;
     RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId,
-        callId, seqNum, new SimpleMessage("message"));
+        callId, new SimpleMessage("message"));
     assertReply(rpc.sendRequest(r), client, callId);
     long oldLastApplied = cluster.getLeader().getState().getLastAppliedIndex();
 
@@ -151,7 +149,7 @@ public abstract class RetryCacheTests<CLUSTER extends MiniRaftCluster>
     Assert.assertNotEquals(leaderId, newLeaderId);
     // same clientId and callId in the request
     r = cluster.newRaftClientRequest(client.getId(), newLeaderId,
-        callId, seqNum, new SimpleMessage("message"));
+        callId, new SimpleMessage("message"));
     rpc.addServers(Arrays.asList(change.newPeers));
     for (int i = 0; i < 10; i++) {
       try {
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
index cf3a490..8fd60b7 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftStateMachineExceptionTests.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -107,9 +107,8 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
     final RaftClient client = cluster.createClient(leaderId);
     final RaftClientRpc rpc = client.getClientRpc();
     final long callId = 999;
-    final long seqNum = 111;
     final SimpleMessage message = new SimpleMessage("message");
-    final RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId, callId, seqNum, message);
+    final RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId, callId, message);
     RaftClientReply reply = rpc.sendRequest(r);
     Assert.assertFalse(reply.isSuccess());
     Assert.assertNotNull(reply.getStateMachineException());
@@ -156,9 +155,8 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
     final RaftClient client = cluster.createClient(leaderId);
     final RaftClientRpc rpc = client.getClientRpc();
     final long callId = 999;
-    final long seqNum = 111;
     RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId,
-        callId, seqNum, new RaftTestUtil.SimpleMessage("message"));
+        callId, new RaftTestUtil.SimpleMessage("message"));
     RaftClientReply reply = rpc.sendRequest(r);
     Objects.requireNonNull(reply.getStateMachineException());
 
@@ -174,7 +172,7 @@ public abstract class RaftStateMachineExceptionTests<CLUSTER extends MiniRaftClu
     leaderId = leader.getId();
     // retry
     r = cluster.newRaftClientRequest(client.getId(), leaderId,
-        callId, seqNum, new RaftTestUtil.SimpleMessage("message"));
+        callId, new RaftTestUtil.SimpleMessage("message"));
     reply = rpc.sendRequest(r);
     Objects.requireNonNull(reply.getStateMachineException());