You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ms...@apache.org on 2018/03/16 16:12:42 UTC
incubator-ratis git commit: RATIS-210. Refactor client request proto.
Contributed by Tsz Wo Nicholas Sze.
Repository: incubator-ratis
Updated Branches:
refs/heads/master 8fd74ede4 -> cbfa28a02
RATIS-210. Refactor client request proto. Contributed by Tsz Wo Nicholas Sze.
Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/cbfa28a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/cbfa28a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/cbfa28a0
Branch: refs/heads/master
Commit: cbfa28a0232d969521f5d905d3d22c3b0a8ab402
Parents: 8fd74ed
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Fri Mar 16 21:42:07 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Fri Mar 16 21:42:07 2018 +0530
----------------------------------------------------------------------
.../ratis/client/impl/ClientProtoUtils.java | 45 +++++-
.../ratis/client/impl/RaftClientImpl.java | 32 ++---
.../ratis/protocol/RaftClientRequest.java | 144 ++++++++++++++-----
.../ratis/protocol/ReinitializeRequest.java | 2 +-
.../ratis/protocol/ServerInformatonRequest.java | 2 +-
.../ratis/protocol/SetConfigurationRequest.java | 2 +-
.../TestRaftStateMachineException.java | 8 +-
ratis-proto-shaded/src/main/proto/Raft.proto | 26 ++--
.../ratis/server/impl/PendingRequests.java | 3 +-
.../ratis/server/impl/RaftServerImpl.java | 6 +-
.../java/org/apache/ratis/MiniRaftCluster.java | 10 +-
.../java/org/apache/ratis/RetryCacheTests.java | 10 +-
12 files changed, 206 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cbfa28a0/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index de28e18..241f704 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -67,7 +67,22 @@ public interface ClientProtoUtils {
request.getSeqNum());
}
+ static RaftClientRequest.Type toRaftClientRequestType(RaftClientRequestProto p) {
+ switch (p.getTypeCase()) {
+ case WRITE:
+ return RaftClientRequest.Type.valueOf(p.getWrite());
+ case READ:
+ return RaftClientRequest.Type.valueOf(p.getRead());
+ case STALEREAD:
+ return RaftClientRequest.Type.valueOf(p.getStaleRead());
+ default:
+ throw new IllegalArgumentException("Unexpected request type: " + p.getTypeCase()
+ + " in request proto " + p);
+ }
+ }
+
static RaftClientRequest toRaftClientRequest(RaftClientRequestProto p) {
+ final RaftClientRequest.Type type = toRaftClientRequestType(p);
final RaftRpcRequestProto request = p.getRpcRequest();
return new RaftClientRequest(
ClientId.valueOf(request.getRequestorId()),
@@ -75,17 +90,33 @@ public interface ClientProtoUtils {
ProtoUtils.toRaftGroupId(request.getRaftGroupId()),
request.getCallId(),
request.getSeqNum(),
- p.getType(), toMessage(p.getMessage()), p.getMinIndex());
+ toMessage(p.getMessage()),
+ type);
}
static RaftClientRequestProto toRaftClientRequestProto(
RaftClientRequest request) {
- return RaftClientRequestProto.newBuilder()
+ final RaftClientRequestProto.Builder b = RaftClientRequestProto.newBuilder()
.setRpcRequest(toRaftRpcRequestProtoBuilder(request))
- .setType(request.getType())
- .setMessage(toClientMessageEntryProtoBuilder(request.getMessage()))
- .setMinIndex(request.getMinIndex())
- .build();
+ .setMessage(toClientMessageEntryProtoBuilder(request.getMessage()));
+
+ final RaftClientRequest.Type type = request.getType();
+ switch (type.getTypeCase()) {
+ case WRITE:
+ b.setWrite(type.getWrite());
+ break;
+ case READ:
+ b.setRead(type.getRead());
+ break;
+ case STALEREAD:
+ b.setStaleRead(type.getStaleRead());
+ break;
+ default:
+ throw new IllegalArgumentException("Unexpected request type: " + request.getType()
+ + " in request " + request);
+ }
+
+ return b.build();
}
static RaftClientRequestProto toRaftClientRequestProto(
@@ -94,7 +125,7 @@ public interface ClientProtoUtils {
return RaftClientRequestProto.newBuilder()
.setRpcRequest(toRaftRpcRequestProtoBuilder(
clientId, serverId, groupId, callId, seqNum))
- .setType(RaftClientRequestProto.Type.WRITE)
+ .setWrite(WriteRequestTypeProto.getDefaultInstance())
.setMessage(toClientMessageEntryProtoBuilder(content))
.build();
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cbfa28a0/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 6f3b8e0..ca1f057 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -22,7 +22,6 @@ import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.client.RaftClientRpc;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.protocol.*;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.util.*;
import java.io.IOException;
@@ -37,9 +36,7 @@ import java.util.function.LongFunction;
import java.util.function.Supplier;
import java.util.stream.Stream;
-import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.Type.READ;
-import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.Type.STALE_READ;
-import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.Type.WRITE;
+import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.TypeCase.STALEREAD;
/** A client who sends requests to a raft service. */
final class RaftClientImpl implements RaftClient {
@@ -123,7 +120,7 @@ final class RaftClientImpl implements RaftClient {
}
private SlidingWindow.Client<PendingAsyncRequest, RaftClientReply> getSlidingWindow(RaftClientRequest request) {
- return getSlidingWindow(request.isStaleRead()? request.getServerId(): null);
+ return getSlidingWindow(request.is(STALEREAD)? request.getServerId(): null);
}
private SlidingWindow.Client<PendingAsyncRequest, RaftClientReply> getSlidingWindow(RaftPeerId target) {
@@ -133,21 +130,21 @@ final class RaftClientImpl implements RaftClient {
@Override
public CompletableFuture<RaftClientReply> sendAsync(Message message) {
- return sendAsync(WRITE, message, 0L, null);
+ return sendAsync(RaftClientRequest.writeRequestType(), message, null);
}
@Override
public CompletableFuture<RaftClientReply> sendReadOnlyAsync(Message message) {
- return sendAsync(READ, message, 0L, null);
+ return sendAsync(RaftClientRequest.readRequestType(), message, null);
}
@Override
public CompletableFuture<RaftClientReply> sendStaleReadAsync(Message message, long minIndex, RaftPeerId server) {
- return sendAsync(STALE_READ, message, minIndex, server);
+ return sendAsync(RaftClientRequest.staleReadRequestType(minIndex), message, server);
}
private CompletableFuture<RaftClientReply> sendAsync(
- RaftClientRequestProto.Type type, Message message, long minIndex, RaftPeerId server) {
+ RaftClientRequest.Type type, Message message, RaftPeerId server) {
Objects.requireNonNull(message, "message == null");
try {
asyncRequestSemaphore.acquire();
@@ -157,7 +154,7 @@ final class RaftClientImpl implements RaftClient {
}
final long callId = nextCallId();
final LongFunction<PendingAsyncRequest> constructor = seqNum -> new PendingAsyncRequest(seqNum,
- seq -> newRaftClientRequest(server, callId, seq, type, message, minIndex));
+ seq -> newRaftClientRequest(server, callId, seq, message, type));
return getSlidingWindow(server).submitNewRequest(constructor, this::sendRequestWithRetryAsync
).getReplyFuture(
).thenApply(reply -> handleStateMachineException(reply, CompletionException::new)
@@ -165,35 +162,34 @@ final class RaftClientImpl implements RaftClient {
}
private RaftClientRequest newRaftClientRequest(
- RaftPeerId server, long callId, long seq,
- RaftClientRequestProto.Type type, Message message, long minIndex) {
+ RaftPeerId server, long callId, long seq, Message message, RaftClientRequest.Type type) {
return new RaftClientRequest(clientId, server != null? server: leaderId, groupId,
- callId, seq, type, message, minIndex);
+ callId, seq, message, type);
}
@Override
public RaftClientReply send(Message message) throws IOException {
- return send(WRITE, message, 0L, null);
+ return send(RaftClientRequest.writeRequestType(), message, null);
}
@Override
public RaftClientReply sendReadOnly(Message message) throws IOException {
- return send(READ, message, 0L, null);
+ return send(RaftClientRequest.readRequestType(), message, null);
}
@Override
public RaftClientReply sendStaleRead(Message message, long minIndex, RaftPeerId server)
throws IOException {
- return send(STALE_READ, message, minIndex, server);
+ return send(RaftClientRequest.staleReadRequestType(minIndex), message, server);
}
- private RaftClientReply send(RaftClientRequestProto.Type type, Message message, long minIndex, RaftPeerId server)
+ private RaftClientReply send(RaftClientRequest.Type type, Message message, RaftPeerId server)
throws IOException {
Objects.requireNonNull(message, "message == null");
final long callId = nextCallId();
return sendRequestWithRetry(() -> newRaftClientRequest(
- server, callId, 0L, type, message, minIndex));
+ server, callId, 0L, message, type));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cbfa28a0/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index d20b158..072a854 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -18,45 +18,135 @@
package org.apache.ratis.protocol;
import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.ReadRequestTypeProto;
+import org.apache.ratis.shaded.proto.RaftProtos.StaleReadRequestTypeProto;
+import org.apache.ratis.shaded.proto.RaftProtos.WriteRequestTypeProto;
import org.apache.ratis.util.Preconditions;
-import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.Type.READ;
-import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.Type.STALE_READ;
-import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.Type.WRITE;
+import java.util.Objects;
+
+import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.TypeCase.*;
/**
* Request from client to server
*/
public class RaftClientRequest extends RaftClientMessage {
- private final long callId;
- private final long seqNum;
+ private static final Type DEFAULT_WRITE = new Type(WriteRequestTypeProto.getDefaultInstance());
+ private static final Type DEFAULT_READ = new Type(ReadRequestTypeProto.getDefaultInstance());
+ private static final Type DEFAULT_STALE_READ = new Type(StaleReadRequestTypeProto.getDefaultInstance());
- private final RaftClientRequestProto.Type type;
- private final Message message;
+ public static Type writeRequestType() {
+ return DEFAULT_WRITE;
+ }
- private final long minIndex;
+ public static Type readRequestType() {
+ return DEFAULT_READ;
+ }
- public RaftClientRequest(ClientId clientId, RaftPeerId serverId,
- RaftGroupId groupId, long callId, Message message) {
- this(clientId, serverId, groupId, callId, 0L, WRITE, message, 0L);
+ public static Type staleReadRequestType(long minIndex) {
+ return minIndex == 0L? DEFAULT_STALE_READ
+ : new Type(StaleReadRequestTypeProto.newBuilder().setMinIndex(minIndex).build());
+ }
+
+ /** The type of a request (oneof write, read, staleRead; see the message RaftClientRequestProto). */
+ public static class Type {
+ public static Type valueOf(WriteRequestTypeProto write) {
+ return DEFAULT_WRITE;
+ }
+
+ public static Type valueOf(ReadRequestTypeProto read) {
+ return DEFAULT_READ;
+ }
+
+ public static Type valueOf(StaleReadRequestTypeProto staleRead) {
+ return staleRead.getMinIndex() == 0? DEFAULT_STALE_READ
+ : new Type(staleRead);
+ }
+
+ /**
+ * The type case of the proto.
+ * Only the corresponding proto (must be non-null) is used.
+ * The other protos are ignored.
+ */
+ private final RaftClientRequestProto.TypeCase typeCase;
+ private final WriteRequestTypeProto write;
+ private final ReadRequestTypeProto read;
+ private final StaleReadRequestTypeProto staleRead;
+
+ private Type(WriteRequestTypeProto write) {
+ this.typeCase = WRITE;
+ this.write = Objects.requireNonNull(write);
+ this.read = null;
+ this.staleRead = null;
+ }
+
+ private Type(ReadRequestTypeProto read) {
+ this.typeCase = READ;
+ this.write = null;
+ this.read = Objects.requireNonNull(read);
+ this.staleRead = null;
+ }
+
+ private Type(StaleReadRequestTypeProto staleRead) {
+ this.typeCase = STALEREAD;
+ this.write = null;
+ this.read = null;
+ this.staleRead = Objects.requireNonNull(staleRead);
+ }
+
+ public RaftClientRequestProto.TypeCase getTypeCase() {
+ return typeCase;
+ }
+
+ public WriteRequestTypeProto getWrite() {
+ Preconditions.assertTrue(typeCase == WRITE);
+ return write;
+ }
+
+ public ReadRequestTypeProto getRead() {
+ Preconditions.assertTrue(typeCase == READ);
+ return read;
+ }
+
+ public StaleReadRequestTypeProto getStaleRead() {
+ Preconditions.assertTrue(typeCase == STALEREAD);
+ return staleRead;
+ }
+
+ @Override
+ public String toString() {
+ switch (typeCase) {
+ case WRITE:
+ return "RW";
+ case READ:
+ return "RO";
+ case STALEREAD:
+ return "StaleRead(" + staleRead.getMinIndex() + ")";
+ default:
+ throw new IllegalStateException("Unexpected request type: " + typeCase);
+ }
+ }
}
+ private final long callId;
+ private final long seqNum;
+
+ private final Message message;
+ private final Type type;
+
public RaftClientRequest(ClientId clientId, RaftPeerId serverId,
- RaftGroupId groupId, long callId, long seqNum, Message message) {
- this(clientId, serverId, groupId, callId, seqNum, WRITE, message, 0L);
+ RaftGroupId groupId, long callId) {
+ this(clientId, serverId, groupId, callId, 0L, null, writeRequestType());
}
public RaftClientRequest(
ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
- long callId, long seqNum, RaftClientRequestProto.Type type, Message message, long minIndex) {
+ long callId, long seqNum, Message message, Type type) {
super(clientId, serverId, groupId);
this.callId = callId;
this.seqNum = seqNum;
- this.type = type;
this.message = message;
- this.minIndex = minIndex;
-
- Preconditions.assertTrue(minIndex >= 0, "minIndex < 0");
+ this.type = type;
}
@Override
@@ -76,27 +166,17 @@ public class RaftClientRequest extends RaftClientMessage {
return message;
}
- public RaftClientRequestProto.Type getType() {
+ public Type getType() {
return type;
}
- public boolean isReadOnly() {
- return getType() != WRITE;
- }
-
- public boolean isStaleRead() {
- return getType() == STALE_READ;
- }
-
- /** @return the minimum required commit index for processing the request. */
- public long getMinIndex() {
- return minIndex;
+ public boolean is(RaftClientRequestProto.TypeCase typeCase) {
+ return getType().getTypeCase() == typeCase;
}
@Override
public String toString() {
return super.toString() + ", cid=" + callId + ", seq=" + seqNum + " "
- + (!isReadOnly()? "RW": isStaleRead()? "StaleRead(" + getMinIndex() + ")": "RO")
- + ", " + getMessage();
+ + type + ", " + getMessage();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cbfa28a0/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java
index 3c6d468..b0e69af 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java
@@ -22,7 +22,7 @@ public class ReinitializeRequest extends RaftClientRequest {
public ReinitializeRequest(ClientId clientId, RaftPeerId serverId,
RaftGroupId groupId, long callId, RaftGroup group) {
- super(clientId, serverId, groupId, callId, null);
+ super(clientId, serverId, groupId, callId);
this.group = group;
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cbfa28a0/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformatonRequest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformatonRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformatonRequest.java
index c2b7eb5..5e047af 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformatonRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformatonRequest.java
@@ -24,6 +24,6 @@ package org.apache.ratis.protocol;
public class ServerInformatonRequest extends RaftClientRequest {
public ServerInformatonRequest(ClientId clientId, RaftPeerId serverId,
RaftGroupId groupId, long callId) {
- super(clientId, serverId, groupId, callId, null);
+ super(clientId, serverId, groupId, callId, 0L, null, RaftClientRequest.readRequestType());
}
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cbfa28a0/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
index 83be197..e25da2a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
@@ -24,7 +24,7 @@ public class SetConfigurationRequest extends RaftClientRequest {
public SetConfigurationRequest(ClientId clientId, RaftPeerId serverId,
RaftGroupId groupId, long callId, RaftPeer[] peers) {
- super(clientId, serverId, groupId, callId, null);
+ super(clientId, serverId, groupId, callId);
this.peers = peers;
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cbfa28a0/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
index b0e2b7c..a339b68 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
@@ -102,8 +102,8 @@ public class TestRaftStateMachineException extends ParameterizedBaseTest {
final RaftClientRpc rpc = client.getClientRpc();
final long callId = 999;
final long seqNum = 111;
- RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId,
- cluster.getGroupId(), callId, seqNum, new SimpleMessage("message"));
+ RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId,
+ callId, seqNum, new SimpleMessage("message"));
RaftClientReply reply = rpc.sendRequest(r);
Assert.assertFalse(reply.isSuccess());
Assert.assertNotNull(reply.getStateMachineException());
@@ -144,8 +144,8 @@ public class TestRaftStateMachineException extends ParameterizedBaseTest {
final RaftClientRpc rpc = client.getClientRpc();
final long callId = 999;
final long seqNum = 111;
- RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId,
- cluster.getGroupId(), callId, seqNum, new SimpleMessage("message"));
+ RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId,
+ callId, seqNum, new SimpleMessage("message"));
RaftClientReply reply = rpc.sendRequest(r);
Objects.requireNonNull(reply.getStateMachineException());
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cbfa28a0/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto
index d5f6d97..0fa845b 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -170,18 +170,26 @@ message ClientMessageEntryProto {
bytes content = 1;
}
+message WriteRequestTypeProto {
+}
+
+message ReadRequestTypeProto {
+}
+
+message StaleReadRequestTypeProto {
+ uint64 minIndex = 1;
+}
+
// normal client request
message RaftClientRequestProto {
- enum Type {
- WRITE = 0;
- READ = 1;
- STALE_READ = 2;
- }
-
RaftRpcRequestProto rpcRequest = 1;
- Type type = 2;
- ClientMessageEntryProto message = 3;
- uint64 minIndex = 4;
+ ClientMessageEntryProto message = 2;
+
+ oneof Type {
+ WriteRequestTypeProto write = 3;
+ ReadRequestTypeProto read = 4;
+ StaleReadRequestTypeProto staleRead = 5;
+ }
}
message NotLeaderExceptionProto {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cbfa28a0/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index c9c66a5..b418658 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -18,6 +18,7 @@
package org.apache.ratis.server.impl;
import org.apache.ratis.protocol.*;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
import org.apache.ratis.statemachine.TransactionContext;
import org.apache.ratis.util.Preconditions;
import org.slf4j.Logger;
@@ -83,7 +84,7 @@ class PendingRequests {
PendingRequest addPendingRequest(long index, RaftClientRequest request,
TransactionContext entry) {
// externally synced for now
- Preconditions.assertTrue(!request.isReadOnly());
+ Preconditions.assertTrue(request.is(RaftClientRequestProto.TypeCase.WRITE));
if (last != null && !(last.getRequest() instanceof SetConfigurationRequest)) {
Preconditions.assertTrue(index == last.getIndex() + 1,
() -> "index = " + index + " != last.getIndex() + 1, last=" + last);
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cbfa28a0/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 441e390..1777bb8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -476,7 +476,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
RaftClientRequest request) throws IOException {
assertLifeCycleState(RUNNING);
LOG.debug("{}: receive client request({})", getId(), request);
- if (request.isStaleRead()) {
+ if (request.is(RaftClientRequestProto.TypeCase.STALEREAD)) {
return staleReadAsync(request);
}
@@ -488,7 +488,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
// let the state machine handle read-only request from client
final StateMachine stateMachine = getStateMachine();
- if (request.isReadOnly()) {
+ if (request.is(RaftClientRequestProto.TypeCase.READ)) {
// TODO: We might not be the leader anymore by the time this completes.
// See the RAFT paper section 8 (last part)
return processQueryFuture(stateMachine.query(request.getMessage()), request);
@@ -518,7 +518,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
}
private CompletableFuture<RaftClientReply> staleReadAsync(RaftClientRequest request) {
- final long minIndex = request.getMinIndex();
+ final long minIndex = request.getType().getStaleRead().getMinIndex();
final long commitIndex = state.getLog().getLastCommittedIndex();
LOG.debug("{}: minIndex={}, commitIndex={}", getId(), minIndex, commitIndex);
if (commitIndex < minIndex) {
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cbfa28a0/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index f4df193..91a1600 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -497,13 +497,19 @@ public abstract class MiniRaftCluster {
public RaftClientRequest newRaftClientRequest(
ClientId clientId, RaftPeerId leaderId, Message message) {
- return new RaftClientRequest(clientId, leaderId, getGroupId(),
+ return newRaftClientRequest(clientId, leaderId,
DEFAULT_CALLID, DEFAULT_SEQNUM, message);
}
+ public RaftClientRequest newRaftClientRequest(
+ ClientId clientId, RaftPeerId leaderId, long callId, long seqNum, Message message) {
+ return new RaftClientRequest(clientId, leaderId, getGroupId(),
+ callId, seqNum, message, RaftClientRequest.writeRequestType());
+ }
+
public SetConfigurationRequest newSetConfigurationRequest(
ClientId clientId, RaftPeerId leaderId,
- RaftPeer... peers) throws IOException {
+ RaftPeer... peers) {
return new SetConfigurationRequest(clientId, leaderId, getGroupId(),
DEFAULT_CALLID, peers);
}
http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cbfa28a0/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
index e02f999..474482b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
@@ -77,8 +77,8 @@ public abstract class RetryCacheTests extends BaseTest {
final RaftClientRpc rpc = client.getClientRpc();
final long callId = 999;
final long seqNum = 111;
- RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId,
- cluster.getGroupId(), callId, seqNum, new SimpleMessage("message"));
+ RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId,
+ callId, seqNum, new SimpleMessage("message"));
RaftClientReply reply = rpc.sendRequest(r);
Assert.assertEquals(callId, reply.getCallId());
Assert.assertTrue(reply.isSuccess());
@@ -122,8 +122,8 @@ public abstract class RetryCacheTests extends BaseTest {
RaftClientRpc rpc = client.getClientRpc();
final long callId = 999;
final long seqNum = 111;
- RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId,
- cluster.getGroupId(), callId, seqNum, new SimpleMessage("message"));
+ RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId,
+ callId, seqNum, new SimpleMessage("message"));
RaftClientReply reply = rpc.sendRequest(r);
Assert.assertEquals(callId, reply.getCallId());
Assert.assertTrue(reply.isSuccess());
@@ -140,7 +140,7 @@ public abstract class RetryCacheTests extends BaseTest {
final RaftPeerId newLeaderId = cluster.getLeader().getId();
Assert.assertNotEquals(leaderId, newLeaderId);
// same clientId and callId in the request
- r = new RaftClientRequest(client.getId(), newLeaderId, cluster.getGroupId(),
+ r = cluster.newRaftClientRequest(client.getId(), newLeaderId,
callId, seqNum, new SimpleMessage("message"));
for (int i = 0; i < 10; i++) {
try {