You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2019/03/11 22:33:44 UTC

[incubator-ratis] branch master updated: RATIS-345. Watch requests should bypass sliding window.

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 5680cf5  RATIS-345. Watch requests should bypass sliding window.
5680cf5 is described below

commit 5680cf5fbb03362c65f0514adf104a5e29ea9b57
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Mon Mar 11 12:16:22 2019 -0700

    RATIS-345. Watch requests should bypass sliding window.
---
 dev-support/run-test-repeatedly.sh                 |   4 +-
 .../org/apache/ratis/client/RaftClientRpc.java     |   7 +
 .../apache/ratis/client/impl/RaftClientImpl.java   | 107 +++++++-----
 .../apache/ratis/client/impl/UnorderedAsync.java   | 104 ++++++++++++
 .../org/apache/ratis/protocol/RaftClientReply.java |   5 -
 .../apache/ratis/protocol/TimeoutIOException.java  |   4 +
 .../org/apache/ratis/util/CollectionUtils.java     |   4 +-
 .../main/java/org/apache/ratis/util/JavaUtils.java |   9 +-
 .../java/org/apache/ratis/util/PeerProxyMap.java   |   8 +-
 .../main/java/org/apache/ratis/grpc/GrpcUtil.java  |  18 +-
 .../grpc/client/GrpcClientProtocolClient.java      | 122 ++++++++++----
 .../grpc/client/GrpcClientProtocolService.java     | 184 ++++++++++++++++-----
 .../apache/ratis/grpc/client/GrpcClientRpc.java    |  36 +++-
 .../apache/ratis/grpc/server/GrpcLogAppender.java  |   8 +-
 ratis-proto/src/main/proto/Grpc.proto              |   4 +
 .../org/apache/ratis/server/impl/LeaderState.java  |   2 +
 .../apache/ratis/server/impl/ServerProtoUtils.java |  18 +-
 .../org/apache/ratis/server/storage/RaftLog.java   |   6 +-
 .../ratis/server/storage/SegmentedRaftLog.java     |  12 ++
 .../java/org/apache/ratis/WatchRequestTests.java   |  60 +++----
 .../ratis/grpc/TestWatchRequestWithGrpc.java       |  14 +-
 21 files changed, 567 insertions(+), 169 deletions(-)

diff --git a/dev-support/run-test-repeatedly.sh b/dev-support/run-test-repeatedly.sh
index 2a666a9..dd3b874 100755
--- a/dev-support/run-test-repeatedly.sh
+++ b/dev-support/run-test-repeatedly.sh
@@ -27,6 +27,8 @@ TEST_NAME=`echo ${TEST_PATTERN} | cut -d# -f 1`
 MVN="mvn"
 set -ex
 
+mvn clean
+
 for i in `seq 1 99`;
 do
   OUTDIR=${TEST_NAME}.${i}
@@ -35,7 +37,7 @@ do
   echo
   echo Running ${OUTDIR}
   echo
-  time ${MVN} test -DskipShade -Dtest=${TEST_PATTERN} 2>&1 | tee ${OUTF}
+  time ${MVN} test -Dtest=${TEST_PATTERN} 2>&1 | tee ${OUTF}
 
   find */target/surefire-reports/ -name \*${TEST_NAME}\* | xargs -I{} cp {} ${OUTDIR}
 
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
index 8fb0987..abdfd41 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientRpc.java
@@ -21,6 +21,7 @@ import org.apache.ratis.protocol.RaftClientReply;
 import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeer;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.util.JavaUtils;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -33,6 +34,12 @@ public interface RaftClientRpc extends Closeable {
     throw new UnsupportedOperationException(getClass() + " does not support this method.");
   }
 
+  /** Async call to send a request. */
+  default CompletableFuture<RaftClientReply> sendRequestAsyncUnordered(RaftClientRequest request) {
+    throw new UnsupportedOperationException(getClass() + " does not support "
+        + JavaUtils.getCurrentStackTraceElement().getMethodName());
+  }
+
   /** Send a request. */
   RaftClientReply sendRequest(RaftClientRequest request) throws IOException;
 
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index c14417d..b49fbbf 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -31,6 +31,7 @@ import java.io.IOException;
 import java.io.InterruptedIOException;
 import java.util.*;
 import java.util.concurrent.*;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
 import java.util.function.LongFunction;
@@ -44,24 +45,39 @@ import static org.apache.ratis.proto.RaftProtos.RaftClientRequestProto.TypeCase.
 final class RaftClientImpl implements RaftClient {
   private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
 
-  private static long nextCallId() {
+  static long nextCallId() {
     return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
   }
 
-  static class PendingAsyncRequest implements SlidingWindow.Request<RaftClientReply> {
-    private final long seqNum;
-    private final LongFunction<RaftClientRequest> requestConstructor;
+  static class PendingClientRequest {
+    private final Supplier<RaftClientRequest> requestConstructor;
     private final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
-    private volatile int attemptCount;
+    private final AtomicInteger attemptCount = new AtomicInteger();
 
-    PendingAsyncRequest(long seqNum, LongFunction<RaftClientRequest> requestConstructor) {
-      this.seqNum = seqNum;
+    PendingClientRequest(Supplier<RaftClientRequest> requestConstructor) {
       this.requestConstructor = requestConstructor;
     }
 
     RaftClientRequest newRequest() {
-      attemptCount++;
-      return requestConstructor.apply(seqNum);
+      attemptCount.incrementAndGet();
+      return requestConstructor.get();
+    }
+
+    CompletableFuture<RaftClientReply> getReplyFuture() {
+      return replyFuture;
+    }
+
+    int getAttemptCount() {
+      return attemptCount.get();
+    }
+  }
+
+  static class PendingAsyncRequest extends PendingClientRequest implements SlidingWindow.Request<RaftClientReply> {
+    private final long seqNum;
+
+    PendingAsyncRequest(long seqNum, LongFunction<RaftClientRequest> requestConstructor) {
+      super(() -> requestConstructor.apply(seqNum));
+      this.seqNum = seqNum;
     }
 
     @Override
@@ -71,25 +87,17 @@ final class RaftClientImpl implements RaftClient {
 
     @Override
     public boolean hasReply() {
-      return replyFuture.isDone();
+      return getReplyFuture().isDone();
     }
 
     @Override
     public void setReply(RaftClientReply reply) {
-      replyFuture.complete(reply);
+      getReplyFuture().complete(reply);
     }
 
     @Override
     public void fail(Exception e) {
-      replyFuture.completeExceptionally(e);
-    }
-
-    CompletableFuture<RaftClientReply> getReplyFuture() {
-      return replyFuture;
-    }
-
-    int getAttemptCount() {
-      return attemptCount;
+      getReplyFuture().completeExceptionally(e);
     }
 
     @Override
@@ -133,6 +141,14 @@ final class RaftClientImpl implements RaftClient {
     return clientId;
   }
 
+  RetryPolicy getRetryPolicy() {
+    return retryPolicy;
+  }
+
+  TimeoutScheduler getScheduler() {
+    return scheduler;
+  }
+
   private SlidingWindow.Client<PendingAsyncRequest, RaftClientReply> getSlidingWindow(RaftClientRequest request) {
     return getSlidingWindow(request.is(STALEREAD)? request.getServerId(): null);
   }
@@ -159,7 +175,7 @@ final class RaftClientImpl implements RaftClient {
 
   @Override
   public CompletableFuture<RaftClientReply> sendWatchAsync(long index, ReplicationLevel replication) {
-    return sendAsync(RaftClientRequest.watchRequestType(index, replication), null, null);
+    return UnorderedAsync.send(RaftClientRequest.watchRequestType(index, replication), this);
   }
 
   private CompletableFuture<RaftClientReply> sendAsync(
@@ -183,7 +199,7 @@ final class RaftClientImpl implements RaftClient {
     ).whenComplete((r, e) -> asyncRequestSemaphore.release());
   }
 
-  private RaftClientRequest newRaftClientRequest(
+  RaftClientRequest newRaftClientRequest(
       RaftPeerId server, long callId, long seq, Message message, RaftClientRequest.Type type) {
     return new RaftClientRequest(clientId, server != null? server: leaderId, groupId,
         callId, seq, message, type);
@@ -288,10 +304,10 @@ final class RaftClientImpl implements RaftClient {
         return;
       }
       if (reply == null) {
-        LOG.debug("schedule attempt #{} with policy {} for {}", pending.getAttemptCount(), retryPolicy, request);
+        LOG.debug("schedule* attempt #{} with policy {} for {}", pending.getAttemptCount(), retryPolicy, request);
         scheduler.onTimeout(retryPolicy.getSleepTime(),
             () -> getSlidingWindow(request).retry(pending, this::sendRequestWithRetryAsync),
-            LOG, () -> "Failed to retry " + request);
+            LOG, () -> "Failed* to retry " + request);
       } else {
         f.complete(reply);
       }
@@ -323,7 +339,7 @@ final class RaftClientImpl implements RaftClient {
     LOG.debug("{}: send* {}", clientId, request);
     return clientRpc.sendRequestAsync(request).thenApply(reply -> {
       LOG.debug("{}: receive* {}", clientId, reply);
-      reply = handleNotLeaderException(request, reply);
+      reply = handleNotLeaderException(request, reply, true);
       if (reply != null) {
         getSlidingWindow(request).receiveReply(
             request.getSeqNum(), reply, this::sendRequestWithRetryAsync);
@@ -333,16 +349,16 @@ final class RaftClientImpl implements RaftClient {
       return reply;
     }).exceptionally(e -> {
       if (LOG.isTraceEnabled()) {
-        LOG.trace(clientId + ": Failed " + request, e);
+        LOG.trace(clientId + ": Failed* " + request, e);
       } else {
-        LOG.debug("{}: Failed {} with {}", clientId, request, e);
+        LOG.debug("{}: Failed* {} with {}", clientId, request, e);
       }
       e = JavaUtils.unwrapCompletionException(e);
       if (e instanceof IOException && !(e instanceof GroupMismatchException)) {
         if (!retryPolicy.shouldRetry(attemptCount)) {
           handleAsyncRetryFailure(request, attemptCount);
         } else {
-          handleIOException(request, (IOException) e, null);
+          handleIOException(request, (IOException) e, null, true);
         }
         return null;
       }
@@ -350,9 +366,14 @@ final class RaftClientImpl implements RaftClient {
     });
   }
 
-  private void handleAsyncRetryFailure(RaftClientRequest request, int attemptCount) {
-    final RaftRetryFailureException rfe = new RaftRetryFailureException(
+  static RaftRetryFailureException newRaftRetryFailureException(
+      RaftClientRequest request, int attemptCount, RetryPolicy retryPolicy) {
+    return new RaftRetryFailureException(
         "Failed " + request + " for " + (attemptCount-1) + " attempts with " + retryPolicy);
+  }
+
+  private void handleAsyncRetryFailure(RaftClientRequest request, int attemptCount) {
+    final RaftRetryFailureException rfe = newRaftRetryFailureException(request, attemptCount, retryPolicy);
     getSlidingWindow(request).fail(request.getSeqNum(), rfe);
   }
 
@@ -365,10 +386,10 @@ final class RaftClientImpl implements RaftClient {
     } catch (GroupMismatchException gme) {
       throw gme;
     } catch (IOException ioe) {
-      handleIOException(request, ioe, null);
+      handleIOException(request, ioe, null, false);
     }
     LOG.debug("{}: receive {}", clientId, reply);
-    reply = handleNotLeaderException(request, reply);
+    reply = handleNotLeaderException(request, reply, false);
     reply = handleStateMachineException(reply, Function.identity());
     return reply;
   }
@@ -388,7 +409,8 @@ final class RaftClientImpl implements RaftClient {
    * @return null if the reply is null or it has {@link NotLeaderException};
    *         otherwise return the same reply.
    */
-  private RaftClientReply handleNotLeaderException(RaftClientRequest request, RaftClientReply reply) {
+  RaftClientReply handleNotLeaderException(RaftClientRequest request, RaftClientReply reply,
+      boolean resetSlidingWindow) {
     if (reply == null) {
       return null;
     }
@@ -396,10 +418,15 @@ final class RaftClientImpl implements RaftClient {
     if (nle == null) {
       return reply;
     }
+    return handleNotLeaderException(request, nle, resetSlidingWindow);
+  }
+
+  RaftClientReply handleNotLeaderException(RaftClientRequest request, NotLeaderException nle,
+      boolean resetSlidingWindow) {
     refreshPeers(Arrays.asList(nle.getPeers()));
     final RaftPeerId newLeader = nle.getSuggestedLeader() == null ? null
         : nle.getSuggestedLeader().getId();
-    handleIOException(request, nle, newLeader);
+    handleIOException(request, nle, newLeader, resetSlidingWindow);
     return null;
   }
 
@@ -412,25 +439,29 @@ final class RaftClientImpl implements RaftClient {
     }
   }
 
-  private void handleIOException(RaftClientRequest request, IOException ioe,
-      RaftPeerId newLeader) {
+  void handleIOException(RaftClientRequest request, IOException ioe,
+      RaftPeerId newLeader, boolean resetSlidingWindow) {
     LOG.debug("{}: suggested new leader: {}. Failed {} with {}",
         clientId, newLeader, request, ioe);
     if (LOG.isTraceEnabled()) {
       LOG.trace("Stack trace", new Throwable("TRACE"));
     }
 
-    getSlidingWindow(request).resetFirstSeqNum();
+    if (resetSlidingWindow) {
+      getSlidingWindow(request).resetFirstSeqNum();
+    }
     if (ioe instanceof LeaderNotReadyException) {
       return;
     }
 
     final RaftPeerId oldLeader = request.getServerId();
-    final boolean stillLeader = oldLeader.equals(leaderId);
+    final RaftPeerId curLeader = request.getServerId();
+    final boolean stillLeader = oldLeader.equals(curLeader);
     if (newLeader == null && stillLeader) {
       newLeader = CollectionUtils.random(oldLeader,
           CollectionUtils.as(peers, RaftPeer::getId));
     }
+    LOG.debug("{}: oldLeader={},  curLeader={}, newLeader{}", clientId, oldLeader, curLeader, newLeader);
 
     final boolean changeLeader = newLeader != null && stillLeader;
     if (changeLeader) {
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
new file mode 100644
index 0000000..d4ab14d
--- /dev/null
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/UnorderedAsync.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.client.impl;
+
+import org.apache.ratis.client.impl.RaftClientImpl.PendingClientRequest;
+import org.apache.ratis.protocol.ClientId;
+import org.apache.ratis.protocol.GroupMismatchException;
+import org.apache.ratis.protocol.NotLeaderException;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.util.JavaUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+
+/** Send unordered asynchronous requests to a raft service. */
+public interface UnorderedAsync {
+  Logger LOG = LoggerFactory.getLogger(UnorderedAsync.class);
+
+  static CompletableFuture<RaftClientReply> send(RaftClientRequest.Type type, RaftClientImpl client) {
+    final long callId = RaftClientImpl.nextCallId();
+    final PendingClientRequest pending = new PendingClientRequest(
+        () -> client.newRaftClientRequest(null, callId, -1L, null, type));
+    sendRequestWithRetry(pending, client);
+    return pending.getReplyFuture()
+        .thenApply(reply -> RaftClientImpl.handleStateMachineException(reply, CompletionException::new));
+  }
+
+  static void sendRequestWithRetry(PendingClientRequest pending, RaftClientImpl client) {
+    final CompletableFuture<RaftClientReply> f = pending.getReplyFuture();
+    if (f.isDone()) {
+      return;
+    }
+
+    final RaftClientRequest request = pending.newRequest();
+    final int attemptCount = pending.getAttemptCount();
+
+    final ClientId clientId = client.getId();
+    LOG.debug("{}: attempt #{} send~ {}", clientId, attemptCount, request);
+    client.getClientRpc().sendRequestAsyncUnordered(request).whenCompleteAsync((reply, e) -> {
+      try {
+        LOG.debug("{}: attempt #{} receive~ {}", clientId, attemptCount, reply);
+        reply = client.handleNotLeaderException(request, reply, false);
+        if (reply != null) {
+          f.complete(reply);
+          return;
+        }
+        final RetryPolicy retryPolicy = client.getRetryPolicy();
+        if (!retryPolicy.shouldRetry(attemptCount)) {
+          f.completeExceptionally(RaftClientImpl.newRaftRetryFailureException(request, attemptCount, retryPolicy));
+          return;
+        }
+
+        if (e != null) {
+          if (LOG.isTraceEnabled()) {
+            LOG.trace(clientId + ": attempt #" + attemptCount + " failed~ " + request, e);
+          } else {
+            LOG.debug("{}: attempt #{} failed {} with {}", clientId, attemptCount, request, e);
+          }
+          e = JavaUtils.unwrapCompletionException(e);
+
+          if (e instanceof IOException) {
+            if (e instanceof NotLeaderException) {
+              client.handleNotLeaderException(request, (NotLeaderException) e, false);
+            } else if (!(e instanceof GroupMismatchException)) {
+              client.handleIOException(request, (IOException) e, null, false);
+            }
+          } else {
+            if (!client.getClientRpc().handleException(request.getServerId(), e, false)) {
+              f.completeExceptionally(e);
+              return;
+            }
+          }
+        }
+
+        LOG.debug("schedule retry for attempt #{}, policy={}, request={}", attemptCount, retryPolicy, request);
+        client.getScheduler().onTimeout(retryPolicy.getSleepTime(), () -> sendRequestWithRetry(pending, client),
+            LOG, () -> clientId + ": Failed~ to retry " + request);
+      } catch (Throwable t) {
+        LOG.error(clientId + ": XXX Failed " + request, t);
+        f.completeExceptionally(t);
+      }
+    });
+  }
+}
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
index 7a9574f..826eeee 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientReply.java
@@ -92,11 +92,6 @@ public class RaftClientReply extends RaftClientMessage {
         request.getCallId(), false, request.getMessage(), nre, nre.getLogIndex(), commitInfos);
   }
 
-  public RaftClientReply(RaftClientReply reply, NotReplicatedException nre) {
-    this(reply.getClientId(), reply.getServerId(), reply.getRaftGroupId(),
-        reply.getCallId(), false, reply.getMessage(), nre, reply.getLogIndex(), reply.getCommitInfos());
-  }
-
   /**
    * Get the commit information for the entire group.
    * The commit information may be unavailable for exception reply.
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/TimeoutIOException.java b/ratis-common/src/main/java/org/apache/ratis/protocol/TimeoutIOException.java
index 6effb30..bfdff88 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/TimeoutIOException.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/TimeoutIOException.java
@@ -25,6 +25,10 @@ import java.io.IOException;
 public class TimeoutIOException extends IOException {
   static final long serialVersionUID = 1L;
 
+  public TimeoutIOException(String message) {
+    super(message);
+  }
+
   public TimeoutIOException(String message, Throwable throwable) {
     super(message, throwable);
   }
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
index cb49847..a215d3d 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
@@ -62,7 +62,6 @@ public interface CollectionUtils {
   static <T> T random(final T given, Iterable<T> iteration) {
     Objects.requireNonNull(given, "given == null");
     Objects.requireNonNull(iteration, "iteration == null");
-    Preconditions.assertTrue(iteration.iterator().hasNext(), "iteration is empty");
 
     final List<T> list = StreamSupport.stream(iteration.spliterator(), false)
         .filter(e -> !given.equals(e))
@@ -92,10 +91,11 @@ public interface CollectionUtils {
     return as(Arrays.asList(array), converter);
   }
 
-  static <K, V> void putNew(K key, V value, Map<K, V> map, Supplier<String> name) {
+  static <K, V> V putNew(K key, V value, Map<K, V> map, Supplier<String> name) {
     final V returned = map.put(key, value);
     Preconditions.assertTrue(returned == null,
         () -> "Entry already exists for key " + key + " in map " + name.get());
+    return value;
   }
 
   static <K, V> void replaceExisting(K key, V oldValue, V newValue, Map<K, V> map, Supplier<String> name) {
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
index 916d27a..d004c3f 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/JavaUtils.java
@@ -27,8 +27,8 @@ import java.lang.management.ThreadInfo;
 import java.lang.management.ThreadMXBean;
 import java.text.DateFormat;
 import java.text.SimpleDateFormat;
+import java.util.Collection;
 import java.util.Date;
-import java.util.List;
 import java.util.Objects;
 import java.util.Timer;
 import java.util.TimerTask;
@@ -74,6 +74,11 @@ public interface JavaUtils {
     return trace[3];
   }
 
+  static StackTraceElement getCurrentStackTraceElement() {
+    final StackTraceElement[] trace = Thread.currentThread().getStackTrace();
+    return trace[2];
+  }
+
   static <T extends Throwable> void runAsUnchecked(CheckedRunnable<T> runnable) {
     runAsUnchecked(runnable, RuntimeException::new);
   }
@@ -238,7 +243,7 @@ public interface JavaUtils {
     return t instanceof CompletionException && t.getCause() != null? t.getCause(): t;
   }
 
-  static <T> CompletableFuture<Void> allOf(List<CompletableFuture<T>> futures) {
+  static <T> CompletableFuture<Void> allOf(Collection<CompletableFuture<T>> futures) {
     return CompletableFuture.allOf(futures.toArray(EMPTY_COMPLETABLE_FUTURE_ARRAY));
   }
 
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
index 813fac7..b2720ff 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/PeerProxyMap.java
@@ -121,9 +121,11 @@ public class PeerProxyMap<PROXY extends Closeable> implements Closeable {
     LOG.debug("{}: reset proxy for {}", name, id );
     synchronized (resetLock) {
       final PeerAndProxy pp = peers.remove(id);
-      final RaftPeer peer = pp.getPeer();
-      pp.close();
-      computeIfAbsent(peer);
+      if (pp != null) {
+        final RaftPeer peer = pp.getPeer();
+        pp.close();
+        computeIfAbsent(peer);
+      }
     }
   }
 
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
index 482a8de..ce524db 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/GrpcUtil.java
@@ -38,6 +38,8 @@ import java.util.function.Supplier;
 public interface GrpcUtil {
   Metadata.Key<String> EXCEPTION_TYPE_KEY =
       Metadata.Key.of("exception-type", Metadata.ASCII_STRING_MARSHALLER);
+  Metadata.Key<byte[]> EXCEPTION_OBJECT_KEY =
+      Metadata.Key.of("exception-object-bin", Metadata.BINARY_BYTE_MARSHALLER);
   Metadata.Key<String> CALL_ID =
       Metadata.Key.of("call-id", Metadata.ASCII_STRING_MARSHALLER);
 
@@ -50,6 +52,7 @@ public interface GrpcUtil {
 
     Metadata trailers = new Metadata();
     trailers.put(EXCEPTION_TYPE_KEY, t.getClass().getCanonicalName());
+    trailers.put(EXCEPTION_OBJECT_KEY, IOUtils.object2Bytes(t));
     if (callId > 0) {
       trailers.put(CALL_ID, String.valueOf(callId));
     }
@@ -74,8 +77,21 @@ public interface GrpcUtil {
 
   static IOException tryUnwrapException(StatusRuntimeException se) {
     final Metadata trailers = se.getTrailers();
+    if (trailers == null) {
+      return null;
+    }
+
+    final byte[] bytes = trailers.get(EXCEPTION_OBJECT_KEY);
+    if (bytes != null) {
+      try {
+        return IOUtils.bytes2Object(bytes, IOException.class);
+      } catch (Exception e) {
+        se.addSuppressed(e);
+      }
+    }
+
     final Status status = se.getStatus();
-    if (trailers != null && status != null) {
+    if (status != null) {
       final String className = trailers.get(EXCEPTION_TYPE_KEY);
       if (className != null) {
         try {
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index 8e6503d..cfd4ce1 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -31,6 +31,7 @@ import org.apache.ratis.proto.RaftProtos.GroupManagementRequestProto;
 import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
+import org.apache.ratis.protocol.TimeoutIOException;
 import org.apache.ratis.thirdparty.io.grpc.netty.GrpcSslContexts;
 import org.apache.ratis.thirdparty.io.netty.handler.ssl.SslContextBuilder;
 import org.apache.ratis.proto.grpc.AdminProtocolServiceGrpc;
@@ -66,6 +67,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
+import java.util.function.Function;
 import java.util.function.Supplier;
 
 public class GrpcClientProtocolClient implements Closeable {
@@ -76,7 +78,7 @@ public class GrpcClientProtocolClient implements Closeable {
   private final ManagedChannel channel;
 
   private final TimeDuration requestTimeoutDuration;
-  private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(1);
+  private final TimeoutScheduler scheduler = TimeoutScheduler.newInstance(3);
 
   private final RaftClientProtocolServiceBlockingStub blockingStub;
   private final RaftClientProtocolServiceStub asyncStub;
@@ -84,9 +86,9 @@ public class GrpcClientProtocolClient implements Closeable {
 
   private final AtomicReference<AsyncStreamObservers> appendStreamObservers = new AtomicReference<>();
 
-  public GrpcClientProtocolClient(ClientId id, RaftPeer target,
-                                  RaftProperties properties,
-                                  GrpcTlsConfig tlsConf) {
+  private final AtomicReference<AsyncStreamObservers> unorderedStreamObservers = new AtomicReference<>();
+
+  GrpcClientProtocolClient(ClientId id, RaftPeer target, RaftProperties properties, GrpcTlsConfig tlsConf) {
     this.name = JavaUtils.memoize(() -> id + "->" + target.getId());
     this.target = target;
     final SizeInBytes flowControlWindow = GrpcConfigKeys.flowControlWindow(properties, LOG::debug);
@@ -126,10 +128,8 @@ public class GrpcClientProtocolClient implements Closeable {
 
   @Override
   public void close() {
-    final AsyncStreamObservers observers = appendStreamObservers.get();
-    if (observers != null) {
-      observers.close();
-    }
+    Optional.ofNullable(appendStreamObservers.getAndSet(null)).ifPresent(AsyncStreamObservers::close);
+    Optional.ofNullable(unorderedStreamObservers.getAndSet(null)).ifPresent(AsyncStreamObservers::close);
     channel.shutdownNow();
   }
 
@@ -181,22 +181,79 @@ public class GrpcClientProtocolClient implements Closeable {
   }
 
   AsyncStreamObservers getAppendStreamObservers() {
-    return appendStreamObservers.updateAndGet(a -> a != null? a : new AsyncStreamObservers());
+    return appendStreamObservers.updateAndGet(
+        a -> a != null? a : new AsyncStreamObservers(appendStreamObservers, this::append));
+  }
+
+  AsyncStreamObservers getUnorderedAsyncStreamObservers() {
+    return unorderedStreamObservers.updateAndGet(
+        a -> a != null? a : new AsyncStreamObservers(unorderedStreamObservers, asyncStub::unordered));
   }
 
   public RaftPeer getTarget() {
     return target;
   }
 
-  class AsyncStreamObservers implements Closeable {
+  class ReplyMap {
+    private final AtomicReference<Map<Long, CompletableFuture<RaftClientReply>>> map
+        = new AtomicReference<>(new ConcurrentHashMap<>());
+
+    // synchronized to avoid putNew after getAndSetNull
+    synchronized CompletableFuture<RaftClientReply> putNew(long callId) {
+      return Optional.ofNullable(map.get())
+          .map(m -> CollectionUtils.putNew(callId, new CompletableFuture<>(), m, this::toString))
+          .orElse(null);
+    }
+
+    Optional<CompletableFuture<RaftClientReply>> remove(long callId) {
+      return Optional.ofNullable(map.get()).map(m -> m.remove(callId));
+    }
+
+    // synchronized to avoid putNew after getAndSetNull
+    synchronized Map<Long, CompletableFuture<RaftClientReply>> getAndSetNull() {
+      return map.getAndSet(null);
+    }
+
+    @Override
+    public String toString() {
+      return getName() + ":" + getClass().getSimpleName();
+    }
+  }
+
+  static class RequestStreamer {
+    private final AtomicReference<StreamObserver<RaftClientRequestProto>> streamObserver;
+
+    RequestStreamer(StreamObserver<RaftClientRequestProto> streamObserver) {
+      this.streamObserver = new AtomicReference<>(streamObserver);
+    }
+
+    synchronized boolean onNext(RaftClientRequestProto request) {
+      final StreamObserver<RaftClientRequestProto> s = streamObserver.get();
+      if (s != null) {
+        s.onNext(request);
+        return true;
+      }
+      return false;
+    }
+
+    synchronized void onCompleted() {
+      final StreamObserver<RaftClientRequestProto> s = streamObserver.getAndSet(null);
+      if (s != null) {
+        s.onCompleted();
+      }
+    }
+  }
+
+  class AsyncStreamObservers {
     /** Request map: callId -> future */
-    private final AtomicReference<Map<Long, CompletableFuture<RaftClientReply>>> replies = new AtomicReference<>(new ConcurrentHashMap<>());
+    private final ReplyMap replies = new ReplyMap();
     private final StreamObserver<RaftClientReplyProto> replyStreamObserver = new StreamObserver<RaftClientReplyProto>() {
       @Override
       public void onNext(RaftClientReplyProto proto) {
         final long callId = proto.getRpcReply().getCallId();
         try {
           final RaftClientReply reply = ClientProtoUtils.toRaftClientReply(proto);
+          LOG.info("{}: receive {}", getName(), reply);
           final NotLeaderException nle = reply.getNotLeaderException();
           if (nle != null) {
             completeReplyExceptionally(nle, NotLeaderException.class.getName());
@@ -219,46 +276,53 @@ public class GrpcClientProtocolClient implements Closeable {
         completeReplyExceptionally(null, "completed");
       }
     };
-    private final StreamObserver<RaftClientRequestProto> requestStreamObserver = append(replyStreamObserver);
+    private final RequestStreamer requestStreamer;
+    private final AtomicReference<AsyncStreamObservers> ref;
+
+    AsyncStreamObservers(AtomicReference<AsyncStreamObservers> ref,
+        Function<StreamObserver<RaftClientReplyProto>, StreamObserver<RaftClientRequestProto>> f) {
+      this.requestStreamer = new RequestStreamer(f.apply(replyStreamObserver));
+      this.ref = ref;
+    }
 
     CompletableFuture<RaftClientReply> onNext(RaftClientRequest request) {
-      final Map<Long, CompletableFuture<RaftClientReply>> map = replies.get();
-      if (map == null) {
+      final CompletableFuture<RaftClientReply> f = replies.putNew(request.getCallId());
+      if (f == null) {
         return JavaUtils.completeExceptionally(new AlreadyClosedException(getName() + " is closed."));
       }
-      final CompletableFuture<RaftClientReply> f = new CompletableFuture<>();
-      CollectionUtils.putNew(request.getCallId(), f, map,
-          () -> getName() + ":" + getClass().getSimpleName());
       try {
-        requestStreamObserver.onNext(ClientProtoUtils.toRaftClientRequestProto(request));
-        scheduler.onTimeout(requestTimeoutDuration, () -> timeoutCheck(request), LOG,
-            () -> "Timeout check failed for client request: " + request);
+        if (!requestStreamer.onNext(ClientProtoUtils.toRaftClientRequestProto(request))) {
+          throw new AlreadyClosedException(getName() + ": the stream is closed.");
+        }
       } catch(Throwable t) {
         handleReplyFuture(request.getCallId(), future -> future.completeExceptionally(t));
+        return f;
       }
+
+      LOG.info("schedule " + requestTimeoutDuration + " timeout check for " + request);
+      scheduler.onTimeout(requestTimeoutDuration, () -> timeoutCheck(request), LOG,
+          () -> "Timeout check failed for client request: " + request);
       return f;
     }
 
     private void timeoutCheck(RaftClientRequest request) {
       handleReplyFuture(request.getCallId(), f -> f.completeExceptionally(
-          new IOException("Request timeout " + requestTimeoutDuration + ": " + request)));
+          new TimeoutIOException("Request timeout " + requestTimeoutDuration + ": " + request)));
     }
 
     private void handleReplyFuture(long callId, Consumer<CompletableFuture<RaftClientReply>> handler) {
-      Optional.ofNullable(replies.get())
-          .map(replyMap -> replyMap.remove(callId))
-          .ifPresent(handler);
+      replies.remove(callId).ifPresent(handler);
     }
 
-    @Override
-    public void close() {
-      requestStreamObserver.onCompleted();
+    private void close() {
+      requestStreamer.onCompleted();
       completeReplyExceptionally(null, "close");
     }
 
     private void completeReplyExceptionally(Throwable t, String event) {
-      appendStreamObservers.compareAndSet(this, null);
-      final Map<Long, CompletableFuture<RaftClientReply>> map = replies.getAndSet(null);
+      ref.compareAndSet(this, null);
+
+      final Map<Long, CompletableFuture<RaftClientReply>> map = replies.getAndSetNull();
       if (map == null) {
         return;
       }
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
index a099a0a..db082a2 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -32,9 +32,13 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase {
@@ -104,65 +108,169 @@ public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase
     return new AppendRequestStreamObserver(responseObserver);
   }
 
+  @Override
+  public StreamObserver<RaftClientRequestProto> unordered(StreamObserver<RaftClientReplyProto> responseObserver) {
+    return new UnorderedRequestStreamObserver(responseObserver);
+  }
+
   private final AtomicInteger streamCount = new AtomicInteger();
 
-  private class AppendRequestStreamObserver implements
-      StreamObserver<RaftClientRequestProto> {
-    private final String name = getId() + "-" +  streamCount.getAndIncrement();
+  private abstract class RequestStreamObserver implements StreamObserver<RaftClientRequestProto> {
+    private final String name = getId() + "-" + getClass().getSimpleName() + streamCount.getAndIncrement();
     private final StreamObserver<RaftClientReplyProto> responseObserver;
-    private final SlidingWindow.Server<PendingAppend, RaftClientReply> slidingWindow
-        = new SlidingWindow.Server<>(name, COMPLETED);
-    private final AtomicBoolean isClosed;
+    private final AtomicBoolean isClosed = new AtomicBoolean();
 
-    AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> ro) {
-      LOG.debug("new AppendRequestStreamObserver {}", name);
-      this.responseObserver = ro;
-      this.isClosed = new AtomicBoolean(false);
+    RequestStreamObserver(StreamObserver<RaftClientReplyProto> responseObserver) {
+      LOG.debug("new {}", name);
+      this.responseObserver = responseObserver;
     }
 
-    void processClientRequestAsync(PendingAppend pending) {
+    String getName() {
+      return name;
+    }
+
+    synchronized void responseNext(RaftClientReplyProto reply) {
+      responseObserver.onNext(reply);
+    }
+
+    synchronized void responseCompleted() {
+      responseObserver.onCompleted();
+    }
+
+    synchronized void responseError(Throwable t) {
+      responseObserver.onError(t);
+    }
+
+
+    boolean setClose() {
+      return isClosed.compareAndSet(false, true);
+    }
+
+    CompletableFuture<Void> processClientRequest(RaftClientRequest request, Consumer<RaftClientReply> replyHandler) {
       try {
-        protocol.submitClientRequestAsync(pending.getRequest()
-        ).thenAcceptAsync(reply -> slidingWindow.receiveReply(
-            pending.getSeqNum(), reply, this::sendReply, this::processClientRequestAsync)
+        return protocol.submitClientRequestAsync(request
+        ).thenAcceptAsync(replyHandler
         ).exceptionally(exception -> {
           // TODO: the exception may be from either raft or state machine.
           // Currently we skip all the following responses when getting an
           // exception from the state machine.
-          responseError(exception, () -> "processClientRequestAsync for " + pending.getRequest());
+          responseError(exception, () -> "processClientRequest for " + request);
           return null;
         });
       } catch (IOException e) {
-        throw new CompletionException("Failed processClientRequestAsync for " + pending.getRequest(), e);
+        throw new CompletionException("Failed processClientRequest for " + request + " in " + name, e);
       }
     }
 
+    abstract void processClientRequest(RaftClientRequest request);
+
     @Override
     public void onNext(RaftClientRequestProto request) {
       try {
         final RaftClientRequest r = ClientProtoUtils.toRaftClientRequest(request);
-        final PendingAppend p = new PendingAppend(r);
-        slidingWindow.receivedRequest(p, this::processClientRequestAsync);
+        processClientRequest(r);
       } catch (Throwable e) {
-        responseError(e, () -> "onNext for " + ClientProtoUtils.toString(request));
+        responseError(e, () -> "onNext for " + ClientProtoUtils.toString(request) + " in " + name);
       }
     }
 
-    private void sendReply(PendingAppend ready) {
-        Preconditions.assertTrue(ready.hasReply());
-        if (ready == COMPLETED) {
-          close();
-        } else {
-          LOG.debug("{}: sendReply seq={}, {}", name, ready.getSeqNum(), ready.getReply());
-          responseObserver.onNext(
-              ClientProtoUtils.toRaftClientReplyProto(ready.getReply()));
+    @Override
+    public void onError(Throwable t) {
+      // for now we just log a msg
+      GrpcUtil.warn(LOG, () -> name + ": onError", t);
+    }
+
+
+    boolean responseError(Throwable t, Supplier<String> message) {
+      if (setClose()) {
+        t = JavaUtils.unwrapCompletionException(t);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(name + ": Failed " + message.get(), t);
         }
+        responseError(GrpcUtil.wrapException(t));
+        return true;
+      }
+      return false;
+    }
+  }
+
+  private class UnorderedRequestStreamObserver extends RequestStreamObserver {
+    /** Map: callId -> futures (seqNum is not set for unordered requests) */
+    private final Map<Long, CompletableFuture<Void>> futures = new HashMap<>();
+
+    UnorderedRequestStreamObserver(StreamObserver<RaftClientReplyProto> responseObserver) {
+      super(responseObserver);
+    }
+
+    @Override
+    void processClientRequest(RaftClientRequest request) {
+      final CompletableFuture<Void> f = processClientRequest(request, reply -> {
+        if (!reply.isSuccess()) {
+          LOG.info("Failed " + request + ", reply=" + reply);
+        }
+        final RaftClientReplyProto proto = ClientProtoUtils.toRaftClientReplyProto(reply);
+        responseNext(proto);
+      });
+      final long callId = request.getCallId();
+      put(callId, f);
+      f.thenAccept(dummy -> remove(callId));
+    }
+
+    private synchronized void put(long callId, CompletableFuture<Void> f) {
+      futures.put(callId, f);
+    }
+    private synchronized void remove(long callId) {
+      futures.remove(callId);
+    }
+
+    private synchronized CompletableFuture<Void> allOfFutures() {
+      return JavaUtils.allOf(futures.values());
+    }
+
+    @Override
+    public void onCompleted() {
+      allOfFutures().thenAccept(dummy -> {
+        if (setClose()) {
+          LOG.debug("{}: close", getName());
+          responseCompleted();
+        }
+      });
+    }
+  }
+
+  private class AppendRequestStreamObserver extends RequestStreamObserver {
+    private final SlidingWindow.Server<PendingAppend, RaftClientReply> slidingWindow
+        = new SlidingWindow.Server<>(getName(), COMPLETED);
+
+    AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> responseObserver) {
+      super(responseObserver);
+    }
+
+    void processClientRequest(PendingAppend pending) {
+      final long seq = pending.getSeqNum();
+      processClientRequest(pending.getRequest(),
+          reply -> slidingWindow.receiveReply(seq, reply, this::sendReply, this::processClientRequest));
+    }
+
+    @Override
+    void processClientRequest(RaftClientRequest r) {
+      slidingWindow.receivedRequest(new PendingAppend(r), this::processClientRequest);
+    }
+
+    private void sendReply(PendingAppend ready) {
+      Preconditions.assertTrue(ready.hasReply());
+      if (ready == COMPLETED) {
+        close();
+      } else {
+        LOG.debug("{}: sendReply seq={}, {}", getName(), ready.getSeqNum(), ready.getReply());
+        responseNext(ClientProtoUtils.toRaftClientReplyProto(ready.getReply()));
+      }
     }
 
     @Override
     public void onError(Throwable t) {
       // for now we just log a msg
-      GrpcUtil.warn(LOG, () -> name + ": onError", t);
+      GrpcUtil.warn(LOG, () -> getName() + ": onError", t);
       slidingWindow.close();
     }
 
@@ -174,22 +282,20 @@ public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase
     }
 
     private void close() {
-      if (isClosed.compareAndSet(false, true)) {
-        LOG.debug("{}: close", name);
-        responseObserver.onCompleted();
+      if (setClose()) {
+        LOG.debug("{}: close", getName());
+        responseCompleted();
         slidingWindow.close();
       }
     }
 
-    void responseError(Throwable t, Supplier<String> message) {
-      if (isClosed.compareAndSet(false, true)) {
-        t = JavaUtils.unwrapCompletionException(t);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(name + ": Failed " + message.get(), t);
-        }
-        responseObserver.onError(GrpcUtil.wrapException(t));
+    @Override
+    boolean responseError(Throwable t, Supplier<String> message) {
+      if (super.responseError(t, message)) {
         slidingWindow.close();
+        return true;
       }
+      return false;
     }
   }
 }
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index 25fcc85..a63c2af 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -24,6 +24,7 @@ import org.apache.ratis.grpc.GrpcConfigKeys;
 import org.apache.ratis.grpc.GrpcTlsConfig;
 import org.apache.ratis.grpc.GrpcUtil;
 import org.apache.ratis.protocol.*;
+import org.apache.ratis.thirdparty.io.grpc.StatusRuntimeException;
 import org.apache.ratis.thirdparty.io.grpc.stub.StreamObserver;
 import org.apache.ratis.proto.RaftProtos.GroupInfoRequestProto;
 import org.apache.ratis.proto.RaftProtos.GroupListRequestProto;
@@ -57,10 +58,6 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<GrpcClientProtocolClie
     this.tlsConfig = tlsConfig;
   }
 
-  public GrpcClientRpc(ClientId clientId, RaftProperties properties) {
-    this(clientId, properties, null);
-  }
-
   @Override
   public CompletableFuture<RaftClientReply> sendRequestAsync(
       RaftClientRequest request) {
@@ -75,6 +72,19 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<GrpcClientProtocolClie
   }
 
   @Override
+  public CompletableFuture<RaftClientReply> sendRequestAsyncUnordered(RaftClientRequest request) {
+    final RaftPeerId serverId = request.getServerId();
+    try {
+      final GrpcClientProtocolClient proxy = getProxies().getProxy(serverId);
+      // Reuse the same grpc stream for all async calls.
+      return proxy.getUnorderedAsyncStreamObservers().onNext(request);
+    } catch (Throwable t) {
+      LOG.error(clientId + ": XXX Failed " + request, t);
+      return JavaUtils.completeExceptionally(t);
+    }
+  }
+
+  @Override
   public RaftClientReply sendRequest(RaftClientRequest request)
       throws IOException {
     final RaftPeerId serverId = request.getServerId();
@@ -149,4 +159,20 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<GrpcClientProtocolClie
     }
     return proto;
   }
+
+  @Override
+  public boolean handleException(RaftPeerId serverId, Throwable e, boolean reconnect) {
+    final Throwable cause = e.getCause();
+    if (e instanceof IOException && cause instanceof StatusRuntimeException) {
+      if (!((StatusRuntimeException) cause).getStatus().isOk()) {
+        reconnect = true;
+      }
+    } else if (e instanceof IllegalArgumentException) {
+      if (e.getMessage().contains("null frame before EOS")) {
+        reconnect = true;
+      }
+    }
+    LOG.debug("{}->{}: reconnect? {}, e={}, cause={}", clientId, serverId, reconnect, e, cause);
+    return super.handleException(serverId, e, reconnect);
+  }
 }
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
index ed96468..b2d2e45 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcLogAppender.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -180,7 +180,7 @@ public class GrpcLogAppender extends LogAppender {
   private void timeoutAppendRequest(AppendEntriesRequestProto request) {
     AppendEntriesRequestProto pendingRequest = pendingRequests.remove(request.getServerRequest().getCallId());
     if (pendingRequest != null) {
-      LOG.warn( "{}: appendEntries Timeout, request={}", this, ProtoUtils.toString(pendingRequest.getServerRequest()));
+      LOG.warn( "{}: appendEntries Timeout, request={}", this, ServerProtoUtils.toString(pendingRequest));
     }
   }
 
@@ -258,8 +258,8 @@ public class GrpcLogAppender extends LogAppender {
 
     @Override
     public void onCompleted() {
-      LOG.info("{} stops appending log entries to follower {}", server.getId(),
-          follower);
+      LOG.info("{}: follower {} response Completed", server.getId(), follower);
+      resetClient(null);
     }
   }
 
diff --git a/ratis-proto/src/main/proto/Grpc.proto b/ratis-proto/src/main/proto/Grpc.proto
index 950a73e..f177609 100644
--- a/ratis-proto/src/main/proto/Grpc.proto
+++ b/ratis-proto/src/main/proto/Grpc.proto
@@ -31,6 +31,10 @@ service RaftClientProtocolService {
   // A client-to-server stream RPC to append data
   rpc append(stream ratis.common.RaftClientRequestProto)
       returns (stream ratis.common.RaftClientReplyProto) {}
+
+  // A client-to-server stream RPC for unordered async requested
+  rpc unordered(stream ratis.common.RaftClientRequestProto)
+      returns (stream ratis.common.RaftClientReplyProto) {}
 }
 
 service RaftServerProtocolService {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 5e9dbc6..8ece134 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -308,6 +308,8 @@ public class LeaderState {
           e = JavaUtils.unwrapCompletionException(e);
           if (e instanceof NotReplicatedException) {
             return new RaftClientReply(request, (NotReplicatedException)e, server.getCommitInfos());
+          } else if (e instanceof NotLeaderException) {
+            return new RaftClientReply(request, (NotLeaderException)e, server.getCommitInfos());
           } else {
             throw new CompletionException(e);
           }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
index 512fd18..29dee9b 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/ServerProtoUtils.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -61,6 +61,10 @@ public interface ServerProtoUtils {
     return TermIndex.toString(entry.getTerm(), entry.getIndex());
   }
 
+  static String toTermIndexString(TermIndexProto proto) {
+    return TermIndex.toString(proto.getTerm(), proto.getIndex());
+  }
+
   static String toLogEntryString(LogEntryProto entry) {
     if (entry == null) {
       return null;
@@ -86,7 +90,17 @@ public interface ServerProtoUtils {
         : "" + Arrays.stream(entries).map(ServerProtoUtils::toLogEntryString)
             .collect(Collectors.toList());
   }
-
+  static String toShortString(List<LogEntryProto> entries) {
+    return entries.size() == 0? "<empty>"
+        : "size=" + entries.size() + ", first=" + toLogEntryString(entries.get(0));
+  }
+  static String toString(AppendEntriesRequestProto proto) {
+    return ProtoUtils.toString(proto.getServerRequest()) + "-t" + proto.getLeaderTerm()
+        + ", previous=" + toTermIndexString(proto.getPreviousLog())
+        + ", leaderCommit=" + proto.getLeaderCommit()
+        + ", initializing? " + proto.getInitializing()
+        + ", entries: " + toShortString(proto.getEntriesList());
+  }
   static String toString(AppendEntriesReplyProto reply) {
     return toString(reply.getServerReply()) + "," + reply.getResult()
         + ",nextIndex:" + reply.getNextIndex() + ",term:" + reply.getTerm()
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
index 155122e..b1971e5 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/RaftLog.java
@@ -88,6 +88,10 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
     state.assertOpen();
   }
 
+  public boolean isOpened() {
+    return state.isOpened();
+  }
+
   /**
    * Update the last committed index.
    * @param majorityIndex the index that has achieved majority.
@@ -352,7 +356,7 @@ public abstract class RaftLog implements RaftLogSequentialOps, Closeable {
 
   @Override
   public String toString() {
-    return getName() + ":" + state;
+    return getName() + ":" + state + ":c" + getLastCommittedIndex();
   }
 
   public static class Metadata {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
index 588b819..5d151c6 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/storage/SegmentedRaftLog.java
@@ -413,4 +413,16 @@ public class SegmentedRaftLog extends RaftLog {
   RaftLogCache getRaftLogCache() {
     return cache;
   }
+
+  @Override
+  public String toString() {
+    try(AutoCloseableLock readLock = readLock()) {
+      if (isOpened()) {
+        return super.toString() + ",f" + getLatestFlushedIndex()
+            + ",i" + Optional.ofNullable(getLastEntryTermIndex()).map(TermIndex::getIndex).orElse(0L);
+      } else {
+        return super.toString();
+      }
+    }
+  }
 }
diff --git a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
index 8d143eb..63aad09 100644
--- a/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/WatchRequestTests.java
@@ -72,23 +72,13 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
   static class TestParameters {
     final int numMessages;
     final RaftClient writeClient;
-    final RaftClient watchMajorityClient;
-    final RaftClient watchAllClient;
-    final RaftClient watchMajorityCommittedClient;
-    final RaftClient watchAllCommittedClient;
     final MiniRaftCluster cluster;
     final Logger log;
 
     TestParameters(int numMessages, RaftClient writeClient,
-        RaftClient watchMajorityClient, RaftClient watchAllClient,
-        RaftClient watchMajorityCommittedClient, RaftClient watchAllCommittedClient,
         MiniRaftCluster cluster, Logger log) {
       this.numMessages = numMessages;
       this.writeClient = writeClient;
-      this.watchMajorityClient = watchMajorityClient;
-      this.watchAllClient = watchAllClient;
-      this.watchMajorityCommittedClient = watchMajorityCommittedClient;
-      this.watchAllCommittedClient = watchAllCommittedClient;
       this.cluster = cluster;
       this.log = log;
     }
@@ -106,10 +96,10 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
           final long logIndex = reply.getLogIndex();
           log.info("SEND_WATCH: message={}, logIndex={}", message, logIndex);
           watchFuture.complete(new WatchReplies(logIndex,
-              watchMajorityClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY),
-              watchAllClient.sendWatchAsync(logIndex, ReplicationLevel.ALL),
-              watchMajorityCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY_COMMITTED),
-              watchAllCommittedClient.sendWatchAsync(logIndex, ReplicationLevel.ALL_COMMITTED),
+              writeClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY),
+              writeClient.sendWatchAsync(logIndex, ReplicationLevel.ALL),
+              writeClient.sendWatchAsync(logIndex, ReplicationLevel.MAJORITY_COMMITTED),
+              writeClient.sendWatchAsync(logIndex, ReplicationLevel.ALL_COMMITTED),
               log));
         });
       }
@@ -121,18 +111,13 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
     }
   }
 
-  static void runTest(CheckedConsumer<TestParameters, Exception> testCase, MiniRaftCluster cluster, Logger LOG) throws Exception {
-    try(final RaftClient writeClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId());
-        final RaftClient watchMajorityClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId());
-        final RaftClient watchAllClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId());
-        final RaftClient watchMajorityCommittedClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId());
-        final RaftClient watchAllCommittedClient = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId())) {
+  static void runTest(CheckedConsumer<TestParameters, Exception> testCase, MiniRaftCluster cluster, Logger LOG)
+      throws Exception {
+    try(final RaftClient client = cluster.createClient(RaftTestUtil.waitForLeader(cluster).getId())) {
       final int[] numMessages = {1, 10, 100};
       for(int i = 0; i < 5; i++) {
         final int n = numMessages[ThreadLocalRandom.current().nextInt(numMessages.length)];
-        final TestParameters p = new TestParameters(
-            n, writeClient, watchMajorityClient, watchAllClient,
-            watchMajorityCommittedClient, watchAllCommittedClient, cluster, LOG);
+        final TestParameters p = new TestParameters(n, client, cluster, LOG);
         LOG.info("{}) {}, {}", i, p, cluster.printServers());
         testCase.accept(p);
       }
@@ -159,26 +144,30 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
     }
 
     RaftClientReply getMajority() throws Exception {
-      final RaftClientReply reply = majority.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
-      log.info("watchMajorityReply({}) = {}", logIndex, reply);
-      return reply;
+      return get(majority, "majority");
     }
 
     RaftClientReply getMajorityCommitted() throws Exception {
-      final RaftClientReply reply = majorityCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
-      log.info("watchMajorityCommittedReply({}) = {}", logIndex, reply);
-      return reply;
+      return get(majorityCommitted, "majorityCommitted");
     }
 
     RaftClientReply getAll() throws Exception {
-      final RaftClientReply reply = all.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
-      log.info("watchAllReply({}) = {}", logIndex, reply);
-      return reply;
+      return get(all, "all");
     }
 
     RaftClientReply getAllCommitted() throws Exception {
-      final RaftClientReply reply = allCommitted.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
-      log.info("watchAllCommittedReply({}) = {}", logIndex, reply);
+      return get(allCommitted, "allCommitted");
+    }
+
+    RaftClientReply get(CompletableFuture<RaftClientReply> f, String name) throws Exception {
+      final RaftClientReply reply;
+      try {
+        reply = f.get(GET_TIMEOUT_SECOND, TimeUnit.SECONDS);
+      } catch (Exception e) {
+        log.error("Failed to get {}({})", name, logIndex);
+        throw e;
+      }
+      log.info("{}-Watch({}) returns {}", name, logIndex, reply);
       return reply;
     }
   }
@@ -324,11 +313,10 @@ public abstract class WatchRequestTests<CLUSTER extends MiniRaftCluster>
     Assert.assertEquals(numMessages, replies.size());
     Assert.assertEquals(numMessages, watches.size());
 
-    // since only one follower is blocked, requests can be committed MAJORITY but neither ALL nor ALL_COMMITTED.
+    // since only one follower is blocked commit, requests can be committed MAJORITY and ALL but not ALL_COMMITTED.
     checkMajority(replies, watches, LOG);
 
     TimeUnit.SECONDS.sleep(1);
-    assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> w.all));
     assertNotDone(watches.stream().map(CompletableFuture::join).map(w -> w.allCommitted));
 
     // Now change leader
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestWatchRequestWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestWatchRequestWithGrpc.java
index 7b9061b..77e2eda 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestWatchRequestWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestWatchRequestWithGrpc.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,9 +17,21 @@
  */
 package org.apache.ratis.grpc;
 
+import org.apache.log4j.Level;
 import org.apache.ratis.WatchRequestTests;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.impl.UnorderedAsync;
+import org.apache.ratis.grpc.client.GrpcClientProtocolService;
+import org.apache.ratis.grpc.client.GrpcClientRpc;
+import org.apache.ratis.util.LogUtils;
 
 public class TestWatchRequestWithGrpc
     extends WatchRequestTests<MiniRaftClusterWithGrpc>
     implements MiniRaftClusterWithGrpc.FactoryGet {
+  {
+    LogUtils.setLogLevel(GrpcClientProtocolService.LOG, Level.ALL);
+    LogUtils.setLogLevel(GrpcClientRpc.LOG, Level.ALL);
+    LogUtils.setLogLevel(UnorderedAsync.LOG, Level.ALL);
+    LogUtils.setLogLevel(RaftClient.LOG, Level.ALL);
+  }
 }
\ No newline at end of file