You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@ratis.apache.org by sz...@apache.org on 2019/03/19 07:17:51 UTC

[incubator-ratis] branch master updated: RATIS-499. Leader should close the sliding window if it steps down.

This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-ratis.git


The following commit(s) were added to refs/heads/master by this push:
     new 74b491c  RATIS-499. Leader should close the sliding window if it steps down.
74b491c is described below

commit 74b491cc76aa5e26029c3f6d77c1210809ecc110
Author: Tsz Wo Nicholas Sze <sz...@apache.org>
AuthorDate: Tue Mar 19 15:17:18 2019 +0800

    RATIS-499. Leader should close the sliding window if it steps down.
---
 .../ratis/client/impl/RaftClientTestUtil.java      |  11 +-
 .../apache/ratis/protocol/RaftClientRequest.java   |   2 +-
 .../org/apache/ratis/util/CollectionUtils.java     |  41 ++++---
 .../grpc/client/GrpcClientProtocolClient.java      |  22 ++--
 .../ratis/grpc/client/GrpcClientProtocolProxy.java |   2 +-
 .../grpc/client/GrpcClientProtocolService.java     |  81 ++++++++++---
 .../apache/ratis/grpc/client/GrpcClientRpc.java    |  12 +-
 .../org/apache/ratis/grpc/server/GrpcService.java  |  23 ++--
 ratis-proto/src/main/proto/Grpc.proto              |   8 +-
 .../org/apache/ratis/server/RaftServerRpc.java     |   6 +-
 .../org/apache/ratis/server/impl/LeaderState.java  |   1 +
 .../org/apache/ratis/server/impl/RoleInfo.java     |   4 +
 .../ratis/server/impl/RaftServerTestUtil.java      |   7 +-
 .../apache/ratis/grpc/TestRaftServerWithGrpc.java  | 135 +++++++++++++++++++--
 14 files changed, 277 insertions(+), 78 deletions(-)

diff --git a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
index 20647de..7426d32 100644
--- a/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
+++ b/ratis-client/src/main/java/org/apache/ratis/client/impl/RaftClientTestUtil.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -18,6 +18,10 @@
 package org.apache.ratis.client.impl;
 
 import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.proto.RaftProtos.SlidingWindowEntry;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientRequest;
+import org.apache.ratis.protocol.RaftPeerId;
 
 /** Interface for testing raft client. */
 public interface RaftClientTestUtil {
@@ -33,4 +37,9 @@ public interface RaftClientTestUtil {
   static long getCallId(RaftClient client) {
     return ((RaftClientImpl) client).getCallId();
   }
+
+  static RaftClientRequest newRaftClientRequest(RaftClient client, RaftPeerId server,
+      long callId, Message message, RaftClientRequest.Type type, SlidingWindowEntry slidingWindowEntry) {
+    return ((RaftClientImpl)client).newRaftClientRequest(server, callId, message, type, slidingWindowEntry);
+  }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
index a007e83..18253fa 100644
--- a/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
+++ b/ratis-common/src/main/java/org/apache/ratis/protocol/RaftClientRequest.java
@@ -199,7 +199,7 @@ public class RaftClientRequest extends RaftClientMessage {
 
   @Override
   public String toString() {
-    return super.toString() + ", cid=" + callId + ", seq=" + ProtoUtils.toString(slidingWindowEntry)
+    return super.toString() + ", cid=" + callId + ", seq=" + ProtoUtils.toString(slidingWindowEntry) + ", "
         + type + ", " + getMessage();
   }
 }
diff --git a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
index a215d3d..39d9908 100644
--- a/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
+++ b/ratis-common/src/main/java/org/apache/ratis/util/CollectionUtils.java
@@ -1,23 +1,20 @@
 /*
- * *
- *  * Licensed to the Apache Software Foundation (ASF) under one
- *  * or more contributor license agreements.  See the NOTICE file
- *  * distributed with this work for additional information
- *  * regarding copyright ownership.  The ASF licenses this file
- *  * to you under the Apache License, Version 2.0 (the
- *  * "License"); you may not use this file except in compliance
- *  * with the License.  You may obtain a copy of the License at
- *  *
- *  *     http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
  *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
  */
-
 package org.apache.ratis.util;
 
 import java.util.*;
@@ -91,16 +88,22 @@ public interface CollectionUtils {
     return as(Arrays.asList(array), converter);
   }
 
-  static <K, V> V putNew(K key, V value, Map<K, V> map, Supplier<String> name) {
+  static <K, V> V putNew(K key, V value, Map<K, V> map, Supplier<Object> name) {
     final V returned = map.put(key, value);
     Preconditions.assertTrue(returned == null,
         () -> "Entry already exists for key " + key + " in map " + name.get());
     return value;
   }
 
-  static <K, V> void replaceExisting(K key, V oldValue, V newValue, Map<K, V> map, Supplier<String> name) {
+  static <K, V> void replaceExisting(K key, V oldValue, V newValue, Map<K, V> map, Supplier<Object> name) {
     final boolean replaced = map.replace(key, oldValue, newValue);
     Preconditions.assertTrue(replaced,
         () -> "Entry not found for key " + key + " in map " + name.get());
   }
+
+  static <K, V> void removeExisting(K key, V value, Map<K, V> map, Supplier<Object> name) {
+    final boolean removed = map.remove(key, value);
+    Preconditions.assertTrue(removed,
+        () -> "Entry not found for key " + key + " in map " + name.get());
+  }
 }
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
index 0be963f..2ed5df0 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolClient.java
@@ -84,7 +84,7 @@ public class GrpcClientProtocolClient implements Closeable {
   private final RaftClientProtocolServiceStub asyncStub;
   private final AdminProtocolServiceBlockingStub adminBlockingStub;
 
-  private final AtomicReference<AsyncStreamObservers> appendStreamObservers = new AtomicReference<>();
+  private final AtomicReference<AsyncStreamObservers> orderedStreamObservers = new AtomicReference<>();
 
   private final AtomicReference<AsyncStreamObservers> unorderedStreamObservers = new AtomicReference<>();
 
@@ -128,7 +128,7 @@ public class GrpcClientProtocolClient implements Closeable {
 
   @Override
   public void close() {
-    Optional.ofNullable(appendStreamObservers.getAndSet(null)).ifPresent(AsyncStreamObservers::close);
+    Optional.ofNullable(orderedStreamObservers.getAndSet(null)).ifPresent(AsyncStreamObservers::close);
     Optional.ofNullable(unorderedStreamObservers.getAndSet(null)).ifPresent(AsyncStreamObservers::close);
     channel.shutdownNow();
   }
@@ -169,20 +169,18 @@ public class GrpcClientProtocolClient implements Closeable {
     }
   }
 
-  StreamObserver<RaftClientRequestProto> append(
-      StreamObserver<RaftClientReplyProto> responseHandler) {
-    return asyncStub.append(responseHandler);
+  StreamObserver<RaftClientRequestProto> ordered(StreamObserver<RaftClientReplyProto> responseHandler) {
+    return asyncStub.ordered(responseHandler);
   }
 
-  StreamObserver<RaftClientRequestProto> appendWithTimeout(
-      StreamObserver<RaftClientReplyProto> responseHandler) {
+  StreamObserver<RaftClientRequestProto> orderedWithTimeout(StreamObserver<RaftClientReplyProto> responseHandler) {
     return asyncStub.withDeadlineAfter(requestTimeoutDuration.getDuration(), requestTimeoutDuration.getUnit())
-        .append(responseHandler);
+        .unordered(responseHandler);
   }
 
-  AsyncStreamObservers getAppendStreamObservers() {
-    return appendStreamObservers.updateAndGet(
-        a -> a != null? a : new AsyncStreamObservers(appendStreamObservers, this::append));
+  AsyncStreamObservers getOrderedStreamObservers() {
+    return orderedStreamObservers.updateAndGet(
+        a -> a != null? a : new AsyncStreamObservers(orderedStreamObservers, this::ordered));
   }
 
   AsyncStreamObservers getUnorderedAsyncStreamObservers() {
@@ -329,7 +327,7 @@ public class GrpcClientProtocolClient implements Closeable {
         final CompletableFuture<RaftClientReply> f = entry.getValue();
         if (!f.isDone()) {
           f.completeExceptionally(t != null? t
-              : new IOException(getName() + ": Stream " + event
+              : new AlreadyClosedException(getName() + ": Stream " + event
                   + ": no reply for async request cid=" + entry.getKey()));
         }
       }
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java
index 3a3941b..666b1c3 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolProxy.java
@@ -84,7 +84,7 @@ public class GrpcClientProtocolProxy implements Closeable {
 
     RpcSession(CloseableStreamObserver responseHandler) {
       this.responseHandler = responseHandler;
-      this.requestObserver = proxy.append(responseHandler);
+      this.requestObserver = proxy.ordered(responseHandler);
     }
 
     void onError() {
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
index 2883795..b0a7578 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientProtocolService.java
@@ -25,6 +25,7 @@ import org.apache.ratis.proto.RaftProtos.RaftClientReplyProto;
 import org.apache.ratis.proto.RaftProtos.RaftClientRequestProto;
 import org.apache.ratis.proto.RaftProtos.SetConfigurationRequestProto;
 import org.apache.ratis.proto.grpc.RaftClientProtocolServiceGrpc.RaftClientProtocolServiceImplBase;
+import org.apache.ratis.util.CollectionUtils;
 import org.apache.ratis.util.JavaUtils;
 import org.apache.ratis.util.Preconditions;
 import org.apache.ratis.util.SlidingWindow;
@@ -36,6 +37,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Consumer;
@@ -44,11 +46,11 @@ import java.util.function.Supplier;
 public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase {
   public static final Logger LOG = LoggerFactory.getLogger(GrpcClientProtocolService.class);
 
-  private static class PendingAppend implements SlidingWindow.ServerSideRequest<RaftClientReply> {
+  private static class PendingOrderedRequest implements SlidingWindow.ServerSideRequest<RaftClientReply> {
     private final RaftClientRequest request;
     private volatile RaftClientReply reply;
 
-    PendingAppend(RaftClientRequest request) {
+    PendingOrderedRequest(RaftClientRequest request) {
       this.request = request;
     }
 
@@ -85,11 +87,34 @@ public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase
       return request != null? getSeqNum() + ":" + reply: "COMPLETED";
     }
   }
-  private static final PendingAppend COMPLETED = new PendingAppend(null);
+  private static final PendingOrderedRequest COMPLETED = new PendingOrderedRequest(null);
+
+  static class OrderedStreamObservers {
+    private final Map<Integer, OrderedRequestStreamObserver> map = new ConcurrentHashMap<>();
+
+    void putNew(OrderedRequestStreamObserver so) {
+      CollectionUtils.putNew(so.getId(), so, map, () -> getClass().getSimpleName());
+    }
+
+    void removeExisting(OrderedRequestStreamObserver so) {
+      CollectionUtils.removeExisting(so.getId(), so, map, () -> getClass().getSimpleName());
+    }
+
+    void closeAllExisting() {
+      // Iteration not synchronized:
+      // Okay if an existing object is removed by another mean during the iteration since it must be already closed.
+      // Also okay if a new object is added during the iteration since this method closes only the existing objects.
+      for(OrderedRequestStreamObserver so : map.values()) {
+        so.close();
+      }
+    }
+  }
 
   private final Supplier<RaftPeerId> idSupplier;
   private final RaftClientAsynchronousProtocol protocol;
 
+  private final OrderedStreamObservers orderedStreamObservers = new OrderedStreamObservers();
+
   public GrpcClientProtocolService(Supplier<RaftPeerId> idSupplier, RaftClientAsynchronousProtocol protocol) {
     this.idSupplier = idSupplier;
     this.protocol = protocol;
@@ -108,9 +133,15 @@ public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase
   }
 
   @Override
-  public StreamObserver<RaftClientRequestProto> append(
-      StreamObserver<RaftClientReplyProto> responseObserver) {
-    return new AppendRequestStreamObserver(responseObserver);
+  public StreamObserver<RaftClientRequestProto> ordered(StreamObserver<RaftClientReplyProto> responseObserver) {
+    final OrderedRequestStreamObserver so = new OrderedRequestStreamObserver(responseObserver);
+    orderedStreamObservers.putNew(so);
+    return so;
+  }
+
+  public void closeAllOrderedRequestStreamObservers() {
+    LOG.debug("{}: closeAllOrderedRequestStreamObservers", getId());
+    orderedStreamObservers.closeAllExisting();
   }
 
   @Override
@@ -121,7 +152,8 @@ public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase
   private final AtomicInteger streamCount = new AtomicInteger();
 
   private abstract class RequestStreamObserver implements StreamObserver<RaftClientRequestProto> {
-    private final String name = getId() + "-" + getClass().getSimpleName() + streamCount.getAndIncrement();
+    private final int id = streamCount.getAndIncrement();
+    private final String name = getId() + "-" + getClass().getSimpleName() + id;
     private final StreamObserver<RaftClientReplyProto> responseObserver;
     private final AtomicBoolean isClosed = new AtomicBoolean();
 
@@ -130,6 +162,10 @@ public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase
       this.responseObserver = responseObserver;
     }
 
+    int getId() {
+      return id;
+    }
+
     String getName() {
       return name;
     }
@@ -139,11 +175,25 @@ public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase
     }
 
     synchronized void responseCompleted() {
-      responseObserver.onCompleted();
+      try {
+        responseObserver.onCompleted();
+      } catch(Exception e) {
+        // response stream may possibly be already closed/failed so that the exception can be safely ignored.
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(getName() + ": Failed onCompleted, exception is ignored", e);
+        }
+      }
     }
 
     synchronized void responseError(Throwable t) {
-      responseObserver.onError(t);
+      try {
+        responseObserver.onError(t);
+      } catch(Exception e) {
+        // response stream may possibly be already closed/failed so that the exception can be safely ignored.
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(getName() + ": Failed onError, exception is ignored", e);
+        }
+      }
     }
 
 
@@ -243,15 +293,15 @@ public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase
     }
   }
 
-  private class AppendRequestStreamObserver extends RequestStreamObserver {
-    private final SlidingWindow.Server<PendingAppend, RaftClientReply> slidingWindow
+  private class OrderedRequestStreamObserver extends RequestStreamObserver {
+    private final SlidingWindow.Server<PendingOrderedRequest, RaftClientReply> slidingWindow
         = new SlidingWindow.Server<>(getName(), COMPLETED);
 
-    AppendRequestStreamObserver(StreamObserver<RaftClientReplyProto> responseObserver) {
+    OrderedRequestStreamObserver(StreamObserver<RaftClientReplyProto> responseObserver) {
       super(responseObserver);
     }
 
-    void processClientRequest(PendingAppend pending) {
+    void processClientRequest(PendingOrderedRequest pending) {
       final long seq = pending.getSeqNum();
       processClientRequest(pending.getRequest(),
           reply -> slidingWindow.receiveReply(seq, reply, this::sendReply, this::processClientRequest));
@@ -259,10 +309,10 @@ public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase
 
     @Override
     void processClientRequest(RaftClientRequest r) {
-      slidingWindow.receivedRequest(new PendingAppend(r), this::processClientRequest);
+      slidingWindow.receivedRequest(new PendingOrderedRequest(r), this::processClientRequest);
     }
 
-    private void sendReply(PendingAppend ready) {
+    private void sendReply(PendingOrderedRequest ready) {
       Preconditions.assertTrue(ready.hasReply());
       if (ready == COMPLETED) {
         close();
@@ -291,6 +341,7 @@ public class GrpcClientProtocolService extends RaftClientProtocolServiceImplBase
         LOG.debug("{}: close", getName());
         responseCompleted();
         slidingWindow.close();
+        orderedStreamObservers.removeExisting(this);
       }
     }
 
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
index a63c2af..deaaac0 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/client/GrpcClientRpc.java
@@ -65,7 +65,7 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<GrpcClientProtocolClie
     try {
       final GrpcClientProtocolClient proxy = getProxies().getProxy(serverId);
       // Reuse the same grpc stream for all async calls.
-      return proxy.getAppendStreamObservers().onNext(request);
+      return proxy.getOrderedStreamObservers().onNext(request);
     } catch (IOException e) {
       return JavaUtils.completeExceptionally(e);
     }
@@ -113,6 +113,9 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<GrpcClientProtocolClie
         throw new InterruptedIOException(
             "Interrupted while waiting for response of request " + request);
       } catch (ExecutionException e) {
+        if (LOG.isTraceEnabled()) {
+          LOG.trace(clientId + ": failed " + request, e);
+        }
         throw IOUtils.toIOException(e);
       }
     }
@@ -122,11 +125,10 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<GrpcClientProtocolClie
       RaftClientRequest request, GrpcClientProtocolClient proxy) throws IOException {
     final RaftClientRequestProto requestProto =
         toRaftClientRequestProto(request);
-    final CompletableFuture<RaftClientReplyProto> replyFuture =
-        new CompletableFuture<>();
+    final CompletableFuture<RaftClientReplyProto> replyFuture = new CompletableFuture<>();
     // create a new grpc stream for each non-async call.
     final StreamObserver<RaftClientRequestProto> requestObserver =
-        proxy.appendWithTimeout(new StreamObserver<RaftClientReplyProto>() {
+        proxy.orderedWithTimeout(new StreamObserver<RaftClientReplyProto>() {
           @Override
           public void onNext(RaftClientReplyProto value) {
             replyFuture.complete(value);
@@ -141,7 +143,7 @@ public class GrpcClientRpc extends RaftClientRpcWithProxy<GrpcClientProtocolClie
           public void onCompleted() {
             if (!replyFuture.isDone()) {
               replyFuture.completeExceptionally(
-                  new IOException(clientId + ": Stream completed but no reply for request " + request));
+                  new AlreadyClosedException(clientId + ": Stream completed but no reply for request " + request));
             }
           }
         });
diff --git a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
index 5139aff..de65acc 100644
--- a/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
+++ b/ratis-grpc/src/main/java/org/apache/ratis/grpc/server/GrpcService.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -81,6 +81,8 @@ public class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocolClient
   private final Server server;
   private final Supplier<InetSocketAddress> addressSupplier;
 
+  private final GrpcClientProtocolService clientProtocolService;
+
   private GrpcService(RaftServer server, GrpcTlsConfig tlsConfig) {
     this(server, server::getId,
         GrpcConfigKeys.Server.port(server.getProperties()),
@@ -103,11 +105,13 @@ public class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocolClient
           + " > " + GrpcConfigKeys.MESSAGE_SIZE_MAX_KEY + " = " + grpcMessageSizeMax);
     }
 
+    this.clientProtocolService = new GrpcClientProtocolService(idSupplier, raftServer);
+
     NettyServerBuilder nettyServerBuilder = NettyServerBuilder.forPort(port)
         .maxInboundMessageSize(grpcMessageSizeMax.getSizeInt())
         .flowControlWindow(flowControlWindow.getSizeInt())
         .addService(new GrpcServerProtocolService(idSupplier, raftServer))
-        .addService(new GrpcClientProtocolService(idSupplier, raftServer))
+        .addService(clientProtocolService)
         .addService(new GrpcAdminProtocolService(raftServer));
 
     if (tlsConfig != null) {
@@ -159,22 +163,25 @@ public class GrpcService extends RaftServerRpcWithProxy<GrpcServerProtocolClient
   }
 
   @Override
+  public void notifyNotLeader() {
+    clientProtocolService.closeAllOrderedRequestStreamObservers();
+  }
+
+  @Override
   public InetSocketAddress getInetSocketAddress() {
     return addressSupplier.get();
   }
 
   @Override
-  public AppendEntriesReplyProto appendEntries(
-      AppendEntriesRequestProto request) throws IOException {
+  public AppendEntriesReplyProto appendEntries(AppendEntriesRequestProto request) {
     throw new UnsupportedOperationException(
-        "Blocking AppendEntries call is not supported");
+        "Blocking " + JavaUtils.getCurrentStackTraceElement().getMethodName() + " call is not supported");
   }
 
   @Override
-  public InstallSnapshotReplyProto installSnapshot(
-      InstallSnapshotRequestProto request) throws IOException {
+  public InstallSnapshotReplyProto installSnapshot(InstallSnapshotRequestProto request) {
     throw new UnsupportedOperationException(
-        "Blocking InstallSnapshot call is not supported");
+        "Blocking " + JavaUtils.getCurrentStackTraceElement().getMethodName() + " call is not supported");
   }
 
   @Override
diff --git a/ratis-proto/src/main/proto/Grpc.proto b/ratis-proto/src/main/proto/Grpc.proto
index f177609..497e3fd 100644
--- a/ratis-proto/src/main/proto/Grpc.proto
+++ b/ratis-proto/src/main/proto/Grpc.proto
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -28,11 +28,11 @@ service RaftClientProtocolService {
   rpc setConfiguration(ratis.common.SetConfigurationRequestProto)
       returns(ratis.common.RaftClientReplyProto) {}
 
-  // A client-to-server stream RPC to append data
-  rpc append(stream ratis.common.RaftClientRequestProto)
+  // A client-to-server stream RPC to ordered async requests
+  rpc ordered(stream ratis.common.RaftClientRequestProto)
       returns (stream ratis.common.RaftClientReplyProto) {}
 
-  // A client-to-server stream RPC for unordered async requested
+  // A client-to-server stream RPC for unordered async requests
   rpc unordered(stream ratis.common.RaftClientRequestProto)
       returns (stream ratis.common.RaftClientReplyProto) {}
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
index a85e606..c37433e 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/RaftServerRpc.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -62,4 +62,8 @@ public interface RaftServerRpc extends RaftServerProtocol, RpcType.Get, Closeabl
 
   /** Handle the given exception.  For example, try reconnecting. */
   void handleException(RaftPeerId serverId, Exception e, boolean reconnect);
+
+  /** The server role changes from leader to a non-leader role. */
+  default void notifyNotLeader() {
+  }
 }
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
index 8ece134..e828fd8 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/LeaderState.java
@@ -246,6 +246,7 @@ public class LeaderState {
     } catch (IOException e) {
       LOG.warn(server.getId() + ": Caught exception in sendNotLeaderResponses", e);
     }
+    server.getServerRpc().notifyNotLeader();
   }
 
   void notifySenders() {
diff --git a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
index 88d525c..52f4b81 100644
--- a/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
+++ b/ratis-server/src/main/java/org/apache/ratis/server/impl/RoleInfo.java
@@ -53,6 +53,10 @@ class RoleInfo {
     this.transitionTime = new AtomicReference<>(Timestamp.currentTime());
   }
 
+  RaftPeerRole getRaftPeerRole() {
+    return role;
+  }
+
   void transitionRole(RaftPeerRole newRole) {
     this.role = newRole;
     this.transitionTime.set(Timestamp.currentTime());
diff --git a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
index ea9b9a2..676713f 100644
--- a/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
+++ b/ratis-server/src/test/java/org/apache/ratis/server/impl/RaftServerTestUtil.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -19,6 +19,7 @@ package org.apache.ratis.server.impl;
 
 import org.apache.log4j.Level;
 import org.apache.ratis.MiniRaftCluster;
+import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
 import org.apache.ratis.protocol.ClientId;
 import org.apache.ratis.protocol.RaftGroupId;
 import org.apache.ratis.protocol.RaftPeer;
@@ -93,6 +94,10 @@ public class RaftServerTestUtil {
     return entry.isFailed();
   }
 
+  public static RaftPeerRole getRole(RaftServerImpl server) {
+    return server.getRole().getRaftPeerRole();
+  }
+
   public static Stream<LogAppender> getLogAppenders(RaftServerImpl server) {
     return server.getRole().getLeaderState().map(LeaderState::getLogAppenders).orElse(null);
   }
diff --git a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
index 2ec7ae8..177e0f7 100644
--- a/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
+++ b/ratis-test/src/test/java/org/apache/ratis/grpc/TestRaftServerWithGrpc.java
@@ -1,4 +1,4 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -17,32 +17,74 @@
  */
 package org.apache.ratis.grpc;
 
+import org.apache.log4j.Level;
 import org.apache.ratis.BaseTest;
+import org.apache.ratis.MiniRaftCluster;
 import org.apache.ratis.RaftTestUtil;
+import org.apache.ratis.RaftTestUtil.SimpleMessage;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.client.RaftClientConfigKeys;
+import org.apache.ratis.client.RaftClientRpc;
+import org.apache.ratis.client.impl.RaftClientTestUtil;
 import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.client.GrpcClientProtocolClient;
+import org.apache.ratis.grpc.client.GrpcClientProtocolService;
+import org.apache.ratis.proto.RaftProtos.RaftPeerRole;
+import org.apache.ratis.protocol.AlreadyClosedException;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftClientRequest;
 import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.protocol.TimeoutIOException;
+import org.apache.ratis.server.RaftServerRpc;
+import org.apache.ratis.server.impl.RaftServerImpl;
+import org.apache.ratis.server.impl.RaftServerTestUtil;
 import org.apache.ratis.server.impl.ServerImplUtils;
+import org.apache.ratis.statemachine.SimpleStateMachine4Testing;
 import org.apache.ratis.statemachine.StateMachine;
+import org.apache.ratis.util.LogUtils;
+import org.apache.ratis.util.ProtoUtils;
+import org.apache.ratis.util.TimeDuration;
+import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.io.IOException;
 import java.nio.channels.OverlappingFileLockException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
-public class TestRaftServerWithGrpc extends BaseTest {
+public class TestRaftServerWithGrpc extends BaseTest implements MiniRaftClusterWithGrpc.FactoryGet {
+  {
+    LogUtils.setLogLevel(GrpcClientProtocolService.LOG, Level.ALL);
+    LogUtils.setLogLevel(GrpcClientProtocolClient.LOG, Level.ALL);
+  }
+
+  @Before
+  public void setup() {
+    final RaftProperties p = getProperties();
+    p.setClass(MiniRaftCluster.STATEMACHINE_CLASS_KEY, SimpleStateMachine4Testing.class, StateMachine.class);
+    RaftClientConfigKeys.Rpc.setRequestTimeout(p, TimeDuration.valueOf(1, TimeUnit.SECONDS));
+  }
 
   @Test
   public void testServerRestartOnException() throws Exception {
-    RaftProperties properties = new RaftProperties();
-    final MiniRaftClusterWithGrpc cluster
-        = MiniRaftClusterWithGrpc.FACTORY.newCluster(1, properties);
-    cluster.start();
-    RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
-    GrpcConfigKeys.Server.setPort(properties, cluster.getLeader().getServerRpc().getInetSocketAddress().getPort());
+    runWithNewCluster(1, this::runTestServerRestartOnException);
+  }
+
+  void runTestServerRestartOnException(MiniRaftClusterWithGrpc cluster) throws Exception {
+    final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
+    final RaftPeerId leaderId = leader.getId();
+
+    final RaftProperties p = getProperties();
+    GrpcConfigKeys.Server.setPort(p, leader.getServerRpc().getInetSocketAddress().getPort());
+
     // Create a raft server proxy with server rpc bound to a different address
     // compared to leader. This helps in locking the raft storage directory to
     // be used by next raft server proxy instance.
     final StateMachine stateMachine = cluster.getLeader().getStateMachine();
-    ServerImplUtils.newRaftServer(leaderId, cluster.getGroup(), gid -> stateMachine, properties, null);
+    ServerImplUtils.newRaftServer(leaderId, cluster.getGroup(), gid -> stateMachine, p, null);
     // Close the server rpc for leader so that new raft server can be bound to it.
     cluster.getLeader().getServerRpc().close();
 
@@ -51,9 +93,82 @@ public class TestRaftServerWithGrpc extends BaseTest {
     // the raft server proxy created earlier. Raft server proxy should close
     // the rpc server on failure.
     testFailureCase("start a new server with the same address",
-        () -> ServerImplUtils.newRaftServer(leaderId, cluster.getGroup(), gid -> stateMachine, properties, null).start(),
+        () -> ServerImplUtils.newRaftServer(leaderId, cluster.getGroup(), gid -> stateMachine, p, null).start(),
         IOException.class, OverlappingFileLockException.class);
     // Try to start a raft server rpc at the leader address.
     cluster.getServer(leaderId).getFactory().newRaftServerRpc(cluster.getServer(leaderId));
   }
+
+  @Test
+  public void testUnsupportedMethods() throws Exception {
+    runWithNewCluster(1, this::runTestUnsupportedMethods);
+  }
+
+  void runTestUnsupportedMethods(MiniRaftClusterWithGrpc cluster) throws Exception {
+    final RaftPeerId leaderId = RaftTestUtil.waitForLeader(cluster).getId();
+    final RaftServerRpc rpc = cluster.getServer(leaderId).getFactory().newRaftServerRpc(cluster.getServer(leaderId));
+
+    testFailureCase("appendEntries",
+        () -> rpc.appendEntries(null),
+        UnsupportedOperationException.class);
+
+    testFailureCase("installSnapshot",
+        () -> rpc.installSnapshot(null),
+        UnsupportedOperationException.class);
+  }
+
+  @Test
+  public void testLeaderRestart() throws Exception {
+    runWithNewCluster(3, this::runTestLeaderRestart);
+  }
+
+  void runTestLeaderRestart(MiniRaftClusterWithGrpc cluster) throws Exception {
+    final RaftServerImpl leader = RaftTestUtil.waitForLeader(cluster);
+
+    try (final RaftClient client = cluster.createClient()) {
+      // send a request to make sure leader is ready
+      final CompletableFuture<RaftClientReply> f = client.sendAsync(new SimpleMessage("testing"));
+      Assert.assertTrue(f.get().isSuccess());
+    }
+
+    try (final RaftClient client = cluster.createClient()) {
+      final RaftClientRpc rpc = client.getClientRpc();
+
+      final AtomicLong seqNum = new AtomicLong();
+      {
+        // send a request using rpc directly
+        final RaftClientRequest request = newRaftClientRequest(client, leader.getId(), seqNum.incrementAndGet());
+        final CompletableFuture<RaftClientReply> f = rpc.sendRequestAsync(request);
+        Assert.assertTrue(f.get().isSuccess());
+      }
+
+      // send another request which will be blocked
+      final SimpleStateMachine4Testing stateMachine = SimpleStateMachine4Testing.get(leader);
+      stateMachine.blockStartTransaction();
+      final RaftClientRequest requestBlocked = newRaftClientRequest(client, leader.getId(), seqNum.incrementAndGet());
+      final CompletableFuture<RaftClientReply> futureBlocked = rpc.sendRequestAsync(requestBlocked);
+
+      // change leader
+      RaftTestUtil.changeLeader(cluster, leader.getId());
+      Assert.assertNotEquals(RaftPeerRole.LEADER, RaftServerTestUtil.getRole(leader));
+
+      // the blocked request should fail
+      testFailureCase("request should fail", futureBlocked::get,
+          ExecutionException.class, AlreadyClosedException.class);
+      stateMachine.unblockStartTransaction();
+
+      // send one more request which should timeout.
+      final RaftClientRequest requestTimeout = newRaftClientRequest(client, leader.getId(), seqNum.incrementAndGet());
+      final CompletableFuture<RaftClientReply> f = rpc.sendRequestAsync(requestTimeout);
+      testFailureCase("request should timeout", f::get,
+          ExecutionException.class, TimeoutIOException.class);
+    }
+
+  }
+
+  static RaftClientRequest newRaftClientRequest(RaftClient client, RaftPeerId serverId, long seqNum) {
+    final SimpleMessage m = new SimpleMessage("m" + seqNum);
+    return RaftClientTestUtil.newRaftClientRequest(client, serverId, seqNum, m,
+        RaftClientRequest.writeRequestType(), ProtoUtils.toSlidingWindowEntry(seqNum, seqNum == 1L));
+  }
 }