You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by li...@apache.org on 2017/03/04 03:02:13 UTC
incubator-ratis git commit: RATIS-15. Add call ID to identify a
client request and its retry. Contributed by Jing Zhao
Repository: incubator-ratis
Updated Branches:
refs/heads/master 46116d412 -> 6adf89d48
RATIS-15. Add call ID to identify a client request and its retry. Contributed by Jing Zhao
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/6adf89d4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/6adf89d4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/6adf89d4
Branch: refs/heads/master
Commit: 6adf89d481cc424716b12aa120e89b2a0fe921fb
Parents: 46116d4
Author: Mingliang Liu <li...@apache.org>
Authored: Fri Mar 3 18:58:11 2017 -0800
Committer: Mingliang Liu <li...@apache.org>
Committed: Fri Mar 3 18:58:11 2017 -0800
----------------------------------------------------------------------
.../org/apache/ratis/client/RaftClient.java | 1 -
.../ratis/client/impl/ClientProtoUtils.java | 24 ++++++++++----------
.../ratis/client/impl/RaftClientImpl.java | 13 +++++++++--
.../apache/ratis/protocol/RaftClientReply.java | 16 ++++++-------
.../ratis/protocol/RaftClientRequest.java | 16 ++++++-------
.../ratis/protocol/SetConfigurationRequest.java | 4 ++--
.../java/org/apache/ratis/util/ProtoUtils.java | 4 ++--
.../ratis/grpc/client/AppendStreamer.java | 6 ++---
.../grpc/client/RaftClientProtocolService.java | 22 +++++++++---------
.../ratis/grpc/client/RaftOutputStream.java | 6 +++--
.../org/apache/ratis/netty/NettyRpcProxy.java | 14 ++++++------
.../ratis/netty/server/NettyRpcService.java | 2 +-
ratis-proto-shaded/src/main/proto/Raft.proto | 4 ++--
.../ratis/server/impl/RaftServerConstants.java | 4 +---
.../ratis/server/impl/RaftServerImpl.java | 2 +-
.../ratis/server/impl/ServerProtoUtils.java | 15 ++++++------
.../ratis/RaftNotLeaderExceptionBaseTest.java | 6 ++---
.../impl/RaftReconfigurationBaseTest.java | 14 ++++++------
.../statemachine/RaftSnapshotBaseTest.java | 4 ++--
19 files changed, 92 insertions(+), 85 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 8fbffd3..bf85386 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -31,7 +31,6 @@ import java.util.Objects;
/** A client who sends requests to a raft service. */
public interface RaftClient extends Closeable {
Logger LOG = LoggerFactory.getLogger(RaftClient.class);
- long DEFAULT_SEQNUM = 0;
/** @return the id of this client. */
ClientId getId();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index b0e1d41..218d761 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -26,20 +26,20 @@ import java.util.Arrays;
public class ClientProtoUtils {
public static RaftRpcReplyProto.Builder toRaftRpcReplyProtoBuilder(
- byte[] requestorId, byte[] replyId, long seqNum, boolean success) {
+ byte[] requestorId, byte[] replyId, long callId, boolean success) {
return RaftRpcReplyProto.newBuilder()
.setRequestorId(ProtoUtils.toByteString(requestorId))
.setReplyId(ProtoUtils.toByteString(replyId))
- .setSeqNum(seqNum)
+ .setCallId(callId)
.setSuccess(success);
}
public static RaftRpcRequestProto.Builder toRaftRpcRequestProtoBuilder(
- byte[] requesterId, byte[] replyId, long seqNum) {
+ byte[] requesterId, byte[] replyId, long callId) {
return RaftRpcRequestProto.newBuilder()
.setRequestorId(ProtoUtils.toByteString(requesterId))
.setReplyId(ProtoUtils.toByteString(replyId))
- .setSeqNum(seqNum);
+ .setCallId(callId);
}
public static RaftClientRequest toRaftClientRequest(RaftClientRequestProto p) {
@@ -48,7 +48,7 @@ public class ClientProtoUtils {
RaftPeerId serverId = new RaftPeerId(
p.getRpcRequest().getReplyId());
return new RaftClientRequest(clientId, serverId,
- p.getRpcRequest().getSeqNum(),
+ p.getRpcRequest().getCallId(),
toMessage(p.getMessage()), p.getReadOnly());
}
@@ -56,18 +56,18 @@ public class ClientProtoUtils {
RaftClientRequest request) {
return RaftClientRequestProto.newBuilder()
.setRpcRequest(toRaftRpcRequestProtoBuilder(request.getClientId().toBytes(),
- request.getServerId().toBytes(), request.getSeqNum()))
+ request.getServerId().toBytes(), request.getCallId()))
.setMessage(toClientMessageEntryProto(request.getMessage()))
.setReadOnly(request.isReadOnly())
.build();
}
public static RaftClientRequestProto genRaftClientRequestProto(
- ClientId clientId, RaftPeerId serverId, long seqNum, ByteString content,
+ ClientId clientId, RaftPeerId serverId, long callId, ByteString content,
boolean readOnly) {
return RaftClientRequestProto.newBuilder()
.setRpcRequest(toRaftRpcRequestProtoBuilder(clientId.toBytes(),
- serverId.toBytes(), seqNum))
+ serverId.toBytes(), callId))
.setMessage(ClientMessageEntryProto.newBuilder().setContent(content))
.setReadOnly(readOnly)
.build();
@@ -78,7 +78,7 @@ public class ClientProtoUtils {
final RaftClientReplyProto.Builder b = RaftClientReplyProto.newBuilder();
if (reply != null) {
b.setRpcReply(toRaftRpcReplyProtoBuilder(reply.getClientId().toBytes(),
- reply.getServerId().toBytes(), reply.getSeqNum(), reply.isSuccess()));
+ reply.getServerId().toBytes(), reply.getCallId(), reply.isSuccess()));
if (reply.getMessage() != null) {
b.setMessage(toClientMessageEntryProto(reply.getMessage()));
}
@@ -110,7 +110,7 @@ public class ClientProtoUtils {
}
return new RaftClientReply(new ClientId(rp.getRequestorId().toByteArray()),
new RaftPeerId(rp.getReplyId()),
- rp.getSeqNum(), rp.getSuccess(), toMessage(replyProto.getMessage()), e);
+ rp.getCallId(), rp.getSuccess(), toMessage(replyProto.getMessage()), e);
}
public static Message toMessage(final ClientMessageEntryProto p) {
@@ -129,7 +129,7 @@ public class ClientProtoUtils {
return new SetConfigurationRequest(
new ClientId(m.getRequestorId().toByteArray()),
new RaftPeerId(m.getReplyId()),
- p.getRpcRequest().getSeqNum(), peers);
+ p.getRpcRequest().getCallId(), peers);
}
public static SetConfigurationRequestProto toSetConfigurationRequestProto(
@@ -138,7 +138,7 @@ public class ClientProtoUtils {
.setRpcRequest(toRaftRpcRequestProtoBuilder(
request.getClientId().toBytes(),
request.getServerId().toBytes(),
- request.getSeqNum()))
+ request.getCallId()))
.addAllPeers(ProtoUtils.toRaftPeerProtos(
Arrays.asList(request.getPeersInNewConf())))
.build();
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 3a6fd58..be22305 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -26,10 +26,17 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.util.Arrays;
import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;
/** A client who sends requests to a raft service. */
final class RaftClientImpl implements RaftClient {
+ private static final AtomicLong callIdCounter = new AtomicLong();
+
+ private static long nextCallId() {
+ return callIdCounter.getAndIncrement() & Long.MAX_VALUE;
+ }
+
private final ClientId clientId;
private final RaftClientRpc clientRpc;
private final Collection<RaftPeer> peers;
@@ -65,15 +72,17 @@ final class RaftClientImpl implements RaftClient {
}
private RaftClientReply send(Message message, boolean readOnly) throws IOException {
+ final long callId = nextCallId();
return sendRequestWithRetry(() -> new RaftClientRequest(
- clientId, leaderId, DEFAULT_SEQNUM, message, readOnly));
+ clientId, leaderId, callId, message, readOnly));
}
@Override
public RaftClientReply setConfiguration(RaftPeer[] peersInNewConf)
throws IOException {
+ final long callId = nextCallId();
return sendRequestWithRetry(() -> new SetConfigurationRequest(
- clientId, leaderId, DEFAULT_SEQNUM, peersInNewConf));
+ clientId, leaderId, callId, peersInNewConf));
}
private RaftClientReply sendRequestWithRetry(
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index 459e0f4..4dd2943 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -22,29 +22,29 @@ package org.apache.ratis.protocol;
*/
public class RaftClientReply extends RaftClientMessage {
private final boolean success;
- private final long seqNum;
+ private final long callId;
/** non-null if the server is not leader */
private final NotLeaderException notLeaderException;
private final Message message;
- public RaftClientReply(ClientId clientId, RaftPeerId serverId, long seqNum,
+ public RaftClientReply(ClientId clientId, RaftPeerId serverId, long callId,
boolean success, Message message, NotLeaderException notLeaderException) {
super(clientId, serverId);
this.success = success;
- this.seqNum = seqNum;
+ this.callId = callId;
this.message = message;
this.notLeaderException = notLeaderException;
}
public RaftClientReply(RaftClientRequest request,
NotLeaderException notLeaderException) {
- this(request.getClientId(), request.getServerId(), request.getSeqNum(),
+ this(request.getClientId(), request.getServerId(), request.getCallId(),
false, null, notLeaderException);
}
public RaftClientReply(RaftClientRequest request, Message message) {
- this(request.getClientId(), request.getServerId(), request.getSeqNum(),
+ this(request.getClientId(), request.getServerId(), request.getCallId(),
true, message, null);
}
@@ -53,13 +53,13 @@ public class RaftClientReply extends RaftClientMessage {
return false;
}
- public long getSeqNum() {
- return seqNum;
+ public long getCallId() {
+ return callId;
}
@Override
public String toString() {
- return super.toString() + ", seqNum: " + getSeqNum()
+ return super.toString() + ", callId: " + getCallId()
+ ", success: " + isSuccess();
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index 9b649c9..898c166 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -21,19 +21,19 @@ package org.apache.ratis.protocol;
* Request from client to server
*/
public class RaftClientRequest extends RaftClientMessage {
- private final long seqNum;
+ private final long callId;
private final Message message;
private final boolean readOnly;
public RaftClientRequest(ClientId clientId, RaftPeerId serverId,
- long seqNum, Message message) {
- this(clientId, serverId, seqNum, message, false);
+ long callId, Message message) {
+ this(clientId, serverId, callId, message, false);
}
public RaftClientRequest(ClientId clientId, RaftPeerId serverId,
- long seqNum, Message message, boolean readOnly) {
+ long callId, Message message, boolean readOnly) {
super(clientId, serverId);
- this.seqNum = seqNum;
+ this.callId = callId;
this.message = message;
this.readOnly = readOnly;
}
@@ -43,8 +43,8 @@ public class RaftClientRequest extends RaftClientMessage {
return true;
}
- public long getSeqNum() {
- return seqNum;
+ public long getCallId() {
+ return callId;
}
public Message getMessage() {
@@ -57,7 +57,7 @@ public class RaftClientRequest extends RaftClientMessage {
@Override
public String toString() {
- return super.toString() + ", seqNum: " + seqNum + ", "
+ return super.toString() + ", callId: " + callId + ", "
+ (isReadOnly()? "RO": "RW");
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
index 6bc34f4..77d545c 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
@@ -23,8 +23,8 @@ public class SetConfigurationRequest extends RaftClientRequest {
private final RaftPeer[] peers;
public SetConfigurationRequest(ClientId clientId, RaftPeerId serverId,
- long seqNum, RaftPeer[] peers) {
- super(clientId, serverId, seqNum, null);
+ long callId, RaftPeer[] peers) {
+ super(clientId, serverId, callId, null);
this.peers = peers;
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
index 2613342..ea74d09 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/ProtoUtils.java
@@ -132,12 +132,12 @@ public class ProtoUtils {
public static String toString(RaftRpcRequestProto proto) {
return proto.getRequestorId() + "->" + proto.getReplyId()
- + "#" + proto.getSeqNum();
+ + "#" + proto.getCallId();
}
public static String toString(RaftRpcReplyProto proto) {
return proto.getRequestorId() + "<-" + proto.getReplyId()
- + "#" + proto.getSeqNum() + ":"
+ + "#" + proto.getCallId() + ":"
+ (proto.getSuccess()? "OK": "FAIL");
}
public static String toString(RequestVoteReplyProto proto) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
index 4f96c06..6d7b207 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
@@ -272,8 +272,8 @@ public class AppendStreamer implements Closeable {
RaftClientRequestProto pending = Preconditions.checkNotNull(
ackQueue.peek());
if (reply.getRpcReply().getSuccess()) {
- Preconditions.checkState(pending.getRpcRequest().getSeqNum() ==
- reply.getRpcReply().getSeqNum());
+ Preconditions.checkState(pending.getRpcRequest().getCallId() ==
+ reply.getRpcReply().getCallId());
ackQueue.poll();
LOG.trace("{} received success ack for request {}", this,
pending.getRpcRequest());
@@ -375,7 +375,7 @@ public class AppendStreamer implements Closeable {
.setMessage(oldRequest.getMessage())
.setReadOnly(oldRequest.getReadOnly())
.setRpcRequest(toRaftRpcRequestProtoBuilder(
- clientId.toBytes(), newLeader.toBytes(), r.getSeqNum()))
+ clientId.toBytes(), newLeader.toBytes(), r.getCallId()))
.build();
dataQueue.offerFirst(newRequest);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
index 99d8778..d550963 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
@@ -39,11 +39,11 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase
static final Logger LOG = LoggerFactory.getLogger(RaftClientProtocolService.class);
private static class PendingAppend implements Comparable<PendingAppend> {
- private final long seqNum;
+ private final long callId;
private volatile RaftClientReply reply;
- PendingAppend(long seqNum) {
- this.seqNum = seqNum;
+ PendingAppend(long callId) {
+ this.callId = callId;
}
boolean isReady() {
@@ -56,12 +56,12 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase
@Override
public int compareTo(PendingAppend p) {
- return seqNum == p.seqNum ? 0 : (seqNum < p.seqNum ? -1 : 1);
+ return callId == p.callId ? 0 : (callId < p.callId ? -1 : 1);
}
@Override
public String toString() {
- return seqNum + ", reply:" + (reply == null ? "null" : reply.toString());
+ return callId + ", reply:" + (reply == null ? "null" : reply.toString());
}
}
private static final PendingAppend COMPLETED = new PendingAppend(Long.MAX_VALUE);
@@ -111,7 +111,7 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase
@Override
public void onNext(RaftClientRequestProto request) {
try {
- PendingAppend p = new PendingAppend(request.getRpcRequest().getSeqNum());
+ PendingAppend p = new PendingAppend(request.getRpcRequest().getCallId());
synchronized (pendingList) {
pendingList.add(p);
}
@@ -125,17 +125,17 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase
// exception from the state machine.
responseObserver.onError(RaftGrpcUtil.wrapException(exception));
} else {
- final long replySeq = reply.getSeqNum();
+ final long replySeq = reply.getCallId();
synchronized (pendingList) {
Preconditions.checkState(!pendingList.isEmpty(),
- "PendingList is empty when handling onNext for seqNum %s",
+ "PendingList is empty when handling onNext for callId %s",
replySeq);
- final long headSeqNum = pendingList.get(0).seqNum;
- // we assume the seqNum is consecutive for a stream RPC call
+ final long headSeqNum = pendingList.get(0).callId;
+ // we assume the callId is consecutive for a stream RPC call
final PendingAppend pendingForReply = pendingList.get(
(int) (replySeq - headSeqNum));
Preconditions.checkState(pendingForReply != null &&
- pendingForReply.seqNum == replySeq,
+ pendingForReply.callId == replySeq,
"pending for reply is: %s, the pending list: %s",
pendingForReply, pendingList);
pendingForReply.setReply(reply);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java
index 8f0e183..73e56b8 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftOutputStream.java
@@ -23,6 +23,7 @@ import static org.apache.ratis.grpc.RaftGrpcConfigKeys.RAFT_OUTPUTSTREAM_BUFFER_
import java.io.IOException;
import java.io.OutputStream;
import java.util.Collection;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.ClientId;
@@ -34,7 +35,7 @@ public class RaftOutputStream extends OutputStream {
/** internal buffer */
private final byte buf[];
private int count;
- private long seqNum = 0;
+ private final AtomicLong seqNum = new AtomicLong();
private final ClientId clientId;
private final AppendStreamer streamer;
@@ -82,7 +83,8 @@ public class RaftOutputStream extends OutputStream {
private void flushToStreamer() throws IOException {
if (count > 0) {
- streamer.write(ProtoUtils.toByteString(buf, 0, count), seqNum++);
+ streamer.write(ProtoUtils.toByteString(buf, 0, count),
+ seqNum.getAndIncrement());
count = 0;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
index 9b8553b..5b7efc8 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/NettyRpcProxy.java
@@ -69,18 +69,18 @@ public class NettyRpcProxy implements Closeable {
}
}
- public static long getSeqNum(RaftNettyServerReplyProto proto) {
+ public static long getCallId(RaftNettyServerReplyProto proto) {
switch (proto.getRaftNettyServerReplyCase()) {
case REQUESTVOTEREPLY:
- return proto.getRequestVoteReply().getServerReply().getSeqNum();
+ return proto.getRequestVoteReply().getServerReply().getCallId();
case APPENDENTRIESREPLY:
- return proto.getAppendEntriesReply().getServerReply().getSeqNum();
+ return proto.getAppendEntriesReply().getServerReply().getCallId();
case INSTALLSNAPSHOTREPLY:
- return proto.getInstallSnapshotReply().getServerReply().getSeqNum();
+ return proto.getInstallSnapshotReply().getServerReply().getCallId();
case RAFTCLIENTREPLY:
- return proto.getRaftClientReply().getRpcReply().getSeqNum();
+ return proto.getRaftClientReply().getRpcReply().getCallId();
case EXCEPTIONREPLY:
- return proto.getExceptionReply().getRpcReply().getSeqNum();
+ return proto.getExceptionReply().getRpcReply().getCallId();
case RAFTNETTYSERVERREPLY_NOT_SET:
throw new IllegalArgumentException("Reply case not set in proto: "
+ proto.getRaftNettyServerReplyCase());
@@ -104,7 +104,7 @@ public class NettyRpcProxy implements Closeable {
RaftNettyServerReplyProto proto) {
final CompletableFuture<RaftNettyServerReplyProto> future = pollReply();
if (future == null) {
- throw new IllegalStateException("Request #" + getSeqNum(proto)
+ throw new IllegalStateException("Request #" + getCallId(proto)
+ " not found");
}
if (proto.getRaftNettyServerReplyCase() == EXCEPTIONREPLY) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
----------------------------------------------------------------------
diff --git a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
index a1486b6..2d927db 100644
--- a/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
+++ b/ratis-netty/src/main/java/org/apache/ratis/netty/server/NettyRpcService.java
@@ -218,7 +218,7 @@ public final class NettyRpcService implements RaftServerRpc {
final RaftRpcReplyProto.Builder rpcReply = RaftRpcReplyProto.newBuilder()
.setRequestorId(request.getRequestorId())
.setReplyId(request.getReplyId())
- .setSeqNum(request.getSeqNum())
+ .setCallId(request.getCallId())
.setSuccess(false);
final RaftNettyExceptionReplyProto.Builder ioe = RaftNettyExceptionReplyProto.newBuilder()
.setRpcReply(rpcReply)
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto
index 14901f6..8c334dd 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -65,13 +65,13 @@ message TermIndexProto {
message RaftRpcRequestProto {
bytes requestorId = 1;
bytes replyId = 2;
- uint64 seqNum = 3;
+ uint64 callId = 3;
}
message RaftRpcReplyProto {
bytes requestorId = 1;
bytes replyId = 2;
- uint64 seqNum = 3;
+ uint64 callId = 3;
bool success = 4;
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
index caf9c4d..b4db46e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerConstants.java
@@ -17,12 +17,10 @@
*/
package org.apache.ratis.server.impl;
-import org.apache.ratis.client.RaftClient;
-
public interface RaftServerConstants {
long INVALID_LOG_INDEX = -1;
byte LOG_TERMINATE_BYTE = 0;
- long DEFAULT_SEQNUM = RaftClient.DEFAULT_SEQNUM;
+ long DEFAULT_CALLID = 0;
enum StartupOption {
FORMAT("format"),
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index b9f063e..3c7c0e1 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -381,7 +381,7 @@ public class RaftServerImpl implements RaftServer {
final long entryIndex;
try {
entryIndex = state.applyLog(entry, request.getClientId(),
- request.getSeqNum());
+ request.getCallId());
} catch (IOException e) {
throw new RaftException(e);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 846fbe2..945be8d 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -17,7 +17,7 @@
*/
package org.apache.ratis.server.impl;
-import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
+import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
import static org.apache.ratis.shaded.proto.RaftProtos.AppendEntriesReplyProto.AppendResult.SUCCESS;
import java.util.Arrays;
@@ -112,7 +112,7 @@ public class ServerProtoUtils {
boolean shouldShutdown) {
final RequestVoteReplyProto.Builder b = RequestVoteReplyProto.newBuilder();
b.setServerReply(ClientProtoUtils.toRaftRpcReplyProtoBuilder(
- requestorId.toBytes(), replyId.toBytes(), DEFAULT_SEQNUM, success))
+ requestorId.toBytes(), replyId.toBytes(), DEFAULT_CALLID, success))
.setTerm(term)
.setShouldShutdown(shouldShutdown);
return b.build();
@@ -121,8 +121,7 @@ public class ServerProtoUtils {
public static RequestVoteRequestProto toRequestVoteRequestProto(
RaftPeerId requestorId, RaftPeerId replyId, long term, TermIndex lastEntry) {
RaftProtos.RaftRpcRequestProto.Builder rpb = ClientProtoUtils
- .toRaftRpcRequestProtoBuilder(requestorId.toBytes(), replyId.toBytes(),
- DEFAULT_SEQNUM);
+ .toRaftRpcRequestProtoBuilder(requestorId.toBytes(), replyId.toBytes(), DEFAULT_CALLID);
final RequestVoteRequestProto.Builder b = RequestVoteRequestProto.newBuilder()
.setServerRequest(rpb)
.setCandidateTerm(term);
@@ -136,7 +135,7 @@ public class ServerProtoUtils {
RaftPeerId requestorId, RaftPeerId replyId, long term, int requestIndex,
InstallSnapshotResult result) {
final RaftRpcReplyProto.Builder rb = ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId.toBytes(),
- replyId.toBytes(), DEFAULT_SEQNUM, result == InstallSnapshotResult.SUCCESS);
+ replyId.toBytes(), DEFAULT_CALLID, result == InstallSnapshotResult.SUCCESS);
final InstallSnapshotReplyProto.Builder builder = InstallSnapshotReplyProto
.newBuilder().setServerReply(rb).setTerm(term).setResult(result)
.setRequestIndex(requestIndex);
@@ -150,7 +149,7 @@ public class ServerProtoUtils {
return InstallSnapshotRequestProto.newBuilder()
.setServerRequest(
ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId.toBytes(),
- replyId.toBytes(), DEFAULT_SEQNUM))
+ replyId.toBytes(), DEFAULT_CALLID))
.setRequestId(requestId)
.setRequestIndex(requestIndex)
// .setRaftConfiguration() TODO: save and pass RaftConfiguration
@@ -165,7 +164,7 @@ public class ServerProtoUtils {
RaftPeerId requestorId, RaftPeerId replyId, long term,
long nextIndex, AppendEntriesReplyProto.AppendResult appendResult) {
RaftRpcReplyProto.Builder rb = ClientProtoUtils.toRaftRpcReplyProtoBuilder(requestorId.toBytes(),
- replyId.toBytes(), DEFAULT_SEQNUM, appendResult == SUCCESS);
+ replyId.toBytes(), DEFAULT_CALLID, appendResult == SUCCESS);
final AppendEntriesReplyProto.Builder b = AppendEntriesReplyProto.newBuilder();
b.setServerReply(rb).setTerm(term).setNextIndex(nextIndex)
.setResult(appendResult);
@@ -180,7 +179,7 @@ public class ServerProtoUtils {
.newBuilder()
.setServerRequest(
ClientProtoUtils.toRaftRpcRequestProtoBuilder(requestorId.toBytes(),
- replyId.toBytes(), DEFAULT_SEQNUM))
+ replyId.toBytes(), DEFAULT_CALLID))
.setLeaderTerm(leaderTerm)
.setLeaderCommit(leaderCommit)
.setInitializing(initializing);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java
index a5d1127..54cfa4d 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftNotLeaderExceptionBaseTest.java
@@ -40,7 +40,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
-import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
+import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
public abstract class RaftNotLeaderExceptionBaseTest {
static {
@@ -92,7 +92,7 @@ public abstract class RaftNotLeaderExceptionBaseTest {
for (int i = 0; reply == null && i < 10; i++) {
try {
reply = rpc.sendRequest(
- new RaftClientRequest(ClientId.createId(), leaderId, DEFAULT_SEQNUM,
+ new RaftClientRequest(ClientId.createId(), leaderId, DEFAULT_CALLID,
new SimpleMessage("m2")));
} catch (IOException ignored) {
Thread.sleep(1000);
@@ -139,7 +139,7 @@ public abstract class RaftNotLeaderExceptionBaseTest {
for (int i = 0; reply == null && i < 10; i++) {
try {
reply = rpc.sendRequest(
- new RaftClientRequest(ClientId.createId(), leaderId, DEFAULT_SEQNUM,
+ new RaftClientRequest(ClientId.createId(), leaderId, DEFAULT_CALLID,
new SimpleMessage("m1")));
} catch (IOException ignored) {
Thread.sleep(1000);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
index e3db854..90cbef5 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftReconfigurationBaseTest.java
@@ -48,7 +48,7 @@ import java.util.concurrent.atomic.AtomicReference;
import static java.util.Arrays.asList;
import static org.apache.ratis.MiniRaftCluster.leaderPlaceHolderDelay;
import static org.apache.ratis.MiniRaftCluster.logSyncDelay;
-import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
+import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
import static org.apache.ratis.server.impl.RaftServerTestUtil.waitAndCheckNewConf;
import static org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto.LogEntryBodyCase.CONFIGURATIONENTRY;
@@ -93,7 +93,7 @@ public abstract class RaftReconfigurationBaseTest {
// trigger setConfiguration
SetConfigurationRequest request = new SetConfigurationRequest(clientId,
- cluster.getLeader().getId(), DEFAULT_SEQNUM, allPeers);
+ cluster.getLeader().getId(), DEFAULT_CALLID, allPeers);
LOG.info("Start changing the configuration: {}", request);
cluster.getLeader().setConfiguration(request);
@@ -121,7 +121,7 @@ public abstract class RaftReconfigurationBaseTest {
// trigger setConfiguration
SetConfigurationRequest request = new SetConfigurationRequest(clientId,
- cluster.getLeader().getId(), DEFAULT_SEQNUM, allPeers);
+ cluster.getLeader().getId(), DEFAULT_CALLID, allPeers);
LOG.info("Start changing the configuration: {}", request);
cluster.getLeader().setConfiguration(request);
@@ -159,7 +159,7 @@ public abstract class RaftReconfigurationBaseTest {
// trigger setConfiguration
SetConfigurationRequest request = new SetConfigurationRequest(clientId,
- cluster.getLeader().getId(), DEFAULT_SEQNUM, allPeers);
+ cluster.getLeader().getId(), DEFAULT_CALLID, allPeers);
LOG.info("Start changing the configuration: {}", request);
cluster.getLeader().setConfiguration(request);
@@ -255,7 +255,7 @@ public abstract class RaftReconfigurationBaseTest {
final RaftClientRpc sender = client.getClientRpc();
final SetConfigurationRequest request = new SetConfigurationRequest(
- client.getId(), leaderId, DEFAULT_SEQNUM, c1.allPeersInNewConf);
+ client.getId(), leaderId, DEFAULT_CALLID, c1.allPeersInNewConf);
try {
sender.sendRequest(request);
Assert.fail("did not get expected exception");
@@ -472,7 +472,7 @@ public abstract class RaftReconfigurationBaseTest {
LOG.info("client2 starts to change conf");
final RaftClientRpc sender2 = client2.getClientRpc();
sender2.sendRequest(new SetConfigurationRequest(
- client2.getId(), leaderId, DEFAULT_SEQNUM, peersInRequest2));
+ client2.getId(), leaderId, DEFAULT_CALLID, peersInRequest2));
} catch (ReconfigurationInProgressException e) {
caughtException.set(true);
} catch (Exception e) {
@@ -536,7 +536,7 @@ public abstract class RaftReconfigurationBaseTest {
LOG.info("client starts to change conf");
final RaftClientRpc sender = client.getClientRpc();
RaftClientReply reply = sender.sendRequest(new SetConfigurationRequest(
- client.getId(), leaderId, DEFAULT_SEQNUM, change.allPeersInNewConf));
+ client.getId(), leaderId, DEFAULT_CALLID, change.allPeersInNewConf));
if (reply.isNotLeader()) {
gotNotLeader.set(true);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/6adf89d4/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
index cb4412a..217d5ae 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/RaftSnapshotBaseTest.java
@@ -19,7 +19,7 @@ package org.apache.ratis.statemachine;
import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_AUTO_SNAPSHOT_ENABLED_KEY;
import static org.apache.ratis.server.RaftServerConfigKeys.RAFT_SERVER_SNAPSHOT_TRIGGER_THRESHOLD_KEY;
-import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_SEQNUM;
+import static org.apache.ratis.server.impl.RaftServerConstants.DEFAULT_CALLID;
import java.io.File;
import java.util.List;
@@ -200,7 +200,7 @@ public abstract class RaftSnapshotBaseTest {
new String[]{"s3", "s4"}, true);
// trigger setConfiguration
SetConfigurationRequest request = new SetConfigurationRequest(ClientId.createId(),
- cluster.getLeader().getId(), DEFAULT_SEQNUM, change.allPeersInNewConf);
+ cluster.getLeader().getId(), DEFAULT_CALLID, change.allPeersInNewConf);
LOG.info("Start changing the configuration: {}", request);
cluster.getLeader().setConfiguration(request);