You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2022/09/06 08:20:57 UTC

[ratis] branch master updated: RATIS-1696. Support linearizable read-only requests in leader (#735)

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 117b33311 RATIS-1696. Support linearizable read-only requests in leader (#735)
117b33311 is described below

commit 117b333114daf37e3d7a31919e49f8fdc0d9e201
Author: William Song <48...@users.noreply.github.com>
AuthorDate: Tue Sep 6 16:20:51 2022 +0800

    RATIS-1696. Support linearizable read-only requests in leader (#735)
---
 .../ratis/protocol/exceptions/ReadException.java   |  27 +++
 .../apache/ratis/grpc/server/GrpcLogAppender.java  |   2 +-
 .../apache/ratis/server/RaftServerConfigKeys.java  |  32 +++
 .../apache/ratis/server/leader/LeaderState.java    |   2 +-
 .../apache/ratis/server/impl/LeaderStateImpl.java  |   7 +-
 .../apache/ratis/server/impl/RaftServerImpl.java   |  48 +++-
 .../org/apache/ratis/server/impl/ReadRequests.java | 109 +++++++--
 .../org/apache/ratis/server/impl/ServerState.java  |   6 +-
 .../ratis/server/impl/StateMachineUpdater.java     |  16 +-
 .../ratis/server/leader/LogAppenderDefault.java    |   2 +-
 .../org/apache/ratis/ReadOnlyRequestTests.java     | 261 +++++++++++++++++++++
 .../ratis/grpc/TestReadOnlyRequestsWithGrpc.java   |  25 ++
 12 files changed, 503 insertions(+), 34 deletions(-)

diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/ReadException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/ReadException.java
new file mode 100644
index 000000000..32828418d
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/exceptions/ReadException.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.protocol.exceptions;
+
+/**
+ * This exception indicates the failure of a read request.
+ */
+public class ReadException extends RaftException {
+  public ReadException(Throwable cause) {
+    super(cause);
+  }
+}
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index f83656cb0..5bb035bde 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -382,7 +382,7 @@ public class GrpcLogAppender extends LogAppenderBase {
         default:
           throw new IllegalStateException("Unexpected reply result: " + reply.getResult());
       }
-      getLeaderState().onAppendEntriesReply(getFollower(), reply);
+      getLeaderState().onAppendEntriesReply(GrpcLogAppender.this, reply);
       notifyLogAppender();
     }
 
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
index e24631378..625de2e1a 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/RaftServerConfigKeys.java
@@ -157,6 +157,38 @@ public interface RaftServerConfigKeys {
     }
   }
 
+  interface Read {
+    String PREFIX = RaftServerConfigKeys.PREFIX
+        + "." + JavaUtils.getClassSimpleName(Read.class).toLowerCase();
+
+    String TIMEOUT_KEY = PREFIX + ".timeout";
+    TimeDuration TIMEOUT_DEFAULT = TimeDuration.valueOf(10, TimeUnit.SECONDS);
+    static TimeDuration timeout(RaftProperties properties) {
+      return getTimeDuration(properties.getTimeDuration(TIMEOUT_DEFAULT.getUnit()),
+          TIMEOUT_KEY, TIMEOUT_DEFAULT, getDefaultLog(), requirePositive());
+    }
+    static void setTimeout(RaftProperties properties, TimeDuration readOnlyTimeout) {
+      setTimeDuration(properties::setTimeDuration, TIMEOUT_KEY, readOnlyTimeout);
+    }
+
+    enum Option {
+      /** Directly query statemachine. Efficient but may undermine linearizability */
+      DEFAULT,
+
+      /** Use ReadIndex (see Raft Paper section 6.4). Maintains linearizability */
+      LINEARIZABLE
+    }
+
+    String OPTION_KEY = ".option";
+    Option OPTION_DEFAULT = Option.DEFAULT;
+    static Option option(RaftProperties properties) {
+      return get(properties::getEnum, OPTION_KEY, OPTION_DEFAULT, getDefaultLog());
+    }
+    static void setOption(RaftProperties properties, Option option) {
+      set(properties::setEnum, OPTION_KEY, option);
+    }
+  }
+
   interface Write {
     String PREFIX = RaftServerConfigKeys.PREFIX + ".write";
 
diff --git a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java
index 32c7d6ce0..e906dd209 100644
--- a/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java
+++ b/ratis-server-api/src/main/java/org/apache/ratis/server/leader/LeaderState.java
@@ -64,6 +64,6 @@ public interface LeaderState {
   boolean isFollowerBootstrapping(FollowerInfo follower);
 
   /** Received an {@link AppendEntriesReplyProto} */
-  void onAppendEntriesReply(FollowerInfo follower, AppendEntriesReplyProto reply);
+  void onAppendEntriesReply(LogAppender appender, AppendEntriesReplyProto reply);
 
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
index 3cb83b1a8..65c586530 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderStateImpl.java
@@ -1087,7 +1087,7 @@ class LeaderStateImpl implements LeaderState {
 
     final MemoizedSupplier<AppendEntriesListener> supplier = MemoizedSupplier.valueOf(
         () -> new AppendEntriesListener(readIndex));
-    final AppendEntriesListener listener = server.getState().getReadRequests().addAppendEntriesListener(
+    final AppendEntriesListener listener = server.getReadRequests().addAppendEntriesListener(
         readIndex, key -> supplier.get());
 
     // the readIndex is already acknowledged before
@@ -1097,7 +1097,6 @@ class LeaderStateImpl implements LeaderState {
 
     if (supplier.isInitialized()) {
       senders.forEach(sender -> {
-        listener.init(sender);
         try {
           sender.triggerHeartbeat();
         } catch (IOException e) {
@@ -1110,8 +1109,8 @@ class LeaderStateImpl implements LeaderState {
   }
 
   @Override
-  public void onAppendEntriesReply(FollowerInfo follower, RaftProtos.AppendEntriesReplyProto reply) {
-    server.getState().getReadRequests().onAppendEntriesReply(reply, this::hasMajority);
+  public void onAppendEntriesReply(LogAppender appender, RaftProtos.AppendEntriesReplyProto reply) {
+    server.getReadRequests().onAppendEntriesReply(appender, reply, this::hasMajority);
   }
 
   void replyPendingRequest(long logIndex, RaftClientReply reply) {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
index 258e94d9f..edea92ced 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RaftServerImpl.java
@@ -23,6 +23,7 @@ import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.*;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase;
 import org.apache.ratis.protocol.*;
+import org.apache.ratis.protocol.exceptions.ReadException;
 import org.apache.ratis.protocol.exceptions.SetConfigurationException;
 import org.apache.ratis.protocol.exceptions.GroupMismatchException;
 import org.apache.ratis.protocol.exceptions.LeaderNotReadyException;
@@ -163,6 +164,7 @@ class RaftServerImpl implements RaftServer.Division,
   private final RoleInfo role;
 
   private final DataStreamMap dataStreamMap;
+  private final RaftServerConfigKeys.Read.Option readOption;
 
   private final MemoizedSupplier<RaftClient> raftClient;
 
@@ -206,6 +208,7 @@ class RaftServerImpl implements RaftServer.Division,
     this.state = new ServerState(id, group, stateMachine, this, option, properties);
     this.retryCache = new RetryCacheImpl(properties);
     this.dataStreamMap = new DataStreamMapImpl(id);
+    this.readOption = RaftServerConfigKeys.Read.option(properties);
 
     this.jmxAdapter = new RaftServerJmxAdapter();
     this.leaderElectionMetrics = LeaderElectionMetrics.getLeaderElectionMetrics(
@@ -845,7 +848,7 @@ class RaftServerImpl implements RaftServer.Division,
       if (type.is(TypeCase.READ)) {
         // TODO: We might not be the leader anymore by the time this completes.
         // See the RAFT paper section 8 (last part)
-        replyFuture = processQueryFuture(stateMachine.query(request.getMessage()), request);
+        replyFuture = readAsync(request);
       } else if (type.is(TypeCase.WATCH)) {
         replyFuture = watchAsync(request);
       } else if (type.is(TypeCase.MESSAGESTREAM)) {
@@ -907,6 +910,45 @@ class RaftServerImpl implements RaftServer.Division,
     return processQueryFuture(stateMachine.queryStale(request.getMessage(), minIndex), request);
   }
 
+  ReadRequests getReadRequests() {
+    return getState().getReadRequests();
+  }
+
+  private CompletableFuture<RaftClientReply> readAsync(RaftClientRequest request) {
+    if (readOption == RaftServerConfigKeys.Read.Option.LINEARIZABLE) {
+      /*
+        Linearizable read using ReadIndex. See Raft paper section 6.4.
+        1. First obtain readIndex from Leader.
+        2. Then waits for statemachine to advance at least as far as readIndex.
+        3. Finally, query the statemachine and return the result.
+       */
+      final LeaderStateImpl leader = role.getLeaderState().orElse(null);
+      // TODO support follower linearizable read
+      if (leader == null) {
+        return JavaUtils.completeExceptionally(generateNotLeaderException());
+      }
+      return leader.getReadIndex()
+          .thenCompose(readIndex -> getReadRequests().waitToAdvance(readIndex))
+          .thenCompose(readIndex -> queryStateMachine(request))
+          .exceptionally(e -> readException2Reply(request, e));
+    } else if (readOption == RaftServerConfigKeys.Read.Option.DEFAULT) {
+      return queryStateMachine(request);
+    } else {
+      throw new IllegalStateException("Unexpected read option: " + readOption);
+    }
+  }
+
+  private RaftClientReply readException2Reply(RaftClientRequest request, Throwable e) {
+    e = JavaUtils.unwrapCompletionException(e);
+    if (e instanceof StateMachineException ) {
+      return newExceptionReply(request, (StateMachineException) e);
+    } else if (e instanceof ReadException) {
+      return newExceptionReply(request, (ReadException) e);
+    } else {
+      throw new CompletionException(e);
+    }
+  }
+
   private CompletableFuture<RaftClientReply> streamAsync(RaftClientRequest request) {
     return role.getLeaderState()
         .map(ls -> ls.streamAsync(request))
@@ -920,6 +962,10 @@ class RaftServerImpl implements RaftServer.Division,
         .orElse(null);
   }
 
+  CompletableFuture<RaftClientReply> queryStateMachine(RaftClientRequest request) {
+    return processQueryFuture(stateMachine.query(request.getMessage()), request);
+  }
+
   CompletableFuture<RaftClientReply> processQueryFuture(
       CompletableFuture<Message> queryFuture, RaftClientRequest request) {
     return queryFuture.thenApply(r -> newReplyBuilder(request).setSuccess().setMessage(r).build())
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
index 76f974a03..bbd2a92ba 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ReadRequests.java
@@ -17,21 +17,31 @@
  */
 package org.apache.ratis.server.impl;
 
-import org.apache.ratis.proto.RaftProtos;
+import org.apache.ratis.conf.RaftProperties;
 import org.apache.ratis.proto.RaftProtos.AppendEntriesReplyProto;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.exceptions.ReadException;
+import org.apache.ratis.protocol.exceptions.TimeoutIOException;
+import org.apache.ratis.server.RaftServerConfigKeys;
 import org.apache.ratis.server.leader.LogAppender;
 import org.apache.ratis.server.raftlog.RaftLog;
 import org.apache.ratis.server.raftlog.RaftLogIndex;
-import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.MemoizedSupplier;
+import org.apache.ratis.util.TimeDuration;
+import org.apache.ratis.util.TimeoutExecutor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Iterator;
 import java.util.Map;
 import java.util.NavigableMap;
+import java.util.Optional;
 import java.util.TreeMap;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.function.Predicate;
 
@@ -90,21 +100,19 @@ class ReadRequests {
       this.commitIndex = commitIndex;
     }
 
-    void init(LogAppender appender) {
-      replies.put(appender.getFollowerId(), new HeartbeatAck(appender));
-    }
-
     CompletableFuture<Long> getFuture() {
       return future;
     }
 
-    boolean receive(AppendEntriesReplyProto proto, Predicate<Predicate<RaftPeerId>> hasMajority) {
+    boolean receive(LogAppender logAppender, AppendEntriesReplyProto proto,
+                    Predicate<Predicate<RaftPeerId>> hasMajority) {
       if (isCompletedNormally()) {
         return true;
       }
-      final RaftProtos.RaftRpcReplyProto rpc = proto.getServerReply();
-      final HeartbeatAck reply = replies.get(RaftPeerId.valueOf(rpc.getReplyId()));
-      if (!reply.receive(proto)) {
+
+      final HeartbeatAck reply = replies.computeIfAbsent(
+          logAppender.getFollowerId(), key -> new HeartbeatAck(logAppender));
+      if (reply.receive(proto)) {
         if (hasMajority.test(id -> replies.get(id).isAcknowledged())) {
           future.complete(commitIndex);
           return true;
@@ -126,31 +134,80 @@ class ReadRequests {
       return sorted.computeIfAbsent(commitIndex, constructor);
     }
 
-    synchronized void onAppendEntriesReply(AppendEntriesReplyProto reply,
+    synchronized void onAppendEntriesReply(LogAppender appender, AppendEntriesReplyProto reply,
                                            Predicate<Predicate<RaftPeerId>> hasMajority) {
       final long callId = reply.getServerReply().getCallId();
-      for (;;) {
-        final Map.Entry<Long, AppendEntriesListener> first = sorted.firstEntry();
-        if (first == null || first.getKey() > callId) {
+
+      Iterator<Map.Entry<Long, AppendEntriesListener>> iterator = sorted.entrySet().iterator();
+      while (iterator.hasNext()) {
+        Map.Entry<Long, AppendEntriesListener> entry = iterator.next();
+        if (entry.getKey() > callId) {
           return;
         }
 
-        final AppendEntriesListener listener = first.getValue();
+        final AppendEntriesListener listener = entry.getValue();
         if (listener == null) {
           continue;
         }
 
-        if (listener.receive(reply, hasMajority)) {
-          final AppendEntriesListener removed = sorted.remove(callId);
-          Preconditions.assertSame(listener, removed, "AppendEntriesListener");
+        if (listener.receive(appender, reply, hasMajority)) {
           ackedCommitIndex.updateToMax(listener.commitIndex, s -> LOG.debug("{}: {}", ReadRequests.this, s));
+          iterator.remove();
+        }
+      }
+    }
+  }
+
+  static class ReadIndexQueue {
+    private final TimeoutExecutor scheduler = TimeoutExecutor.getInstance();
+    private final NavigableMap<Long, CompletableFuture<Long>> sorted = new ConcurrentSkipListMap<>();
+    private final TimeDuration readTimeout;
+
+    ReadIndexQueue(TimeDuration readTimeout) {
+      this.readTimeout = readTimeout;
+    }
+
+    CompletableFuture<Long> add(long readIndex) {
+      final MemoizedSupplier<CompletableFuture<Long>> supplier = MemoizedSupplier.valueOf(CompletableFuture::new);
+      final CompletableFuture<Long> f = sorted.computeIfAbsent(readIndex, i -> supplier.get());
+
+      if (supplier.isInitialized()) {
+        scheduler.onTimeout(readTimeout, () -> handleTimeout(readIndex),
+            LOG, () -> "Failed to handle read timeout for index " + readIndex);
+      }
+      return f;
+    }
+
+    private void handleTimeout(long readIndex) {
+      Optional.ofNullable(sorted.remove(readIndex)).ifPresent(consumer -> {
+        consumer.completeExceptionally(
+          new ReadException(new TimeoutIOException("Read timeout for index " + readIndex)));
+      });
+    }
+
+    void complete(Long appliedIndex) {
+      for(;;) {
+        if (sorted.isEmpty()) {
+          return;
+        }
+        final Long first = sorted.firstKey();
+        if (first == null || first > appliedIndex) {
+          return;
         }
+        Optional.ofNullable(sorted.remove(first)).ifPresent(f -> f.complete(appliedIndex));
       }
     }
   }
 
   private final AppendEntriesListeners appendEntriesListeners = new AppendEntriesListeners();
   private final RaftLogIndex ackedCommitIndex = new RaftLogIndex("ackedCommitIndex", RaftLog.INVALID_LOG_INDEX);
+  private final ReadIndexQueue readIndexQueue;
+  private final StateMachine stateMachine;
+
+  ReadRequests(RaftProperties properties, StateMachine stateMachine) {
+    this.readIndexQueue = new ReadIndexQueue(RaftServerConfigKeys.Read.timeout(properties));
+    this.stateMachine = stateMachine;
+  }
 
   AppendEntriesListener addAppendEntriesListener(long commitIndex,
                                                  Function<Long, AppendEntriesListener> constructor) {
@@ -160,7 +217,19 @@ class ReadRequests {
     return appendEntriesListeners.add(commitIndex, constructor);
   }
 
-  void onAppendEntriesReply(AppendEntriesReplyProto reply, Predicate<Predicate<RaftPeerId>> hasMajority) {
-    appendEntriesListeners.onAppendEntriesReply(reply, hasMajority);
+  void onAppendEntriesReply(LogAppender appender, AppendEntriesReplyProto reply,
+                            Predicate<Predicate<RaftPeerId>> hasMajority) {
+    appendEntriesListeners.onAppendEntriesReply(appender, reply, hasMajority);
+  }
+
+  Consumer<Long> getAppliedIndexConsumer() {
+    return readIndexQueue::complete;
+  }
+
+  CompletableFuture<Long> waitToAdvance(long readIndex) {
+    if (stateMachine.getLastAppliedTermIndex().getIndex() >= readIndex) {
+      return CompletableFuture.completedFuture(readIndex);
+    }
+    return readIndexQueue.add(readIndex);
   }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
index d62e8f50e..d90cddf01 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerState.java
@@ -128,10 +128,10 @@ class ServerState {
         .filter(i -> i >= 0)
         .orElse(RaftLog.INVALID_LOG_INDEX);
     this.log = JavaUtils.memoize(() -> initRaftLog(getSnapshotIndexFromStateMachine, prop));
+    this.readRequests = new ReadRequests(prop, stateMachine);
     this.stateMachineUpdater = JavaUtils.memoize(() -> new StateMachineUpdater(
-        stateMachine, server, this, getLog().getSnapshotIndex(), prop));
-
-    this.readRequests = new ReadRequests();
+        stateMachine, server, this, getLog().getSnapshotIndex(), prop,
+        this.readRequests.getAppliedIndexConsumer()));
   }
 
   void initialize(StateMachine stateMachine) throws IOException {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
index bd989a389..9f1a8ca5e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/StateMachineUpdater.java
@@ -88,9 +88,12 @@ class StateMachineUpdater implements Runnable {
 
   private final MemoizedSupplier<StateMachineMetrics> stateMachineMetrics;
 
+  private final Consumer<Long> appliedIndexConsumer;
+
   StateMachineUpdater(StateMachine stateMachine, RaftServerImpl server,
-      ServerState serverState, long lastAppliedIndex, RaftProperties properties) {
+      ServerState serverState, long lastAppliedIndex, RaftProperties properties, Consumer<Long> appliedIndexConsumer) {
     this.name = serverState.getMemberId() + "-" + JavaUtils.getClassSimpleName(getClass());
+    this.appliedIndexConsumer = appliedIndexConsumer;
     this.infoIndexChange = s -> LOG.info("{}: {}", name, s);
     this.debugIndexChange = s -> LOG.debug("{}: {}", name, s);
 
@@ -122,6 +125,7 @@ class StateMachineUpdater implements Runnable {
     //wait for RaftServerImpl and ServerState constructors to complete
     stateMachineMetrics.get();
     updater.start();
+    notifyAppliedIndex(appliedIndex.get());
   }
 
   private void stop() {
@@ -218,6 +222,7 @@ class StateMachineUpdater implements Runnable {
     final long i = snapshot.getIndex();
     snapshotIndex.setUnconditionally(i, infoIndexChange);
     appliedIndex.setUnconditionally(i, infoIndexChange);
+    notifyAppliedIndex(i);
     state = State.RUNNING;
   }
 
@@ -235,11 +240,12 @@ class StateMachineUpdater implements Runnable {
         }
 
         final CompletableFuture<Message> f = server.applyLogToStateMachine(next);
+        final long incremented = appliedIndex.incrementAndGet(debugIndexChange);
+        Preconditions.assertTrue(incremented == nextIndex);
         if (f != null) {
           futures.get().add(f);
+          f.thenAccept(m -> notifyAppliedIndex(incremented));
         }
-        final long incremented = appliedIndex.incrementAndGet(debugIndexChange);
-        Preconditions.assertTrue(incremented == nextIndex);
       } else {
         LOG.debug("{}: logEntry {} is null. There may be snapshot to load. state:{}",
             this, nextIndex, state);
@@ -320,6 +326,10 @@ class StateMachineUpdater implements Runnable {
     return appliedIndex.get();
   }
 
+  private void notifyAppliedIndex(long index) {
+    appliedIndexConsumer.accept(index);
+  }
+
   long getStateMachineLastAppliedIndex() {
     return Optional.ofNullable(stateMachine.getLastAppliedTermIndex())
         .map(TermIndex::getIndex)
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
index ca654d1de..8f71f91fc 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/leader/LogAppenderDefault.java
@@ -193,7 +193,7 @@ class LogAppenderDefault extends LogAppenderBase {
           break;
         default: throw new IllegalArgumentException("Unable to process result " + reply.getResult());
       }
-      getLeaderState().onAppendEntriesReply(getFollower(), reply);
+      getLeaderState().onAppendEntriesReply(this, reply);
     }
   }
 
diff --git a/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
new file mode 100644
index 000000000..5c7383000
--- /dev/null
+++ b/ratis-server/src/test/java/org/apache/ratis/ReadOnlyRequestTests.java
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+import org.apache.log4j.Level;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientConfigKeys;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.retry.RetryPolicies;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.RaftServerConfigKeys;
+import org.apache.ratis.server.impl.MiniRaftCluster;
+import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.util.Log4jUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+public abstract class ReadOnlyRequestTests<CLUSTER extends MiniRaftCluster>
+  extends BaseTest
+  implements MiniRaftCluster.Factory.Get<CLUSTER> {
+
+  {
+    Log4jUtils.setLogLevel(RaftServer.Division.LOG, Level.DEBUG);
+  }
+
+  static final int NUM_SERVERS = 3;
+
+  static final String INCREMENT = "INCREMENT";
+  static final String WAIT_AND_INCREMENT = "WAIT_AND_INCREMENT";
+  static final String TIMEOUT_INCREMENT = "TIMEOUT_INCREMENT";
+  static final String QUERY = "QUERY";
+  final Message incrementMessage = new RaftTestUtil.SimpleMessage(INCREMENT);
+  final Message waitAndIncrementMessage = new RaftTestUtil.SimpleMessage(WAIT_AND_INCREMENT);
+  final Message timeoutIncrement = new RaftTestUtil.SimpleMessage(TIMEOUT_INCREMENT);
+  final Message queryMessage = new RaftTestUtil.SimpleMessage(QUERY);
+
+  @Before
+  public void setup() {
+    final RaftProperties p = getProperties();
+    p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY,
+        CounterStateMachine.class, StateMachine.class);
+
+    p.setEnum(RaftServerConfigKeys.Read.OPTION_KEY, RaftServerConfigKeys.Read.Option.LINEARIZABLE);
+    p.setTimeDuration(RaftServerConfigKeys.Read.TIMEOUT_KEY, TimeDuration.valueOf(1, TimeUnit.SECONDS));
+    p.setTimeDuration(RaftServerConfigKeys.Rpc.FIRST_ELECTION_TIMEOUT_MIN_KEY,
+        TimeDuration.valueOf(150, TimeUnit.MILLISECONDS));
+    p.setTimeDuration(RaftServerConfigKeys.Rpc.FIRST_ELECTION_TIMEOUT_MAX_KEY,
+        TimeDuration.valueOf(300, TimeUnit.MILLISECONDS));
+    p.setTimeDuration(RaftServerConfigKeys.Rpc.TIMEOUT_MIN_KEY, TimeDuration.valueOf(3, TimeUnit.SECONDS));
+    p.setTimeDuration(RaftServerConfigKeys.Rpc.TIMEOUT_MAX_KEY, TimeDuration.valueOf(6, TimeUnit.SECONDS));
+    p.setTimeDuration(RaftServerConfigKeys.Rpc.REQUEST_TIMEOUT_KEY,
+        TimeDuration.valueOf(10, TimeUnit.SECONDS));
+
+    p.setTimeDuration(RaftClientConfigKeys.Rpc.REQUEST_TIMEOUT_KEY,
+        TimeDuration.valueOf(10, TimeUnit.SECONDS));
+  }
+
+  @Test
+  public void testLinearizableRead() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::testLinearizableReadImpl);
+  }
+
+  private void testLinearizableReadImpl(CLUSTER cluster) throws Exception {
+    try {
+      RaftTestUtil.waitForLeader(cluster);
+      final RaftPeerId leaderId = cluster.getLeader().getId();
+
+      try (final RaftClient client = cluster.createClient(leaderId)) {
+        for (int i = 1; i <= 10; i++) {
+          RaftClientReply reply = client.io().send(incrementMessage);
+          Assert.assertTrue(reply.isSuccess());
+          reply = client.io().sendReadOnly(queryMessage);
+          Assert.assertEquals(retrieve(reply), i);
+        }
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testLinearizableReadParallel() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::testLinearizableReadParallelImpl);
+  }
+
+  private void testLinearizableReadParallelImpl(CLUSTER cluster) throws Exception {
+    try {
+      RaftTestUtil.waitForLeader(cluster);
+      final RaftPeerId leaderId = cluster.getLeader().getId();
+
+      try (RaftClient client = cluster.createClient(leaderId, RetryPolicies.noRetry())) {
+
+        RaftClientReply reply = client.io().send(incrementMessage);
+        Assert.assertTrue(reply.isSuccess());
+        Semaphore canRead = new Semaphore(0);
+
+        // this future will complete after 500 ms
+        Thread thread = new Thread(() -> {
+          try {
+            RaftClientReply staleValueBefore = client.io()
+                .sendStaleRead(queryMessage, 0, leaderId);
+
+            Assert.assertEquals(retrieve(staleValueBefore), 1);
+
+            canRead.acquire();
+            // we still have to sleep for a while to guarantee that the async write arrives at RaftServer
+            Thread.sleep(100);
+            // send a linearizable read request
+            // linearizable read will wait the statemachine to advance
+            RaftClientReply linearizableReadValue = client.io()
+                .sendReadOnly(queryMessage);
+            Assert.assertEquals(retrieve(linearizableReadValue), 2);
+          }
+          catch (Exception ignored) {}
+        });
+
+        thread.start();
+        CompletableFuture<RaftClientReply> result = client.async().send(waitAndIncrementMessage);
+        canRead.release();
+        thread.join();
+
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testLinearizableReadTimeout() throws Exception {
+    runWithNewCluster(NUM_SERVERS, this::testLinearizableReadTimeoutImpl);
+  }
+
+  private void testLinearizableReadTimeoutImpl(CLUSTER cluster) throws Exception {
+    try {
+      RaftTestUtil.waitForLeader(cluster);
+      final RaftPeerId leaderId = cluster.getLeader().getId();
+
+      try (final RaftClient client = cluster.createClient(leaderId)) {
+
+        Semaphore canRead = new Semaphore(0);
+
+        Thread thread = new Thread(() -> {
+          try (RaftClient noRetryClient = cluster.createClient(leaderId, RetryPolicies.noRetry())) {
+            canRead.acquire();
+            // we still have to sleep for a while to guarantee that the async write arrives at RaftServer
+            Thread.sleep(100);
+            RaftClientReply timeoutReply = noRetryClient.io().sendReadOnly(queryMessage);
+            Assert.assertNotNull(timeoutReply.getException());
+          } catch (Exception ignored) {}
+        });
+
+        thread.start();
+        CompletableFuture<RaftClientReply> result = client.async().send(timeoutIncrement);
+        canRead.release();
+
+        thread.join();
+      }
+
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  private int retrieve(RaftClientReply reply) {
+    return Integer.parseInt(reply.getMessage().getContent().toString(StandardCharsets.UTF_8));
+  }
+
+
+  /**
+   * CounterStateMachine support 3 operations
+   * 1. increment
+   * 2. get
+   * 3. waitAndIncrement
+   */
+  static class CounterStateMachine extends BaseStateMachine {
+    private final AtomicLong counter = new AtomicLong(0L);
+
+    @Override
+    public CompletableFuture<Message> query(Message request) {
+      return CompletableFuture.completedFuture(
+          Message.valueOf(String.valueOf(counter.get())));
+    }
+
+    @Override
+    public CompletableFuture<Message> queryStale(Message request, long minIndex) {
+      return query(request);
+    }
+
+    private void sleepQuietly(int millis) {
+      try {
+        Thread.sleep(millis);
+      } catch (InterruptedException e) {
+        LOG.debug("{} be interrupted", Thread.currentThread());
+        Thread.currentThread().interrupt();
+      }
+    }
+
+    private void increment() {
+      counter.incrementAndGet();
+    }
+
+    private void waitAndIncrement() {
+      sleepQuietly(500);
+      increment();
+    }
+
+    private void timeoutIncrement() {
+      sleepQuietly(1500);
+      increment();
+    }
+
+
+    @Override
+    public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
+      LOG.debug("apply trx with index=" + trx.getLogEntry().getIndex());
+      updateLastAppliedTermIndex(trx.getLogEntry().getTerm(), trx.getLogEntry().getIndex());
+
+      String command = trx.getLogEntry().getStateMachineLogEntry()
+          .getLogData().toString(StandardCharsets.UTF_8);
+
+      LOG.info("receive command: {}", command);
+      if (command.equals(INCREMENT)) {
+        increment();
+      } else if (command.equals(WAIT_AND_INCREMENT)) {
+        waitAndIncrement();
+      } else {
+        timeoutIncrement();
+      }
+
+      return CompletableFuture.completedFuture(Message.valueOf("OK"));
+    }
+  }
+}
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestsWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestsWithGrpc.java
new file mode 100644
index 000000000..b89523c61
--- /dev/null
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestReadOnlyRequestsWithGrpc.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.grpc;
+
+import org.apache.ratis.ReadOnlyRequestTests;
+
+public class TestReadOnlyRequestsWithGrpc
+  extends ReadOnlyRequestTests<MiniRaftClusterWithGrpc>
+  implements MiniRaftClusterWithGrpc.FactoryGet {
+}