You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by ms...@apache.org on 2018/02/18 14:16:04 UTC

incubator-ratis git commit: RATIS-207. Implement stale read. Contributed by Tsz Wo Nicholas Sze.

Repository: incubator-ratis
Updated Branches:
  refs/heads/master 4104860b9 -> 8fd74ede4


RATIS-207. Implement stale read. Contributed by Tsz Wo Nicholas Sze.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/8fd74ede
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/8fd74ede
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/8fd74ede

Branch: refs/heads/master
Commit: 8fd74ede411196a2b762b6ed741eccb30248e81c
Parents: 4104860
Author: Mukul Kumar Singh <ms...@apache.org>
Authored: Sun Feb 18 19:45:14 2018 +0530
Committer: Mukul Kumar Singh <ms...@apache.org>
Committed: Sun Feb 18 19:45:14 2018 +0530

----------------------------------------------------------------------
 .../org/apache/ratis/client/RaftClient.java     |  6 ++
 .../ratis/client/impl/ClientProtoUtils.java     |  9 +-
 .../ratis/client/impl/RaftClientImpl.java       | 70 ++++++++++-----
 .../ratis/protocol/RaftClientRequest.java       | 43 ++++++++--
 .../ratis/protocol/StaleReadException.java      | 27 ++++++
 .../ratis/protocol/StateMachineException.java   |  4 +
 .../test/java/org/apache/ratis/BaseTest.java    | 55 ++++++++++--
 .../ratis/grpc/client/AppendStreamer.java       |  2 +-
 ratis-proto-shaded/src/main/proto/Raft.proto    | 11 ++-
 .../ratis/server/impl/RaftServerImpl.java       | 33 +++++++-
 .../apache/ratis/statemachine/StateMachine.java | 13 +++
 .../statemachine/impl/BaseStateMachine.java     | 25 +++++-
 .../java/org/apache/ratis/RaftAsyncTests.java   | 89 +++++++++++++++++++-
 .../org/apache/ratis/RaftExceptionBaseTest.java | 13 ++-
 .../SimpleStateMachine4Testing.java             |  6 +-
 15 files changed, 355 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
index 89fb8f4..84fec9e 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClient.java
@@ -51,6 +51,9 @@ public interface RaftClient extends Closeable {
   /** Async call to send the given readonly message to the raft service. */
   CompletableFuture<RaftClientReply> sendReadOnlyAsync(Message message);
 
+  /** Async call to send the given stale-read message to the given server (not the raft service). */
+  CompletableFuture<RaftClientReply> sendStaleReadAsync(Message message, long minIndex, RaftPeerId server);
+
   /**
    * Send the given message to the raft service.
    * The message may change the state of the service.
@@ -61,6 +64,9 @@ public interface RaftClient extends Closeable {
   /** Send the given readonly message to the raft service. */
   RaftClientReply sendReadOnly(Message message) throws IOException;
 
+  /** Send the given stale-read message to the given server (not the raft service). */
+  RaftClientReply sendStaleRead(Message message, long minIndex, RaftPeerId server) throws IOException;
+
   /** Send set configuration request to the raft service. */
   RaftClientReply setConfiguration(RaftPeer[] serversInNewConf) throws IOException;
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 9148633..de28e18 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -75,26 +75,27 @@ public interface ClientProtoUtils {
         ProtoUtils.toRaftGroupId(request.getRaftGroupId()),
         request.getCallId(),
         request.getSeqNum(),
-        toMessage(p.getMessage()), p.getReadOnly());
+        p.getType(), toMessage(p.getMessage()), p.getMinIndex());
   }
 
   static RaftClientRequestProto toRaftClientRequestProto(
       RaftClientRequest request) {
     return RaftClientRequestProto.newBuilder()
         .setRpcRequest(toRaftRpcRequestProtoBuilder(request))
+        .setType(request.getType())
         .setMessage(toClientMessageEntryProtoBuilder(request.getMessage()))
-        .setReadOnly(request.isReadOnly())
+        .setMinIndex(request.getMinIndex())
         .build();
   }
 
   static RaftClientRequestProto toRaftClientRequestProto(
       ClientId clientId, RaftPeerId serverId, RaftGroupId groupId, long callId,
-      long seqNum, ByteString content, boolean readOnly) {
+      long seqNum, ByteString content) {
     return RaftClientRequestProto.newBuilder()
         .setRpcRequest(toRaftRpcRequestProtoBuilder(
             clientId, serverId, groupId, callId, seqNum))
+        .setType(RaftClientRequestProto.Type.WRITE)
         .setMessage(toClientMessageEntryProtoBuilder(content))
-        .setReadOnly(readOnly)
         .build();
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index 6ee415d..6f3b8e0 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -22,6 +22,7 @@ import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.protocol.*;
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.util.*;
 
 import java.io.IOException;
@@ -36,6 +37,10 @@ import java.util.function.LongFunction;
 import java.util.function.Supplier;
 import java.util.stream.Stream;
 
+import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.Type.READ;
+import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.Type.STALE_READ;
+import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.Type.WRITE;
+
 /** A client who sends requests to a raft service. */
 final class RaftClientImpl implements RaftClient {
   private static final AtomicLong callIdCounter = new AtomicLong();
@@ -91,7 +96,9 @@ final class RaftClientImpl implements RaftClient {
 
   private volatile RaftPeerId leaderId;
 
-  private final SlidingWindow.Client<PendingAsyncRequest, RaftClientReply> slidingWindow;
+  /** Map: id -> {@link SlidingWindow}, in order to support async calls to the RAFT service or individual servers. */
+  private final ConcurrentMap<String, SlidingWindow.Client<PendingAsyncRequest, RaftClientReply>>
+      slidingWindows = new ConcurrentHashMap<>();
   private final ScheduledExecutorService scheduler;
   private final Semaphore asyncRequestSemaphore;
 
@@ -107,7 +114,6 @@ final class RaftClientImpl implements RaftClient {
 
     asyncRequestSemaphore = new Semaphore(RaftClientConfigKeys.Async.maxOutstandingRequests(properties));
     scheduler = Executors.newScheduledThreadPool(RaftClientConfigKeys.Async.schedulerThreads(properties));
-    slidingWindow = new SlidingWindow.Client<>(getId());
     clientRpc.addServers(peers);
   }
 
@@ -116,18 +122,32 @@ final class RaftClientImpl implements RaftClient {
     return clientId;
   }
 
+  private SlidingWindow.Client<PendingAsyncRequest, RaftClientReply> getSlidingWindow(RaftClientRequest request) {
+    return getSlidingWindow(request.isStaleRead()? request.getServerId(): null);
+  }
+
+  private SlidingWindow.Client<PendingAsyncRequest, RaftClientReply> getSlidingWindow(RaftPeerId target) {
+    final String id = target != null? target.toString(): "RAFT";
+    return slidingWindows.computeIfAbsent(id, key -> new SlidingWindow.Client<>(getId() + "->" + key));
+  }
+
   @Override
   public CompletableFuture<RaftClientReply> sendAsync(Message message) {
-    return sendAsync(message, false);
+    return sendAsync(WRITE, message, 0L, null);
   }
 
   @Override
   public CompletableFuture<RaftClientReply> sendReadOnlyAsync(Message message) {
-    return sendAsync(message, true);
+    return sendAsync(READ, message, 0L, null);
+  }
+
+  @Override
+  public CompletableFuture<RaftClientReply> sendStaleReadAsync(Message message, long minIndex, RaftPeerId server) {
+    return sendAsync(STALE_READ, message, minIndex, server);
   }
 
-  private CompletableFuture<RaftClientReply> sendAsync(Message message,
-      boolean readOnly) {
+  private CompletableFuture<RaftClientReply> sendAsync(
+      RaftClientRequestProto.Type type, Message message, long minIndex, RaftPeerId server) {
     Objects.requireNonNull(message, "message == null");
     try {
       asyncRequestSemaphore.acquire();
@@ -137,29 +157,43 @@ final class RaftClientImpl implements RaftClient {
     }
     final long callId = nextCallId();
     final LongFunction<PendingAsyncRequest> constructor = seqNum -> new PendingAsyncRequest(seqNum,
-        seq -> new RaftClientRequest(clientId, leaderId, groupId, callId, seq, message, readOnly));
-    return slidingWindow.submitNewRequest(constructor, this::sendRequestWithRetryAsync
+        seq -> newRaftClientRequest(server, callId, seq, type, message, minIndex));
+    return getSlidingWindow(server).submitNewRequest(constructor, this::sendRequestWithRetryAsync
     ).getReplyFuture(
     ).thenApply(reply -> handleStateMachineException(reply, CompletionException::new)
     ).whenComplete((r, e) -> asyncRequestSemaphore.release());
   }
 
+  private RaftClientRequest newRaftClientRequest(
+      RaftPeerId server, long callId, long seq,
+      RaftClientRequestProto.Type type, Message message, long minIndex) {
+    return new RaftClientRequest(clientId, server != null? server: leaderId, groupId,
+        callId, seq, type, message, minIndex);
+  }
+
   @Override
   public RaftClientReply send(Message message) throws IOException {
-    return send(message, false);
+    return send(WRITE, message, 0L, null);
   }
 
   @Override
   public RaftClientReply sendReadOnly(Message message) throws IOException {
-    return send(message, true);
+    return send(READ, message, 0L, null);
+  }
+
+  @Override
+  public RaftClientReply sendStaleRead(Message message, long minIndex, RaftPeerId server)
+      throws IOException {
+    return send(STALE_READ, message, minIndex, server);
   }
 
-  private RaftClientReply send(Message message, boolean readOnly) throws IOException {
+  private RaftClientReply send(RaftClientRequestProto.Type type, Message message, long minIndex, RaftPeerId server)
+      throws IOException {
     Objects.requireNonNull(message, "message == null");
 
     final long callId = nextCallId();
-    return sendRequestWithRetry(() -> new RaftClientRequest(
-        clientId, leaderId, groupId, callId, 0L, message, readOnly));
+    return sendRequestWithRetry(() -> newRaftClientRequest(
+        server, callId, 0L, type, message, minIndex));
   }
 
   @Override
@@ -207,7 +241,7 @@ final class RaftClientImpl implements RaftClient {
     return sendRequestAsync(request).thenCompose(reply -> {
       if (reply == null) {
         final TimeUnit unit = retryInterval.getUnit();
-        scheduler.schedule(() -> slidingWindow.retry(pending, this::sendRequestWithRetryAsync),
+        scheduler.schedule(() -> getSlidingWindow(request).retry(pending, this::sendRequestWithRetryAsync),
             retryInterval.toLong(unit), unit);
       } else {
         f.complete(reply);
@@ -244,15 +278,13 @@ final class RaftClientImpl implements RaftClient {
       LOG.debug("{}: receive* {}", clientId, reply);
       reply = handleNotLeaderException(request, reply);
       if (reply != null) {
-        slidingWindow.receiveReply(
+        getSlidingWindow(request).receiveReply(
             request.getSeqNum(), reply, this::sendRequestWithRetryAsync);
       }
       return reply;
     }).exceptionally(e -> {
       LOG.debug("{}: Failed {} with {}", clientId, request, e);
-      if (e instanceof CompletionException) {
-        e = e.getCause();
-      }
+      e = JavaUtils.unwrapCompletionException(e);
       if (e instanceof GroupMismatchException) {
         throw new CompletionException(e);
       } else if (e instanceof IOException) {
@@ -328,7 +360,7 @@ final class RaftClientImpl implements RaftClient {
       LOG.trace("Stack trace", new Throwable("TRACE"));
     }
 
-    slidingWindow.resetFirstSeqNum();
+    getSlidingWindow(request).resetFirstSeqNum();
     if (ioe instanceof LeaderNotReadyException) {
       return;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index c924ef8..d20b158 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -17,6 +17,13 @@
  */
 package org.apache.ratis.protocol;
 
+import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
+import org.apache.ratis.util.Preconditions;
+
+import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.Type.READ;
+import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.Type.STALE_READ;
+import static org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto.Type.WRITE;
+
 /**
  * Request from client to server
  */
@@ -24,26 +31,32 @@ public class RaftClientRequest extends RaftClientMessage {
   private final long callId;
   private final long seqNum;
 
+  private final RaftClientRequestProto.Type type;
   private final Message message;
-  private final boolean readOnly;
+
+  private final long minIndex;
 
   public RaftClientRequest(ClientId clientId, RaftPeerId serverId,
       RaftGroupId groupId, long callId, Message message) {
-    this(clientId, serverId, groupId, callId, 0L, message, false);
+    this(clientId, serverId, groupId, callId, 0L, WRITE, message, 0L);
   }
 
   public RaftClientRequest(ClientId clientId, RaftPeerId serverId,
        RaftGroupId groupId, long callId, long seqNum, Message message) {
-    this(clientId, serverId, groupId, callId, seqNum, message, false);
+    this(clientId, serverId, groupId, callId, seqNum, WRITE, message, 0L);
   }
 
-  public RaftClientRequest(ClientId clientId, RaftPeerId serverId,
-       RaftGroupId groupId, long callId, long seqNum, Message message, boolean readOnly) {
+  public RaftClientRequest(
+      ClientId clientId, RaftPeerId serverId, RaftGroupId groupId,
+      long callId, long seqNum, RaftClientRequestProto.Type type, Message message, long minIndex) {
     super(clientId, serverId, groupId);
     this.callId = callId;
     this.seqNum = seqNum;
+    this.type = type;
     this.message = message;
-    this.readOnly = readOnly;
+    this.minIndex = minIndex;
+
+    Preconditions.assertTrue(minIndex >= 0, "minIndex < 0");
   }
 
   @Override
@@ -63,13 +76,27 @@ public class RaftClientRequest extends RaftClientMessage {
     return message;
   }
 
+  public RaftClientRequestProto.Type getType() {
+    return type;
+  }
+
   public boolean isReadOnly() {
-    return readOnly;
+    return getType() != WRITE;
+  }
+
+  public boolean isStaleRead() {
+    return getType() == STALE_READ;
+  }
+
+  /** @return the minimum required commit index for processing the request. */
+  public long getMinIndex() {
+    return minIndex;
   }
 
   @Override
   public String toString() {
     return super.toString() + ", cid=" + callId + ", seq=" + seqNum + " "
-        + (isReadOnly()? "RO": "RW") + ", " + getMessage();
+        + (!isReadOnly()? "RW": isStaleRead()? "StaleRead(" + getMinIndex() + ")": "RO")
+        + ", " + getMessage();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-common/src/main/java/org/apache/ratis/protocol/StaleReadException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/StaleReadException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/StaleReadException.java
new file mode 100644
index 0000000..0240479
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/StaleReadException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol;
+
+/**
+ * This exception indicates the failure of a stale-read.
+ */
+public class StaleReadException extends RaftException {
+  public StaleReadException(String message) {
+    super(message);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
index 49a64ef..56f1160 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/StateMachineException.java
@@ -25,4 +25,8 @@ public class StateMachineException extends RaftException {
   public StateMachineException(String msg) {
     super(msg);
   }
+
+  public StateMachineException(String message, Throwable cause) {
+    super(message, cause);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
index 54aad77..e487841 100644
--- a/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
+++ b/ratis-common/src/test/java/org/apache/ratis/BaseTest.java
@@ -19,10 +19,7 @@ package org.apache.ratis;
 
 import org.apache.log4j.Level;
 import org.apache.ratis.conf.ConfUtils;
-import org.apache.ratis.util.CheckedRunnable;
-import org.apache.ratis.util.FileUtils;
-import org.apache.ratis.util.JavaUtils;
-import org.apache.ratis.util.LogUtils;
+import org.apache.ratis.util.*;
 import org.junit.Assert;
 import org.junit.Rule;
 import org.junit.rules.TestName;
@@ -32,6 +29,8 @@ import org.slf4j.LoggerFactory;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.function.Supplier;
 
@@ -78,20 +77,58 @@ public abstract class BaseTest {
     return new File(getClassTestDir(), testName.getMethodName());
   }
 
+  public static void assertThrowable(
+      String description, Throwable t,
+      Class<? extends Throwable> exceptedThrowableClass, Logger log,
+      Class<? extends Throwable>... exceptedCauseClasses) {
+    if (log != null) {
+      log.info("The test \"" + description + "\" throws " + t.getClass().getSimpleName(), t);
+    }
+    Assert.assertEquals(exceptedThrowableClass, t.getClass());
+
+    for (Class<? extends Throwable> expectedCause : exceptedCauseClasses) {
+      final Throwable previous = t;
+      t = Objects.requireNonNull(previous.getCause(),
+          () -> "previous.getCause() == null for previous=" + previous);
+      Assert.assertEquals(expectedCause, t.getClass());
+    }
+  }
+
   public static void testFailureCase(
       String description, CheckedRunnable<?> testCode,
-      Class<? extends Throwable> exceptedThrowableClass, Logger log) {
+      Class<? extends Throwable> exceptedThrowableClass, Logger log,
+      Class<? extends Throwable>... exceptedCauseClasses) {
     try {
       testCode.run();
       Assert.fail("The test \"" + description + "\" does not throw anything.");
     } catch (Throwable t) {
-      log.info("The test \"" + description + "\" throws " + t.getClass().getSimpleName(), t);
-      Assert.assertEquals(exceptedThrowableClass, t.getClass());
+      assertThrowable(description, t, exceptedThrowableClass, log, exceptedCauseClasses);
     }
   }
 
   public void testFailureCase(
-      String description, CheckedRunnable<?> testCode, Class<? extends Throwable> exceptedThrowableClass) {
-    testFailureCase(description, testCode, exceptedThrowableClass, LOG);
+      String description, CheckedRunnable<?> testCode,
+      Class<? extends Throwable> exceptedThrowableClass,
+      Class<? extends Throwable>... exceptedCauseClasses) {
+    testFailureCase(description, testCode, exceptedThrowableClass, LOG, exceptedCauseClasses);
+  }
+
+  public static void testFailureCaseAsync(
+      String description, Supplier<CompletableFuture<?>> testCode,
+      Class<? extends Throwable> exceptedThrowableClass, Logger log,
+      Class<? extends Throwable>... exceptedCauseClasses) {
+    try {
+      testCode.get().join();
+      Assert.fail("The test \"" + description + "\" does not throw anything.");
+    } catch (Throwable t) {
+      t = JavaUtils.unwrapCompletionException(t);
+      assertThrowable(description, t, exceptedThrowableClass, log, exceptedCauseClasses);
+    }
+  }
+
+  public void testFailureCaseAsync(
+      String description, Supplier<CompletableFuture<?>> testCode, Class<? extends Throwable> exceptedThrowableClass,
+      Class<? extends Throwable>... exceptedCauseClasses) {
+    testFailureCaseAsync(description, testCode, exceptedThrowableClass, LOG, exceptedCauseClasses);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
index 08e8376..ff3ed28 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/AppendStreamer.java
@@ -157,7 +157,7 @@ public class AppendStreamer implements Closeable {
     if (isRunning()) {
       // wrap the current buffer into a RaftClientRequestProto
       final RaftClientRequestProto request = ClientProtoUtils.toRaftClientRequestProto(
-          clientId, leaderId, groupId, seqNum, seqNum, content, false);
+          clientId, leaderId, groupId, seqNum, seqNum, content);
       if (request.getSerializedSize() > maxMessageSize.getSizeInt()) {
         throw new IOException("msg size:" + request.getSerializedSize() +
             " exceeds maximum:" + maxMessageSize.getSizeInt());

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-proto-shaded/src/main/proto/Raft.proto
----------------------------------------------------------------------
diff --git a/ratis-proto-shaded/src/main/proto/Raft.proto b/ratis-proto-shaded/src/main/proto/Raft.proto
index fa823ab..d5f6d97 100644
--- a/ratis-proto-shaded/src/main/proto/Raft.proto
+++ b/ratis-proto-shaded/src/main/proto/Raft.proto
@@ -172,9 +172,16 @@ message ClientMessageEntryProto {
 
 // normal client request
 message RaftClientRequestProto {
+  enum Type {
+    WRITE = 0;
+    READ = 1;
+    STALE_READ = 2;
+  }
+
   RaftRpcRequestProto rpcRequest = 1;
-  ClientMessageEntryProto message = 2;
-  bool readOnly = 3;
+  Type type = 2;
+  ClientMessageEntryProto message = 3;
+  uint64 minIndex = 4;
 }
 
 message NotLeaderExceptionProto {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 37cd16f..441e390 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -474,7 +474,12 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
   @Override
   public CompletableFuture<RaftClientReply> submitClientRequestAsync(
       RaftClientRequest request) throws IOException {
+    assertLifeCycleState(RUNNING);
     LOG.debug("{}: receive client request({})", getId(), request);
+    if (request.isStaleRead()) {
+      return staleReadAsync(request);
+    }
+
     // first check the server's leader state
     CompletableFuture<RaftClientReply> reply = checkLeaderState(request, null);
     if (reply != null) {
@@ -486,8 +491,7 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
     if (request.isReadOnly()) {
       // TODO: We might not be the leader anymore by the time this completes.
       // See the RAFT paper section 8 (last part)
-      return stateMachine.query(request.getMessage())
-          .thenApply(r -> new RaftClientReply(request, r, getCommitInfos()));
+      return processQueryFuture(stateMachine.query(request.getMessage()), request);
     }
 
     // query the retry cache
@@ -513,6 +517,31 @@ public class RaftServerImpl implements RaftServerProtocol, RaftServerAsynchronou
     return appendTransaction(request, context, cacheEntry);
   }
 
+  private CompletableFuture<RaftClientReply> staleReadAsync(RaftClientRequest request) {
+    final long minIndex = request.getMinIndex();
+    final long commitIndex = state.getLog().getLastCommittedIndex();
+    LOG.debug("{}: minIndex={}, commitIndex={}", getId(), minIndex, commitIndex);
+    if (commitIndex < minIndex) {
+      final StaleReadException e = new StaleReadException(
+          "Unable to serve stale-read due to server commit index = " + commitIndex + " < min = " + minIndex);
+      return CompletableFuture.completedFuture(
+          new RaftClientReply(request, new StateMachineException(getId(), e), getCommitInfos()));
+    }
+    return processQueryFuture(getStateMachine().queryStale(request.getMessage(), minIndex), request);
+  }
+
+  CompletableFuture<RaftClientReply> processQueryFuture(
+      CompletableFuture<Message> queryFuture, RaftClientRequest request) {
+    return queryFuture.thenApply(r -> new RaftClientReply(request, r, getCommitInfos()))
+        .exceptionally(e -> {
+          e = JavaUtils.unwrapCompletionException(e);
+          if (e instanceof StateMachineException) {
+            return new RaftClientReply(request, (StateMachineException)e, getCommitInfos());
+          }
+          throw new CompletionException(e);
+        });
+  }
+
   @Override
   public RaftClientReply submitClientRequest(RaftClientRequest request)
       throws IOException {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
index e2aaf29..7d35796 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/StateMachine.java
@@ -119,6 +119,19 @@ public interface StateMachine extends Closeable {
   CompletableFuture<Message> query(Message request);
 
   /**
+   * Query the state machine, provided minIndex <= commit index.
+   * The request must be read-only.
+   * Since the commit index of this server may lag behind the Raft service,
+   * the returned result may possibly be stale.
+   *
+   * When minIndex > {@link #getLastAppliedTermIndex()},
+   * the state machine may choose to either
+   * (1) return exceptionally, or
+   * (2) wait until minIndex <= {@link #getLastAppliedTermIndex()} before running the query.
+   */
+  CompletableFuture<Message> queryStale(Message request, long minIndex);
+
+  /**
    * Validate/pre-process the incoming update request in the state machine.
    * @return the content to be written to the log entry. Null means the request
    * should be rejected.

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
index 201eff7..b87143f 100644
--- a/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
+++ b/ratis-server/src/main/java/org/apache/ratis/statemachine/impl/BaseStateMachine.java
@@ -36,6 +36,8 @@ import org.apache.ratis.util.Preconditions;
 
 import java.io.IOException;
 import java.util.Collection;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.atomic.AtomicReference;
 
@@ -51,6 +53,8 @@ public class BaseStateMachine implements StateMachine {
 
   private final AtomicReference<TermIndex> lastAppliedTermIndex = new AtomicReference<>();
 
+  private final SortedMap<Long, CompletableFuture<Void>> transactionFutures = new TreeMap<>();
+
   public RaftPeerId getId() {
     return id;
   }
@@ -131,6 +135,12 @@ public class BaseStateMachine implements StateMachine {
       }
       return true;
     }
+
+    synchronized (transactionFutures) {
+      for(long i; !transactionFutures.isEmpty() && (i = transactionFutures.firstKey()) <= index; ) {
+        transactionFutures.remove(i).complete(null);
+      }
+    }
     return false;
   }
 
@@ -158,8 +168,21 @@ public class BaseStateMachine implements StateMachine {
   }
 
   @Override
+  public CompletableFuture<Message> queryStale(Message request, long minIndex) {
+    if (getLastAppliedTermIndex().getIndex() < minIndex) {
+      synchronized (transactionFutures) {
+        if (getLastAppliedTermIndex().getIndex() < minIndex) {
+          return transactionFutures.computeIfAbsent(minIndex, key -> new CompletableFuture<>())
+              .thenCompose(v -> query(request));
+        }
+      }
+    }
+    return query(request);
+  }
+
+  @Override
   public CompletableFuture<Message> query(Message request) {
-    return null;
+    return CompletableFuture.completedFuture(null);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index f72ea5a..a1835c0 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -22,15 +22,24 @@ import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.client.impl.RaftClientTestUtil;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.*;
 import org.apache.ratis.server.impl.RaftServerImpl;
 import org.apache.ratis.server.impl.RaftServerProxy;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.shaded.com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.ratis.shaded.proto.RaftProtos.CommitInfoProto;
+import org.apache.ratis.shaded.proto.RaftProtos.LogEntryProto;
 import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.LogUtils;
 import org.junit.*;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.List;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -48,7 +57,7 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
   public static final int NUM_SERVERS = 3;
 
   @Before
-  public void setup() throws IOException {
+  public void setup() {
     properties = new RaftProperties();
     properties.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
         SimpleStateMachine4Testing.class, StateMachine.class);
@@ -158,4 +167,80 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
     RaftBasicTests.testWithLoad(10, 500, true, cluster, LOG);
     cluster.shutdown();
   }
+
+  @Test
+  public void testStaleReadAsync() throws Exception {
+    final int numMesssages = 10;
+    final CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties);
+
+    try (RaftClient client = cluster.createClient()) {
+      cluster.start();
+      RaftTestUtil.waitForLeader(cluster);
+
+      // submit some messages
+      final List<CompletableFuture<RaftClientReply>> futures = new ArrayList<>();
+      for (int i = 0; i < numMesssages; i++) {
+        final String s = "m" + i;
+        LOG.info("sendAsync " + s);
+        futures.add(client.sendAsync(new RaftTestUtil.SimpleMessage(s)));
+      }
+      Assert.assertEquals(numMesssages, futures.size());
+      RaftClientReply lastWriteReply = null;
+      for (CompletableFuture<RaftClientReply> f : futures) {
+        lastWriteReply = f.join();
+        Assert.assertTrue(lastWriteReply.isSuccess());
+      }
+      futures.clear();
+
+      // Use a follower with the max commit index
+      final RaftPeerId leader = lastWriteReply.getServerId();
+      LOG.info("leader = " + leader);
+      final Collection<CommitInfoProto> commitInfos = lastWriteReply.getCommitInfos();
+      LOG.info("commitInfos = " + commitInfos);
+      final CommitInfoProto followerCommitInfo = commitInfos.stream()
+          .filter(info -> !RaftPeerId.valueOf(info.getServer().getId()).equals(leader))
+          .max(Comparator.comparing(CommitInfoProto::getCommitIndex)).get();
+      final RaftPeerId follower = RaftPeerId.valueOf(followerCommitInfo.getServer().getId());
+      LOG.info("max follower = " + follower);
+
+      // test a failure case
+      testFailureCaseAsync("sendStaleReadAsync(..) with a larger commit index",
+          () -> client.sendStaleReadAsync(
+              new RaftTestUtil.SimpleMessage("" + (numMesssages + 1)),
+              followerCommitInfo.getCommitIndex(), follower),
+          StateMachineException.class, IndexOutOfBoundsException.class);
+
+      // test sendStaleReadAsync
+      for (int i = 1; i < followerCommitInfo.getCommitIndex(); i++) {
+        final int query = i;
+        LOG.info("sendStaleReadAsync, query=" + query);
+        final Message message = new RaftTestUtil.SimpleMessage("" + query);
+        final CompletableFuture<RaftClientReply> readFuture = client.sendReadOnlyAsync(message);
+        final CompletableFuture<RaftClientReply> staleReadFuture = client.sendStaleReadAsync(
+            message, followerCommitInfo.getCommitIndex(), follower);
+
+        futures.add(readFuture.thenApply(r -> getMessageContent(r))
+            .thenCombine(staleReadFuture.thenApply(r -> getMessageContent(r)), (expected, computed) -> {
+              try {
+                LOG.info("query " + query + " returns "
+                    + LogEntryProto.parseFrom(expected).getSmLogEntry().getData().toStringUtf8());
+              } catch (InvalidProtocolBufferException e) {
+                throw new CompletionException(e);
+              }
+
+              Assert.assertEquals("log entry mismatch for query=" + query, expected, computed);
+              return null;
+            })
+        );
+      }
+      JavaUtils.allOf(futures).join();
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  static ByteString getMessageContent(RaftClientReply reply) {
+    Assert.assertTrue(reply.isSuccess());
+    return reply.getMessage().getContent();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
index f90012e..4562cb8 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftExceptionBaseTest.java
@@ -45,7 +45,7 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster>
     LogUtils.setLogLevel(RaftClient.LOG, Level.DEBUG);
   }
 
-  public static final int NUM_PEERS = 5;
+  public static final int NUM_PEERS = 3;
 
   private CLUSTER cluster;
 
@@ -191,4 +191,15 @@ public abstract class RaftExceptionBaseTest<CLUSTER extends MiniRaftCluster>
           GroupMismatchException.class);
     }
   }
+
+  @Test
+  public void testStaleReadException() throws Exception {
+    RaftTestUtil.waitForLeader(cluster);
+    try (RaftClient client = cluster.createClient()) {
+      final RaftPeerId follower = cluster.getFollowers().iterator().next().getId();
+      testFailureCase("sendStaleRead(..) with a large commit index",
+          () -> client.sendStaleRead(Message.EMPTY, 1_000_000_000L, follower),
+          StateMachineException.class, StaleReadException.class);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/8fd74ede/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
index 261b9ef..4bf75e1 100644
--- a/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
+++ b/ratis-server/src/test/java/org/apache/ratis/statemachine/SimpleStateMachine4Testing.java
@@ -23,6 +23,7 @@ import org.apache.ratis.io.MD5Hash;
 import org.apache.ratis.protocol.Message;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.StateMachineException;
 import org.apache.ratis.server.RaftServer;
 import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.impl.RaftServerConstants;
@@ -227,11 +228,12 @@ public class SimpleStateMachine4Testing extends BaseStateMachine {
       final long index = bytes.isEmpty()? getLastAppliedTermIndex().getIndex()
           : Long.parseLong(bytes.toStringUtf8());
       LOG.info("query log index " + index);
-      final LogEntryProto entry = list.get(Math.toIntExact(index));
+      final LogEntryProto entry = list.get(Math.toIntExact(index - 1));
       return CompletableFuture.completedFuture(Message.valueOf(entry.toByteString()));
     } catch (Exception e) {
       LOG.warn("Failed request " + request, e);
-      return JavaUtils.completeExceptionally(e);
+      return JavaUtils.completeExceptionally(new StateMachineException(
+          "Failed request " + request, e));
     }
   }