You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by li...@apache.org on 2017/03/04 03:02:13 UTC

incubator-ratis git commit: RATIS-15. Add call ID to identify a client request and its retry. Contributed by Jing Zhao

Repository: incubator-ratis
Updated Branches:
  refs/heads/master 46116d412 -> 6adf89d48


RATIS-15. Add call ID to identify a client request and its retry. Contributed by Jing Zhao


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/6adf89d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/6adf89d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/6adf89d4

Branch: refs/heads/master
Commit: 6adf89d481cc424716b12aa120e89b2a0fe921fb
Parents: 46116d4
Author: Mingliang Liu <li...@apache.org>
Authored: Fri Mar 3 18:58:11 2017 -0800
Committer: Mingliang Liu <li...@apache.org>
Committed: Fri Mar 3 18:58:11 2017 -0800

----------------------------------------------------------------------
 .../org/apache/ratis/client/RaftClient.java     |  1 -
 .../ratis/client/impl/ClientProtoUtils.java     | 24 ++++++++++----------
 .../ratis/client/impl/RaftClientImpl.java       | 13 +++++++++--
 .../apache/ratis/protocol/RaftClientReply.java  | 16 ++++++-------
 .../ratis/protocol/RaftClientRequest.java       | 16 ++++++-------
 .../ratis/protocol/SetConfigurationRequest.java |  4 ++--
 .../java/org/apache/ratis/util/ProtoUtils.java  |  4 ++--
 .../ratis/grpc/client/AppendStreamer.java       |  6 ++---
 .../grpc/client/RaftClientProtocolService.java  | 22 +++++++++---------
 .../ratis/grpc/client/RaftOutputStream.java     |  6 +++--
 .../org/apache/ratis/netty/NettyRpcProxy.java   | 14 ++++++------
 .../ratis/netty/server/NettyRpcService.java     |  2 +-
 ratis-proto-shaded/src/main/proto/Raft.proto    |  4 ++--
 .../ratis/server/impl/RaftServerConstants.java  |  4 +---
 .../ratis/server/impl/RaftServerImpl.java       |  2 +-
 .../ratis/server/impl/ServerProtoUtils.java     | 15 ++++++------
 .../ratis/RaftNotLeaderExceptionBaseTest.java   |  6 ++---
 .../impl/RaftReconfigurationBaseTest.java       | 14 ++++++------
 .../statemachine/RaftSnapshotBaseTest.java      |  4 ++--
 19 files changed, 92 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 8fbffd3..bf85386 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -31,7 +31,6 @@ import java.util.Objects;
 /** A client who sends requests to a raft service. */
 public interface RaftClient extends Closeable {
   Logger LOG = LoggerFactory.getLogger(RaftClient.class);
-  long DEFAULT_SEQNUM = 0;
 
   /** @return the id of this client. */
   ClientId getId();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index b0e1d41..218d761 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -26,20 +26,20 @@ import java.util.Arrays;
 
 public class ClientProtoUtils {
   public static RaftRpcReplyProto.Builder toRaftRpcReplyProtoBuilder(
-      byte[] requestorId, byte[] replyId, long seqNum, boolean success) {
+      byte[] requestorId, byte[] replyId, long callId, boolean success) {
     return RaftRpcReplyProto.newBuilder()
         .setRequestorId(ProtoUtils.toByteString(requestorId))
         .setReplyId(ProtoUtils.toByteString(replyId))
-        .setSeqNum(seqNum)
+        .setCallId(callId)
         .setSuccess(success);
   }
 
   public static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
-      byte[] requesterId, byte[] replyId, long seqNum) {
+      byte[] requesterId, byte[] replyId, long callId) {
     return RaftRpcRequestProto.newBuilder()
         .setRequestorId(ProtoUtils.toByteString(requesterId))
         .setReplyId(ProtoUtils.toByteString(replyId))
-        .setSeqNum(seqNum);
+        .setCallId(callId);
   }
 
   public static RaftClientRequest toRaftClientRequest(RaftClientRequestProto p) {
@@ -48,7 +48,7 @@ public class ClientProtoUtils {
     RaftPeerId serverId = new RaftPeerId(
         p.getRpcRequest().getReplyId());
     return new RaftClientRequest(clientId, serverId,
-        p.getRpcRequest().getSeqNum(),
+        p.getRpcRequest().getCallId(),
         toMessage(p.getMessage()), p.getReadOnly());
   }
 
@@ -56,18 +56,18 @@ public class ClientProtoUtils {
       RaftClientRequest request) {
     return RaftClientRequestProto.newBuilder()
         .setRpcRequest(toRaftRpcRequestProtoBuilder(request.getClientId().toBytes(),
-            request.getServerId().toBytes(), request.getSeqNum()))
+            request.getServerId().toBytes(), request.getCallId()))
         .setMessage(toClientMessageEntryProto(request.getMessage()))
         .setReadOnly(request.isReadOnly())
         .build();
   }
 
   public static RaftClientRequestProto genRaftClientRequestProto(
-      ClientId clientId, RaftPeerId serverId, long seqNum, ByteString content,
+      ClientId clientId, RaftPeerId serverId, long callId, ByteString content,
       boolean readOnly) {
     return RaftClientRequestProto.newBuilder()
         .setRpcRequest(toRaftRpcRequestProtoBuilder(clientId.toBytes(),
-            serverId.toBytes(), seqNum))
+            serverId.toBytes(), callId))
         .setMessage(ClientMessageEntryProto.newBuilder().setContent(content))
         .setReadOnly(readOnly)
         .build();
@@ -78,7 +78,7 @@ public class ClientProtoUtils {
     final RaftClientReplyProto.Builder b = RaftClientReplyProto.newBuilder();
     if (reply != null) {
       b.setRpcReply(toRaftRpcReplyProtoBuilder(reply.getClientId().toBytes(),
-          reply.getServerId().toBytes(), reply.getSeqNum(), reply.isSuccess()));
+          reply.getServerId().toBytes(), reply.getCallId(), reply.isSuccess()));
       if (reply.getMessage() != null) {
         b.setMessage(toClientMessageEntryProto(reply.getMessage()));
       }
@@ -110,7 +110,7 @@ public class ClientProtoUtils {
     }
     return new RaftClientReply(new ClientId(rp.getRequestorId().toByteArray()),
         new RaftPeerId(rp.getReplyId()),
-        rp.getSeqNum(), rp.getSuccess(), toMessage(replyProto.getMessage()), e);
+        rp.getCallId(), rp.getSuccess(), toMessage(replyProto.getMessage()), e);
   }
 
   public static Message toMessage(final ClientMessageEntryProto p) {
@@ -129,7 +129,7 @@ public class ClientProtoUtils {
     return new SetConfigurationRequest(
         new ClientId(m.getRequestorId().toByteArray()),
         new RaftPeerId(m.getReplyId()),
-        p.getRpcRequest().getSeqNum(), peers);
+        p.getRpcRequest().getCallId(), peers);
   }
 
   public static SetConfigurationRequestProto toSetConfigurationRequestProto(
@@ -138,7 +138,7 @@ public class ClientProtoUtils {
         .setRpcRequest(toRaftRpcRequestProtoBuilder(
             request.getClientId().toBytes(),
             request.getServerId().toBytes(),
-            request.getSeqNum()))
+            request.getCallId()))
         .addAllPeers(ProtoUtils.toRaftPeerProtos(
             Arrays.asList(request.getPeersInNewConf())))
         .build();

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 3a6fd58..be22305 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -26,10 +26,17 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Supplier;
 
 /** A client who sends requests to a raft service. */
 final class RaftClientImpl implements RaftClient {
+  private static final AtomicLong callIdCounter = new AtomicLong();
+
+  private static long nextCallId() {
+    return callIdCounter.getAndIncrement() & Long.MAX_VALUE;
+  }
+
   private final ClientId clientId;
   private final RaftClientRpc clientRpc;
   private final Collection<RaftPeer> peers;
@@ -65,15 +72,17 @@ final class RaftClientImpl implements RaftClient {
   }
 
   private RaftClientReply send(Message message, boolean readOnly) throws IOException {
+    final long callId = nextCallId();
     return sendRequestWithRetry(() -> new RaftClientRequest(
-        clientId, leaderId, DEFAULT_SEQNUM, message, readOnly));
+        clientId, leaderId, callId, message, readOnly));
   }
 
   @Override
   public RaftClientReply setConfiguration(RaftPeer[] peersInNewConf)
       throws IOException {
+    final long callId = nextCallId();
     return sendRequestWithRetry(() -> new SetConfigurationRequest(
-        clientId, leaderId, DEFAULT_SEQNUM, peersInNewConf));
+        clientId, leaderId, callId, peersInNewConf));
   }
 
   private RaftClientReply sendRequestWithRetry(

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index 459e0f4..4dd2943 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -22,29 +22,29 @@ package org.apache.ratis.protocol;
  */
 public class RaftClientReply extends RaftClientMessage {
   private final boolean success;
-  private final long seqNum;
+  private final long callId;
 
   /** non-null if the server is not leader */
   private final NotLeaderException notLeaderException;
   private final Message message;
 
-  public RaftClientReply(ClientId clientId, RaftPeerId serverId, long seqNum,
+  public RaftClientReply(ClientId clientId, RaftPeerId serverId, long callId,
       boolean success, Message message, NotLeaderException notLeaderException) {
     super(clientId, serverId);
     this.success = success;
-    this.seqNum = seqNum;
+    this.callId = callId;
     this.message = message;
     this.notLeaderException = notLeaderException;
   }
 
   public RaftClientReply(RaftClientRequest request,
       NotLeaderException notLeaderException) {
-    this(request.getClientId(), request.getServerId(), request.getSeqNum(),
+    this(request.getClientId(), request.getServerId(), request.getCallId(),
         false, null, notLeaderException);
   }
 
   public RaftClientReply(RaftClientRequest request, Message message) {
-    this(request.getClientId(), request.getServerId(), request.getSeqNum(),
+    this(request.getClientId(), request.getServerId(), request.getCallId(),
         true, message, null);
   }
 
@@ -53,13 +53,13 @@ public class RaftClientReply extends RaftClientMessage {
     return false;
   }
 
-  public long getSeqNum() {
-    return seqNum;
+  public long getCallId() {
+    return callId;
   }
 
   @Override
   public String toString() {
-    return super.toString() + ", seqNum: " + getSeqNum()
+    return super.toString() + ", callId: " + getCallId()
         + ", success: " + isSuccess();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 9b649c9..898c166 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -21,19 +21,19 @@ package org.apache.ratis.protocol;
  * Request from client to server
  */
 public class RaftClientRequest extends RaftClientMessage {
-  private final long seqNum;
+  private final long callId;
   private final Message message;
   private final boolean readOnly;
 
   public RaftClientRequest(ClientId clientId, RaftPeerId serverId,
-      long seqNum, Message message) {
-    this(clientId, serverId, seqNum, message, false);
+      long callId, Message message) {
+    this(clientId, serverId, callId, message, false);
   }
 
   public RaftClientRequest(ClientId clientId, RaftPeerId serverId,
-      long seqNum, Message message, boolean readOnly) {
+      long callId, Message message, boolean readOnly) {
     super(clientId, serverId);
-    this.seqNum = seqNum;
+    this.callId = callId;
     this.message = message;
     this.readOnly = readOnly;
   }
@@ -43,8 +43,8 @@ public class RaftClientRequest extends RaftClientMessage {
     return true;
   }
 
-  public long getSeqNum() {
-    return seqNum;
+  public long getCallId() {
+    return callId;
   }
 
   public Message getMessage() {
@@ -57,7 +57,7 @@ public class RaftClientRequest extends RaftClientMessage {
 
   @Override
   public String toString() {
-    return super.toString() + ", seqNum: " + seqNum + ", "
+    return super.toString() + ", callId: " + callId + ", "
         + (isReadOnly()? "RO": "RW");
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
index 6bc34f4..77d545c 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
@@ -23,8 +23,8 @@ public class SetConfigurationRequest extends RaftClientRequest {
   private final RaftPeer[] peers;
 
   public SetConfigurationRequest(ClientId clientId, RaftPeerId serverId,
-      long seqNum, RaftPeer[] peers) {
-    super(clientId, serverId, seqNum, null);
+      long callId, RaftPeer[] peers) {
+    super(clientId, serverId, callId, null);
     this.peers = peers;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index 2613342..ea74d09 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -132,12 +132,12 @@ public class ProtoUtils {
 
   public static String toString(RaftRpcRequestProto proto) {
     return proto.getRequestorId() + "->" + proto.getReplyId()
-        + "#" + proto.getSeqNum();
+        + "#" + proto.getCallId();
   }
 
   public static String toString(RaftRpcReplyProto proto) {
     return proto.getRequestorId() + "<-" + proto.getReplyId()
-        + "#" + proto.getSeqNum() + ":"
+        + "#" + proto.getCallId() + ":"
         + (proto.getSuccess()? "OK": "FAIL");
   }
   public static String toString(RequestVoteReplyProto proto) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
index 4f96c06..6d7b207 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
@@ -272,8 +272,8 @@ public class AppendStreamer implements Closeable {
         RaftClientRequestProto pending = Preconditions.checkNotNull(
             ackQueue.peek());
         if (reply.getRpcReply().getSuccess()) {
-          Preconditions.checkState(pending.getRpcRequest().getSeqNum() ==
-              reply.getRpcReply().getSeqNum());
+          Preconditions.checkState(pending.getRpcRequest().getCallId() ==
+              reply.getRpcReply().getCallId());
           ackQueue.poll();
           LOG.trace("{} received success ack for request {}", this,
               pending.getRpcRequest());
@@ -375,7 +375,7 @@ public class AppendStreamer implements Closeable {
             .setMessage(oldRequest.getMessage())
             .setReadOnly(oldRequest.getReadOnly())
             .setRpcRequest(toRaftRpcRequestProtoBuilder(
-                clientId.toBytes(), newLeader.toBytes(), r.getSeqNum()))
+                clientId.toBytes(), newLeader.toBytes(), r.getCallId()))
             .build();
         dataQueue.offerFirst(newRequest);
       }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
index 99d8778..d550963 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
@@ -39,11 +39,11 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase
   static final Logger LOG = LoggerFactory.getLogger(RaftClientProtocolService.class);
 
   private static class PendingAppend implements Comparable<PendingAppend> {
-    private final long seqNum;
+    private final long callId;
     private volatile RaftClientReply reply;
 
-    PendingAppend(long seqNum) {
-      this.seqNum = seqNum;
+    PendingAppend(long callId) {
+      this.callId = callId;
     }
 
     boolean isReady() {
@@ -56,12 +56,12 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase
 
     @Override
     public int compareTo(PendingAppend p) {
-      return seqNum == p.seqNum ? 0 : (seqNum < p.seqNum ? -1 : 1);
+      return callId == p.callId ? 0 : (callId < p.callId ? -1 : 1);
     }
 
     @Override
     public String toString() {
-      return seqNum + ", reply:" + (reply == null ? "null" : reply.toString());
+      return callId + ", reply:" + (reply == null ? "null" : reply.toString());
     }
   }
   private static final PendingAppend COMPLETED = new PendingAppend(Long.MAX_VALUE);
@@ -111,7 +111,7 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase
     @Override
     public void onNext(RaftClientRequestProto request) {
       try {
-        PendingAppend p = new PendingAppend(request.getRpcRequest().getSeqNum());
+        PendingAppend p = new PendingAppend(request.getRpcRequest().getCallId());
         synchronized (pendingList) {
           pendingList.add(p);
         }
@@ -125,17 +125,17 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase
             // exception from the state machine.
             responseObserver.onError(RaftGrpcUtil.wrapException(exception));
           } else {
-            final long replySeq = reply.getSeqNum();
+            final long replySeq = reply.getCallId();
             synchronized (pendingList) {
               Preconditions.checkState(!pendingList.isEmpty(),
-                  "PendingList is empty when handling onNext for seqNum %s",
+                  "PendingList is empty when handling onNext for callId %s",
                   replySeq);
-              final long headSeqNum = pendingList.get(0).seqNum;
-              // we assume the seqNum is consecutive for a stream RPC call
+              final long headSeqNum = pendingList.get(0).callId;
+              // we assume the callId is consecutive for a stream RPC call
               final PendingAppend pendingForReply = pendingList.get(
                   (int) (replySeq - headSeqNum));
               Preconditions.checkState(pendingForReply != null &&
-                      pendingForReply.seqNum == replySeq,
+                      pendingForReply.callId == replySeq,
                   "pending for reply is: %s, the pending list: %s",
                   pendingForReply, pendingList);
               pendingForReply.setReply(reply);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java
index 8f0e183..73e56b8 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java
@@ -23,6 +23,7 @@ import static org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_BUFFER_
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.ClientId;
@@ -34,7 +35,7 @@ public class RaftOutputStream extends OutputStream {
   /** internal buffer */
   private final byte buf[];
   private int count;
-  private long seqNum = 0;
+  private final AtomicLong seqNum = new AtomicLong();
   private final ClientId clientId;
   private final AppendStreamer streamer;
 
@@ -82,7 +83,8 @@ public class RaftOutputStream extends OutputStream {
 
   private void flushToStreamer() throws IOException {
     if (count > 0) {
-      streamer.write(ProtoUtils.toByteString(buf, 0, count), seqNum++);
+      streamer.write(ProtoUtils.toByteString(buf, 0, count),
+          seqNum.getAndIncrement());
       count = 0;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
index 9b8553b..5b7efc8 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
@@ -69,18 +69,18 @@ public class NettyRpcProxy implements Closeable {
     }
   }
 
-  public static long getSeqNum(RaftNettyServerReplyProto proto) {
+  public static long getCallId(RaftNettyServerReplyProto proto) {
     switch (proto.getRaftNettyServerReplyCase()) {
       case REQUESTVOTEREPLY:
-        return proto.getRequestVoteReply().getServerReply().getSeqNum();
+        return proto.getRequestVoteReply().getServerReply().getCallId();
       case APPENDENTRIESREPLY:
-        return proto.getAppendEntriesReply().getServerReply().getSeqNum();
+        return proto.getAppendEntriesReply().getServerReply().getCallId();
       case INSTALLSNAPSHOTREPLY:
-        return proto.getInstallSnapshotReply().getServerReply().getSeqNum();
+        return proto.getInstallSnapshotReply().getServerReply().getCallId();
       case RAFTCLIENTREPLY:
-        return proto.getRaftClientReply().getRpcReply().getSeqNum();
+        return proto.getRaftClientReply().getRpcReply().getCallId();
       case EXCEPTIONREPLY:
-        return proto.getExceptionReply().getRpcReply().getSeqNum();
+        return proto.getExceptionReply().getRpcReply().getCallId();
       case RAFTNETTYSERVERREPLY_NOT_SET:
         throw new IllegalArgumentException("Reply case not set in proto: "
             + proto.getRaftNettyServerReplyCase());
@@ -104,7 +104,7 @@ public class NettyRpcProxy implements Closeable {
                                     RaftNettyServerReplyProto proto) {
           final CompletableFuture<RaftNettyServerReplyProto> future = pollReply();
           if (future == null) {
-            throw new IllegalStateException("Request #" + getSeqNum(proto)
+            throw new IllegalStateException("Request #" + getCallId(proto)
                 + " not found");
           }
           if (proto.getRaftNettyServerReplyCase() == EXCEPTIONREPLY) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
index a1486b6..2d927db 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
@@ -218,7 +218,7 @@ public final class NettyRpcService implements RaftServerRpc {
     final RaftRpcReplyProto.Builder rpcReply = RaftRpcReplyProto.newBuilder()
         .setRequestorId(request.getRequestorId())
         .setReplyId(request.getReplyId())
-        .setSeqNum(request.getSeqNum())
+        .setCallId(request.getCallId())
         .setSuccess(false);
     final RaftNettyExceptionReplyProto.Builder ioe = RaftNettyExceptionReplyProto.newBuilder()
         .setRpcReply(rpcReply)

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto
index 14901f6..8c334dd 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -65,13 +65,13 @@ message TermIndexProto {
 message RaftRpcRequestProto {
   bytes requestorId = 1;
   bytes replyId = 2;
-  uint64 seqNum = 3;
+  uint64 callId = 3;
 }
 
 message RaftRpcReplyProto {
   bytes requestorId = 1;
   bytes replyId = 2;
-  uint64 seqNum = 3;
+  uint64 callId = 3;
   bool success = 4;
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
index caf9c4d..b4db46e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
@@ -17,12 +17,10 @@
  */
 package org.apache.ratis.server.impl;
 
-import org.apache.ratis.client.RaftClient;
-
 public interface RaftServerConstants {
   long INVALID_LOG_INDEX = -1;
   byte LOG_TERMINATE_BYTE = 0;
-  long DEFAULT_SEQNUM = RaftClient.DEFAULT_SEQNUM;
+  long DEFAULT_CALLID = 0;
 
   enum StartupOption {
     FORMAT("format"),

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index b9f063e..3c7c0e1 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -381,7 +381,7 @@ public class RaftServerImpl implements RaftServer {
       final long entryIndex;
       try {
         entryIndex = state.applyLog(entry, request.getClientId(),
-            request.getSeqNum());
+            request.getCallId());
       } catch (IOException e) {
         throw new RaftException(e);
       }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 846fbe2..945be8d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -17,7 +17,7 @@
  */
 package org.apache.ratis.server.impl;
 
-import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
+import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
 import static org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.SUCCESS;
 
 import java.util.Arrays;
@@ -112,7 +112,7 @@ public class ServerProtoUtils {
       boolean shouldShutdown) {
     final RequestVoteReplyProto.Builder b = RequestVoteReplyProto.newBuilder();
     b.setServerReply(ClientProtoUtils.toRaftRpcReplyProtoBuilder(
-        requestorId.toBytes(), replyId.toBytes(), DEFAULT_SEQNUM, success))
+        requestorId.toBytes(), replyId.toBytes(), DEFAULT_CALLID, success))
         .setTerm(term)
         .setShouldShutdown(shouldShutdown);
     return b.build();
@@ -121,8 +121,7 @@ public class ServerProtoUtils {
   public static RequestVoteRequestProto toRequestVoteRequestProto(
       RaftPeerId requestorId, RaftPeerId replyId, long term, TermIndex lastEntry) {
     RaftProtos.RaftRpcRequestProto.Builder rpb = ClientProtoUtils
-        .toRaftRpcRequestProtoBuilder(requestorId.toBytes(), replyId.toBytes(),
-            DEFAULT_SEQNUM);
+        .toRaftRpcRequestProtoBuilder(requestorId.toBytes(), replyId.toBytes(), DEFAULT_CALLID);
     final RequestVoteRequestProto.Builder b = RequestVoteRequestProto.newBuilder()
         .setServerRequest(rpb)
         .setCandidateTerm(term);
@@ -136,7 +135,7 @@ public class ServerProtoUtils {
       RaftPeerId requestorId, RaftPeerId replyId, long term, int requestIndex,
       InstallSnapshotResult result) {
     final RaftRpcReplyProto.Builder rb = ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId.toBytes(),
-        replyId.toBytes(), DEFAULT_SEQNUM, result == InstallSnapshotResult.SUCCESS);
+        replyId.toBytes(), DEFAULT_CALLID, result == InstallSnapshotResult.SUCCESS);
     final InstallSnapshotReplyProto.Builder builder = InstallSnapshotReplyProto
         .newBuilder().setServerReply(rb).setTerm(term).setResult(result)
         .setRequestIndex(requestIndex);
@@ -150,7 +149,7 @@ public class ServerProtoUtils {
     return InstallSnapshotRequestProto.newBuilder()
         .setServerRequest(
             ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId.toBytes(),
-                replyId.toBytes(), DEFAULT_SEQNUM))
+                replyId.toBytes(), DEFAULT_CALLID))
         .setRequestId(requestId)
         .setRequestIndex(requestIndex)
         // .setRaftConfiguration()  TODO: save and pass RaftConfiguration
@@ -165,7 +164,7 @@ public class ServerProtoUtils {
       RaftPeerId requestorId, RaftPeerId replyId, long term,
       long nextIndex, AppendEntriesReplyProto.AppendResult appendResult) {
     RaftRpcReplyProto.Builder rb = ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId.toBytes(),
-        replyId.toBytes(), DEFAULT_SEQNUM, appendResult == SUCCESS);
+        replyId.toBytes(), DEFAULT_CALLID, appendResult == SUCCESS);
     final AppendEntriesReplyProto.Builder b = AppendEntriesReplyProto.newBuilder();
     b.setServerReply(rb).setTerm(term).setNextIndex(nextIndex)
         .setResult(appendResult);
@@ -180,7 +179,7 @@ public class ServerProtoUtils {
         .newBuilder()
         .setServerRequest(
             ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId.toBytes(),
-                replyId.toBytes(), DEFAULT_SEQNUM))
+                replyId.toBytes(), DEFAULT_CALLID))
         .setLeaderTerm(leaderTerm)
         .setLeaderCommit(leaderCommit)
         .setInitializing(initializing);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java
index a5d1127..54cfa4d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java
@@ -40,7 +40,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
 
-import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
+import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
 
 public abstract class RaftNotLeaderExceptionBaseTest {
   static {
@@ -92,7 +92,7 @@ public abstract class RaftNotLeaderExceptionBaseTest {
     for (int i = 0; reply == null && i < 10; i++) {
       try {
         reply = rpc.sendRequest(
-            new RaftClientRequest(ClientId.createId(), leaderId, DEFAULT_SEQNUM,
+            new RaftClientRequest(ClientId.createId(), leaderId, DEFAULT_CALLID,
                 new SimpleMessage("m2")));
       } catch (IOException ignored) {
         Thread.sleep(1000);
@@ -139,7 +139,7 @@ public abstract class RaftNotLeaderExceptionBaseTest {
     for (int i = 0; reply == null && i < 10; i++) {
       try {
         reply = rpc.sendRequest(
-            new RaftClientRequest(ClientId.createId(), leaderId, DEFAULT_SEQNUM,
+            new RaftClientRequest(ClientId.createId(), leaderId, DEFAULT_CALLID,
                 new SimpleMessage("m1")));
       } catch (IOException ignored) {
         Thread.sleep(1000);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index e3db854..90cbef5 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -48,7 +48,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import static java.util.Arrays.asList;
 import static org.apache.ratis.MiniRaftCluster.leaderPlaceHolderDelay;
 import static org.apache.ratis.MiniRaftCluster.logSyncDelay;
-import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
+import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
 import static org.apache.ratis.server.impl.RaftServerTestUtil.waitAndCheckNewConf;
 import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY;
 
@@ -93,7 +93,7 @@ public abstract class RaftReconfigurationBaseTest {
 
       // trigger setConfiguration
       SetConfigurationRequest request = new SetConfigurationRequest(clientId,
-          cluster.getLeader().getId(), DEFAULT_SEQNUM, allPeers);
+          cluster.getLeader().getId(), DEFAULT_CALLID, allPeers);
       LOG.info("Start changing the configuration: {}", request);
       cluster.getLeader().setConfiguration(request);
 
@@ -121,7 +121,7 @@ public abstract class RaftReconfigurationBaseTest {
 
       // trigger setConfiguration
       SetConfigurationRequest request = new SetConfigurationRequest(clientId,
-          cluster.getLeader().getId(), DEFAULT_SEQNUM, allPeers);
+          cluster.getLeader().getId(), DEFAULT_CALLID, allPeers);
       LOG.info("Start changing the configuration: {}", request);
       cluster.getLeader().setConfiguration(request);
 
@@ -159,7 +159,7 @@ public abstract class RaftReconfigurationBaseTest {
 
       // trigger setConfiguration
       SetConfigurationRequest request = new SetConfigurationRequest(clientId,
-          cluster.getLeader().getId(), DEFAULT_SEQNUM, allPeers);
+          cluster.getLeader().getId(), DEFAULT_CALLID, allPeers);
       LOG.info("Start changing the configuration: {}", request);
       cluster.getLeader().setConfiguration(request);
 
@@ -255,7 +255,7 @@ public abstract class RaftReconfigurationBaseTest {
 
       final RaftClientRpc sender = client.getClientRpc();
       final SetConfigurationRequest request = new SetConfigurationRequest(
-          client.getId(), leaderId, DEFAULT_SEQNUM, c1.allPeersInNewConf);
+          client.getId(), leaderId, DEFAULT_CALLID, c1.allPeersInNewConf);
       try {
         sender.sendRequest(request);
         Assert.fail("did not get expected exception");
@@ -472,7 +472,7 @@ public abstract class RaftReconfigurationBaseTest {
           LOG.info("client2 starts to change conf");
           final RaftClientRpc sender2 = client2.getClientRpc();
           sender2.sendRequest(new SetConfigurationRequest(
-              client2.getId(), leaderId, DEFAULT_SEQNUM, peersInRequest2));
+              client2.getId(), leaderId, DEFAULT_CALLID, peersInRequest2));
         } catch (ReconfigurationInProgressException e) {
           caughtException.set(true);
         } catch (Exception e) {
@@ -536,7 +536,7 @@ public abstract class RaftReconfigurationBaseTest {
           LOG.info("client starts to change conf");
           final RaftClientRpc sender = client.getClientRpc();
           RaftClientReply reply = sender.sendRequest(new SetConfigurationRequest(
-              client.getId(), leaderId, DEFAULT_SEQNUM, change.allPeersInNewConf));
+              client.getId(), leaderId, DEFAULT_CALLID, change.allPeersInNewConf));
           if (reply.isNotLeader()) {
             gotNotLeader.set(true);
           }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
index cb4412a..217d5ae 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
@@ -19,7 +19,7 @@ package org.apache.ratis.statemachine;
 
 import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY;
 import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_KEY;
-import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
+import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
 
 import java.io.File;
 import java.util.List;
@@ -200,7 +200,7 @@ public abstract class RaftSnapshotBaseTest {
           new String[]{"s3", "s4"}, true);
       // trigger setConfiguration
       SetConfigurationRequest request = new SetConfigurationRequest(ClientId.createId(),
-          cluster.getLeader().getId(), DEFAULT_SEQNUM, change.allPeersInNewConf);
+          cluster.getLeader().getId(), DEFAULT_CALLID, change.allPeersInNewConf);
       LOG.info("Start changing the configuration: {}", request);
       cluster.getLeader().setConfiguration(request);