You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2017/12/28 06:44:53 UTC

incubator-ratis git commit: RATIS-140. Raft client should reuse the gRPC stream for all async calls.

Repository: incubator-ratis
Updated Branches:
  refs/heads/master f7f97e050 -> 7872f3296


RATIS-140. Raft client should reuse the gRPC stream for all async calls.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ratis/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ratis/commit/7872f329
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ratis/tree/7872f329
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ratis/diff/7872f329

Branch: refs/heads/master
Commit: 7872f32962b5fdb5229be6bfc36eb73ec01ff79f
Parents: f7f97e0
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Thu Dec 28 14:44:04 2017 +0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Thu Dec 28 14:44:04 2017 +0800

----------------------------------------------------------------------
 .../ratis/client/RaftClientConfigKeys.java      |   2 +-
 .../ratis/client/impl/ClientProtoUtils.java     |   6 +-
 .../ratis/client/impl/RaftClientImpl.java       |  96 +++--
 .../org/apache/ratis/util/SlidingWindow.java    | 403 +++++++++++++++++++
 .../apache/ratis/grpc/client/GrpcClientRpc.java |   9 +-
 .../grpc/client/RaftClientProtocolClient.java   |  96 ++++-
 .../grpc/client/RaftClientProtocolService.java  | 137 +++----
 .../java/org/apache/ratis/RaftAsyncTests.java   |   3 +-
 .../java/org/apache/ratis/RaftTestUtil.java     |   5 -
 9 files changed, 648 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7872f329/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
index 03f12cb..bb76910 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/RaftClientConfigKeys.java
@@ -59,7 +59,7 @@ public interface RaftClientConfigKeys {
 
     static int schedulerThreads(RaftProperties properties) {
       return getInt(properties::getInt, SCHEDULER_THREADS_KEY,
-          SCHEDULER_THREADS_DEFAULT);
+          SCHEDULER_THREADS_DEFAULT, requireMin(1));
     }
 
     static void setSchedulerThreads(RaftProperties properties, int schedulerThreads) {

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7872f329/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
index 97439ac..a7aaf54 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/ClientProtoUtils.java
@@ -209,10 +209,14 @@ public interface ClientProtoUtils {
   }
 
   static Message toMessage(final ClientMessageEntryProto p) {
+    return toMessage(p.getContent());
+  }
+
+  static Message toMessage(final ByteString bytes) {
     return new Message() {
       @Override
       public ByteString getContent() {
-        return p.getContent();
+        return bytes;
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7872f329/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
----------------------------------------------------------------------
diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
index ba1a107..6ee415d 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientImpl.java
@@ -21,18 +21,18 @@ import org.apache.ratis.client.RaftClient;
 import org.apache.ratis.client.RaftClientConfigKeys;
 import org.apache.ratis.client.RaftClientRpc;
 import org.apache.ratis.conf.RaftProperties;
-import org.apache.ratis.util.IOUtils;
-import org.apache.ratis.util.CollectionUtils;
-import org.apache.ratis.util.Preconditions;
-import org.apache.ratis.util.TimeDuration;
 import org.apache.ratis.protocol.*;
+import org.apache.ratis.util.*;
 
 import java.io.IOException;
 import java.io.InterruptedIOException;
-import java.util.*;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Objects;
 import java.util.concurrent.*;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Function;
+import java.util.function.LongFunction;
 import java.util.function.Supplier;
 import java.util.stream.Stream;
 
@@ -44,6 +44,45 @@ final class RaftClientImpl implements RaftClient {
     return callIdCounter.getAndIncrement() & Long.MAX_VALUE;
   }
 
+  static class PendingAsyncRequest implements SlidingWindow.Request<RaftClientReply> {
+    private final long seqNum;
+    private final LongFunction<RaftClientRequest> requestConstructor;
+    private final CompletableFuture<RaftClientReply> replyFuture = new CompletableFuture<>();
+
+    PendingAsyncRequest(long seqNum, LongFunction<RaftClientRequest> requestConstructor) {
+      this.seqNum = seqNum;
+      this.requestConstructor = requestConstructor;
+    }
+
+    RaftClientRequest newRequest() {
+      return requestConstructor.apply(seqNum);
+    }
+
+    @Override
+    public long getSeqNum() {
+      return seqNum;
+    }
+
+    @Override
+    public boolean hasReply() {
+      return replyFuture.isDone();
+    }
+
+    @Override
+    public void setReply(RaftClientReply reply) {
+      replyFuture.complete(reply);
+    }
+
+    CompletableFuture<RaftClientReply> getReplyFuture() {
+      return replyFuture;
+    }
+
+    @Override
+    public String toString() {
+      return "[seq=" + getSeqNum() + "]";
+    }
+  }
+
   private final ClientId clientId;
   private final RaftClientRpc clientRpc;
   private final Collection<RaftPeer> peers;
@@ -52,7 +91,7 @@ final class RaftClientImpl implements RaftClient {
 
   private volatile RaftPeerId leaderId;
 
-  private final AtomicLong asyncSeqNum = new AtomicLong();
+  private final SlidingWindow.Client<PendingAsyncRequest, RaftClientReply> slidingWindow;
   private final ScheduledExecutorService scheduler;
   private final Semaphore asyncRequestSemaphore;
 
@@ -68,13 +107,10 @@ final class RaftClientImpl implements RaftClient {
 
     asyncRequestSemaphore = new Semaphore(RaftClientConfigKeys.Async.maxOutstandingRequests(properties));
     scheduler = Executors.newScheduledThreadPool(RaftClientConfigKeys.Async.schedulerThreads(properties));
+    slidingWindow = new SlidingWindow.Client<>(getId());
     clientRpc.addServers(peers);
   }
 
-  private long nextSeqNum() {
-    return asyncSeqNum.getAndIncrement() & Long.MAX_VALUE;
-  }
-
   @Override
   public ClientId getId() {
     return clientId;
@@ -100,9 +136,10 @@ final class RaftClientImpl implements RaftClient {
           "Interrupted when sending " + message, e));
     }
     final long callId = nextCallId();
-    final long seqNum = nextSeqNum();
-    return sendRequestWithRetryAsync(
-        () -> new RaftClientRequest(clientId, leaderId, groupId, callId, seqNum, message, readOnly)
+    final LongFunction<PendingAsyncRequest> constructor = seqNum -> new PendingAsyncRequest(seqNum,
+        seq -> new RaftClientRequest(clientId, leaderId, groupId, callId, seq, message, readOnly));
+    return slidingWindow.submitNewRequest(constructor, this::sendRequestWithRetryAsync
+    ).getReplyFuture(
     ).thenApply(reply -> handleStateMachineException(reply, CompletionException::new)
     ).whenComplete((r, e) -> asyncRequestSemaphore.release());
   }
@@ -164,13 +201,14 @@ final class RaftClientImpl implements RaftClient {
   }
 
   private CompletableFuture<RaftClientReply> sendRequestWithRetryAsync(
-      Supplier<RaftClientRequest> supplier) {
-    return sendRequestAsync(supplier.get()).thenComposeAsync(reply -> {
-      final CompletableFuture<RaftClientReply> f = new CompletableFuture<>();
+      PendingAsyncRequest pending) {
+    final RaftClientRequest request = pending.newRequest();
+    final CompletableFuture<RaftClientReply> f = pending.getReplyFuture();
+    return sendRequestAsync(request).thenCompose(reply -> {
       if (reply == null) {
         final TimeUnit unit = retryInterval.getUnit();
-        scheduler.schedule(() -> sendRequestWithRetryAsync(supplier)
-            .thenApply(r -> f.complete(r)), retryInterval.toLong(unit), unit);
+        scheduler.schedule(() -> slidingWindow.retry(pending, this::sendRequestWithRetryAsync),
+            retryInterval.toLong(unit), unit);
       } else {
         f.complete(reply);
       }
@@ -204,14 +242,23 @@ final class RaftClientImpl implements RaftClient {
     LOG.debug("{}: send* {}", clientId, request);
     return clientRpc.sendRequestAsync(request).thenApply(reply -> {
       LOG.debug("{}: receive* {}", clientId, reply);
-      return handleNotLeaderException(request, reply);
+      reply = handleNotLeaderException(request, reply);
+      if (reply != null) {
+        slidingWindow.receiveReply(
+            request.getSeqNum(), reply, this::sendRequestWithRetryAsync);
+      }
+      return reply;
     }).exceptionally(e -> {
       LOG.debug("{}: Failed {} with {}", clientId, request, e);
-      final Throwable cause = e.getCause();
-      if (cause instanceof GroupMismatchException) {
-        return new RaftClientReply(request, (RaftException) cause);
-      } else if (cause instanceof IOException) {
-        handleIOException(request, (IOException) cause, null);
+      if (e instanceof CompletionException) {
+        e = e.getCause();
+      }
+      if (e instanceof GroupMismatchException) {
+        throw new CompletionException(e);
+      } else if (e instanceof IOException) {
+        handleIOException(request, (IOException)e, null);
+      } else {
+        throw new CompletionException(e);
       }
       return null;
     });
@@ -281,6 +328,7 @@ final class RaftClientImpl implements RaftClient {
       LOG.trace("Stack trace", new Throwable("TRACE"));
     }
 
+    slidingWindow.resetFirstSeqNum();
     if (ioe instanceof LeaderNotReadyException) {
       return;
     }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7872f329/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
----------------------------------------------------------------------
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
new file mode 100644
index 0000000..6ded6f7
--- /dev/null
+++ b/ratis-common/src/main/java/org/apache/ratis/util/SlidingWindow.java
@@ -0,0 +1,403 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.util.Iterator;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import java.util.function.LongFunction;
+
+/**
+ * A single-client-to-multiple-server sliding window.
+ * The client only talks to a server at any time.
+ * When the current server fails, the client fails over to another server.
+ */
+public interface SlidingWindow {
+  Logger LOG = LoggerFactory.getLogger(SlidingWindow.class);
+
+  interface Request<REPLY> {
+    long getSeqNum();
+
+    void setReply(REPLY reply);
+
+    boolean hasReply();
+  }
+
+  /** A seqNum-to-request map, sorted by seqNum. */
+  class RequestMap<REQUEST extends Request<REPLY>, REPLY> implements Iterable<REQUEST> {
+    private final Object name;
+    /** Request map: seqNum -> request */
+    private final SortedMap<Long, REQUEST> requests = new TreeMap<>();
+
+    RequestMap(Object name) {
+      this.name = name;
+      if (LOG.isDebugEnabled()) {
+        JavaUtils.runRepeatedly(() -> log(), 5, 10, TimeUnit.SECONDS);
+      }
+    }
+
+    Object getName() {
+      return name;
+    }
+
+    boolean isEmpty() {
+      return requests.isEmpty();
+    }
+
+    /**
+     * If the request with the given seqNum is non-replied, return it.
+     * Otherwise, return null.
+     *
+     * A request is non-replied if
+     * (1) it is in the request map, and
+     * (2) it does not has reply.
+     */
+    REQUEST getNonRepliedRequest(long seqNum, String op) {
+      final REQUEST request = requests.get(seqNum);
+      if (request == null) {
+        LOG.debug("{}: {}, seq={} not found in {}", getName(), op, seqNum, this);
+        return null;
+      }
+      if (request.hasReply()) {
+        LOG.debug("{}: {}, seq={} already has replied in {}", getName(), op, seqNum, this);
+        return null;
+      }
+      return request;
+    }
+
+    long firstSeqNum() {
+      return requests.firstKey();
+    }
+
+    long lastSeqNum() {
+      return requests.lastKey();
+    }
+
+    /** Iterate the requests in the order of seqNum. */
+    @Override
+    public Iterator<REQUEST> iterator() {
+      return requests.values().iterator();
+    }
+
+    void putNewRequest(REQUEST request) {
+      final long seqNum = request.getSeqNum();
+      CollectionUtils.putNew(seqNum, request, requests, () -> getName() + ":requests");
+    }
+
+    /**
+     * Set reply for the request with the given seqNum if it is non-replied.
+     * Otherwise, do nothing.
+     *
+     * @return true iff this method does set the reply for the request.
+     */
+    boolean setReply(long seqNum, REPLY reply, String op) {
+      final REQUEST request = getNonRepliedRequest(seqNum, op);
+      if (request == null) {
+        LOG.debug("{}: DUPLICATED reply {} for seq={} in {}", getName(), reply, seqNum, this);
+        return false;
+      }
+
+      LOG.debug("{}: set reply {} for seq={} in {}", getName(), reply, seqNum, this);
+      request.setReply(reply);
+      return true;
+    }
+
+    synchronized void clear() {
+      LOG.debug("close {}", this);
+      requests.clear();
+    }
+
+    synchronized void log() {
+      LOG.debug(this.toString());
+      for(REQUEST r : requests.values()) {
+        LOG.debug("  {}: hasReply? {}", r.getSeqNum(), r.hasReply());
+      }
+    }
+
+    @Override
+    public String toString() {
+      return getName() + ": requests" + asString(requests);
+    }
+
+    private static String asString(SortedMap<Long, ?> map) {
+      return map.isEmpty()? "[]": "[" + map.firstKey() + ".." + map.lastKey() + "]";
+    }
+  }
+
+  /**
+   * Client side sliding window.
+   * A client may
+   * (1) allocate seqNum for new requests;
+   * (2) send requests/retries to the server;
+   * (3) receive replies/exceptions from the server;
+   * (4) return the replies/exceptions to client.
+   *
+   * Depend on the replies/exceptions, the client may retry the requests
+   * to the same or a different server.
+   */
+  class Client<REQUEST extends Request<REPLY>, REPLY> {
+    /** The requests in the sliding window. */
+    private final RequestMap<REQUEST, REPLY> requests;
+    /** Delayed requests. */
+    private final SortedMap<Long, Long> delayedRequests = new TreeMap<>();
+
+    /** The seqNum for the next new request. */
+    private long nextSeqNum = 0;
+    /** The seqNum of the first request. */
+    private long firstSeqNum = -1;
+    /** Is the first request replied? */
+    private boolean firstReplied;
+
+    public Client(Object name) {
+      this.requests = new RequestMap<REQUEST, REPLY>(name) {
+        @Override
+        synchronized void log() {
+          LOG.debug(toString());
+          for (REQUEST r : requests) {
+            LOG.debug("  {}: {}", r.getSeqNum(), r.hasReply() ? "replied"
+                : delayedRequests.containsKey(r.getSeqNum()) ? "delayed" : "submitted");
+          }
+        }
+      };
+    }
+
+    @Override
+    public synchronized String toString() {
+      return requests + ", nextSeqNum=" + nextSeqNum
+          + ", firstSubmitted=" + firstSeqNum + ", replied? " + firstReplied
+          + ", delayed=" + delayedRequests.keySet();
+    }
+
+    /**
+     * A new request arrives, create it with {@link #nextSeqNum}
+     * and then try sending it to the server.
+     *
+     * @param requestConstructor use seqNum to create a new request.
+     * @return the new request.
+     */
+    public synchronized REQUEST submitNewRequest(
+        LongFunction<REQUEST> requestConstructor, Consumer<REQUEST> sendMethod) {
+      if (!requests.isEmpty()) {
+        Preconditions.assertTrue(nextSeqNum == requests.lastSeqNum() + 1,
+            () -> "nextSeqNum=" + nextSeqNum + " but " + this);
+      }
+
+      final long seqNum = nextSeqNum++;
+      final REQUEST r = requestConstructor.apply(seqNum);
+      requests.putNewRequest(r);
+
+      final boolean submitted = sendOrDelayRequest(r, sendMethod);
+      LOG.debug("{}: submitting a new request {} in {}? {}",
+          requests.getName(), r, this, submitted? "submitted": "delayed");
+      return r;
+    }
+
+    private boolean sendOrDelayRequest(REQUEST request, Consumer<REQUEST> sendMethod) {
+      final long seqNum = request.getSeqNum();
+      Preconditions.assertTrue(requests.getNonRepliedRequest(seqNum, "sendOrDelayRequest") == request);
+
+      if (firstReplied) {
+        // already received the reply for the first request, submit any request.
+        sendMethod.accept(request);
+        return true;
+      }
+
+      if (firstSeqNum == -1 && seqNum == requests.firstSeqNum()) {
+        // first request is not yet submitted and this is the first request, submit it.
+        LOG.debug("{}: detect firstSubmitted {} in {}", requests.getName(), request, this);
+        firstSeqNum = seqNum;
+        sendMethod.accept(request);
+        return true;
+      }
+
+      // delay other requests
+      CollectionUtils.putNew(seqNum, seqNum, delayedRequests, () -> requests.getName() + ":delayedRequests");
+      return false;
+    }
+
+    /** Receive a retry from an existing request (may out-of-order). */
+    public synchronized void retry(REQUEST request, Consumer<REQUEST> sendMethod) {
+      if (requests.getNonRepliedRequest(request.getSeqNum(), "retry") != request) {
+        // out-dated or invalid retry
+        LOG.debug("{}: Ignore retry {} in {}", requests.getName(), request, this);
+        return;
+      }
+      final boolean submitted = sendOrDelayRequest(request, sendMethod);
+      LOG.debug("{}: submitting a retry {} in {}? {}",
+          requests.getName(), request, this, submitted? "submitted": "delayed");
+    }
+
+    private void removeRepliedFromHead() {
+      for (final Iterator<REQUEST> i = requests.iterator(); i.hasNext(); i.remove()) {
+        final REQUEST r = i.next();
+        if (!r.hasReply()) {
+          return;
+        }
+      }
+    }
+
+    /**
+     * Receive a reply with the given seqNum (may out-of-order).
+     * It may trigger the client to send delayed requests.
+     */
+    public synchronized void receiveReply(
+        long seqNum, REPLY reply, Consumer<REQUEST> sendMethod) {
+      if (!requests.setReply(seqNum, reply, "receiveReply")) {
+        return; // request already replied
+      }
+      if (seqNum == firstSeqNum) {
+        firstReplied = true; // received the reply for the first submitted request
+      }
+      removeRepliedFromHead();
+      trySendDelayed(sendMethod);
+    }
+
+    private void trySendDelayed(Consumer<REQUEST> sendMethod) {
+      if (firstReplied) {
+        // after first received, all other requests can be submitted (out-of-order)
+        if (!delayedRequests.isEmpty()) {
+          for (Long seqNum : delayedRequests.keySet()) {
+            sendMethod.accept(requests.getNonRepliedRequest(seqNum, "trySendDelayed"));
+          }
+          delayedRequests.clear();
+        }
+      } else {
+        // Otherwise, submit the first only if it is a delayed request
+        final Iterator<REQUEST> i = requests.iterator();
+        if (i.hasNext()) {
+          final REQUEST r = i.next();
+          final Long delayed = delayedRequests.remove(r.getSeqNum());
+          if (delayed != null) {
+            sendOrDelayRequest(r, sendMethod);
+          }
+        }
+      }
+    }
+
+    /** Reset the {@link #firstSeqNum} The stream has an error. */
+    public synchronized void resetFirstSeqNum() {
+      firstSeqNum = -1;
+      firstReplied = false;
+      LOG.debug("After resetFirstSeqNum: {}", this);
+    }
+  }
+
+  /**
+   * Server side sliding window.
+   * A server may
+   * (1) receive requests from client;
+   * (2) submit the requests for processing;
+   * (3) receive replies from the processing unit;
+   * (4) send replies to the client.
+   */
+  class Server<REQUEST extends Request<REPLY>, REPLY> implements Closeable {
+    /** The requests in the sliding window. */
+    private final RequestMap<REQUEST, REPLY> requests;
+    /** The end of requests */
+    private final REQUEST end;
+
+    private long nextToProcess = -1;
+
+    public Server(Object name, REQUEST end) {
+      this.requests = new RequestMap<>(name);
+      this.end = end;
+      Preconditions.assertTrue(end.getSeqNum() == Long.MAX_VALUE);
+    }
+
+    @Override
+    public synchronized String toString() {
+      return requests + ", nextToProcess=" + nextToProcess;
+    }
+
+    /** A request (or a retry) arrives (may be out-of-order except for the first request). */
+    public synchronized void receivedRequest(REQUEST request, Consumer<REQUEST> processingMethod) {
+      final long seqNum = request.getSeqNum();
+      if (nextToProcess == -1) {
+        nextToProcess = seqNum;
+        LOG.debug("{}: got seq={} (first request), set nextToProcess in {}", requests.getName(), seqNum, this);
+      } else {
+        LOG.debug("{}: got seq={} in {}", requests.getName(), seqNum, this);
+      }
+      requests.putNewRequest(request);
+      processRequestsFromHead(processingMethod);
+    }
+
+    private void processRequestsFromHead(Consumer<REQUEST> processingMethod) {
+      for(REQUEST r : requests) {
+        if (r.getSeqNum() != nextToProcess) {
+          return;
+        }
+        processingMethod.accept(r);
+        nextToProcess++;
+      }
+    }
+
+    /**
+     * Receives a reply for the given seqNum (may out-of-order) from the processor.
+     * It may trigger sending replies to client or processing more requests.
+     */
+    public synchronized void receiveReply(
+        long seqNum, REPLY reply, Consumer<REQUEST> replyMethod, Consumer<REQUEST> processingMethod) {
+      if (!requests.setReply(seqNum, reply, "receiveReply")) {
+        return; // request already replied
+      }
+      sendRepliesFromHead(replyMethod);
+      processRequestsFromHead(processingMethod);
+    }
+
+    private void sendRepliesFromHead(
+        Consumer<REQUEST> replyMethod
+    ) {
+      for(final Iterator<REQUEST> i = requests.iterator(); i.hasNext(); i.remove()) {
+        final REQUEST r = i.next();
+        if (!r.hasReply()) {
+          return;
+        }
+        replyMethod.accept(r);
+        if (r == end) {
+          return;
+        }
+      }
+    }
+
+    /**
+     * Signal the end of requests.
+     * @return true if no more outstanding requests.
+     */
+    public synchronized boolean endOfRequests() {
+      if (requests.isEmpty()) {
+        return true;
+      } else {
+        LOG.debug("{}: put end-of-request in {}", requests.getName(), this);
+        requests.putNewRequest(end);
+        return false;
+      }
+    }
+
+    @Override
+    public void close() {
+      requests.clear();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7872f329/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index c5c188e..ea1f204 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -58,7 +58,9 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie
       RaftClientRequest request) {
     final RaftPeerId serverId = request.getServerId();
     try {
-      return sendRequestAsync(request, getProxies().getProxy(serverId));
+      final RaftClientProtocolClient proxy = getProxies().getProxy(serverId);
+      // Reuse the same grpc stream for all async calls.
+      return proxy.getAppendStreamObservers().onNext(request);
     } catch (IOException e) {
       return JavaUtils.completeExceptionally(e);
     }
@@ -83,7 +85,7 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie
       return ClientProtoUtils.toServerInformationReply(
           proxy.serverInformation(proto));
     } else {
-      final CompletableFuture<RaftClientReply> f = sendRequestAsync(request, proxy);
+      final CompletableFuture<RaftClientReply> f = sendRequest(request, proxy);
       // TODO: timeout support
       try {
         return f.get();
@@ -96,12 +98,13 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<RaftClientProtocolClie
     }
   }
 
-  private CompletableFuture<RaftClientReply> sendRequestAsync(
+  private CompletableFuture<RaftClientReply> sendRequest(
       RaftClientRequest request, RaftClientProtocolClient proxy) throws IOException {
     final RaftClientRequestProto requestProto =
         toRaftClientRequestProto(request);
     final CompletableFuture<RaftClientReplyProto> replyFuture =
         new CompletableFuture<>();
+    // create a new grpc stream for each non-async call.
     final StreamObserver<RaftClientRequestProto> requestObserver =
         proxy.append(new StreamObserver<RaftClientReplyProto>() {
           @Override

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7872f329/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
index ace90f2..0b05475 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolClient.java
@@ -17,9 +17,9 @@
  */
 package org.apache.ratis.grpc.client;
 
+import org.apache.ratis.client.impl.ClientProtoUtils;
 import org.apache.ratis.grpc.RaftGrpcUtil;
-import org.apache.ratis.protocol.ClientId;
-import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.*;
 import org.apache.ratis.shaded.io.grpc.ManagedChannel;
 import org.apache.ratis.shaded.io.grpc.ManagedChannelBuilder;
 import org.apache.ratis.shaded.io.grpc.StatusRuntimeException;
@@ -31,12 +31,17 @@ import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc;
 import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceBlockingStub;
 import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceStub;
 import org.apache.ratis.util.CheckedSupplier;
+import org.apache.ratis.util.CollectionUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
 
 public class RaftClientProtocolClient implements Closeable {
@@ -49,6 +54,8 @@ public class RaftClientProtocolClient implements Closeable {
   private final RaftClientProtocolServiceStub asyncStub;
   private final AdminProtocolServiceBlockingStub adminBlockingStub;
 
+  private final AtomicReference<AsyncStreamObservers> appendStreamObservers = new AtomicReference<>();
+
   public RaftClientProtocolClient(ClientId id, RaftPeer target) {
     this.name = JavaUtils.memoize(() -> id + "->" + target.getId());
     this.target = target;
@@ -65,6 +72,10 @@ public class RaftClientProtocolClient implements Closeable {
 
   @Override
   public void close() {
+    final AsyncStreamObservers observers = appendStreamObservers.get();
+    if (observers != null) {
+      observers.close();
+    }
     channel.shutdownNow();
   }
 
@@ -98,7 +109,88 @@ public class RaftClientProtocolClient implements Closeable {
     return asyncStub.append(responseHandler);
   }
 
+  AsyncStreamObservers getAppendStreamObservers() {
+    return appendStreamObservers.updateAndGet(a -> a != null? a : new AsyncStreamObservers());
+  }
+
   public RaftPeer getTarget() {
     return target;
   }
+
+  class AsyncStreamObservers implements Closeable {
+    /** Request map: callId -> future */
+    private final AtomicReference<Map<Long, CompletableFuture<RaftClientReply>>> replies = new AtomicReference<>(new ConcurrentHashMap<>());
+    private final StreamObserver<RaftClientReplyProto> replyStreamObserver = new StreamObserver<RaftClientReplyProto>() {
+      @Override
+      public void onNext(RaftClientReplyProto proto) {
+        final Map<Long, CompletableFuture<RaftClientReply>> map = replies.get();
+        if (map == null) {
+          LOG.warn("replyStreamObserver onNext map == null");
+          return;
+        }
+        final long callId = proto.getRpcReply().getCallId();
+        try {
+          final RaftClientReply reply = ClientProtoUtils.toRaftClientReply(proto);
+          final NotLeaderException nle = reply.getNotLeaderException();
+          if (nle != null) {
+            completeReplyExceptionally(nle, NotLeaderException.class.getName());
+            return;
+          }
+          map.remove(callId).complete(reply);
+        } catch (Throwable t) {
+          map.get(callId).completeExceptionally(t);
+        }
+      }
+
+      @Override
+      public void onError(Throwable t) {
+        final IOException ioe = RaftGrpcUtil.unwrapIOException(t);
+        completeReplyExceptionally(ioe, "onError");
+      }
+
+      @Override
+      public void onCompleted() {
+        completeReplyExceptionally(null, "completed");
+      }
+    };
+    private final StreamObserver<RaftClientRequestProto> requestStreamObserver = append(replyStreamObserver);
+
+    CompletableFuture<RaftClientReply> onNext(RaftClientRequest request) {
+      final Map<Long, CompletableFuture<RaftClientReply>> map = replies.get();
+      if (map == null) {
+        return JavaUtils.completeExceptionally(new IOException("Already closed."));
+      }
+      final CompletableFuture<RaftClientReply> f = new CompletableFuture<>();
+      CollectionUtils.putNew(request.getCallId(), f, map,
+          () -> getName() + ":" + getClass().getSimpleName());
+      try {
+        requestStreamObserver.onNext(ClientProtoUtils.toRaftClientRequestProto(request));
+      } catch(Throwable t) {
+        f.completeExceptionally(t);
+      }
+      return f;
+    }
+
+    @Override
+    public void close() {
+      requestStreamObserver.onCompleted();
+      completeReplyExceptionally(null, "close");
+    }
+
+    private void completeReplyExceptionally(Throwable t, String event) {
+      appendStreamObservers.compareAndSet(this, null);
+      final Map<Long, CompletableFuture<RaftClientReply>> map = replies.getAndSet(null);
+      if (map == null) {
+        return;
+      }
+      for (Map.Entry<Long, CompletableFuture<RaftClientReply>> entry : map.entrySet()) {
+        final CompletableFuture<RaftClientReply> f = entry.getValue();
+        if (!f.isDone()) {
+          f.completeExceptionally(t != null? t
+              : new IOException(getName() + ": Stream " + event
+                  + ": no reply for async request cid=" + entry.getKey()));
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7872f329/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
----------------------------------------------------------------------
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
index f3ebe0f..6d19920 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/RaftClientProtocolService.java
@@ -25,17 +25,21 @@ import org.apache.ratis.shaded.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.ratis.shaded.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.shaded.proto.RaftProtos.SetConfigurationRequestProto;
 import org.apache.ratis.shaded.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceImplBase;
+import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
+import org.apache.ratis.util.SlidingWindow;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
+import java.io.IOException;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Supplier;
 
 public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase {
-  static final Logger LOG = LoggerFactory.getLogger(RaftClientProtocolService.class);
+  public static final Logger LOG = LoggerFactory.getLogger(RaftClientProtocolService.class);
 
-  private static class PendingAppend implements Comparable<PendingAppend> {
+  private static class PendingAppend implements SlidingWindow.Request<RaftClientReply> {
     private final RaftClientRequest request;
     private volatile RaftClientReply reply;
 
@@ -43,25 +47,27 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase
       this.request = request;
     }
 
-    boolean isReady() {
+    @Override
+    public boolean hasReply() {
       return reply != null || this == COMPLETED;
     }
 
-    void setReply(RaftClientReply reply) {
+    @Override
+    public void setReply(RaftClientReply reply) {
       this.reply = reply;
     }
 
-    RaftClientRequest getRequest() {
-      return request;
+    RaftClientReply getReply() {
+      return reply;
     }
 
-    long getSeqNum() {
-      return request != null? request.getSeqNum(): Long.MAX_VALUE;
+    RaftClientRequest getRequest() {
+      return request;
     }
 
     @Override
-    public int compareTo(PendingAppend that) {
-      return Long.compare(this.getSeqNum(), that.getSeqNum());
+    public long getSeqNum() {
+      return request != null? request.getSeqNum(): Long.MAX_VALUE;
     }
 
     @Override
@@ -97,97 +103,84 @@ public class RaftClientProtocolService extends RaftClientProtocolServiceImplBase
     return new AppendRequestStreamObserver(responseObserver);
   }
 
+  private final AtomicInteger streamCount = new AtomicInteger();
+
   private class AppendRequestStreamObserver implements
       StreamObserver<RaftClientRequestProto> {
-    private final List<PendingAppend> pendingList = new LinkedList<>();
+    private final String name = getId() + "-" +  streamCount.getAndIncrement();
     private final StreamObserver<RaftClientReplyProto> responseObserver;
+    private final SlidingWindow.Server<PendingAppend, RaftClientReply> slidingWindow
+        = new SlidingWindow.Server<>(name, COMPLETED);
 
     AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> ro) {
+      LOG.debug("new AppendRequestStreamObserver {}", name);
       this.responseObserver = ro;
     }
 
+    void processClientRequestAsync(PendingAppend pending) {
+      try {
+        protocol.submitClientRequestAsync(pending.getRequest()
+        ).thenAcceptAsync(reply -> slidingWindow.receiveReply(
+            pending.getSeqNum(), reply, this::sendReply, this::processClientRequestAsync)
+        ).exceptionally(exception -> {
+          // TODO: the exception may be from either raft or state machine.
+          // Currently we skip all the following responses when getting an
+          // exception from the state machine.
+          responseError(exception, () -> "processClientRequestAsync for " + pending.getRequest());
+          return null;
+        });
+      } catch (IOException e) {
+        throw new CompletionException("Failed processClientRequestAsync for " + pending.getRequest(), e);
+      }
+    }
+
     @Override
     public void onNext(RaftClientRequestProto request) {
       try {
         final RaftClientRequest r = ClientProtoUtils.toRaftClientRequest(request);
         final PendingAppend p = new PendingAppend(r);
-        final long replySeq = p.getSeqNum();
-        synchronized (pendingList) {
-          pendingList.add(p);
-        }
-
-        protocol.submitClientRequestAsync(r
-        ).whenCompleteAsync((reply, exception) -> {
-          if (exception != null) {
-            // TODO: the exception may be from either raft or state machine.
-            // Currently we skip all the following responses when getting an
-            // exception from the state machine.
-            responseObserver.onError(RaftGrpcUtil.wrapException(exception));
-          } else {
-            synchronized (pendingList) {
-              Preconditions.assertTrue(!pendingList.isEmpty(),
-                  "PendingList is empty when handling onNext for seqNum %s", replySeq);
-              final long headSeqNum = pendingList.get(0).getSeqNum();
-              // stream seqNum is consecutive
-              final PendingAppend pendingForReply = pendingList.get(
-                  (int) (replySeq - headSeqNum));
-              Preconditions.assertTrue(pendingForReply != null &&
-                      pendingForReply.getSeqNum() == replySeq,
-                  "pending for reply is: %s, the pending list: %s",
-                  pendingForReply, pendingList);
-              pendingForReply.setReply(reply);
-
-              if (headSeqNum == replySeq) {
-                Collection<PendingAppend> readySet = new ArrayList<>();
-                // if this is head, we send back all the ready responses
-                Iterator<PendingAppend> iter = pendingList.iterator();
-                PendingAppend pending;
-                while (iter.hasNext() && ((pending = iter.next()).isReady())) {
-                  readySet.add(pending);
-                  iter.remove();
-                }
-                sendReadyReplies(readySet);
-              }
-            }
-          }
-        });
+        slidingWindow.receivedRequest(p, this::processClientRequestAsync);
       } catch (Throwable e) {
-        LOG.info("{} got exception when handling client append request {}: {}",
-            getId(), request.getRpcRequest(), e);
-        responseObserver.onError(RaftGrpcUtil.wrapException(e));
+        responseError(e, () -> "onNext for " + ClientProtoUtils.toString(request));
       }
     }
 
-    private void sendReadyReplies(Collection<PendingAppend> readySet) {
-      readySet.forEach(ready -> {
-        Preconditions.assertTrue(ready.isReady());
+    private void sendReply(PendingAppend ready) {
+        Preconditions.assertTrue(ready.hasReply());
         if (ready == COMPLETED) {
-          responseObserver.onCompleted();
+          close();
         } else {
+          LOG.debug("{}: sendReply seq={}, {}", name, ready.getSeqNum(), ready.getReply());
           responseObserver.onNext(
-              ClientProtoUtils.toRaftClientReplyProto(ready.reply));
+              ClientProtoUtils.toRaftClientReplyProto(ready.getReply()));
         }
-      });
     }
 
     @Override
     public void onError(Throwable t) {
       // for now we just log a msg
-      LOG.warn("{} onError: client Append cancelled", getId(), t);
-      synchronized (pendingList) {
-        pendingList.clear();
-      }
+      LOG.warn(name + ": onError", t);
+      slidingWindow.close();
     }
 
     @Override
     public void onCompleted() {
-      synchronized (pendingList) {
-        if (pendingList.isEmpty()) {
-          responseObserver.onCompleted();
-        } else {
-          pendingList.add(COMPLETED);
-        }
+      if (slidingWindow.endOfRequests()) {
+        close();
       }
     }
+
+    private void close() {
+      LOG.debug("{}: close", name);
+      responseObserver.onCompleted();
+      slidingWindow.close();
+    }
+
+    void responseError(Throwable t, Supplier<String> message) {
+      t = JavaUtils.unwrapCompletionException(t);
+      LOG.warn(name + ": Failed " + message.get(), t);
+      responseObserver.onError(RaftGrpcUtil.wrapException(t));
+      slidingWindow.close();
+    }
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7872f329/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
index b8bc636..e5f41b7 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftAsyncTests.java
@@ -140,10 +140,11 @@ public abstract class RaftAsyncTests<CLUSTER extends MiniRaftCluster> extends Ba
   @Test
   public void testBasicAppendEntriesAsync() throws Exception {
     LOG.info("Running testBasicAppendEntriesAsync");
+    RaftClientConfigKeys.Async.setMaxOutstandingRequests(properties, 100);
     final CLUSTER cluster = getFactory().newCluster(NUM_SERVERS, properties);
     cluster.start();
     waitForLeader(cluster);
-    RaftBasicTests.runTestBasicAppendEntries(true, 10, cluster, LOG);
+    RaftBasicTests.runTestBasicAppendEntries(true, 1000, cluster, LOG);
     cluster.shutdown();
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ratis/blob/7872f329/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
----------------------------------------------------------------------
diff --git a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
index c8dfc0d..c55445a 100644
--- a/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/RaftTestUtil.java
@@ -189,11 +189,6 @@ public interface RaftTestUtil {
       }
     }
 
-    if (async) {
-      Collections.sort(entries, Comparator
-          .comparing(e -> e.getSmLogEntry().getData().toStringUtf8()));
-    }
-
     long logIndex = 0;
     Assert.assertEquals(expectedMessages.length, entries.size());
     for (int i = 0; i < expectedMessages.length; i++) {