You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ms...@apache.org on 2018/03/16 16:12:42 UTC

incubator-ratis git commit: RATIS-210. Refactor client request proto. Contributed by Tsz Wo Nicholas Sze.

Repository: incubator-ratis
Updated Branches:
  refs/heads/master 8fd74ede4 -> cbfa28a02


RATIS-210. Refactor client request proto. Contributed by Tsz Wo Nicholas Sze.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/cbfa28a0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/cbfa28a0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/cbfa28a0

Branch: refs/heads/master
Commit: cbfa28a0232d969521f5d905d3d22c3b0a8ab402
Parents: 8fd74ed
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Fri Mar 16 21:42:07 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Fri Mar 16 21:42:07 2018 +0530

----------------------------------------------------------------------
 .../ratis/client/impl/ClientProtoUtils.java     |  45 +++++-
 .../ratis/client/impl/RaftClientImpl.java       |  32 ++---
 .../ratis/protocol/RaftClientRequest.java       | 144 ++++++++++++++-----
 .../ratis/protocol/ReinitializeRequest.java     |   2 +-
 .../ratis/protocol/ServerInformatonRequest.java |   2 +-
 .../ratis/protocol/SetConfigurationRequest.java |   2 +-
 .../TestRaftStateMachineException.java          |   8 +-
 ratis-proto-shaded/src/main/proto/Raft.proto    |  26 ++--
 .../ratis/server/impl/PendingRequests.java      |   3 +-
 .../ratis/server/impl/RaftServerImpl.java       |   6 +-
 .../java/org/apache/ratis/MiniRaftCluster.java  |  10 +-
 .../java/org/apache/ratis/RetryCacheTests.java  |  10 +-
 12 files changed, 206 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cbfa28a0/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index de28e18..241f704 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -67,7 +67,22 @@ public interface ClientProtoUtils {
         request.getSeqNum());
   }
 
+  static RaftClientRequest.Type toRaftClientRequestType(RaftClientRequestProto p) {
+    switch (p.getTypeCase()) {
+      case WRITE:
+        return RaftClientRequest.Type.valueOf(p.getWrite());
+      case READ:
+        return RaftClientRequest.Type.valueOf(p.getRead());
+      case STALEREAD:
+        return RaftClientRequest.Type.valueOf(p.getStaleRead());
+      default:
+        throw new IllegalArgumentException("Unexpected request type: " + p.getTypeCase()
+            + " in request proto " + p);
+    }
+  }
+
   static RaftClientRequest toRaftClientRequest(RaftClientRequestProto p) {
+    final RaftClientRequest.Type type = toRaftClientRequestType(p);
     final RaftRpcRequestProto request = p.getRpcRequest();
     return new RaftClientRequest(
         ClientId.valueOf(request.getRequestorId()),
@@ -75,17 +90,33 @@ public interface ClientProtoUtils {
         ProtoUtils.toRaftGroupId(request.getRaftGroupId()),
         request.getCallId(),
         request.getSeqNum(),
-        p.getType(), toMessage(p.getMessage()), p.getMinIndex());
+        toMessage(p.getMessage()),
+        type);
   }
 
   static RaftClientRequestProto toRaftClientRequestProto(
       RaftClientRequest request) {
-    return RaftClientRequestProto.newBuilder()
+    final RaftClientRequestProto.Builder b = RaftClientRequestProto.newBuilder()
         .setRpcRequest(toRaftRpcRequestProtoBuilder(request))
-        .setType(request.getType())
-        .setMessage(toClientMessageEntryProtoBuilder(request.getMessage()))
-        .setMinIndex(request.getMinIndex())
-        .build();
+        .setMessage(toClientMessageEntryProtoBuilder(request.getMessage()));
+
+    final RaftClientRequest.Type type = request.getType();
+    switch (type.getTypeCase()) {
+      case WRITE:
+        b.setWrite(type.getWrite());
+        break;
+      case READ:
+        b.setRead(type.getRead());
+        break;
+      case STALEREAD:
+        b.setStaleRead(type.getStaleRead());
+        break;
+      default:
+        throw new IllegalArgumentException("Unexpected request type: " + request.getType()
+            + " in request " + request);
+    }
+
+    return b.build();
   }
 
   static RaftClientRequestProto toRaftClientRequestProto(
@@ -94,7 +125,7 @@ public interface ClientProtoUtils {
     return RaftClientRequestProto.newBuilder()
         .setRpcRequest(toRaftRpcRequestProtoBuilder(
             clientId, serverId, groupId, callId, seqNum))
-        .setType(RaftClientRequestProto.Type.WRITE)
+        .setWrite(WriteRequestTypeProto.getDefaultInstance())
         .setMessage(toClientMessageEntryProtoBuilder(content))
         .build();
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cbfa28a0/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 6f3b8e0..ca1f057 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -22,7 +22,6 @@ import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.*;
-import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.util.*;
 
 import java.io.IOException;
@@ -37,9 +36,7 @@ import java.util.function.LongFunction;
 import java.util.function.Supplier;
 import java.util.stream.Stream;
 
-import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.Type.READ;
-import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.Type.STALE_READ;
-import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.Type.WRITE;
+import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.TypeCase.STALEREAD;
 
 /** A client who sends requests to a raft service. */
 final class RaftClientImpl implements RaftClient {
@@ -123,7 +120,7 @@ final class RaftClientImpl implements RaftClient {
   }
 
   private SlidingWindow.Client<PendingAsyncRequest, RaftClientReply> getSlidingWindow(RaftClientRequest request) {
-    return getSlidingWindow(request.isStaleRead()? request.getServerId(): null);
+    return getSlidingWindow(request.is(STALEREAD)? request.getServerId(): null);
   }
 
   private SlidingWindow.Client<PendingAsyncRequest, RaftClientReply> getSlidingWindow(RaftPeerId target) {
@@ -133,21 +130,21 @@ final class RaftClientImpl implements RaftClient {
 
   @Override
   public CompletableFuture<RaftClientReply> sendAsync(Message message) {
-    return sendAsync(WRITE, message, 0L, null);
+    return sendAsync(RaftClientRequest.writeRequestType(), message, null);
   }
 
   @Override
   public CompletableFuture<RaftClientReply> sendReadOnlyAsync(Message message) {
-    return sendAsync(READ, message, 0L, null);
+    return sendAsync(RaftClientRequest.readRequestType(), message, null);
   }
 
   @Override
   public CompletableFuture<RaftClientReply> sendStaleReadAsync(Message message, long minIndex, RaftPeerId server) {
-    return sendAsync(STALE_READ, message, minIndex, server);
+    return sendAsync(RaftClientRequest.staleReadRequestType(minIndex), message, server);
   }
 
   private CompletableFuture<RaftClientReply> sendAsync(
-      RaftClientRequestProto.Type type, Message message, long minIndex, RaftPeerId server) {
+      RaftClientRequest.Type type, Message message, RaftPeerId server) {
     Objects.requireNonNull(message, "message == null");
     try {
       asyncRequestSemaphore.acquire();
@@ -157,7 +154,7 @@ final class RaftClientImpl implements RaftClient {
     }
     final long callId = nextCallId();
     final LongFunction<PendingAsyncRequest> constructor = seqNum -> new PendingAsyncRequest(seqNum,
-        seq -> newRaftClientRequest(server, callId, seq, type, message, minIndex));
+        seq -> newRaftClientRequest(server, callId, seq, message, type));
     return getSlidingWindow(server).submitNewRequest(constructor, this::sendRequestWithRetryAsync
     ).getReplyFuture(
     ).thenApply(reply -> handleStateMachineException(reply, CompletionException::new)
@@ -165,35 +162,34 @@ final class RaftClientImpl implements RaftClient {
   }
 
   private RaftClientRequest newRaftClientRequest(
-      RaftPeerId server, long callId, long seq,
-      RaftClientRequestProto.Type type, Message message, long minIndex) {
+      RaftPeerId server, long callId, long seq, Message message, RaftClientRequest.Type type) {
     return new RaftClientRequest(clientId, server != null? server: leaderId, groupId,
-        callId, seq, type, message, minIndex);
+        callId, seq, message, type);
   }
 
   @Override
   public RaftClientReply send(Message message) throws IOException {
-    return send(WRITE, message, 0L, null);
+    return send(RaftClientRequest.writeRequestType(), message, null);
   }
 
   @Override
   public RaftClientReply sendReadOnly(Message message) throws IOException {
-    return send(READ, message, 0L, null);
+    return send(RaftClientRequest.readRequestType(), message, null);
   }
 
   @Override
   public RaftClientReply sendStaleRead(Message message, long minIndex, RaftPeerId server)
       throws IOException {
-    return send(STALE_READ, message, minIndex, server);
+    return send(RaftClientRequest.staleReadRequestType(minIndex), message, server);
   }
 
-  private RaftClientReply send(RaftClientRequestProto.Type type, Message message, long minIndex, RaftPeerId server)
+  private RaftClientReply send(RaftClientRequest.Type type, Message message, RaftPeerId server)
       throws IOException {
     Objects.requireNonNull(message, "message == null");
 
     final long callId = nextCallId();
     return sendRequestWithRetry(() -> newRaftClientRequest(
-        server, callId, 0L, type, message, minIndex));
+        server, callId, 0L, message, type));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cbfa28a0/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index d20b158..072a854 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -18,45 +18,135 @@
 package org.apache.ratis.protocol;
 
 import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.shaded.proto.RaftProtos.ReadRequestTypeProto;
+import org.apache.ratis.shaded.proto.RaftProtos.StaleReadRequestTypeProto;
+import org.apache.ratis.shaded.proto.RaftProtos.WriteRequestTypeProto;
 import org.apache.ratis.util.Preconditions;
 
-import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.Type.READ;
-import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.Type.STALE_READ;
-import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.Type.WRITE;
+import java.util.Objects;
+
+import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.TypeCase.*;
 
 /**
  * Request from client to server
  */
 public class RaftClientRequest extends RaftClientMessage {
-  private final long callId;
-  private final long seqNum;
+  private static final Type DEFAULT_WRITE = new Type(WriteRequestTypeProto.getDefaultInstance());
+  private static final Type DEFAULT_READ = new Type(ReadRequestTypeProto.getDefaultInstance());
+  private static final Type DEFAULT_STALE_READ = new Type(StaleReadRequestTypeProto.getDefaultInstance());
 
-  private final RaftClientRequestProto.Type type;
-  private final Message message;
+  public static Type writeRequestType() {
+    return DEFAULT_WRITE;
+  }
 
-  private final long minIndex;
+  public static Type readRequestType() {
+    return DEFAULT_READ;
+  }
 
-  public RaftClientRequest(ClientId clientId, RaftPeerId serverId,
-      RaftGroupId groupId, long callId, Message message) {
-    this(clientId, serverId, groupId, callId, 0L, WRITE, message, 0L);
+  public static Type staleReadRequestType(long minIndex) {
+    return minIndex == 0L? DEFAULT_STALE_READ
+        : new Type(StaleReadRequestTypeProto.newBuilder().setMinIndex(minIndex).build());
+  }
+
+  /** The type of a request (oneof write, read, staleRead; see the message RaftClientRequestProto). */
+  public static class Type {
+    public static Type valueOf(WriteRequestTypeProto write) {
+      return DEFAULT_WRITE;
+    }
+
+    public static Type valueOf(ReadRequestTypeProto read) {
+      return DEFAULT_READ;
+    }
+
+    public static Type valueOf(StaleReadRequestTypeProto staleRead) {
+      return staleRead.getMinIndex() == 0? DEFAULT_STALE_READ
+          : new Type(staleRead);
+    }
+
+    /**
+     * The type case of the proto.
+     * Only the corresponding proto (must be non-null) is used.
+     * The other protos are ignored.
+     */
+    private final RaftClientRequestProto.TypeCase typeCase;
+    private final WriteRequestTypeProto write;
+    private final ReadRequestTypeProto read;
+    private final StaleReadRequestTypeProto staleRead;
+
+    private Type(WriteRequestTypeProto write) {
+      this.typeCase = WRITE;
+      this.write = Objects.requireNonNull(write);
+      this.read = null;
+      this.staleRead = null;
+    }
+
+    private Type(ReadRequestTypeProto read) {
+      this.typeCase = READ;
+      this.write = null;
+      this.read = Objects.requireNonNull(read);
+      this.staleRead = null;
+    }
+
+    private Type(StaleReadRequestTypeProto staleRead) {
+      this.typeCase = STALEREAD;
+      this.write = null;
+      this.read = null;
+      this.staleRead = Objects.requireNonNull(staleRead);
+    }
+
+    public RaftClientRequestProto.TypeCase getTypeCase() {
+      return typeCase;
+    }
+
+    public WriteRequestTypeProto getWrite() {
+      Preconditions.assertTrue(typeCase == WRITE);
+      return write;
+    }
+
+    public ReadRequestTypeProto getRead() {
+      Preconditions.assertTrue(typeCase == READ);
+      return read;
+    }
+
+    public StaleReadRequestTypeProto getStaleRead() {
+      Preconditions.assertTrue(typeCase == STALEREAD);
+      return staleRead;
+    }
+
+    @Override
+    public String toString() {
+      switch (typeCase) {
+        case WRITE:
+          return "RW";
+        case READ:
+          return "RO";
+        case STALEREAD:
+          return "StaleRead(" + staleRead.getMinIndex() + ")";
+        default:
+          throw new IllegalStateException("Unexpected request type: " + typeCase);
+      }
+    }
   }
 
+  private final long callId;
+  private final long seqNum;
+
+  private final Message message;
+  private final Type type;
+
   public RaftClientRequest(ClientId clientId, RaftPeerId serverId,
-       RaftGroupId groupId, long callId, long seqNum, Message message) {
-    this(clientId, serverId, groupId, callId, seqNum, WRITE, message, 0L);
+      RaftGroupId groupId, long callId) {
+    this(clientId, serverId, groupId, callId, 0L, null, writeRequestType());
   }
 
   public RaftClientRequest(
       ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
-      long callId, long seqNum, RaftClientRequestProto.Type type, Message message, long minIndex) {
+      long callId, long seqNum, Message message, Type type) {
     super(clientId, serverId, groupId);
     this.callId = callId;
     this.seqNum = seqNum;
-    this.type = type;
     this.message = message;
-    this.minIndex = minIndex;
-
-    Preconditions.assertTrue(minIndex >= 0, "minIndex < 0");
+    this.type = type;
   }
 
   @Override
@@ -76,27 +166,17 @@ public class RaftClientRequest extends RaftClientMessage {
     return message;
   }
 
-  public RaftClientRequestProto.Type getType() {
+  public Type getType() {
     return type;
   }
 
-  public boolean isReadOnly() {
-    return getType() != WRITE;
-  }
-
-  public boolean isStaleRead() {
-    return getType() == STALE_READ;
-  }
-
-  /** @return the minimum required commit index for processing the request. */
-  public long getMinIndex() {
-    return minIndex;
+  public boolean is(RaftClientRequestProto.TypeCase typeCase) {
+    return getType().getTypeCase() == typeCase;
   }
 
   @Override
   public String toString() {
     return super.toString() + ", cid=" + callId + ", seq=" + seqNum + " "
-        + (!isReadOnly()? "RW": isStaleRead()? "StaleRead(" + getMinIndex() + ")": "RO")
-        + ", " + getMessage();
+        + type + ", " + getMessage();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cbfa28a0/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java
index 3c6d468..b0e69af 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ReinitializeRequest.java
@@ -22,7 +22,7 @@ public class ReinitializeRequest extends RaftClientRequest {
 
   public ReinitializeRequest(ClientId clientId, RaftPeerId serverId,
       RaftGroupId groupId, long callId, RaftGroup group) {
-    super(clientId, serverId, groupId, callId, null);
+    super(clientId, serverId, groupId, callId);
     this.group = group;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cbfa28a0/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformatonRequest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformatonRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformatonRequest.java
index c2b7eb5..5e047af 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformatonRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/ServerInformatonRequest.java
@@ -24,6 +24,6 @@ package org.apache.ratis.protocol;
 public class ServerInformatonRequest extends RaftClientRequest {
   public ServerInformatonRequest(ClientId clientId, RaftPeerId serverId,
       RaftGroupId groupId, long callId) {
-    super(clientId, serverId, groupId, callId, null);
+    super(clientId, serverId, groupId, callId, 0L, null, RaftClientRequest.readRequestType());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cbfa28a0/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
index 83be197..e25da2a 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/SetConfigurationRequest.java
@@ -24,7 +24,7 @@ public class SetConfigurationRequest extends RaftClientRequest {
 
   public SetConfigurationRequest(ClientId clientId, RaftPeerId serverId,
       RaftGroupId groupId, long callId, RaftPeer[] peers) {
-    super(clientId, serverId, groupId, callId, null);
+    super(clientId, serverId, groupId, callId);
     this.peers = peers;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cbfa28a0/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
----------------------------------------------------------------------
diff --git a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
index b0e2b7c..a339b68 100644
--- a/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
+++ b/ratis-examples/src/test/java/org/apache/ratis/statemachine/TestRaftStateMachineException.java
@@ -102,8 +102,8 @@ public class TestRaftStateMachineException extends ParameterizedBaseTest {
     final RaftClientRpc rpc = client.getClientRpc();
     final long callId = 999;
     final long seqNum = 111;
-    RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId,
-        cluster.getGroupId(), callId, seqNum, new SimpleMessage("message"));
+    RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId,
+        callId, seqNum, new SimpleMessage("message"));
     RaftClientReply reply = rpc.sendRequest(r);
     Assert.assertFalse(reply.isSuccess());
     Assert.assertNotNull(reply.getStateMachineException());
@@ -144,8 +144,8 @@ public class TestRaftStateMachineException extends ParameterizedBaseTest {
     final RaftClientRpc rpc = client.getClientRpc();
     final long callId = 999;
     final long seqNum = 111;
-    RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId,
-        cluster.getGroupId(), callId, seqNum, new SimpleMessage("message"));
+    RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId,
+        callId, seqNum, new SimpleMessage("message"));
     RaftClientReply reply = rpc.sendRequest(r);
     Objects.requireNonNull(reply.getStateMachineException());
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cbfa28a0/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto
index d5f6d97..0fa845b 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -170,18 +170,26 @@ message ClientMessageEntryProto {
   bytes content = 1;
 }
 
+message WriteRequestTypeProto {
+}
+
+message ReadRequestTypeProto {
+}
+
+message StaleReadRequestTypeProto {
+  uint64 minIndex = 1;
+}
+
 // normal client request
 message RaftClientRequestProto {
-  enum Type {
-    WRITE = 0;
-    READ = 1;
-    STALE_READ = 2;
-  }
-
   RaftRpcRequestProto rpcRequest = 1;
-  Type type = 2;
-  ClientMessageEntryProto message = 3;
-  uint64 minIndex = 4;
+  ClientMessageEntryProto message = 2;
+
+  oneof Type {
+    WriteRequestTypeProto write = 3;
+    ReadRequestTypeProto read = 4;
+    StaleReadRequestTypeProto staleRead = 5;
+  }
 }
 
 message NotLeaderExceptionProto {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cbfa28a0/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
index c9c66a5..b418658 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/PendingRequests.java
@@ -18,6 +18,7 @@
 package org.apache.ratis.server.impl;
 
 import org.apache.ratis.protocol.*;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.statemachine.TransactionContext;
 import org.apache.ratis.util.Preconditions;
 import org.slf4j.Logger;
@@ -83,7 +84,7 @@ class PendingRequests {
   PendingRequest addPendingRequest(long index, RaftClientRequest request,
       TransactionContext entry) {
     // externally synced for now
-    Preconditions.assertTrue(!request.isReadOnly());
+    Preconditions.assertTrue(request.is(RaftClientRequestProto.TypeCase.WRITE));
     if (last != null && !(last.getRequest() instanceof SetConfigurationRequest)) {
       Preconditions.assertTrue(index == last.getIndex() + 1,
           () -> "index = " + index + " != last.getIndex() + 1, last=" + last);

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cbfa28a0/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 441e390..1777bb8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -476,7 +476,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
       RaftClientRequest request) throws IOException {
     assertLifeCycleState(RUNNING);
     LOG.debug("{}: receive client request({})", getId(), request);
-    if (request.isStaleRead()) {
+    if (request.is(RaftClientRequestProto.TypeCase.STALEREAD)) {
       return staleReadAsync(request);
     }
 
@@ -488,7 +488,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
 
     // let the state machine handle read-only request from client
     final StateMachine stateMachine = getStateMachine();
-    if (request.isReadOnly()) {
+    if (request.is(RaftClientRequestProto.TypeCase.READ)) {
       // TODO: We might not be the leader anymore by the time this completes.
       // See the RAFT paper section 8 (last part)
       return processQueryFuture(stateMachine.query(request.getMessage()), request);
@@ -518,7 +518,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
   }
 
   private CompletableFuture<RaftClientReply> staleReadAsync(RaftClientRequest request) {
-    final long minIndex = request.getMinIndex();
+    final long minIndex = request.getType().getStaleRead().getMinIndex();
     final long commitIndex = state.getLog().getLastCommittedIndex();
     LOG.debug("{}: minIndex={}, commitIndex={}", getId(), minIndex, commitIndex);
     if (commitIndex < minIndex) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cbfa28a0/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
index f4df193..91a1600 100644
--- a/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
+++ b/ratis-server/src/test/java/org/apache/ratis/MiniRaftCluster.java
@@ -497,13 +497,19 @@ public abstract class MiniRaftCluster {
 
   public RaftClientRequest newRaftClientRequest(
       ClientId clientId, RaftPeerId leaderId, Message message) {
-    return new RaftClientRequest(clientId, leaderId, getGroupId(),
+    return newRaftClientRequest(clientId, leaderId,
         DEFAULT_CALLID, DEFAULT_SEQNUM, message);
   }
 
+  public RaftClientRequest newRaftClientRequest(
+      ClientId clientId, RaftPeerId leaderId, long callId, long seqNum, Message message) {
+    return new RaftClientRequest(clientId, leaderId, getGroupId(),
+        callId, seqNum, message, RaftClientRequest.writeRequestType());
+  }
+
   public SetConfigurationRequest newSetConfigurationRequest(
       ClientId clientId, RaftPeerId leaderId,
-      RaftPeer... peers) throws IOException {
+      RaftPeer... peers) {
     return new SetConfigurationRequest(clientId, leaderId, getGroupId(),
         DEFAULT_CALLID, peers);
   }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/cbfa28a0/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
index e02f999..474482b 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RetryCacheTests.java
@@ -77,8 +77,8 @@ public abstract class RetryCacheTests extends BaseTest {
     final RaftClientRpc rpc = client.getClientRpc();
     final long callId = 999;
     final long seqNum = 111;
-    RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId,
-        cluster.getGroupId(), callId, seqNum, new SimpleMessage("message"));
+    RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId,
+        callId, seqNum, new SimpleMessage("message"));
     RaftClientReply reply = rpc.sendRequest(r);
     Assert.assertEquals(callId, reply.getCallId());
     Assert.assertTrue(reply.isSuccess());
@@ -122,8 +122,8 @@ public abstract class RetryCacheTests extends BaseTest {
     RaftClientRpc rpc = client.getClientRpc();
     final long callId = 999;
     final long seqNum = 111;
-    RaftClientRequest r = new RaftClientRequest(client.getId(), leaderId,
-        cluster.getGroupId(), callId, seqNum, new SimpleMessage("message"));
+    RaftClientRequest r = cluster.newRaftClientRequest(client.getId(), leaderId,
+        callId, seqNum, new SimpleMessage("message"));
     RaftClientReply reply = rpc.sendRequest(r);
     Assert.assertEquals(callId, reply.getCallId());
     Assert.assertTrue(reply.isSuccess());
@@ -140,7 +140,7 @@ public abstract class RetryCacheTests extends BaseTest {
     final RaftPeerId newLeaderId = cluster.getLeader().getId();
     Assert.assertNotEquals(leaderId, newLeaderId);
     // same clientId and callId in the request
-    r = new RaftClientRequest(client.getId(), newLeaderId, cluster.getGroupId(),
+    r = cluster.newRaftClientRequest(client.getId(), newLeaderId,
         callId, seqNum, new SimpleMessage("message"));
     for (int i = 0; i < 10; i++) {
       try {